diff --git a/.gitignore b/.gitignore index c80c514bf2..18748a7b45 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,5 @@ target *.orig *.rej dependency-reduced-pom.xml -.idea/* \ No newline at end of file +.idea/* +target/ diff --git a/parquet-protobuf/README.md b/parquet-protobuf/README.md new file mode 100644 index 0000000000..7b57d5f376 --- /dev/null +++ b/parquet-protobuf/README.md @@ -0,0 +1,4 @@ +parquet-protobuf +================ + +protobuffer support for Parquet columnar format diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml new file mode 100644 index 0000000000..6e2a87eb70 --- /dev/null +++ b/parquet-protobuf/pom.xml @@ -0,0 +1,157 @@ + + + com.twitter + parquet + ../pom.xml + 1.2.10-SNAPSHOT + + + 4.0.0 + + parquet-protobuf + jar + + + 3.0.8 + + + + Parquet Protobuf + https://github.com/lukasnalezenec/parquet-protobuf.git + + + + com.google.protobuf + protobuf-java + 2.4.1 + + + com.twitter + parquet-hadoop + ${project.version} + + + com.twitter.elephantbird + elephant-bird-core + ${elephant-bird.version} + + + org.apache.hadoop + hadoop-core + + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + + + + + lukasnalezenec + Lukas Nalezenec + + + + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + + ${buildNumber} + + + + + + + test-jar + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + org.codehaus.jackson:jackson-mapper-asl + org.codehaus.jackson:jackson-core-asl + + + + + org.codehaus.jackson + parquet.org.codehaus.jackson + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.8 + + + add-test-sources + generate-test-sources + + add-test-source + + + + ${project.build.directory}/generated-test-sources + + + + + + + + maven-antrun-plugin + + + generate-sources + generate-test-sources + + + + + + + + + + + src/main/java + target/generated-sources/java + + + run + + + + + + + + + diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetInputFormat.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetInputFormat.java new file mode 100644 index 0000000000..497b9d1e18 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetInputFormat.java @@ -0,0 +1,34 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; +import parquet.hadoop.ParquetInputFormat; + +/** + * A Hadoop {@link org.apache.hadoop.mapreduce.InputFormat} for Parquet files. + */ +public class ProtoParquetInputFormat extends ParquetInputFormat { + public ProtoParquetInputFormat() { + super(ProtoReadSupport.class); + } + + public static void setRequestedProjection(Configuration configuration, String requestedProjection) { + ProtoReadSupport.setRequestedProjection(configuration, requestedProjection); + } + +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetOutputFormat.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetOutputFormat.java new file mode 100644 index 0000000000..4c401d3222 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetOutputFormat.java @@ -0,0 +1,54 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.mapreduce.Job; +import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.util.ContextUtil; + +/** + * A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Protobuffer Parquet files. + *

+ * Usage: + *

+ *

+ * {@code
+ * final Job job = new Job(conf, "Parquet writing job");
+ * job.setOutputFormatClass(ProtoParquetOutputFormat.class);
+ * ProtoParquetOutputFormat.setOutputPath(job, parquetPath);
+ * ProtoParquetOutputFormat.setProtobufferClass(job, YourProtobuffer.class);
+ * }
+ * 
+ * + * @author Lukas Nalezenec + */ +public class ProtoParquetOutputFormat extends ParquetOutputFormat { + + public static void setProtobufferClass(Job job, Class protoClass) { + ProtoWriteSupport.setSchema(ContextUtil.getConfiguration(job), protoClass); + } + + public ProtoParquetOutputFormat(Class msg) { + super(new ProtoWriteSupport(msg)); + } + + public ProtoParquetOutputFormat() { + super(new ProtoWriteSupport()); + } + +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetReader.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetReader.java new file mode 100644 index 0000000000..fe4609adac --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetReader.java @@ -0,0 +1,40 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.fs.Path; +import parquet.filter.UnboundRecordFilter; +import parquet.hadoop.ParquetReader; +import parquet.hadoop.api.ReadSupport; + +import java.io.IOException; + +/** + * Read Avro records from a Parquet file. + */ +public class ProtoParquetReader extends ParquetReader { + + public ProtoParquetReader(Path file) throws IOException { + super(file, (ReadSupport) new ProtoReadSupport()); + } + + public ProtoParquetReader(Path file, UnboundRecordFilter recordFilter) throws IOException { + super(file, (ReadSupport) new ProtoReadSupport(), recordFilter); + } + + //TODO here should be option to override pb from file +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetWriter.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetWriter.java new file mode 100644 index 0000000000..c0f1b28473 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoParquetWriter.java @@ -0,0 +1,78 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.fs.Path; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.api.WriteSupport; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; + +/** + * Write Protobuffer records to a Parquet file. + */ +public class ProtoParquetWriter extends ParquetWriter { + + /** + * Create a new {@link ProtoParquetWriter}. + * + * @param file + * @param compressionCodecName + * @param blockSize + * @param pageSize + * @throws IOException + */ + public ProtoParquetWriter(Path file, Class protoMessage, + CompressionCodecName compressionCodecName, int blockSize, + int pageSize) throws IOException { + super(file, (WriteSupport) new ProtoWriteSupport(protoMessage), + compressionCodecName, blockSize, pageSize); + } + + /** + * Create a new {@link ProtoParquetWriter}. + * + * @param file The file name to write to. + * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED + * @param blockSize HDFS block size + * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes. + * @param enableDictionary Whether to use a dictionary to compress columns. + * @throws IOException + */ + public ProtoParquetWriter(Path file, Class protoMessage, + CompressionCodecName compressionCodecName, int blockSize, + int pageSize, boolean enableDictionary) throws IOException { + super(file, (WriteSupport) + new ProtoWriteSupport(protoMessage), + compressionCodecName, blockSize, pageSize, enableDictionary, false); + } + + /** + * Create a new {@link ProtoParquetWriter}. The default block size is 50 MB.The default + * page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter}) + * + * @param file The file name to write to. + * @throws IOException + */ + public ProtoParquetWriter(Path file, Class protoMessage) throws IOException { + this(file, protoMessage, CompressionCodecName.UNCOMPRESSED, + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); + } + +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoReadSupport.java new file mode 100644 index 0000000000..edf06480bf --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoReadSupport.java @@ -0,0 +1,66 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.Message; +import com.twitter.elephantbird.util.Protobufs; +import org.apache.hadoop.conf.Configuration; +import parquet.hadoop.api.InitContext; +import parquet.hadoop.api.ReadSupport; +import parquet.io.api.RecordMaterializer; +import parquet.schema.MessageType; + +import java.util.Map; + + +/** + * @author Lukas Nalezenec + */ +public class ProtoReadSupport extends ReadSupport { + + public static final String PB_REQUESTED_PROJECTION = "parquet.proto.projection"; + + public static final String PB_CLASS = "parquet.proto.class"; + public static final String PB_DESCRIPTOR = "parquet.proto.descriptor"; + + public static void setRequestedProjection(Configuration configuration, String requestedProjection) { + configuration.set(PB_REQUESTED_PROJECTION, requestedProjection); + } + + @Override + public ReadContext init(InitContext context) { + String requestedProjectionString = context.getConfiguration().get(PB_REQUESTED_PROJECTION); + if (requestedProjectionString != null && !requestedProjectionString.trim().isEmpty()) { + MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), requestedProjectionString); + return new ReadContext(requestedProjection); + } else { + return new ReadContext(context.getFileSchema()); + } + } + + @Override + public RecordMaterializer prepareForRead(Configuration configuration, Map keyValueMetaData, MessageType fileSchema, ReadContext readContext) { + String strProtoClass = keyValueMetaData.get(PB_CLASS); + + if (strProtoClass == null) { + throw new RuntimeException("I Need parameter " + PB_CLASS + " with protobufer class"); + } + + return new ProtoRecordMaterializer(readContext.getRequestedSchema(), Protobufs.getProtobufClass(strProtoClass)); + } + + +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoRecordMaterializer.java new file mode 100644 index 0000000000..f88b829429 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoRecordMaterializer.java @@ -0,0 +1,41 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import parquet.io.api.GroupConverter; +import parquet.io.api.RecordMaterializer; +import parquet.schema.MessageType; + +class ProtoRecordMaterializer extends RecordMaterializer { + + private final ProtobufferRecordConverter root; + + public ProtoRecordMaterializer(MessageType requestedSchema, Class protobufClass) { + this.root = new ProtobufferRecordConverter(protobufClass, requestedSchema); + } + + @Override + public T getCurrentRecord() { + return root.getCurrentRecord(); + } + + @Override + public GroupConverter getRootConverter() { + return root; + } +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoSchemaConverter.java new file mode 100644 index 0000000000..c409bb9185 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoSchemaConverter.java @@ -0,0 +1,136 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.twitter.elephantbird.util.Protobufs; +import parquet.Log; +import parquet.schema.ConversionPatterns; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; +import parquet.schema.Type; + +import java.util.ArrayList; +import java.util.List; + +import static parquet.schema.OriginalType.ENUM; +import static parquet.schema.OriginalType.UTF8; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +/** + *

+ * Converts a Protobuffer Descriptor into a Parquet schema. + * + * @author Lukas Nalezenec + */ +public class ProtoSchemaConverter { + + private static final Log LOG = Log.getLog(ProtoSchemaConverter.class); + + public MessageType convert(Class protobufClass) { + LOG.debug("Converting protobuffer class \"" + protobufClass + "\" to parquet schema."); + Descriptors.Descriptor descriptor = Protobufs.getMessageDescriptor(protobufClass); + + MessageType messageType = new MessageType(descriptor.getFullName(), convertFields(descriptor.getFields())); + + LOG.debug("Convertor info:\n " + descriptor.toProto() + " was converted to \n" + messageType); + return messageType; + } + + /* Iterates over list of fields. **/ + private List convertFields(List fieldDescriptors) { + List types = new ArrayList(); + + for (Descriptors.FieldDescriptor fieldDescriptor : fieldDescriptors) { + + String fieldName = fieldDescriptor.getName(); + Type.Repetition repetition = getRepetition(fieldDescriptor); + + Type type; + if (fieldDescriptor.isRepeated()) { + Type nestedType = convertScalarField(fieldName + "_tuple", fieldDescriptor, Type.Repetition.REPEATED); + type = ConversionPatterns.listType(Type.Repetition.OPTIONAL, fieldName, nestedType); + } else { + type = convertScalarField(fieldName, fieldDescriptor, repetition); + } + + types.add(type); + } + return types; + } + + private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) { + Type.Repetition repetition; + if (descriptor.isRequired()) { + repetition = Type.Repetition.REQUIRED; + } else if (descriptor.isRepeated()) { + repetition = Type.Repetition.REPEATED; + } else { + repetition = Type.Repetition.OPTIONAL; + } + return repetition; + } + + private Type convertScalarField(String fieldName, Descriptors.FieldDescriptor descriptor, Type.Repetition repetition) { + Descriptors.FieldDescriptor.JavaType javaType = descriptor.getJavaType(); + + if (javaType.equals(Descriptors.FieldDescriptor.JavaType.BOOLEAN)) { + return primitive(fieldName, BOOLEAN, repetition); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.INT)) { + return primitive(fieldName, INT32, repetition); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.LONG)) { + return primitive(fieldName, INT64, repetition); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.FLOAT)) { + return primitive(fieldName, FLOAT, repetition); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.DOUBLE)) { + return primitive(fieldName, DOUBLE, repetition); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.BYTE_STRING)) { + return primitive(fieldName, BINARY, repetition); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.STRING)) { + return primitive(fieldName, BINARY, repetition, UTF8); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.MESSAGE)) { + Descriptors.Descriptor messageDescriptor = descriptor.getMessageType(); + List fields = convertFields(messageDescriptor.getFields()); + return new GroupType(repetition, fieldName, fields); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.ENUM)) { + return primitive(fieldName, BINARY, repetition, ENUM); + } + + throw new UnsupportedOperationException("Cannot convert Protobuffer: unknown type " + javaType + " fieldName " + fieldName); + } + + /** + * Makes primitive type with additional information. Used for String and Binary types + */ + private Type primitive(String name, PrimitiveType.PrimitiveTypeName primitive, + Type.Repetition repetition, OriginalType originalType) { + return new PrimitiveType(repetition, primitive, name, originalType); + } + + private PrimitiveType primitive(String name, PrimitiveType.PrimitiveTypeName + primitive, Type.Repetition repetition) { + return new PrimitiveType(repetition, primitive, name, null); + } + +} \ No newline at end of file diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/parquet/proto/ProtoWriteSupport.java new file mode 100644 index 0000000000..dd283db622 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtoWriteSupport.java @@ -0,0 +1,193 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.ByteString; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import com.twitter.elephantbird.util.Protobufs; +import org.apache.hadoop.conf.Configuration; +import parquet.hadoop.api.WriteSupport; +import parquet.io.api.Binary; +import parquet.io.api.RecordConsumer; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.Type; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class ProtoWriteSupport extends WriteSupport { + + private RecordConsumer recordConsumer; + private MessageType rootSchema; + static final String PB_CLASS_WRITE = "parquet.proto.writeClass"; + private Class protoMessage; + + public ProtoWriteSupport() { + } + + public ProtoWriteSupport(Class protobufClass) { + this.protoMessage = protobufClass; + rootSchema = new ProtoSchemaConverter().convert(protoMessage); + } + + @Override + public WriteContext init(Configuration configuration) { + + // if no protobuffer descriptor was given in constructor, load descriptor from configuration (set with setProtobufferClass) + if (protoMessage == null) { + Class pbClass = configuration.getClass(PB_CLASS_WRITE, null, Message.class); + if (pbClass != null) { + protoMessage = pbClass; + rootSchema = new ProtoSchemaConverter().convert(pbClass); + } else { + String msg = "Protobuffer class not specified."; + String hint = " Please use method ProtoParquetOutputFormat.setProtobufferClass(...) or other similar method."; + throw new RuntimeException(msg + hint); + } + } + + Map extraMetaData = new HashMap(); + extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName()); + extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage)); + + // TODO add also pig descriptor + // see Thrift code ThriftWriteSupport + return new WriteContext(rootSchema, extraMetaData); + } + + private String serializeDescriptor(Class protoClass) { + Descriptors.Descriptor descriptor = Protobufs.getMessageDescriptor(protoClass); + DescriptorProtos.DescriptorProto asProto = descriptor.toProto(); + return asProto.toString(); + } + + + public static void setSchema(Configuration configuration, Class protoClass) { + configuration.setClass(PB_CLASS_WRITE, protoClass, Message.class); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(T record) { + recordConsumer.startMessage(); + writeRecordFields(rootSchema, record); + recordConsumer.endMessage(); + } + + private void writeMessage(GroupType schema, MessageOrBuilder message) { + recordConsumer.startGroup(); + writeRecordFields(schema, message); + recordConsumer.endGroup(); + } + + private void writeRecordFields(GroupType parquetSchema, MessageOrBuilder record) { + List fields = parquetSchema.getFields(); + + Map pbFields = record.getAllFields(); + + for (Map.Entry entry : pbFields.entrySet()) { + + Descriptors.FieldDescriptor fieldDescriptor = entry.getKey(); + int protoIndex = fieldDescriptor.getIndex(); + Type fieldType = fields.get(protoIndex); + + Object value = entry.getValue(); + + if (value != null) { + + int parquetIndex = parquetSchema.getFieldIndex(fieldDescriptor.getName()); + + if (fieldDescriptor.isRepeated()) { + recordConsumer.startField(fieldType.getName(), parquetIndex); + writeArray(fieldType.asGroupType(), fieldDescriptor, (List) value); + recordConsumer.endField(fieldType.getName(), parquetIndex); + } else { + recordConsumer.startField(fieldType.getName(), parquetIndex); + writeScalarValue(fieldType, fieldDescriptor, value); + recordConsumer.endField(fieldType.getName(), parquetIndex); + } + + } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) { + throw new RuntimeException("Null-value for required field: " + fieldDescriptor.getName()); + } + } + } + + private void writeArray(GroupType schema, Descriptors.FieldDescriptor fieldDescriptor, + List array) { + if (!schema.getName().equals(fieldDescriptor.getName())) throw new RuntimeException("Mismatch");//TODO remove me + + recordConsumer.startGroup(); + if (array.iterator().hasNext()) { + String arrayType = schema.getName(); + recordConsumer.startField(arrayType, 0); + for (T elt : array) { + writeScalarValue((schema.getType(0)), fieldDescriptor, elt); // patch + } + recordConsumer.endField(arrayType, 0); + } + recordConsumer.endGroup(); + } + + + private void writeScalarValue(Type type, Descriptors.FieldDescriptor fieldDescriptor, Object value) { + + Descriptors.FieldDescriptor.JavaType javaType = fieldDescriptor.getJavaType(); + + if (javaType.equals(Descriptors.FieldDescriptor.JavaType.BOOLEAN)) { + recordConsumer.addBoolean((Boolean) value); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.INT)) { + recordConsumer.addInteger(((Number) value).intValue()); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.LONG)) { + recordConsumer.addLong(((Number) value).longValue()); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.FLOAT)) { + recordConsumer.addFloat(((Number) value).floatValue()); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.DOUBLE)) { + recordConsumer.addDouble(((Number) value).doubleValue()); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.BYTE_STRING)) { + ByteString byteString = (ByteString) value; + Binary binary = Binary.fromByteArray(byteString.toByteArray()); + recordConsumer.addBinary(binary); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.STRING)) { + Binary binary = stringToBinary(value); + recordConsumer.addBinary(binary); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.MESSAGE)) { + MessageOrBuilder msg = (MessageOrBuilder) value; + writeMessage(type.asGroupType(), msg); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.ENUM)) { + Descriptors.EnumValueDescriptor enumDescriptor = (Descriptors.EnumValueDescriptor) value; + recordConsumer.addBinary(Binary.fromString(enumDescriptor.getName())); + } else { + String msg = "Cannot write " + value + " with descriptor " + fieldDescriptor + " and type " + javaType; + throw new RuntimeException(msg); + } + } + + private Binary stringToBinary(Object value) { + return Binary.fromString(value.toString()); + } + +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/ProtobufferRecordConverter.java b/parquet-protobuf/src/main/java/parquet/proto/ProtobufferRecordConverter.java new file mode 100644 index 0000000000..f9a7a3e192 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/ProtobufferRecordConverter.java @@ -0,0 +1,72 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + + +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import parquet.proto.converters.ParentValueContainer; +import parquet.schema.MessageType; + +/** + * Converts data content of root message from protobuffer message to parquet message. + * It delegates conversion of inner fields to ProtoMessageConverter class using inheritance. + * Schema is converted in ProtoSchemaConverter class. + * + * @author Lukas Nalezenec + */ +class ProtobufferRecordConverter extends parquet.proto.converters.ProtoMessageConverter { + + final Message.Builder reusedBuilder; + boolean buildBefore; + + /** + * We dont need to write message value at top level. + */ + private static class SkipParentValueContainer extends ParentValueContainer { + @Override + public void add(Object a) { + throw new RuntimeException("Should never happen"); + } + } + + + public ProtobufferRecordConverter(Class protoclass, MessageType parquetSchema) { + super(new SkipParentValueContainer(), protoclass, parquetSchema); + reusedBuilder = getBuilder(); + } + + + @Override + public void start() { + reusedBuilder.clear(); + super.start(); + } + + @Override + public void end() { + // do nothing, dont call ParentValueContainer at top level. + } + + T getCurrentRecord() { + if (buildBefore) { + return (T) this.reusedBuilder.build(); + } else { + return (T) this.reusedBuilder; + } + } + +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ParentValueContainer.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ParentValueContainer.java new file mode 100644 index 0000000000..33a0f4d431 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ParentValueContainer.java @@ -0,0 +1,26 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +public abstract class ParentValueContainer { + + /** + * Adds the value to the parent. + */ + public abstract void add(Object value); + +} \ No newline at end of file diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoArrayConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoArrayConverter.java new file mode 100644 index 0000000000..e74d62fc41 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoArrayConverter.java @@ -0,0 +1,44 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import parquet.io.api.Converter; +import parquet.io.api.GroupConverter; + +public class ProtoArrayConverter extends GroupConverter { + + private final Converter converter; + + public ProtoArrayConverter(Converter innerConverter) { + converter = innerConverter; + } + + @Override + public Converter getConverter(int i) { + return converter; + } + + @Override + public void start() { + + } + + @Override + public void end() { + + } +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoBinaryConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoBinaryConverter.java new file mode 100644 index 0000000000..eb2a95c678 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoBinaryConverter.java @@ -0,0 +1,36 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import com.google.protobuf.ByteString; +import parquet.io.api.Binary; +import parquet.io.api.PrimitiveConverter; + +public final class ProtoBinaryConverter extends PrimitiveConverter { + + final ParentValueContainer parent; + + public ProtoBinaryConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + public void addBinary(Binary binary) { + ByteString byteString = ByteString.copyFrom(binary.toByteBuffer()); + parent.add(byteString); + } +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoBooleanConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoBooleanConverter.java new file mode 100644 index 0000000000..b3cba6266f --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoBooleanConverter.java @@ -0,0 +1,34 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import parquet.io.api.PrimitiveConverter; + +public final class ProtoBooleanConverter extends PrimitiveConverter { + + final ParentValueContainer parent; + + public ProtoBooleanConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBoolean(boolean value) { + parent.add(value ? Boolean.TRUE : Boolean.FALSE); + } + +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoDoubleConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoDoubleConverter.java new file mode 100644 index 0000000000..6921cc1f8a --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoDoubleConverter.java @@ -0,0 +1,33 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import parquet.io.api.PrimitiveConverter; + +public final class ProtoDoubleConverter extends PrimitiveConverter { + + final ParentValueContainer parent; + + public ProtoDoubleConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + public void addDouble(double value) { + parent.add(value); + } +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoEnumConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoEnumConverter.java new file mode 100644 index 0000000000..1bea9a6f3f --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoEnumConverter.java @@ -0,0 +1,69 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import com.google.protobuf.Descriptors; +import parquet.io.api.Binary; +import parquet.io.api.PrimitiveConverter; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public final class ProtoEnumConverter extends PrimitiveConverter { + + private final Descriptors.FieldDescriptor fieldType; + private final Map enumLookup; + private final ParentValueContainer parent; + + public ProtoEnumConverter(ParentValueContainer parent, Descriptors.FieldDescriptor fieldType) { + this.parent = parent; + this.fieldType = fieldType; + this.enumLookup = makeLookupStructure(fieldType); + } + + Map makeLookupStructure(Descriptors.FieldDescriptor enumFieldType) { + Descriptors.EnumDescriptor enumType = enumFieldType.getEnumType(); + Map lookupStructure = new HashMap(); + + List enumValues = enumType.getValues(); + + for (Descriptors.EnumValueDescriptor value : enumValues) { + String name = value.getName(); + lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name)); + } + + return lookupStructure; + } + + @Override + final public void addBinary(Binary binaryValue) { + Descriptors.EnumValueDescriptor protoValue = enumLookup.get(binaryValue); + + if (protoValue == null) { + Set knownValues = enumLookup.keySet(); + String msg = "Illegal enum value \"" + binaryValue + "\"" + + " in protoBuffer \"" + fieldType.getFullName() + "\"" + + " legal values are: \"" + knownValues + "\""; + throw new RuntimeException(msg); + } + + parent.add(protoValue); + } + +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoFloatConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoFloatConverter.java new file mode 100644 index 0000000000..2bb9fde6f6 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoFloatConverter.java @@ -0,0 +1,33 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import parquet.io.api.PrimitiveConverter; + +public final class ProtoFloatConverter extends PrimitiveConverter { + + final ParentValueContainer parent; + + public ProtoFloatConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + public void addFloat(float value) { + parent.add(value); + } +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoIntConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoIntConverter.java new file mode 100644 index 0000000000..04ae624327 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoIntConverter.java @@ -0,0 +1,33 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import parquet.io.api.PrimitiveConverter; + +public final class ProtoIntConverter extends PrimitiveConverter { + + final ParentValueContainer parent; + + public ProtoIntConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + public void addInt(int value) { + parent.add(value); + } +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoLongConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoLongConverter.java new file mode 100644 index 0000000000..ae76f08aa2 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoLongConverter.java @@ -0,0 +1,33 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import parquet.io.api.PrimitiveConverter; + +public final class ProtoLongConverter extends PrimitiveConverter { + + final ParentValueContainer parent; + + public ProtoLongConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + public void addLong(long value) { + parent.add(value); + } +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoMessageConverter.java new file mode 100644 index 0000000000..27fc0d76dd --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtoMessageConverter.java @@ -0,0 +1,183 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.twitter.elephantbird.util.Protobufs; +import parquet.io.api.Converter; +import parquet.io.api.GroupConverter; +import parquet.schema.GroupType; +import parquet.schema.Type; + +/** + * @author Lukas Nalezenec + */ +public class ProtoMessageConverter extends GroupConverter { + + private final Converter[] converters; + private final ParentValueContainer parent; + private final Message.Builder myBuilder; + + // used in record converter + public ProtoMessageConverter(ParentValueContainer pvc, Class protoClass, GroupType parquetSchema) { + this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema); + } + + + // For usage in message arrays + public ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) { + + int schemaSize = parquetSchema.getFieldCount(); + converters = new Converter[schemaSize]; + + this.parent = pvc; + int parquetFieldIndex = 1; + + if (pvc == null) { + throw new IllegalStateException("Missing parent value container"); + } + + myBuilder = builder; + + Descriptors.Descriptor protoDescriptor = builder.getDescriptorForType(); + + for (Type parquetField : parquetSchema.getFields()) { + Descriptors.FieldDescriptor protoField = protoDescriptor.findFieldByName(parquetField.getName()); + + if (parquetField.isRepetition(Type.Repetition.REPEATED)) { + GroupType groupType = parquetField.asGroupType(); + if (groupType.getFieldCount() != 1) throw new RuntimeException("One field expected but found " + groupType); + + // TODO find this hack in avro and Thrift + parquetField = groupType.getType(0); + protoField = protoDescriptor.findFieldByName(parquetField.getName()); + } + + if (protoField == null) { + throw new RuntimeException("Cant find " + parquetField.getName() + " Scheme mismatch \n\"" + parquetField + "\"\n proto descriptor:\n" + protoDescriptor.toProto()); + } + + converters[parquetFieldIndex - 1] = newMessageConverter(myBuilder, protoField, parquetField); + + parquetFieldIndex++; + } + } + + + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + @Override + public void start() { + + } + + @Override + public void end() { + parent.add(myBuilder.build()); + //todo myBuilder.clear(); + // TODO should be overriden in MessageRecordConverter + } + + private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + + boolean isRepeated = fieldDescriptor.isRepeated(); + + ParentValueContainer parent; + + if (isRepeated) { + parent = new ParentValueContainer() { + @Override + public void add(Object value) { + parentBuilder.addRepeatedField(fieldDescriptor, value); + } + }; + } else { + parent = new ParentValueContainer() { + @Override + public void add(Object value) { + parentBuilder.setField(fieldDescriptor, value); + } + }; + } + + if (isRepeated) { + parquetType = parquetType.asGroupType().getType(0); + } + + Converter innerConverter = newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType); + + if (isRepeated) { + return new ProtoArrayConverter(innerConverter); + } else { + return innerConverter; + } + } + + + private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) { + + Descriptors.FieldDescriptor.JavaType javaType = fieldDescriptor.getJavaType(); + + boolean isMessage = javaType.equals(Descriptors.FieldDescriptor.JavaType.MESSAGE); + + if (isMessage) { + + if (!fieldDescriptor.getContainingType().equals(parentBuilder.getDescriptorForType())) { + throw new RuntimeException(fieldDescriptor.getFullName() + " is not inside " + parentBuilder.getDescriptorForType().getFullName()); + } + + GroupType parquetSubType = parquetType.asGroupType(); + Message.Builder subBuilder; + if (fieldDescriptor.isRepeated()) { + subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); + } else { + subBuilder = parentBuilder.newBuilderForField(fieldDescriptor); + } + + return new ProtoMessageConverter(pvc, subBuilder, parquetSubType); + } else { + if (javaType.equals(Descriptors.FieldDescriptor.JavaType.STRING)) { + return new ProtobufStringConverter(pvc); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.FLOAT)) { + return new ProtoFloatConverter(pvc); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.DOUBLE)) { + return new ProtoDoubleConverter(pvc); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.BOOLEAN)) { + return new ProtoBooleanConverter(pvc); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.BYTE_STRING)) { + return new ProtoBinaryConverter(pvc); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.ENUM)) { + return new ProtoEnumConverter(pvc, fieldDescriptor); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.INT)) { + return new ProtoIntConverter(pvc); + } else if (javaType.equals(Descriptors.FieldDescriptor.JavaType.LONG)) { + return new ProtoLongConverter(pvc); + } + } + + throw new UnsupportedOperationException(String.format("Cannot convert type: %s" + + " (Parquet type: %s) ", javaType, parquetType)); + } + + public Message.Builder getBuilder() { + return myBuilder; + } +} diff --git a/parquet-protobuf/src/main/java/parquet/proto/converters/ProtobufStringConverter.java b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtobufStringConverter.java new file mode 100644 index 0000000000..53805ecaa8 --- /dev/null +++ b/parquet-protobuf/src/main/java/parquet/proto/converters/ProtobufStringConverter.java @@ -0,0 +1,37 @@ +/** + * Copyright 2013 Lukas Nalezenec. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto.converters; + +import parquet.io.api.Binary; +import parquet.io.api.PrimitiveConverter; + +public final class ProtobufStringConverter extends PrimitiveConverter { + + final ParentValueContainer parent; + + public ProtobufStringConverter(ParentValueContainer parent) { + this.parent = parent; + } + + + @Override + public void addBinary(Binary binary) { + String str = binary.toStringUsingUTF8(); + parent.add(str); + } + +} diff --git a/parquet-protobuf/src/test/java/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/parquet/proto/ProtoInputOutputFormatTest.java new file mode 100644 index 0000000000..3b276fd857 --- /dev/null +++ b/parquet-protobuf/src/test/java/parquet/proto/ProtoInputOutputFormatTest.java @@ -0,0 +1,102 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto; + +import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.Test; +import parquet.Log; +import parquet.proto.utils.ReadUsingMR; +import parquet.proto.utils.WriteUsingMR; +import parquet.protobuf.test.TestProtobuf; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ProtoInputOutputFormatTest { + + private static final Log LOG = Log.getLog(ProtoInputOutputFormatTest.class); + + /** + * Writes protobuffer using first MR job, reads written file using + * second job and compares input and output. + */ + @Test + public void testInputOutput() throws Exception { + TestProtobuf.IOFormatMessage input; + { + TestProtobuf.IOFormatMessage.Builder msg = TestProtobuf.IOFormatMessage.newBuilder(); + msg.setOptionalDouble(666); + msg.addRepeatedString("Msg1"); + msg.addRepeatedString("Msg2"); + msg.getMsgBuilder().setSomeId(323); + input = msg.build(); + } + + List result = runMRJobs(TestProtobuf.IOFormatMessage.class, input); + + assertEquals(1, result.size()); + TestProtobuf.IOFormatMessage output = (TestProtobuf.IOFormatMessage) result.get(0); + + assertEquals(666, output.getOptionalDouble(), 0.00001); + assertEquals(323, output.getMsg().getSomeId()); + assertEquals("Msg1", output.getRepeatedString(0)); + assertEquals("Msg2", output.getRepeatedString(1)); + + assertEquals(input, output); + + } + + + @Test + public void testProjection() throws Exception { + + TestProtobuf.Document.Builder writtenDocument = TestProtobuf.Document.newBuilder(); + writtenDocument.setDocId(12345); + writtenDocument.addNameBuilder().setUrl("http://goout.cz/"); + + Path outputPath = new WriteUsingMR().write(TestProtobuf.Document.class, writtenDocument.build()); + + //lets prepare reading with schema + ReadUsingMR reader = new ReadUsingMR(); + + Configuration conf = new Configuration(); + reader.setConfiguration(conf); + String projection = "message Document {required int64 DocId; }"; + ProtoParquetInputFormat.setRequestedProjection(conf, projection); + + List output = reader.read(outputPath); + TestProtobuf.Document readDocument = (TestProtobuf.Document) output.get(0); + + + //test that only requested fields were deserialized + assertTrue(readDocument.hasDocId()); + assertTrue("Found data outside projection.", readDocument.getNameCount() == 0); + } + + /** + * Runs job that writes input to file and then job reading data back. + */ + public static List runMRJobs(Class pbClass, Message... messages) throws Exception { + Path outputPath = new WriteUsingMR().write(pbClass, messages); + List result = new ReadUsingMR().read(outputPath); + return result; + } +} diff --git a/parquet-protobuf/src/test/java/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/parquet/proto/ProtoSchemaConverterTest.java new file mode 100644 index 0000000000..ebf11fee46 --- /dev/null +++ b/parquet-protobuf/src/test/java/parquet/proto/ProtoSchemaConverterTest.java @@ -0,0 +1,99 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto; + +import com.google.protobuf.Message; +import org.junit.Test; +import parquet.protobuf.test.TestProtobuf; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import static org.junit.Assert.assertEquals; + +public class ProtoSchemaConverterTest { + + /** + * Converts given pbClass to parquet schema and compares it with expected parquet schema. + */ + private void testConversion(Class pbClass, String parquetSchemaString) throws + Exception { + ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter(); + MessageType schema = protoSchemaConverter.convert(pbClass); + MessageType expectedMT = MessageTypeParser.parseMessageType(parquetSchemaString); + assertEquals(expectedMT.toString(), schema.toString()); + } + + + /** + * Tests that all protobuffer datatypes are converted to correct parquet datatypes. + */ + @Test + public void testConvertAllDatatypes() throws Exception { + String expectedSchema = + "message TestProtobuf.SchemaConverterAllDatatypes {\n" + + " optional double optionalDouble;\n" + + " optional float optionalFloat;\n" + + " optional int32 optionalInt32;\n" + + " optional int64 optionalInt64;\n" + + " optional int32 optionalUInt32;\n" + + " optional int64 optionalUInt64;\n" + + " optional int32 optionalSInt32;\n" + + " optional int64 optionalSInt64;\n" + + " optional int32 optionalFixed32;\n" + + " optional int64 optionalFixed64;\n" + + " optional int32 optionalSFixed32;\n" + + " optional int64 optionalSFixed64;\n" + + " optional boolean optionalBool;\n" + + " optional binary optionalString (UTF8);\n" + + " optional binary optionalBytes;\n" + + " optional group optionalMessage {\n" + + " optional int32 someId;\n" + + " }\n" + + " optional group pbgroup {\n" + + " optional int32 groupInt;\n" + + " }\n" + + " optional binary optionalEnum (ENUM);" + + "}"; + + testConversion(TestProtobuf.SchemaConverterAllDatatypes.class, expectedSchema); + } + + @Test + public void testConvertRepetition() throws Exception { + String expectedSchema = + "message TestProtobuf.SchemaConverterRepetition {\n" + + " optional int32 optionalPrimitive;\n" + + " required int32 requiredPrimitive;\n" + + " optional group repeatedPrimitive (LIST) {\n" + + " repeated int32 repeatedPrimitive_tuple;\n" + + " }\n" + + " optional group optionalMessage {\n" + + " optional int32 someId;\n" + + " }\n" + + " required group requiredMessage {" + + " optional int32 someId;\n" + + " }\n" + + " optional group repeatedMessage (LIST) {" + + " repeated group repeatedMessage_tuple {\n" + + " optional int32 someId;\n" + + " }\n" + + " }\n" + + "}"; + + testConversion(TestProtobuf.SchemaConverterRepetition.class, expectedSchema); + } +} diff --git a/parquet-protobuf/src/test/java/parquet/proto/ProtobufferRecordConverterTest.java b/parquet-protobuf/src/test/java/parquet/proto/ProtobufferRecordConverterTest.java new file mode 100644 index 0000000000..f552393327 --- /dev/null +++ b/parquet-protobuf/src/test/java/parquet/proto/ProtobufferRecordConverterTest.java @@ -0,0 +1,134 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet.proto; + +import com.google.protobuf.ByteString; +import org.junit.Test; +import parquet.protobuf.test.TestProtobuf; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static parquet.proto.TestUtils.testData; +import static parquet.protobuf.test.TestProtobuf.SchemaConverterAllDatatypes; + +public class ProtobufferRecordConverterTest { + + @Test + public void testAllTypes() throws Exception { + SchemaConverterAllDatatypes.Builder data; + data = SchemaConverterAllDatatypes.newBuilder(); + + data.setOptionalBool(true); + data.setOptionalBytes(ByteString.copyFrom("someText", "UTF-8")); + data.setOptionalDouble(0.577); + data.setOptionalFloat(3.1415f); + data.setOptionalEnum(SchemaConverterAllDatatypes.TestEnum.FIRST); + data.setOptionalFixed32(1000 * 1000 * 1); + data.setOptionalFixed64(1000 * 1000 * 1000 * 2); + data.setOptionalInt32(1000 * 1000 * 3); + data.setOptionalInt64(1000L * 1000 * 1000 * 4); + data.setOptionalSFixed32(1000 * 1000 * 5); + data.setOptionalSFixed64(1000L * 1000 * 1000 * 6); + data.setOptionalSInt32(1000 * 1000 * 56); + data.setOptionalSInt64(1000L * 1000 * 1000 * 7); + data.setOptionalString("Good Will Hunting"); + data.setOptionalUInt32(1000 * 1000 * 8); + data.setOptionalUInt64(1000L * 1000 * 1000 * 9); + data.getOptionalMessageBuilder().setSomeId(1984); + data.getPbGroupBuilder().setGroupInt(1492); + + SchemaConverterAllDatatypes dataBuilt = data.build(); + data.clear(); + + List result; + result = testData(dataBuilt); + + //data are fully checked in testData function. Lets do one more check. + SchemaConverterAllDatatypes o = result.get(0); + assertEquals("Good Will Hunting", o.getOptionalString()); + + assertEquals(true, o.getOptionalBool()); + assertEquals(ByteString.copyFrom("someText", "UTF-8"), o.getOptionalBytes()); + assertEquals(0.577, o.getOptionalDouble(), 0.00001); + assertEquals(3.1415f, o.getOptionalFloat(), 0.00001); + assertEquals(SchemaConverterAllDatatypes.TestEnum.FIRST, o.getOptionalEnum()); + assertEquals(1000 * 1000 * 1, o.getOptionalFixed32()); + assertEquals(1000 * 1000 * 1000 * 2, o.getOptionalFixed64()); + assertEquals(1000 * 1000 * 3, o.getOptionalInt32()); + assertEquals(1000L * 1000 * 1000 * 4, o.getOptionalInt64()); + assertEquals(1000 * 1000 * 5, o.getOptionalSFixed32()); + assertEquals(1000L * 1000 * 1000 * 6, o.getOptionalSFixed64()); + assertEquals(1000 * 1000 * 56, o.getOptionalSInt32()); + assertEquals(1000L * 1000 * 1000 * 7, o.getOptionalSInt64()); + assertEquals(1000 * 1000 * 8, o.getOptionalUInt32()); + assertEquals(1000L * 1000 * 1000 * 9, o.getOptionalUInt64()); + assertEquals(1984, o.getOptionalMessage().getSomeId()); + assertEquals(1492, o.getPbGroup().getGroupInt()); + } + + @Test + public void testAllTypesMultiple() throws Exception { + int count = 100; + SchemaConverterAllDatatypes[] input = new SchemaConverterAllDatatypes[count]; + + for (int i = 0; i < count; i++) { + SchemaConverterAllDatatypes.Builder d = SchemaConverterAllDatatypes.newBuilder(); + + if (i % 2 != 0) d.setOptionalBool(true); + if (i % 3 != 0) d.setOptionalBytes(ByteString.copyFrom("someText " + i, "UTF-8")); + if (i % 4 != 0) d.setOptionalDouble(0.577 * i); + if (i % 5 != 0) d.setOptionalFloat(3.1415f * i); + if (i % 6 != 0) d.setOptionalEnum(SchemaConverterAllDatatypes.TestEnum.FIRST); + if (i % 7 != 0) d.setOptionalFixed32(1000 * i * 1); + if (i % 8 != 0) d.setOptionalFixed64(1000 * i * 1000 * 2); + if (i % 9 != 0) d.setOptionalInt32(1000 * i * 3); + if (i % 2 != 1) d.setOptionalSFixed32(1000 * i * 5); + if (i % 3 != 1) d.setOptionalSFixed64(1000 * i * 1000 * 6); + if (i % 4 != 1) d.setOptionalSInt32(1000 * i * 56); + if (i % 5 != 1) d.setOptionalSInt64(1000 * i * 1000 * 7); + if (i % 6 != 1) d.setOptionalString("Good Will Hunting " + i); + if (i % 7 != 1) d.setOptionalUInt32(1000 * i * 8); + if (i % 8 != 1) d.setOptionalUInt64(1000 * i * 1000 * 9); + if (i % 9 != 1) d.getOptionalMessageBuilder().setSomeId(1984 * i); + if (i % 2 != 1) d.getPbGroupBuilder().setGroupInt(1492 * i); + if (i % 3 != 1) d.setOptionalInt64(1000 * i * 1000 * 4); + input[i] = d.build(); + } + + List result; + result = testData(input); + + //data are fully checked in testData function. Lets do one more check. + assertEquals("Good Will Hunting 0", result.get(0).getOptionalString()); + assertEquals("Good Will Hunting 90", result.get(90).getOptionalString()); + } + + + @Test + public void testDefaults() throws Exception { + SchemaConverterAllDatatypes.Builder data; + data = SchemaConverterAllDatatypes.newBuilder(); + + List result = testData(data.build()); + SchemaConverterAllDatatypes message = result.get(0); + assertEquals("", message.getOptionalString()); + assertEquals(false, message.getOptionalBool()); + assertEquals(0, message.getOptionalFixed32()); + } +} diff --git a/parquet-protobuf/src/test/java/parquet/proto/TestUtils.java b/parquet-protobuf/src/test/java/parquet/proto/TestUtils.java new file mode 100644 index 0000000000..3e97a76356 --- /dev/null +++ b/parquet-protobuf/src/test/java/parquet/proto/TestUtils.java @@ -0,0 +1,174 @@ +package parquet.proto; + +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class TestUtils { + + public static Path someTemporaryFilePath() throws IOException { + File tmp = File.createTempFile("ParquetProtobuf_unitTest", ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + return new Path(tmp.getPath()); + } + + public static T writeAndReadSingle(T records) throws IOException { + return writeAndRead(records).get(0); + } + + public static List writeAndRead(T... records) throws IOException { + Class cls = inferRecordsClass(records); + + Path file = writeMessages(cls, records); + + return readMessages(file); + } + + private static Class inferRecordsClass(MessageOrBuilder[] records) { + Class cls = null; + + for (MessageOrBuilder record : records) { + Class recordClass; + if (record instanceof Message.Builder) { + recordClass = ((Message.Builder) record).build().getClass(); + } else if (record instanceof Message) { + recordClass = ((Message) record).getClass(); + } else { + throw new RuntimeException("Illegal class " + record); + } + + if (cls == null) { + cls = recordClass; + } else if (!cls.equals(recordClass)) { + throw new RuntimeException("Wrong class " + cls + ", expected protobuffer."); + } + } + return cls; + } + + /** + * Writes messages to file, reads messages from file and checks if everything is OK. + */ + public static List testData(T... messages) throws IOException { + + checkSameBuilderInstance(messages); + + List input = cloneList(messages); + + List output = (List) writeAndRead(messages); + + List outputAsMessages = asMessages(output); + assertEquals("The protobuffers are not same:\n", asMessages(input), outputAsMessages); + return (List) outputAsMessages; + } + + private static List cloneList(MessageOrBuilder[] messages) { + List result = new ArrayList(); + + for (MessageOrBuilder mob : messages) { + result.add(asMessage(mob)); + } + + return result; + } + + public static List asMessages(List mobs) { + List result = new ArrayList(); + for (MessageOrBuilder messageOrBuilder : mobs) { + result.add(asMessage(messageOrBuilder)); + } + + return result; + } + + /** + * Given message or builder returns same data as message + */ + public static Message asMessage(MessageOrBuilder mob) { + Message message; + if (mob instanceof Message.Builder) { + message = ((Message.Builder) mob).build(); + } else { + message = (Message) mob; + } + return message; + } + + /** + * Fails if some instance of builder is two times in list. + */ + private static void checkSameBuilderInstance(MessageOrBuilder[] messages) { + for (int i = 0; i < messages.length; i++) { + MessageOrBuilder firstMessage = messages[i]; + boolean isBuilder = firstMessage instanceof Message.Builder; + + if (isBuilder) { + for (int j = 0; j < messages.length; j++) { + MessageOrBuilder secondMessage = messages[j]; + + if (i != j) { + boolean isSame = secondMessage == firstMessage; + if (isSame) { + fail("Data contains two references to same instance." + secondMessage); + } + } + } + } + } + } + + /** + * Reads messages from given file. The file could/should be created by method writeMessages + */ + public static List readMessages(Path file) throws IOException { + ProtoParquetReader reader = new ProtoParquetReader(file); + + List result = new ArrayList(); + boolean hasNext = true; + while (hasNext) { + T item = reader.read(); + if (item == null) { + hasNext = false; + } else { + assertNotNull(item); + // It makes sense to return message but production code wont work with messages + result.add((T) asMessage(item).toBuilder()); + } + } + reader.close(); + return result; + } + + /** + * Writes messages to temporary file and returns its path. + */ + public static Path writeMessages(MessageOrBuilder... records) throws IOException { + return writeMessages(inferRecordsClass(records), records); + } + + public static Path writeMessages(Class cls, MessageOrBuilder... records) throws IOException { + Path file = someTemporaryFilePath(); + + ProtoParquetWriter writer = + new ProtoParquetWriter(file, cls); + + for (MessageOrBuilder record : records) { + writer.write(record); + } + + writer.close(); + + return file; + } + +} diff --git a/parquet-protobuf/src/test/java/parquet/proto/utils/ReadUsingMR.java b/parquet-protobuf/src/test/java/parquet/proto/utils/ReadUsingMR.java new file mode 100644 index 0000000000..87394f4ce5 --- /dev/null +++ b/parquet-protobuf/src/test/java/parquet/proto/utils/ReadUsingMR.java @@ -0,0 +1,78 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto.utils; + +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import parquet.proto.ProtoParquetInputFormat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + + +/** + * Reads data from given parquet file using MapReduce job. + */ +public class ReadUsingMR { + + private static List outputMessages; + + Configuration conf; + + public void setConfiguration(Configuration conf) { + this.conf = conf; + } + + public static class ReadingMapper extends Mapper { + protected void map(Void key, MessageOrBuilder value, Context context) throws IOException, InterruptedException { + Message clone = ((Message.Builder) value).build(); + outputMessages.add(clone); + } + } + + public List read(Path parquetPath) throws Exception { + + synchronized (ReadUsingMR.class) { + outputMessages = new ArrayList(); + + if (conf == null) conf = new Configuration(); + + final Job job = new Job(conf, "read"); + job.setInputFormatClass(ProtoParquetInputFormat.class); + ProtoParquetInputFormat.setInputPaths(job, parquetPath); + + job.setMapperClass(ReadingMapper.class); + job.setNumReduceTasks(0); + + job.setOutputFormatClass(NullOutputFormat.class); + + WriteUsingMR.waitForJob(job); + + List result = Collections.unmodifiableList(outputMessages); + outputMessages = null; + return result; + } + } + +} diff --git a/parquet-protobuf/src/test/java/parquet/proto/utils/WriteUsingMR.java b/parquet-protobuf/src/test/java/parquet/proto/utils/WriteUsingMR.java new file mode 100644 index 0000000000..22119ac677 --- /dev/null +++ b/parquet-protobuf/src/test/java/parquet/proto/utils/WriteUsingMR.java @@ -0,0 +1,113 @@ +/** + * Copyright 2013 Lukas Nalezenec + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.proto.utils; + +import com.google.protobuf.Message; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import parquet.Log; +import parquet.proto.ProtoParquetOutputFormat; +import parquet.proto.TestUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static java.lang.Thread.sleep; + +/** + * Writes data to parquet file using MapReduce job. + */ +public class WriteUsingMR { + + private static final Log LOG = Log.getLog(WriteUsingMR.class); + Configuration conf; + private static List inputMessages; + Path outputPath; + + public void setConfiguration(Configuration conf) { + this.conf = conf; + } + + public static class WritingMapper extends Mapper { + + public void run(Context context) throws IOException, InterruptedException { + if (inputMessages == null || inputMessages.size() == 0) { + throw new RuntimeException("No mock data given"); + } else { + for (Message msg : inputMessages) { + context.write(null, msg); + LOG.debug("Reading msg from mock writing mapper" + msg); + } + } + } + } + + public Path write(Class pbClass, Message... messages) throws Exception { + + synchronized (WriteUsingMR.class) { + + outputPath = TestUtils.someTemporaryFilePath(); + if (conf == null) conf = new Configuration(); + final Path inputPath = new Path("src/test/java/parquet/proto/ProtoInputOutputFormatTest.java"); + + inputMessages = new ArrayList(); + + for (Message m : messages) { + inputMessages.add(m); + } + + inputMessages = Collections.unmodifiableList(inputMessages); + + + final Job job = new Job(conf, "write"); + + // input not really used + TextInputFormat.addInputPath(job, inputPath); + job.setInputFormatClass(TextInputFormat.class); + + job.setMapperClass(WritingMapper.class); + job.setNumReduceTasks(0); + + job.setOutputFormatClass(ProtoParquetOutputFormat.class); + ProtoParquetOutputFormat.setOutputPath(job, outputPath); + ProtoParquetOutputFormat.setProtobufferClass(job, pbClass); + + waitForJob(job); + + inputMessages = null; + return outputPath; + } + } + + static void waitForJob(Job job) throws Exception { + job.submit(); + while (!job.isComplete()) { + LOG.debug("waiting for job " + job.getJobName()); + sleep(50); + } + LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE")); + if (!job.isSuccessful()) { + throw new RuntimeException("job failed " + job.getJobName()); + } + } +} diff --git a/parquet-protobuf/src/test/resources/TestProtobuf.proto b/parquet-protobuf/src/test/resources/TestProtobuf.proto new file mode 100644 index 0000000000..2362d53ed1 --- /dev/null +++ b/parquet-protobuf/src/test/resources/TestProtobuf.proto @@ -0,0 +1,94 @@ +package TestProtobuf; + +option java_package = "parquet.protobuf.test"; + +// original dremel paper structures: Original paper used groups, not internal +// messages but groups were depricated. + +message Document { + required int64 DocId = 1; + optional Links links = 32; + repeated group Name = 24 { + repeated Language name = 4; + optional string url = 5; + } +} + +message Language { + required string code = 12; + optional string Country = 14; +} + +message Links { + repeated int64 Backward = 1; + repeated int64 Forward = 2; +} + + +// begin - protobuffers for ProtoSchemaConverterTest + + message SchemaConverterSimpleMessage { + optional int32 someId = 3; + } + + message SchemaConverterAllDatatypes { + optional double optionalDouble = 1; + optional float optionalFloat = 2; + optional int32 optionalInt32 = 3; + optional int64 optionalInt64 = 4; + optional uint32 optionalUInt32 = 5; + optional uint64 optionalUInt64 = 6; + optional sint32 optionalSInt32 = 7; + optional sint64 optionalSInt64 = 8; + optional fixed32 optionalFixed32 = 9; + optional fixed64 optionalFixed64 = 10; + optional sfixed32 optionalSFixed32 = 11; + optional sfixed64 optionalSFixed64 = 12; + optional bool optionalBool = 13; + optional string optionalString = 14; + optional bytes optionalBytes = 15; + optional SchemaConverterSimpleMessage optionalMessage = 16; + optional group PbGroup = 17 { + optional int32 groupInt = 2; + } + enum TestEnum { + FIRST = 0; + SECOND = 1; + } + optional TestEnum optionalEnum = 18; + } + +message RequiredWithDefaultMessage{ + // tohe je asi spatne, required nemuzou mit defaulty + required int32 optionalWD = 18 [default = 10]; + } + + message SchemaConverterRepetition { + optional int32 optionalPrimitive = 1; + required int32 requiredPrimitive = 2; + repeated int32 repeatedPrimitive = 3; + optional SchemaConverterSimpleMessage optionalMessage = 7; + required SchemaConverterSimpleMessage requiredMessage = 8; + repeated SchemaConverterSimpleMessage repeatedMessage = 9; + } + +// end - protobuffers for ProtoSchemaConverterTest + + +//begin protobuffers for ProtoInputOutputFormatTest + +message InputOutputMsgFormat { + optional int32 someId = 3; +} + +message IOFormatMessage { + optional double optionalDouble = 1; + repeated string repeatedString = 2; + optional InputOutputMsgFormat msg = 3; + } + +//end protobuffers for ProtoInputOutputFormatTest + + +//please place your unit test protobuffer definitions here. +