diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java index 6ecc9a0f086db..9fb1787dcd56b 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java @@ -191,6 +191,7 @@ public Set> optionalOptions() { options.add(MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); options.add(ENCODE_IGNORE_NULL_FIELDS); + options.add(DECODE_JSON_PARSER_ENABLED); return options; } @@ -202,6 +203,7 @@ public Set> forwardOptions() { options.add(MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); options.add(ENCODE_IGNORE_NULL_FIELDS); + options.add(DECODE_JSON_PARSER_ENABLED); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java index 4430203b28e46..f118343609961 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java @@ -125,6 +125,13 @@ void testLowerCaseOptionForMapNullKeyMode() { testSchemaDeserializationSchema(tableOptions); } + @Test + void testDecodeJsonParseEnabled() { + testJsonParserConfiguration(true, JsonParserRowDataDeserializationSchema.class); + + testJsonParserConfiguration(false, JsonRowDataDeserializationSchema.class); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -229,6 +236,38 @@ private Map getAllOptions() { options.put("json.map-null-key.literal", "null"); options.put("json.encode.decimal-as-plain-number", "true"); options.put("json.encode.ignore-null-fields", "true"); + options.put("json.decode.json-parser.enabled", "true"); return options; } + + private void testJsonParserConfiguration(boolean enabled, Class expectedClass) { + Map options = + getModifyOptions( + opt -> opt.put("json.decode.json-parser.enabled", String.valueOf(enabled))); + + DeserializationSchema actualDeser = + createTableSource(options) + .valueFormat + .createRuntimeDecoder( + ScanRuntimeProviderContext.INSTANCE, + SCHEMA.toPhysicalRowDataType()); + + DeserializationSchema expectedDeser = + enabled + ? new JsonParserRowDataDeserializationSchema( + PHYSICAL_TYPE, + InternalTypeInfo.of(PHYSICAL_TYPE), + false, + true, + TimestampFormat.ISO_8601) + : new JsonRowDataDeserializationSchema( + PHYSICAL_TYPE, + InternalTypeInfo.of(PHYSICAL_TYPE), + false, + true, + TimestampFormat.ISO_8601); + + assertThat(actualDeser).isInstanceOf(expectedClass); + assertThat(actualDeser).isEqualTo(expectedDeser); + } }