Skip to content

Commit

Permalink
Fixed PUBDEV-3281: ARFF parser parses attached file incorrectly (h2oa…
Browse files Browse the repository at this point in the history
…i#975)

* Fixed PUBDEV-3281

The parser did not skip lines with spaces in ARFF header.
Fixed that; and extracted some functionality for that.
Tests added.
Also added some statistics evaluator, so testing is easier now.
ARFF sample test file is in smalldata; how can I get it on s3?

* Found another dirty bug.
Patching it for now, but have to investigate.

* Matches did not always work.
Switch to using a string as a match pattern.

* Commented out debugging data.

* Investigating weird cli test failure in ARFF

* Update ParserTestARFF.java

(renamed test file)

* Fix file lookup.

* So, that failure fixed now.

* Restored the logic of skipping the header.
The solution to failing test was increasing the min number of lines per chunk.
Headers can be long.

Will have to work on a better algorithm, though.

* Fixed Java6 compatibility error.

* Fixed rest_api_py test failure (chunk size calculations)

* (oops, typo)

* reverted H2OTestRunner
  • Loading branch information
vpatryshev authored and mmalohlava committed Mar 31, 2017
1 parent 9608d12 commit 2d0a7a3
Show file tree
Hide file tree
Showing 12 changed files with 545 additions and 262 deletions.
10 changes: 5 additions & 5 deletions h2o-core/src/main/java/water/fvec/FileVec.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public static int calcOptimalChunkSize(long totalSize, int numCols, long maxLine
}
else {
// New Heuristic
int minNumberRows = 10; // need at least 10 rows (lines) per chunk (core)
int minNumberRows = 200; // need at least 200 rows (lines) per chunk (core) - that's needed for files with headers
int perNodeChunkCountLimit = 1<<21; // don't create more than 2M Chunk POJOs per node
int minParseChunkSize = 1<<12; // don't read less than this many bytes
int maxParseChunkSize = (1<<28)-1; // don't read more than this many bytes per map() thread (needs to fit into a Value object)
Expand All @@ -204,15 +204,15 @@ public static int calcOptimalChunkSize(long totalSize, int numCols, long maxLine

// Super small data check - file size is smaller than 64kB
if (totalSize <= 1<<16) {
chunkSize = Math.max(DFLT_CHUNK_SIZE, (int) (minNumberRows * maxLineLength));
chunkSize = Math.min(maxParseChunkSize, Math.max(DFLT_CHUNK_SIZE, (int) (minNumberRows * maxLineLength)));
} else {

//round down to closest power of 2
// chunkSize = 1L << MathUtils.log2(chunkSize);

// Small data check
if (chunkSize < DFLT_CHUNK_SIZE && (localParseSize / chunkSize) * numCols < perNodeChunkCountLimit) {
chunkSize = Math.max((int)chunkSize, (int) (minNumberRows * maxLineLength));
chunkSize = Math.min(maxParseChunkSize, Math.max((int)chunkSize, (int) (minNumberRows * maxLineLength)));
} else {
// Adjust chunkSize such that we don't create too many chunks
int chunkCount = cores * 4 * numCols;
Expand All @@ -233,8 +233,8 @@ public static int calcOptimalChunkSize(long totalSize, int numCols, long maxLine
}
}
}
assert(chunkSize >= minParseChunkSize);
assert(chunkSize <= maxParseChunkSize);
assert chunkSize >= minParseChunkSize : "Chunk size " + chunkSize + ", min " + minParseChunkSize;
assert chunkSize <= maxParseChunkSize : "Chunk size " + chunkSize + ", max " + maxParseChunkSize;
if (verbose)
Log.info("ParseSetup heuristic: "
+ "cloudSize: " + cloudsize
Expand Down
27 changes: 16 additions & 11 deletions h2o-core/src/main/java/water/parser/ARFFParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import water.Key;
import water.fvec.Vec;
import water.util.ArrayUtils;
import water.util.BytesStats;

import static water.parser.DefaultParserProviders.ARFF_INFO;

Expand All @@ -18,19 +20,19 @@ static ParseSetup guessSetup(byte[] bits, byte sep, boolean singleQuotes, String
if (columnNames != null) throw new UnsupportedOperationException("ARFFParser doesn't accept columnNames.");

// Parse all lines starting with @ until EOF or @DATA
boolean haveData = false;
int offset = 0;
String[][] data = new String[0][];;
String[] labels;
String[][] domains;
String[] headerlines = new String[0];
byte[] ctypes;

int numLines = 0;

// header section
ArrayList<String> header = new ArrayList<>();
offset = readArffHeader(offset, header, bits, singleQuotes);
if (offset < bits.length && !CsvParser.isEOL(bits[offset]))
haveData = true; //more than just the header
int dataOffset = readArffHeader(0, header, bits, singleQuotes);

boolean haveData = dataOffset < bits.length;

if (header.size() == 0)
throw new ParseDataset.H2OParseException("No data!");
Expand All @@ -48,6 +50,7 @@ static ParseSetup guessSetup(byte[] bits, byte sep, boolean singleQuotes, String
if (haveData) {
String[] datalines = new String[0];
ArrayList<String> datablock = new ArrayList<>();
int offset = dataOffset;
while (offset < bits.length) {
int lineStart = offset;
while (offset < bits.length && !CsvParser.isEOL(bits[offset])) ++offset;
Expand All @@ -65,6 +68,7 @@ static ParseSetup guessSetup(byte[] bits, byte sep, boolean singleQuotes, String
if (datablock.size() == 0)
throw new ParseDataset.H2OParseException("Unexpected line.");
datalines = datablock.toArray(datalines);
numLines = datalines.length;

// process data section
int nlines2 = Math.min(10, datalines.length);
Expand Down Expand Up @@ -98,9 +102,14 @@ static ParseSetup guessSetup(byte[] bits, byte sep, boolean singleQuotes, String
}

// Return the final setup
return new ParseSetup(ARFF_INFO, sep, singleQuotes, ParseSetup.NO_HEADER, ncols, labels, ctypes, domains, naStrings, data);
final ParseSetup parseSetup = new ParseSetup(ARFF_INFO, sep, singleQuotes, ParseSetup.NO_HEADER, ncols, labels, ctypes, domains, naStrings, data);
parseSetup.tentativeNumLines = numLines;
parseSetup.dataOffset = dataOffset;
return parseSetup;
}

private final static String DATA_MARKER = "@DATA";

private static int readArffHeader(int offset, ArrayList<String> header, byte[] bits, boolean singleQuotes) {
while (offset < bits.length) {
int lineStart = offset;
Expand All @@ -112,11 +121,7 @@ private static int readArffHeader(int offset, ArrayList<String> header, byte[] b
if (bits[lineStart] == '#') continue; // Ignore comment lines
if (bits[lineStart] == '%') continue; // Ignore ARFF comment lines
if (lineEnd > lineStart) {
if (bits[lineStart] == '@' &&
(bits[lineStart+1] == 'D' || bits[lineStart+1] =='d' ) &&
(bits[lineStart+2] == 'A' || bits[lineStart+2] =='a' ) &&
(bits[lineStart+3] == 'T' || bits[lineStart+3] =='t' ) &&
(bits[lineStart+4] == 'A' || bits[lineStart+4] =='a' )){
if (ArrayUtils.matchesInUpperCase(bits, lineStart, DATA_MARKER)) {
break;
}
String str = new String(bits, lineStart, lineEnd - lineStart).trim();
Expand Down
45 changes: 23 additions & 22 deletions h2o-core/src/main/java/water/parser/CsvParser.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package water.parser;

import org.apache.commons.lang.math.NumberUtils;
import org.apache.http.ParseException;
import water.fvec.Chunk;
import water.fvec.Vec;
import water.fvec.FileVec;
import water.Key;
Expand All @@ -22,7 +22,19 @@ class CsvParser extends Parser {
private static final int HAS_HEADER = ParseSetup.HAS_HEADER;

CsvParser( ParseSetup ps, Key jobKey ) { super(ps, jobKey); }


// TODO(vlad): make it package-private, add a test
private static int skipHeader(byte[] bits, int offset) {
int offsetToNonspace = skipSpaces(bits, offset);
while (offsetToNonspace < bits.length && isCommentChar(bits[offsetToNonspace])) {
offset = skipToNewLine(bits, offsetToNonspace) + 1;
offsetToNonspace = skipSpaces(bits, offset);
}
return offset;
}

private int patience = 20;

// Parse this one Chunk (in parallel with other Chunks)
@SuppressWarnings("fallthrough")
@Override public ParseWriter parseChunk(int cidx, final ParseReader din, final ParseWriter dout) {
Expand Down Expand Up @@ -57,25 +69,13 @@ class CsvParser extends Parser {
int fractionDigits = 0;
int tokenStart = 0; // used for numeric token to backtrace if not successful
int colIdx = 0;
byte c = bits[offset];
// skip comments for the first chunk (or if not a chunk)
if( cidx == 0 ) {
while ( c == '#'
|| isEOL(c)
|| c == '@' /*also treat as comments leading '@' from ARFF format*/
|| c == '%' /*also treat as comments leading '%' from ARFF format*/) {
while ((offset < bits.length) && (bits[offset] != CHAR_CR) && (bits[offset ] != CHAR_LF)) {
// System.out.print(String.format("%c",bits[offset]));
++offset;
}
if ((offset + 1 < bits.length) && (bits[offset] == CHAR_CR) && (bits[offset + 1] == CHAR_LF)) ++offset;
++offset;
// System.out.println();
if (offset >= bits.length)
return dout;
c = bits[offset];
}
if (cidx == 0) {
offset = skipHeader(bits, offset);
if (offset >= bits.length) return dout;
}

byte c = bits[offset];
dout.newLine();

final boolean forceable = dout instanceof FVecParseWriter && ((FVecParseWriter)dout)._ctypes != null && _setup._column_types != null;
Expand All @@ -87,12 +87,13 @@ class CsvParser extends Parser {
switch (state) {
// ---------------------------------------------------------------------
case SKIP_LINE:
if (isEOL(c)) {
offset = skipToNewLine(bits, offset);
if (offset >= bits.length || isEOL(bits[offset])) {
state = EOL;
continue MAIN_LOOP;
} else {
break;
}
continue MAIN_LOOP;
// ---------------------------------------------------------------------
case EXPECT_COND_LF:
state = POSSIBLE_EMPTY_LINE;
Expand Down Expand Up @@ -201,7 +202,7 @@ class CsvParser extends Parser {
dout.addInvalidCol(colIdx++);
break;
} else if (isEOL(c)) {
dout.addInvalidCol(colIdx++);
// ignore empty strings dout.addInvalidCol(colIdx++);
state = EOL;
continue MAIN_LOOP;
}
Expand Down
62 changes: 16 additions & 46 deletions h2o-core/src/main/java/water/parser/ParseSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import water.exceptions.H2OIllegalArgumentException;
import water.fvec.*;
import water.util.ArrayUtils;
import water.util.BytesStats;
import water.util.FileUtils;
import water.util.Log;

Expand All @@ -27,6 +28,8 @@ public class ParseSetup extends Iced {
public static final int HAS_HEADER = 1;
public static final int GUESS_COL_CNT = -1;

public BytesStats bytesStats;

ParserInfo _parse_type; // CSV, XLS, XSLX, SVMLight, Auto, ARFF, ORC
byte _separator; // Field separator, usually comma ',' or TAB or space ' '
// Whether or not single-quotes quote a field. E.g. how do we parse:
Expand All @@ -42,6 +45,9 @@ public class ParseSetup extends Iced {
String[][] _na_strings; // Strings for NA in a given column
String[][] _data; // First few rows of parsed/tokenized data

int tentativeNumLines = -1;
int dataOffset = -1;

String [] _fileNames = new String[]{"unknown"};

public void setFileName(String name) {_fileNames[0] = name;}
Expand Down Expand Up @@ -346,13 +352,14 @@ public GuessSetupTsk(ParseSetup userSetup) {
_maxLineLength = maxLineLength(bits);
if (_maxLineLength==-1) throw new H2OIllegalArgumentException("The first 4MB of the data don't contain any line breaks. Cannot parse.");

// only preview 1 DFLT_CHUNK_SIZE for ByteVecs, UploadFileVecs, compressed, and small files
/* if (ice instanceof ByteVec
|| ((Frame)ice).vecs()[0] instanceof UploadFileVec
|| bv.length() <= FileVec.DFLT_CHUNK_SIZE
|| decompRatio > 1.0) { */
try {
_gblSetup = guessSetup(bv, bits, _userSetup);
_gblSetup.bytesStats = new BytesStats(bits);
{
if (_maxLineLength != _gblSetup.bytesStats.maxWidth) {
//TODO(vlad): investigate! the length produced by maxLineLength is 1.3 meg, and the line consists of zero chars throw new IllegalStateException("wtf");
}
}
for(ParseWriter.ParseErr e:_gblSetup._errs) {
e._byteOffset += e._cidx*Parser.StreamData.bufSz;
e._cidx = 0;
Expand All @@ -361,46 +368,6 @@ public GuessSetupTsk(ParseSetup userSetup) {
} catch (ParseDataset.H2OParseException pse) {
throw pse.resetMsg(pse.getMessage()+" for "+key);
}
/* } else { // file is aun uncompressed NFSFileVec or HDFSFileVec & larger than the DFLT_CHUNK_SIZE
FileVec fv = (FileVec) ((Frame) ice).vecs()[0];
// reset chunk size to 1M (uncompressed)
int chkSize = (int) ((1<<20) /decompRatio);
fv.setChunkSize((Frame) ice, chkSize);
// guessSetup from first chunk
_gblSetup = guessSetup(fv.getPreviewChunkBytes(0), _userSetup);
_userSetup._check_header = -1; // remaining chunks shouldn't check for header
_userSetup._parse_type = _gblSetup._parse_type; // or guess parse type
//preview 1M data every 100M
int numChunks = fv.nChunks();
for (int i=100; i < numChunks;i += 100) {
bits = fv.getPreviewChunkBytes(i);
if (bits != null)
_gblSetup = mergeSetups(_gblSetup, guessSetup(bits, _userSetup));
}
// grab sample at end of file (if not done by prev loop)
if (numChunks % 100 > 1){
bits = fv.getPreviewChunkBytes(numChunks - 1);
if (bits != null)
_gblSetup = mergeSetups(_gblSetup, guessSetup(bits, _userSetup));
}
// return chunk size to DFLT
fv.setChunkSize((Frame) ice, FileVec.DFLT_CHUNK_SIZE);
} */
// report if multiple files exist in zip archive
/* if (ZipUtil.getFileCount(bv) > 1) {
if (_gblSetup._errors != null)
_gblSetup._errors = Arrays.copyOf(_gblSetup._errors, _gblSetup._errors.length + 1);
else
_gblSetup._errors = new String[1];
_gblSetup._errors[_gblSetup._errors.length - 1] = "Only single file zip " +
"archives are currently supported, only the first file has been parsed. " +
"Remaining files have been ignored.";
}*/
}
if (_gblSetup==null)
throw new RuntimeException("This H2O node couldn't find the file(s) to parse. Please check files and/or working directories.");
Expand Down Expand Up @@ -630,7 +597,10 @@ private static final long maxLineLength(byte[] bytes) {
while(true) {
line = br.readLine();
if (line == null) break;
maxLineLength = Math.max(line.length(), maxLineLength);
int ll = line.length();
if (ll > maxLineLength) {
maxLineLength = ll;
}
}
} catch (IOException e) {
return -1;
Expand Down
26 changes: 26 additions & 0 deletions h2o-core/src/main/java/water/parser/Parser.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import water.Iced;
import water.Job;
import water.Key;
import water.util.ArrayUtils;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -32,6 +33,8 @@ public abstract class Parser extends Iced {
static final byte CHAR_TAB = '\t';
static final byte CHAR_CR = 13;
static final byte CHAR_LF = 10;
static final String CRLF = "\n\r";
static final String LFCR = "\r\n";
static final byte CHAR_SPACE = ' ';
static final byte CHAR_DOUBLE_QUOTE = '"';
static final byte CHAR_SINGLE_QUOTE = '\'';
Expand Down Expand Up @@ -62,8 +65,31 @@ public abstract class Parser extends Iced {
protected final byte CHAR_SEPARATOR;

protected static final long LARGEST_DIGIT_NUMBER = Long.MAX_VALUE/10;

protected static boolean isEOL(byte c) { return (c == CHAR_LF) || (c == CHAR_CR); }
protected static boolean isCRLF(byte[] bytes, int offset) {
return ArrayUtils.matches(bytes, offset, CRLF) ||
ArrayUtils.matches(bytes, offset, LFCR);
}

protected static boolean isCommentChar(byte c) {
return c == '#'
|| isEOL(c)
|| c == '@' /*also treat as comments leading '@' from ARFF format*/
|| c == '%'; /*also treat as comments leading '%' from ARFF format*/
}

protected static int skipSpaces(byte[] bits, int offset) {
while (offset < bits.length && bits[offset] == CHAR_SPACE) offset++;
return Math.max(0, Math.min(offset, bits.length));
}

protected static int skipToNewLine(byte[] bits, int offset) {
while (offset < bits.length && !isEOL(bits[offset])) offset++;
if (isCRLF(bits, offset)) ++offset;
return Math.max(0, Math.min(offset, bits.length));
}

protected final ParseSetup _setup;
protected final Key<Job> _jobKey;
protected Parser( ParseSetup setup, Key<Job> jobKey ) { _setup = setup; CHAR_SEPARATOR = setup._separator; _jobKey = jobKey;}
Expand Down
31 changes: 31 additions & 0 deletions h2o-core/src/main/java/water/util/ArrayUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1675,4 +1675,35 @@ public static boolean isSorted(int[] vals) {
if (vals[i - 1] > vals[i]) return false;
return true;
}

/**
* Checks if the array <code>toMatch</code> is equal to the bytes at <code>bytes</code> starting at <code>offset</code>
* @param bytes bytes to check
* @param offset start position
* @param toMatch the array to match
* @return true if bytes[offset..offset+toMatch.length-1] is equal to toMatch
*/
public static boolean matches(byte[] bytes, int offset, String toMatch) {
if (offset < 0 || offset > bytes.length - toMatch.length()) return false;

for (int i = 0; i < toMatch.length(); i++) {
if (bytes[offset+i] != toMatch.charAt(i)) return false;
}
return true;
}

/**
* Checks if the array <code>toMatch</code> is equal to the bytes at <code>bytes</code> starting at <code>offset</code>, up to the case
* @param bytes bytes to check
* @param offset start position
* @param toMatch the string to match; must be UPPER CASE
* @return true if bytes[offset..offset+toMatch.length-1] is equal to toMatch
*/
public static boolean matchesInUpperCase(byte[] bytes, int offset, String toMatch) {
if (offset < 0 || offset > bytes.length - toMatch.length()) return false;
for (int i = 0; i < toMatch.length(); i++) {
if (Character.toUpperCase(bytes[offset+i]) != toMatch.charAt(i)) return false;
}
return true;
}
}
Loading

0 comments on commit 2d0a7a3

Please sign in to comment.