Skip to content

Commit

Permalink
PUBDEV-1612: orc parser. (h2oai#93)
Browse files Browse the repository at this point in the history
Built on top of Michal M's avro-parser.
Using Tomas parser setup with modifications to support orc parsing.
Additional tests added by Nidhi.  She fixes R unit test on HDFS.
Needed extra help from Jeff G on lot of setup issues.

Includes:
  - Added corresponding HDFS tests from pyunit tests.
  - Removed bad tests.
  • Loading branch information
tomasnykodym authored and mmalohlava committed Aug 22, 2016
1 parent 619d974 commit 3e42434
Show file tree
Hide file tree
Showing 75 changed files with 3,391 additions and 107 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ext {
project(':h2o-persist-s3'),
project(':h2o-genmodel'),
project(':h2o-bindings'),
project(':h2o-avro-parser')
project(':h2o-avro-parser'),
project(':h2o-orc-parser'),
]

javaProjects = [
Expand All @@ -69,6 +70,7 @@ ext {
project(':h2o-genmodel'),
project(':h2o-bindings'),
project(':h2o-avro-parser'),
project(':h2o-orc-parser'),
]

scalaProjects = [
Expand Down
16 changes: 15 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,19 @@ doJava6Bytecode=auto
# Run animal sniffer - by default false, but if java6 bytecode is requested
# then animal sniffer is run
doAnimalSniffer=false
# Increase PermGen size for build
# The flag to include ORC support inside default h2o.jar.
# WARNING: this will upgrade default Hadoop client version to one supporting ORC
doIncludeOrc=false
#
# Version of hadoop dependency which is used for jUnit test execution
#
orcDefaultHadoopClientVersion=2.6.0-cdh5.4.0
orcDefaultHiveExecVersion=1.1.0-cdh5.4.0
#
# Default hadoop client version
#
defaultHadoopClientVersion=2.0.0-cdh4.3.0
#
# Gradle arguments
#
org.gradle.jvmargs='-XX:MaxPermSize=384m'
2 changes: 2 additions & 0 deletions h2o-app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ dependencies {
compile project(":h2o-core")
compile project(":h2o-genmodel")
compile project(":h2o-avro-parser")
// Note: orc parser is included at the assembly level for each
// Hadoop distribution
}

3 changes: 3 additions & 0 deletions h2o-assembly/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ dependencies {
compile project(":h2o-app")
compile project(":h2o-persist-s3")
compile project(":h2o-persist-hdfs")
if (project.hasProperty("doIncludeOrc") && project.doIncludeOrc == "true") {
compile project(":h2o-orc-parser")
}
compile "org.slf4j:slf4j-log4j12:1.7.5"
}

Expand Down
16 changes: 13 additions & 3 deletions h2o-core/src/main/java/water/api/ParseSetupHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import water.Key;
import water.api.schemas3.ParseSetupV3;
import water.exceptions.H2OIllegalArgumentException;
import water.parser.ParseDataset;
import water.parser.ParseSetup;
import water.util.DistributedException;
import water.util.PojoUtils;

import static water.parser.DefaultParserProviders.GUESS_INFO;
Expand All @@ -33,9 +35,17 @@ public ParseSetupV3 guessSetup(int version, ParseSetupV3 p) {
if (p.na_strings != null)
for(int i = 0; i < p.na_strings.length; i++)
if (p.na_strings[i] != null && p.na_strings[i].length == 0) p.na_strings[i] = null;

ParseSetup ps = ParseSetup.guessSetup(fkeys, new ParseSetup(p));

ParseSetup ps;
try{
ps = ParseSetup.guessSetup(fkeys, new ParseSetup(p));
} catch(Throwable ex) {
Throwable ex2 = ex;
if(ex instanceof DistributedException)
ex2 = ex.getCause();
if(ex2 instanceof ParseDataset.H2OParseException)
throw new H2OIllegalArgumentException(ex2.getMessage());
throw ex;
}
if(ps._errs != null && ps._errs.length > 0) {
p.warnings = new String[ps._errs.length];
for (int i = 0; i < ps._errs.length; ++i)
Expand Down
23 changes: 22 additions & 1 deletion h2o-core/src/main/java/water/fvec/FileVec.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,36 @@ public abstract class FileVec extends ByteVec {
long _len; // File length
final byte _be;

// Returns String with path for given key.
public static String getPathForKey(Key k) {
final int off = k._kb[0]==Key.CHK || k._kb[0]==Key.VEC ? Vec.KEY_PREFIX_LEN : 0;
String p = new String(k._kb,off,k._kb.length-off);

if(p.startsWith("nfs:/"))
p = p.substring("nfs:/".length());
else if (p.startsWith("nfs:\\"))
p = p.substring("nfs:\\".length());

return p;
}
/** Log-2 of Chunk size. */
public static final int DFLT_LOG2_CHUNK_SIZE = 20/*1Meg*/+2/*4Meg*/;
/** Default Chunk size in bytes, useful when breaking up large arrays into
* "bite-sized" chunks. Bigger increases batch sizes, lowers overhead
* costs, lower increases fine-grained parallelism. */
public static final int DFLT_CHUNK_SIZE = 1 << DFLT_LOG2_CHUNK_SIZE;
public int _chunkSize = DFLT_CHUNK_SIZE;
public int _nChunks = -1;

protected FileVec(Key key, long len, byte be) {
super(key,-1/*no rowLayout*/);
_len = len;
_be = be;
}

public void setNChunks(int n){
_nChunks = n;
setChunkSize((int)length()/n);
}
/**
* Chunk size must be positive, 1G or less, and a power of two.
* Any values that aren't a power of two will be reduced to the
Expand All @@ -36,6 +52,7 @@ protected FileVec(Key key, long len, byte be) {
* @return actual _chunkSize setting
*/
public int setChunkSize(int chunkSize) { return setChunkSize(null, chunkSize); }

public int setChunkSize(Frame fr, int chunkSize) {
// Clear cached chunks first
// Peeking into a file before the chunkSize has been set
Expand Down Expand Up @@ -63,7 +80,11 @@ public int setChunkSize(Frame fr, int chunkSize) {
}

@Override public long length() { return _len; }


@Override public int nChunks() {
if(_nChunks != -1) // number of chunks can be set explicitly
return _nChunks;
return (int)Math.max(1,_len / _chunkSize + ((_len % _chunkSize != 0)?1:0));
}
@Override public boolean writable() { return false; }
Expand Down
17 changes: 9 additions & 8 deletions h2o-core/src/main/java/water/parser/DefaultParserProviders.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import water.Job;
import water.Key;
import water.fvec.ByteVec;
import water.util.Log;

/**
Expand All @@ -23,7 +24,7 @@ public final class DefaultParserProviders {
public static final ParserInfo SVMLight_INFO = new ParserInfo("SVMLight", 1000, true);
public static final ParserInfo CSV_INFO = new ParserInfo("CSV", Integer.MAX_VALUE, true);
public static final ParserInfo GUESS_INFO = new ParserInfo("GUESS", -10000, false);
/** Priority of non-core parsers shoudl begin here.*/
/** Priority of non-core parsers should begin here.*/
public static final int MAX_CORE_PRIO = 10000;

public final static class ArffParserProvider extends AbstractParserProvide {
Expand All @@ -39,7 +40,7 @@ public Parser createParser(ParseSetup setup, Key<Job> jobKey) {
}

@Override
public ParseSetup guessSetup(byte[] bits, byte sep, int ncols, boolean singleQuotes,
public ParseSetup guessSetup(ByteVec bv, byte[] bits, byte sep, int ncols, boolean singleQuotes,
int checkHeader, String[] columnNames, byte[] columnTypes,
String[][] domains, String[][] naStrings) {
return ARFFParser.guessSetup(bits, sep, singleQuotes, columnNames, naStrings);
Expand All @@ -59,7 +60,7 @@ public Parser createParser(ParseSetup setup, Key<Job> jobKey) {
}

@Override
public ParseSetup guessSetup(byte[] bits, byte sep, int ncols, boolean singleQuotes,
public ParseSetup guessSetup(ByteVec bv, byte[] bits, byte sep, int ncols, boolean singleQuotes,
int checkHeader, String[] columnNames, byte[] columnTypes,
String[][] domains, String[][] naStrings) {
return XlsParser.guessSetup(bits);
Expand All @@ -79,7 +80,7 @@ public Parser createParser(ParseSetup setup, Key<Job> jobKey) {
}

@Override
public ParseSetup guessSetup(byte[] bits, byte sep, int ncols, boolean singleQuotes,
public ParseSetup guessSetup(ByteVec bv, byte[] bits, byte sep, int ncols, boolean singleQuotes,
int checkHeader, String[] columnNames, byte[] columnTypes,
String[][] domains, String[][] naStrings) {
return SVMLightParser.guessSetup(bits);
Expand All @@ -99,7 +100,7 @@ public Parser createParser(ParseSetup setup, Key<Job> jobKey) {
}

@Override
public ParseSetup guessSetup(byte[] bits, byte sep, int ncols, boolean singleQuotes,
public ParseSetup guessSetup(ByteVec bv, byte[] bits, byte sep, int ncols, boolean singleQuotes,
int checkHeader, String[] columnNames, byte[] columnTypes,
String[][] domains, String[][] naStrings) {
return CsvParser.guessSetup(bits, sep, ncols, singleQuotes, checkHeader, columnNames, columnTypes, naStrings);
Expand All @@ -119,7 +120,7 @@ public Parser createParser(ParseSetup setup, Key<Job> jobKey) {
}

@Override
public ParseSetup guessSetup(byte[] bits, byte sep, int ncols, boolean singleQuotes,
public ParseSetup guessSetup(ByteVec bv, byte[] bits, byte sep, int ncols, boolean singleQuotes,
int checkHeader, String[] columnNames, byte[] columnTypes,
String[][] domains, String[][] naStrings) {
List<ParserProvider> pps = ParserService.INSTANCE.getAllProviders(true); // Sort them based on priorities
Expand All @@ -129,7 +130,7 @@ public ParseSetup guessSetup(byte[] bits, byte sep, int ncols, boolean singleQuo
if (pp == this || pp.info().equals(GUESS_INFO)) continue;
// Else try to guess with given provider
try {
ParseSetup ps = pp.guessSetup(bits, sep, ncols, singleQuotes, checkHeader, columnNames, columnTypes, domains, naStrings);
ParseSetup ps = pp.guessSetup(bv, bits, sep, ncols, singleQuotes, checkHeader, columnNames, columnTypes, domains, naStrings);
if( ps != null) {
return ps;
}
Expand All @@ -142,7 +143,7 @@ public ParseSetup guessSetup(byte[] bits, byte sep, int ncols, boolean singleQuo
}
}

static abstract class AbstractParserProvide implements ParserProvider {
static abstract class AbstractParserProvide extends ParserProvider {

@Override
public ParseSetup createParserSetup(Key[] inputs, ParseSetup requiredSetup) {
Expand Down
5 changes: 4 additions & 1 deletion h2o-core/src/main/java/water/parser/FVecParseWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ else if(_errs.length < 20)
_errCnt++;
}

@Override public void setIsAllASCII(int colIdx, boolean b) {_nvs[colIdx]._isAllASCII = b;}
@Override public void setIsAllASCII(int colIdx, boolean b) {
if(colIdx < _nvs.length)
_nvs[colIdx]._isAllASCII = b;
}

@Override
public boolean hasErrors() {
Expand Down
7 changes: 0 additions & 7 deletions h2o-core/src/main/java/water/parser/ORCParser.java

This file was deleted.

47 changes: 28 additions & 19 deletions h2o-core/src/main/java/water/parser/ParseDataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,22 @@ public static ParseDataset forkParseDataset(final Key<Frame> dest, final Key[] k
}
Log.info("Total file size: "+ PrettyPrint.bytes(totalParseSize));

// set the parse chunk size for files
for( int i = 0; i < keys.length; ++i ) {
Iced ice = DKV.getGet(keys[i]);
if(ice instanceof FileVec) {
((FileVec) ice).setChunkSize(setup._chunk_size);
Log.info("Parse chunk size " + setup._chunk_size);
} else if(ice instanceof Frame && ((Frame)ice).vec(0) instanceof FileVec) {
((FileVec) ((Frame) ice).vec(0)).setChunkSize((Frame) ice, setup._chunk_size);
Log.info("Parse chunk size " + setup._chunk_size);

// no need to set this for ORC, it is already done:
if (!setup.getParseType().name().contains("ORC")) {
for( int i = 0; i < keys.length; ++i ) {
Iced ice = DKV.getGet(keys[i]);

// set the parse chunk size for files
if (ice instanceof FileVec) {
((FileVec) ice).setChunkSize(setup._chunk_size);
Log.info("Parse chunk size " + setup._chunk_size);
} else if (ice instanceof Frame && ((Frame) ice).vec(0) instanceof FileVec) {
((FileVec) ((Frame) ice).vec(0)).setChunkSize((Frame) ice, setup._chunk_size);
Log.info("Parse chunk size " + setup._chunk_size);
}
}
}
} else Log.info("Orc Parse chunk sizes may be different across files");

long memsz = H2O.CLOUD.free_mem();
if( totalParseSize > memsz*4 )
Expand Down Expand Up @@ -909,7 +914,7 @@ private FVecParseWriter streamParse(final InputStream is, final ParseSetup local

// ------------------------------------------------------------------------
private static class DistributedParse extends MRTask<DistributedParse> {
private final ParseSetup _setup;
private ParseSetup _setup;
private final int _vecIdStart;
private final int _startChunkIdx; // for multifile parse, offset of the first chunk in the final dataset
private final VectorGroup _vg;
Expand Down Expand Up @@ -938,9 +943,10 @@ private static class DistributedParse extends MRTask<DistributedParse> {
super.setupLocal();
_visited = new NonBlockingSetInt();
_espc = MemoryManager.malloc8(_nchunks);
_setup = ParserService.INSTANCE.getByInfo(_setup._parse_type).setupLocal(_fr.anyVec(),_setup);
}
@Override public void map( Chunk in ) {
if( _jobKey.get().stop_requested() ) return;
if( _jobKey.get().stop_requested() ) throw new Job.JobCancelledException();
AppendableVec [] avs = new AppendableVec[_setup._number_columns];
for(int i = 0; i < avs.length; ++i)
if (_setup._column_types == null) // SVMLight
Expand All @@ -956,22 +962,24 @@ private static class DistributedParse extends MRTask<DistributedParse> {
case "ARFF":
case "CSV":
Categorical [] categoricals = categoricals(_cKey, _setup._number_columns);
dout = new FVecParseWriter(_vg,_startChunkIdx + in.cidx(), categoricals, _setup._column_types, _setup._chunk_size, avs); //TODO: use _setup._domains instead of categoricals
dout = new FVecParseWriter(_vg,_startChunkIdx + in.cidx(), categoricals, _setup._column_types,
_setup._chunk_size, avs); //TODO: use _setup._domains instead of categoricals
break;
case "SVMLight":
dout = new SVMLightFVecParseWriter(_vg, _vecIdStart, in.cidx() + _startChunkIdx, _setup._chunk_size, avs);
break;
case "ORC": // setup special case for ORC
Categorical [] orc_categoricals = categoricals(_cKey, _setup._number_columns);
dout = new FVecParseWriter(_vg, in.cidx() + _startChunkIdx, orc_categoricals, _setup._column_types,
_setup._chunk_size, avs);
break;
default: // FIXME: should not be default and creation strategy should be forwarded to ParserProvider
dout = new FVecParseWriter(_vg, in.cidx() + _startChunkIdx, null, _setup._column_types, _setup._chunk_size, avs);
dout = new FVecParseWriter(_vg, in.cidx() + _startChunkIdx, null, _setup._column_types,
_setup._chunk_size, avs);
break;
}
p.parseChunk(in.cidx(), din, dout);
(_dout = dout).close(_fs);
if(_dout.hasErrors())
for(ParseWriter.ParseErr err:_dout._errs) {
assert err != null : "Parse error cannot be null!";
err._file = _srckey.toString();
}
Job.update(in._len, _jobKey); // Record bytes parsed
// remove parsed data right away
freeMem(in);
Expand Down Expand Up @@ -1007,6 +1015,7 @@ private void freeMem(Chunk in) {
_outerMFPT._dout[_outerMFPT._lo] = _dout;
if(_dout.hasErrors()) {
ParseWriter.ParseErr [] errs = _dout.removeErrors();
for(ParseWriter.ParseErr err:errs)err._file = FileVec.getPathForKey(_srckey).toString();
Arrays.sort(errs, new Comparator<ParseWriter.ParseErr>() {
@Override
public int compare(ParseWriter.ParseErr o1, ParseWriter.ParseErr o2) {
Expand Down
Loading

0 comments on commit 3e42434

Please sign in to comment.