Skip to content

[FLINK-37722] Eliminate redundant field initialization of PojoSerializer #26513

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,7 @@ public T copy(T from) {
"Error during POJO copy, this should not happen since we check the fields before.");
}
} else if (actualType == clazz) {
T target;
try {
target = (T) from.getClass().newInstance();
} catch (Throwable t) {
throw new RuntimeException("Cannot instantiate class.", t);
}
T target = instantiateRaw();
// no subclass
try {
for (int i = 0; i < numFields; i++) {
Expand Down Expand Up @@ -424,27 +419,17 @@ public T deserialize(DataInputView source) throws IOException {
return null;
}

T target;

Class<?> actualSubclass = null;
TypeSerializer subclassSerializer = null;

if ((flags & IS_SUBCLASS) != 0) {
String subclassName = source.readUTF();
actualSubclass = getSubclassByName(subclassName);
subclassSerializer = getSubclassSerializer(actualSubclass);
target = (T) subclassSerializer.createInstance();
// also initialize fields for which the subclass serializer is not responsible
initializeFields(target);
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
Class<?> actualSubclass = getSubclassByName(subclassName);
TypeSerializer subclassSerializer = getSubclassSerializer(actualSubclass);
return (T) subclassSerializer.deserialize(source);
}

if ((flags & IS_TAGGED_SUBCLASS) != 0) {
int subclassTag = source.readByte();
subclassSerializer = registeredSerializers[subclassTag];
target = (T) subclassSerializer.createInstance();
// also initialize fields for which the subclass serializer is not responsible
initializeFields(target);
} else {
target = createInstance();
TypeSerializer subclassSerializer = registeredSerializers[subclassTag];
return (T) subclassSerializer.deserialize(source);
}

if (isRecord()) {
Expand All @@ -456,8 +441,11 @@ public T deserialize(DataInputView source) throws IOException {
builder.setField(i, fieldValue);
}
}
target = builder.build();
} else if ((flags & NO_SUBCLASS) != 0) {
return builder.build();
}

if ((flags & NO_SUBCLASS) != 0) {
T target = instantiateRaw();
try {
for (int i = 0; i < numFields; i++) {
boolean isNull = source.readBoolean();
Expand All @@ -471,12 +459,10 @@ public T deserialize(DataInputView source) throws IOException {
"Error during POJO copy, this should not happen since we check the fields before.",
e);
}
} else {
if (subclassSerializer != null) {
target = (T) subclassSerializer.deserialize(target, source);
}
return target;
}
return target;

throw new RuntimeException("Unknown POJO flags, this should not happen.");
}

@Override
Expand All @@ -489,36 +475,36 @@ public T deserialize(T reuse, DataInputView source) throws IOException {
return null;
}

Class<?> subclass = null;
TypeSerializer subclassSerializer = null;
if ((flags & IS_SUBCLASS) != 0) {
String subclassName = source.readUTF();
subclass = getSubclassByName(subclassName);
subclassSerializer = getSubclassSerializer(subclass);
Class<?> subclass = getSubclassByName(subclassName);
TypeSerializer subclassSerializer = getSubclassSerializer(subclass);

if (reuse == null || subclass != reuse.getClass()) {
// cannot reuse
reuse = (T) subclassSerializer.createInstance();
// also initialize fields for which the subclass serializer is not responsible
initializeFields(reuse);
return (T) subclassSerializer.deserialize(source);
} else {
return (T) subclassSerializer.deserialize(reuse, source);
}
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
}

if ((flags & IS_TAGGED_SUBCLASS) != 0) {
int subclassTag = source.readByte();
subclassSerializer = registeredSerializers[subclassTag];
TypeSerializer subclassSerializer = registeredSerializers[subclassTag];

if (reuse == null || ((PojoSerializer) subclassSerializer).clazz != reuse.getClass()) {
// cannot reuse
reuse = (T) subclassSerializer.createInstance();
// also initialize fields for which the subclass serializer is not responsible
initializeFields(reuse);
}
} else {
if (reuse == null || clazz != reuse.getClass()) {
reuse = createInstance();
return (T) subclassSerializer.deserialize(source);
} else {
return (T) subclassSerializer.deserialize(reuse, source);
}
}

if (isRecord()) {
if (reuse != null && clazz != reuse.getClass()) {
// cannot reuse, and cannot directly instantiate a record either
reuse = null;
}
try {
JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = recordFactory.newBuilder();
for (int i = 0; i < numFields; i++) {
Expand All @@ -537,13 +523,19 @@ public T deserialize(T reuse, DataInputView source) throws IOException {
}
}

reuse = builder.build();
return builder.build();
} catch (IllegalAccessException e) {
throw new RuntimeException(
"Error during POJO copy, this should not happen since we check the fields before.",
e);
}
} else if ((flags & NO_SUBCLASS) != 0) {
}

if ((flags & NO_SUBCLASS) != 0) {
if (reuse == null || clazz != reuse.getClass()) {
// cannot reuse
reuse = instantiateRaw();
}
try {
for (int i = 0; i < numFields; i++) {
boolean isNull = source.readBoolean();
Expand All @@ -564,13 +556,10 @@ public T deserialize(T reuse, DataInputView source) throws IOException {
"Error during POJO copy, this should not happen since we check the fields before.",
e);
}
} else {
if (subclassSerializer != null) {
reuse = (T) subclassSerializer.deserialize(reuse, source);
}
return reuse;
}

return reuse;
throw new RuntimeException("Unknown POJO flags, this should not happen.");
}

private Object deserializeField(Object reuseField, int i, DataInputView source)
Expand Down Expand Up @@ -888,4 +877,12 @@ private static <T> PojoSerializerSnapshot<T> buildSnapshot(
nonRegisteredSubclassSerializerCache,
serializerConfig);
}

private T instantiateRaw() {
try {
return clazz.newInstance();
} catch (Exception e) {
throw new RuntimeException("Cannot instantiate class.", e);
}
}
}