Skip to content

Commit

Permalink
Merge pull request h2oai#246 from h2oai/michalk_fix-parquet-na
Browse files Browse the repository at this point in the history
HEXDEV-634: Fixes handling of NAs in Parquet Parser
  • Loading branch information
michalkurka authored Sep 28, 2016
2 parents 1f9d084 + 4d38a5e commit b20615d
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
*/
class ChunkConverter extends GroupConverter {

private final ParseWriter _writer;
private final WriterDelegate _writer;
private final Converter[] _converters;

private int _currentRecordIdx = -1;

ChunkConverter(MessageType parquetSchema, byte[] chunkSchema, ParseWriter writer) {
_writer = writer;
_writer = new WriterDelegate(writer, chunkSchema.length);
int colIdx = 0;
_converters = new Converter[chunkSchema.length];
for (Type parquetField : parquetSchema.getFields()) {
Expand All @@ -49,11 +49,12 @@ public Converter getConverter(int fieldIndex) {
@Override
public void start() {
_currentRecordIdx++;
_writer.startLine();
}

@Override
public void end() {
_writer.newLine();
_writer.endLine();
assert _writer.lineNum() - 1 == _currentRecordIdx;
}

Expand Down Expand Up @@ -85,11 +86,11 @@ private static class StringConverter extends PrimitiveConverter {

private final BufferedString _bs = new BufferedString();
private final int _colIdx;
private final ParseWriter _writer;
private final WriterDelegate _writer;
private final boolean _dictionarySupport;
private String[] _dict;

StringConverter(ParseWriter writer, int colIdx, boolean dictionarySupport) {
StringConverter(WriterDelegate writer, int colIdx, boolean dictionarySupport) {
_colIdx = colIdx;
_writer = writer;
_dictionarySupport = dictionarySupport;
Expand Down Expand Up @@ -124,10 +125,10 @@ public void addValueFromDictionary(int dictionaryId) {
private static class NumberConverter extends PrimitiveConverter {

private final int _colIdx;
private final ParseWriter _writer;
private final WriterDelegate _writer;
private final BufferedString _bs = new BufferedString();

NumberConverter(int _colIdx, ParseWriter _writer) {
NumberConverter(int _colIdx, WriterDelegate _writer) {
this._colIdx = _colIdx;
this._writer = _writer;
}
Expand Down Expand Up @@ -167,9 +168,9 @@ public void addBinary(Binary value) {
private static class TimestampConverter extends PrimitiveConverter {

private final int _colIdx;
private final ParseWriter _writer;
private final WriterDelegate _writer;

TimestampConverter(int _colIdx, ParseWriter _writer) {
TimestampConverter(int _colIdx, WriterDelegate _writer) {
this._colIdx = _colIdx;
this._writer = _writer;
}
Expand All @@ -180,4 +181,49 @@ public void addLong(long value) {
}
}

private static class WriterDelegate {

private final ParseWriter _writer;
private final int _numCols;
private int _col;

WriterDelegate(ParseWriter writer, int numCols) {
_writer = writer;
_numCols = numCols;
_col = Integer.MIN_VALUE;
}

void startLine() {
_col = -1;
}

void endLine() {
moveToCol(_numCols);
_writer.newLine();
}

int moveToCol(int colIdx) {
for (int c = _col + 1; c < colIdx; c++) _writer.addInvalidCol(c);
_col = colIdx;
return _col;
}

void addNumCol(int colIdx, long number, int exp) {
_writer.addNumCol(moveToCol(colIdx), number, exp);
}

void addNumCol(int colIdx, double d) {
_writer.addNumCol(moveToCol(colIdx), d);
}

void addStrCol(int colIdx, BufferedString str) {
_writer.addStrCol(moveToCol(colIdx), str);
}

long lineNum() {
return _writer.lineNum();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import water.TestUtil;
import water.fvec.Frame;
import water.fvec.RollupStatsHelpers;
import water.fvec.Vec;
import water.parser.BufferedString;

Expand Down Expand Up @@ -123,6 +124,30 @@ public void testParseTimestamps() {
assertFrameAssertion(assertion);
}

@Test
public void testParseSparseColumns() {
FrameAssertion assertion = new GenFrameAssertion("sparseColumns.parquet", TestUtil.ari(4, 100)) {
@Override protected File prepareFile() throws IOException { return ParquetFileGenerator.generateSparseParquetFile(Files.createTempDir(), file, nrows()); }
@Override public void check(Frame f) {
assertArrayEquals("Column names need to match!", ar("int32_field", "string_field", "row", "int32_field2"), f.names());
assertArrayEquals("Column types need to match!", ar(Vec.T_NUM, Vec.T_CAT, Vec.T_NUM, Vec.T_NUM), f.types());
for (int row = 0; row < nrows(); row++) {
if (row % 10 == 0) {
assertEquals("Value in column int32_field", row, f.vec(0).at8(row));
assertEquals("Value in column string_field", "CAT_" + (row % 10), f.vec(1).factor(f.vec(1).at8(row)));
assertEquals("Value in column int32_field2", row, f.vec(3).at8(row));
} else {
assertTrue(f.vec(0).isNA(row));
assertTrue(f.vec(1).isNA(row));
assertTrue(f.vec(3).isNA(row));
}
assertEquals("Value in column row", row, f.vec(2).at8(row));
}
}
};
assertFrameAssertion(assertion);
}

}

class ParquetFileGenerator {
Expand Down Expand Up @@ -186,4 +211,28 @@ static File generateParquetFile(File parentDir, String filename, int nrows, Date
return f;
}

static File generateSparseParquetFile(File parentDir, String filename, int nrows) throws IOException {
File f = new File(parentDir, filename);

Configuration conf = new Configuration();
MessageType schema = parseMessageType(
"message test { optional int32 int32_field; optional binary string_field (UTF8); required int32 row; optional int32 int32_field2; } ");
GroupWriteSupport.setSchema(schema, conf);
SimpleGroupFactory fact = new SimpleGroupFactory(schema);
ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(f.getPath()), new GroupWriteSupport(),
UNCOMPRESSED, 1024, 1024, 512, true, false, ParquetProperties.WriterVersion.PARQUET_2_0, conf);
try {
for (int i = 0; i < nrows; i++) {
Group g = fact.newGroup();
if (i % 10 == 0) { g = g.append("int32_field", i); }
if (i % 10 == 0) { g = g.append("string_field", "CAT_" + (i % 10)); }
if (i % 10 == 0) { g = g.append("int32_field2", i); }
writer.write(g.append("row", i));
}
} finally {
writer.close();
}
return f;
}

}

0 comments on commit b20615d

Please sign in to comment.