Skip to content

Commit

Permalink
first version of the Loader/Storer
Browse files Browse the repository at this point in the history
  • Loading branch information
julienledem committed Oct 4, 2012
1 parent ab7df25 commit 2302fe4
Show file tree
Hide file tree
Showing 35 changed files with 2,519 additions and 70 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*.class
.project
.classpath
.settings
target
# Package Files #
*.jar
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
<header>license.txt</header>
<strictCheck>true</strictCheck>
</configuration>
<executions>
<!--executions>
<execution>
<phase>test</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</executions-->
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
20 changes: 20 additions & 0 deletions redelm-column/src/main/java/redelm/column/mem/MemColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,24 @@ public int memSize() {
}
}

public byte[] getData() {
initiateRead();
return data;
}

public ColumnDescriptor getDescriptor() {
return path;
}

public void set(byte[] data, int recordCount) {
this.data = data;
this.out = null;
this.in = new ByteArrayInputStream(data);
this.readRecords = 0;
this.recordCount = recordCount;
}

public int getRecordCount() {
return recordCount;
}
}
23 changes: 23 additions & 0 deletions redelm-column/src/main/java/redelm/column/mem/MemColumnsStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import redelm.column.ColumnReader;
import redelm.column.ColumnWriter;
import redelm.column.ColumnsStore;
import redelm.schema.PrimitiveType.Primitive;


public class MemColumnsStore extends ColumnsStore {
Expand Down Expand Up @@ -179,4 +180,26 @@ public int maxColMemSize() {
}
return max;
}

public Collection<MemColumn> getColumns() {
return columns.values();
}

public void setColumn(String[] path, Primitive primitive, byte[] data, int recordCount) {
ColumnDescriptor descriptor = new ColumnDescriptor(path, primitive);
MemColumn col = getColumn(descriptor);
col.set(data, recordCount);
columns.put(descriptor, col);

}

public boolean isFullyConsumed() {
for (MemColumn c : columns.values()) {
if (c.isFullyConsumed()) {
return true;
}

}
return false;
}
}
4 changes: 4 additions & 0 deletions redelm-column/src/main/java/redelm/schema/GroupType.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public String getFieldName(int index) {
return fields.get(index).getName();
}

public boolean containsField(String name) {
return indexByName.containsKey(name);
}

public int getFieldIndex(String name) {
if (!indexByName.containsKey(name)) {
throw new RuntimeException(name + " not found in " + this);
Expand Down
63 changes: 63 additions & 0 deletions redelm-column/src/main/java/redelm/schema/MessageType.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,71 @@
*/
package redelm.schema;

import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import redelm.schema.PrimitiveType.Primitive;


public class MessageType extends GroupType {


public static MessageType parse(String schemaString) {

StringTokenizer st = new StringTokenizer(schemaString, " ;{}\n\t", true);
String t = nextToken(st);
check(t, "message", "start with 'message'");
String name = nextToken(st);
Type[] fields = readGroupTypeFields(st);
return new MessageType(name, fields);
}

private static Type[] readGroupTypeFields(StringTokenizer st) {
List<Type> types = new ArrayList<Type>();
String t = nextToken(st);
check(t, "{", "start of message");
while (!(t = nextToken(st)).equals("}")) {
types.add(readType(t, st));
}
return types.toArray(new Type[types.size()]);
}

private static Type readType(String t, StringTokenizer st) {
Repetition r = Repetition.valueOf(t.toUpperCase());
t = nextToken(st);
String name = nextToken(st);
if (t.equalsIgnoreCase("group")) {
Type[] fields = readGroupTypeFields(st);
check(nextToken(st), ";", "field ended by ;");
return new GroupType(r, name, fields);
} else {
Primitive p = Primitive.valueOf(t.toUpperCase());
check(nextToken(st), ";", "field ended by ;");
return new PrimitiveType(r, p, name);
}
}

private static void check(String t, String expected, String message) {
if (!t.equalsIgnoreCase(expected)) {
throw new IllegalArgumentException("expected: "+message+ ": '" + expected + "' got '" + t + "'");
}
}

private static String nextToken(StringTokenizer st) {
while (st.hasMoreTokens()) {
String t = st.nextToken();
if (!isWhitespace(t)) {
return t;
}
}
throw new IllegalArgumentException("unexpected end of schema");
}

private static boolean isWhitespace(String t) {
return t.equals(" ") || t.equals("\t") || t.equals("\n");
}

public MessageType(String name, Type... fields) {
super(Repetition.REPEATED, name, fields);
setFieldPath(new String[0]);
Expand All @@ -34,4 +96,5 @@ public String toString() {
+ membersDisplayString(" ")
+"}\n";
}

}
26 changes: 26 additions & 0 deletions redelm-column/src/main/java/redelm/schema/PrimitiveType.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,35 @@ public void addValueToRecordConsumer(
int field, int index) {
recordConsumer.addBinary(group.getBinary(field, index));
}
},
FLOAT {
@Override
public void writeValueToColumn(GroupValueSource parent, String field,
int index, int r, int d, ColumnWriter columnWriter) {
throw new UnsupportedOperationException();
}

@Override
public String toString(ColumnReader columnReader) {
throw new UnsupportedOperationException();
}

@Override
public void addValueToRecordConsumer(RecordConsumer recordConsumer,
ColumnReader columnReader) {
throw new UnsupportedOperationException();
}

@Override
public void addValueToRecordConsumer(RecordConsumer recordConsumer,
Group group, int field, int index) {
throw new UnsupportedOperationException();
}

};



abstract public void writeValueToColumn(GroupValueSource parent, String field, int index, int r, int d, ColumnWriter columnWriter);

abstract public String toString(ColumnReader columnReader);
Expand Down
15 changes: 15 additions & 0 deletions redelm-column/src/test/java/redelm/schema/TestMessageType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package redelm.schema;

import junit.framework.Assert;
import redelm.data.simple.example.Paper;

import org.junit.Test;

public class TestMessageType {
@Test
public void test() {
System.out.println(Paper.schema.toString());
MessageType schema = MessageType.parse(Paper.schema.toString());
Assert.assertEquals(schema.toString(), Paper.schema.toString());
}
}
23 changes: 22 additions & 1 deletion redelm-pig/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,33 @@
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.10.0</version>
<version>0.11.0+59</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>11.0</version>
</dependency>

</dependencies>
</project>
52 changes: 52 additions & 0 deletions redelm-pig/src/main/java/redelm/pig/BlockMetaData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package redelm.pig;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class BlockMetaData implements Serializable {
private static final long serialVersionUID = 1L;

private final long startIndex;
private long endIndex;
private List<ColumnMetaData> columns = new ArrayList<ColumnMetaData>();
private int recordCount;

public BlockMetaData(long startIndex) {
this.startIndex = startIndex;
}

public void setEndIndex(long endIndex) {
this.endIndex = endIndex;
}

public void addColumn(ColumnMetaData column) {
columns.add(column);
}

public long getStartIndex() {
return startIndex;
}

public long getEndIndex() {
return endIndex;
}

public List<ColumnMetaData> getColumns() {
return columns;
}

@Override
public String toString() {
return "BlockMetaData{" + startIndex + ", " + endIndex + " " + columns + "}";
}

public int getRecordCount() {
return recordCount;
}

public void setRecordCount(int recordCount) {
this.recordCount = recordCount;
}
}
28 changes: 28 additions & 0 deletions redelm-pig/src/main/java/redelm/pig/ColumnData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package redelm.pig;

import java.util.Arrays;

public class ColumnData {

private final String[] path;
private byte[] data;

public ColumnData(String[] path, byte[] data) {
super();
this.path = path;
this.data = data;
}

public String[] getPath() {
return path;
}

public byte[] getData() {
return data;
}

@Override
public String toString() {
return "ColumnData{"+Arrays.toString(path)+" " + data.length + "B}";
}
}
56 changes: 56 additions & 0 deletions redelm-pig/src/main/java/redelm/pig/ColumnMetaData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package redelm.pig;

import java.io.Serializable;
import java.util.Arrays;

import redelm.schema.PrimitiveType.Primitive;

public class ColumnMetaData implements Serializable {
private static final long serialVersionUID = 1L;

private final long startIndex;
private long endIndex;
private final String[] path;
private final Primitive type;
private int recordCount;

public ColumnMetaData(long startIndex, String[] path, Primitive type) {
this.startIndex = startIndex;
this.path = path;
this.type = type;
}

public void setEndIndex(long endIndex) {
this.endIndex = endIndex;
}

public long getStartIndex() {
return startIndex;
}

public long getEndIndex() {
return endIndex;
}

public String[] getPath() {
return path;
}

public Primitive getType() {
return type;
}

public int getRecordCount() {
return recordCount;
}

public void setRecordCount(int recordCount) {
this.recordCount = recordCount;
}

@Override
public String toString() {
return "ColumnMetaData{" + startIndex + ", " + endIndex + " " + Arrays.toString(path) + "}";
}

}
Loading

0 comments on commit 2302fe4

Please sign in to comment.