diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 3ec270da41cdf..8571d566e4284 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -254,12 +254,7 @@ public T copy(T from) { "Error during POJO copy, this should not happen since we check the fields before."); } } else if (actualType == clazz) { - T target; - try { - target = (T) from.getClass().newInstance(); - } catch (Throwable t) { - throw new RuntimeException("Cannot instantiate class.", t); - } + T target = instantiateRaw(); // no subclass try { for (int i = 0; i < numFields; i++) { @@ -424,27 +419,17 @@ public T deserialize(DataInputView source) throws IOException { return null; } - T target; - - Class actualSubclass = null; - TypeSerializer subclassSerializer = null; - if ((flags & IS_SUBCLASS) != 0) { String subclassName = source.readUTF(); - actualSubclass = getSubclassByName(subclassName); - subclassSerializer = getSubclassSerializer(actualSubclass); - target = (T) subclassSerializer.createInstance(); - // also initialize fields for which the subclass serializer is not responsible - initializeFields(target); - } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + Class actualSubclass = getSubclassByName(subclassName); + TypeSerializer subclassSerializer = getSubclassSerializer(actualSubclass); + return (T) subclassSerializer.deserialize(source); + } + if ((flags & IS_TAGGED_SUBCLASS) != 0) { int subclassTag = source.readByte(); - subclassSerializer = registeredSerializers[subclassTag]; - target = (T) subclassSerializer.createInstance(); - // also initialize fields for which the subclass serializer is not responsible - initializeFields(target); - } else { - target = createInstance(); + TypeSerializer subclassSerializer = registeredSerializers[subclassTag]; + return (T) subclassSerializer.deserialize(source); } if (isRecord()) { @@ -456,8 +441,11 @@ public T deserialize(DataInputView source) throws IOException { builder.setField(i, fieldValue); } } - target = builder.build(); - } else if ((flags & NO_SUBCLASS) != 0) { + return builder.build(); + } + + if ((flags & NO_SUBCLASS) != 0) { + T target = instantiateRaw(); try { for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); @@ -471,12 +459,10 @@ public T deserialize(DataInputView source) throws IOException { "Error during POJO copy, this should not happen since we check the fields before.", e); } - } else { - if (subclassSerializer != null) { - target = (T) subclassSerializer.deserialize(target, source); - } + return target; } - return target; + + throw new RuntimeException("Unknown POJO flags, this should not happen."); } @Override @@ -489,36 +475,36 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return null; } - Class subclass = null; - TypeSerializer subclassSerializer = null; if ((flags & IS_SUBCLASS) != 0) { String subclassName = source.readUTF(); - subclass = getSubclassByName(subclassName); - subclassSerializer = getSubclassSerializer(subclass); + Class subclass = getSubclassByName(subclassName); + TypeSerializer subclassSerializer = getSubclassSerializer(subclass); if (reuse == null || subclass != reuse.getClass()) { // cannot reuse - reuse = (T) subclassSerializer.createInstance(); - // also initialize fields for which the subclass serializer is not responsible - initializeFields(reuse); + return (T) subclassSerializer.deserialize(source); + } else { + return (T) subclassSerializer.deserialize(reuse, source); } - } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { + } + + if ((flags & IS_TAGGED_SUBCLASS) != 0) { int subclassTag = source.readByte(); - subclassSerializer = registeredSerializers[subclassTag]; + TypeSerializer subclassSerializer = registeredSerializers[subclassTag]; if (reuse == null || ((PojoSerializer) subclassSerializer).clazz != reuse.getClass()) { // cannot reuse - reuse = (T) subclassSerializer.createInstance(); - // also initialize fields for which the subclass serializer is not responsible - initializeFields(reuse); - } - } else { - if (reuse == null || clazz != reuse.getClass()) { - reuse = createInstance(); + return (T) subclassSerializer.deserialize(source); + } else { + return (T) subclassSerializer.deserialize(reuse, source); } } if (isRecord()) { + if (reuse != null && clazz != reuse.getClass()) { + // cannot reuse, and cannot directly instantiate a record either + reuse = null; + } try { JavaRecordBuilderFactory.JavaRecordBuilder builder = recordFactory.newBuilder(); for (int i = 0; i < numFields; i++) { @@ -537,13 +523,19 @@ public T deserialize(T reuse, DataInputView source) throws IOException { } } - reuse = builder.build(); + return builder.build(); } catch (IllegalAccessException e) { throw new RuntimeException( "Error during POJO copy, this should not happen since we check the fields before.", e); } - } else if ((flags & NO_SUBCLASS) != 0) { + } + + if ((flags & NO_SUBCLASS) != 0) { + if (reuse == null || clazz != reuse.getClass()) { + // cannot reuse + reuse = instantiateRaw(); + } try { for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); @@ -564,13 +556,10 @@ public T deserialize(T reuse, DataInputView source) throws IOException { "Error during POJO copy, this should not happen since we check the fields before.", e); } - } else { - if (subclassSerializer != null) { - reuse = (T) subclassSerializer.deserialize(reuse, source); - } + return reuse; } - return reuse; + throw new RuntimeException("Unknown POJO flags, this should not happen."); } private Object deserializeField(Object reuseField, int i, DataInputView source) @@ -888,4 +877,12 @@ private static PojoSerializerSnapshot buildSnapshot( nonRegisteredSubclassSerializerCache, serializerConfig); } + + private T instantiateRaw() { + try { + return clazz.newInstance(); + } catch (Exception e) { + throw new RuntimeException("Cannot instantiate class.", e); + } + } }