Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java): use varint for jdk compatible serializers #1960

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@
import java.io.InputStream;
import java.io.ObjectInput;
import org.apache.fury.Fury;
import org.apache.fury.config.LongEncoding;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.serializer.PrimitiveSerializers.LongSerializer;
import org.apache.fury.serializer.StringSerializer;
import org.apache.fury.util.Preconditions;

/** ObjectInput based on {@link Fury} and {@link MemoryBuffer}. */
public class MemoryBufferObjectInput extends InputStream implements ObjectInput {
private final Fury fury;
private final boolean compressInt;
private final LongEncoding longEncoding;
private MemoryBuffer buffer;
private final StringSerializer stringSerializer;

public MemoryBufferObjectInput(Fury fury, MemoryBuffer buffer) {
this.fury = fury;
this.compressInt = fury.compressInt();
this.longEncoding = fury.longEncoding();
this.buffer = buffer;
this.stringSerializer = new StringSerializer(fury);
}
Expand Down Expand Up @@ -134,12 +140,12 @@ public char readChar() throws IOException {

@Override
public int readInt() throws IOException {
return buffer.readInt32();
return compressInt ? buffer.readVarInt32() : buffer.readInt32();
}

@Override
public long readLong() throws IOException {
return buffer.readInt64();
return LongSerializer.readInt64(buffer, longEncoding);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,28 @@

package org.apache.fury.io;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutput;
import java.io.OutputStream;
import org.apache.fury.Fury;
import org.apache.fury.config.LongEncoding;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.serializer.PrimitiveSerializers.LongSerializer;
import org.apache.fury.serializer.StringSerializer;
import org.apache.fury.util.Preconditions;

/** ObjectOutput based on {@link Fury} and {@link MemoryBuffer}. */
public class MemoryBufferObjectOutput extends OutputStream implements ObjectOutput {
private final Fury fury;
private final DataOutputStream utf8out = new DataOutputStream(this);
private final boolean compressInt;
private final LongEncoding longEncoding;
private final StringSerializer stringSerializer;
private MemoryBuffer buffer;

public MemoryBufferObjectOutput(Fury fury, MemoryBuffer buffer) {
this.fury = fury;
this.compressInt = fury.compressInt();
this.longEncoding = fury.longEncoding();
this.buffer = buffer;
this.stringSerializer = new StringSerializer(fury);
}
Expand Down Expand Up @@ -91,12 +95,16 @@ public void writeChar(int v) throws IOException {

@Override
public void writeInt(int v) throws IOException {
buffer.writeInt32(v);
if (compressInt) {
buffer.writeVarInt32(v);
} else {
buffer.writeInt32(v);
}
}

@Override
public void writeLong(long v) throws IOException {
buffer.writeInt64(v);
LongSerializer.writeInt64(buffer, v, longEncoding);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ public String toString() {
*/
private static class FuryObjectOutputStream extends ObjectOutputStream {
private final Fury fury;
private final boolean compressInt;
private final SlotsInfo slotsInfo;
private MemoryBuffer buffer;
private Object targetObject;
Expand All @@ -422,6 +423,7 @@ protected FuryObjectOutputStream(SlotsInfo slotsInfo) throws IOException {
super();
this.slotsInfo = slotsInfo;
this.fury = slotsInfo.slotsSerializer.fury;
this.compressInt = fury.compressInt();
}

@Override
Expand Down Expand Up @@ -628,12 +630,16 @@ public void writeChar(int v) throws IOException {

@Override
public void writeInt(int v) throws IOException {
buffer.writeInt32(v);
if (compressInt) {
buffer.writeVarInt32(v);
} else {
buffer.writeInt32(v);
}
}

@Override
public void writeLong(long v) throws IOException {
buffer.writeInt64(v);
fury.writeInt64(buffer, v);
}

@Override
Expand Down Expand Up @@ -692,6 +698,7 @@ public void close() throws IOException {}
*/
private static class FuryObjectInputStream extends ObjectInputStream {
private final Fury fury;
private final boolean compressInt;
private final SlotsInfo slotsInfo;
private MemoryBuffer buffer;
private Object targetObject;
Expand All @@ -701,6 +708,7 @@ private static class FuryObjectInputStream extends ObjectInputStream {

protected FuryObjectInputStream(SlotsInfo slotsInfo) throws IOException {
this.fury = slotsInfo.slotsSerializer.fury;
this.compressInt = fury.compressInt();
this.slotsInfo = slotsInfo;
}

Expand Down Expand Up @@ -933,12 +941,12 @@ public char readChar() throws IOException {

@Override
public int readInt() throws IOException {
return buffer.readInt32();
return compressInt ? buffer.readVarInt32() : buffer.readInt32();
}

@Override
public long readLong() throws IOException {
return buffer.readInt64();
return fury.readInt64(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@

import java.io.IOException;
import org.apache.fury.Fury;
import org.apache.fury.FuryTestBase;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryUtils;
import org.testng.annotations.Test;

public class MemoryBufferObjectInputTest {
public class MemoryBufferObjectInputTest extends FuryTestBase {

@Test
public void testFuryObjectInput() throws IOException {
Fury fury = Fury.builder().build();
@Test(dataProvider = "compressNumber")
public void testFuryObjectInput(boolean compressNumber) throws IOException {
Fury fury = Fury.builder().withNumberCompressed(compressNumber).build();
MemoryBuffer buffer = MemoryUtils.buffer(32);
buffer.writeByte(1);
buffer.writeInt32(2);
buffer.writeInt64(3);
if (compressNumber) {
buffer.writeVarInt32(2);
} else {
buffer.writeInt32(2);
}
fury.writeInt64(buffer, 3);
buffer.writeBoolean(true);
buffer.writeFloat32(4.1f);
buffer.writeFloat64(4.2);
Expand Down
Loading