Skip to content

Commit f9a2c92

Browse files
committed
[FLINK-34467] add lineage integration for jdbc connector
1 parent 140f179 commit f9a2c92

File tree

57 files changed

+1480
-16
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+1480
-16
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ scalastyle-output.xml
77
.metadata
88
.settings
99
.project
10+
.java-version
1011
.version.properties
1112
filter.properties
1213
logs.zip
@@ -27,7 +28,7 @@ out/
2728
/docs/api
2829
/docs/.bundle
2930
/docs/.rubydeps
30-
/docs/ruby2/.bundle
31+
/docs/ruby2/.bundleo
3132
/docs/ruby2/.rubydeps
3233
/docs/.jekyll-metadata
3334
*.ipr
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()> calls method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> in (JdbcSource.java:215)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2+
org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
3+
org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
4+
org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

flink-connector-jdbc-core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ under the License.
5656
<optional>true</optional>
5757
</dependency>
5858

59+
<dependency>
60+
<groupId>io.openlineage</groupId>
61+
<artifactId>openlineage-sql-java</artifactId>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>io.openlineage</groupId>
66+
<artifactId>openlineage-java</artifactId>
67+
</dependency>
68+
5969
<!-- Tests -->
6070

6171
<dependency>

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@
2424
import org.apache.flink.api.common.io.InputFormat;
2525
import org.apache.flink.api.common.io.RichInputFormat;
2626
import org.apache.flink.api.common.io.statistics.BaseStatistics;
27+
import org.apache.flink.api.connector.source.Boundedness;
2728
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.configuration.Configuration;
3031
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
3132
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
3233
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3334
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
35+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
36+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3437
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3538
import org.apache.flink.core.io.GenericInputSplit;
3639
import org.apache.flink.core.io.InputSplit;
3740
import org.apache.flink.core.io.InputSplitAssigner;
41+
import org.apache.flink.streaming.api.lineage.LineageDataset;
42+
import org.apache.flink.streaming.api.lineage.LineageVertex;
43+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3844
import org.apache.flink.types.Row;
3945
import org.apache.flink.util.Preconditions;
4046

@@ -53,6 +59,8 @@
5359
import java.sql.Time;
5460
import java.sql.Timestamp;
5561
import java.util.Arrays;
62+
import java.util.Collections;
63+
import java.util.Optional;
5664

5765
/**
5866
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
@@ -107,7 +115,7 @@
107115
@Deprecated
108116
@Experimental
109117
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
110-
implements ResultTypeQueryable<Row> {
118+
implements LineageVertexProvider, ResultTypeQueryable<Row> {
111119

112120
protected static final long serialVersionUID = 2L;
113121
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
@@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() {
344352
return new JdbcInputFormatBuilder();
345353
}
346354

355+
@Override
356+
public LineageVertex getLineageVertex() {
357+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
358+
new DefaultTypeDatasetFacet(getProducedType());
359+
Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate, true);
360+
String namespace = LineageUtils.namespaceOf(connectionProvider);
361+
LineageDataset dataset =
362+
LineageUtils.datasetOf(
363+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
364+
return LineageUtils.sourceLineageVertexOf(
365+
Boundedness.BOUNDED, Collections.singleton(dataset));
366+
}
367+
347368
/** Builder for {@link JdbcInputFormat}. */
348369
public static class JdbcInputFormatBuilder {
349370
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,16 @@
3737
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3838
import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
3939
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
40+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
4041
import org.apache.flink.core.io.SimpleVersionedSerializer;
42+
import org.apache.flink.streaming.api.lineage.LineageDataset;
43+
import org.apache.flink.streaming.api.lineage.LineageVertex;
44+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
4145

4246
import java.io.IOException;
4347
import java.util.Collection;
4448
import java.util.Collections;
49+
import java.util.Optional;
4550

4651
/**
4752
* Flink Sink to produce data into a jdbc database.
@@ -50,7 +55,8 @@
5055
*/
5156
@PublicEvolving
5257
public class JdbcSink<IN>
53-
implements Sink<IN>,
58+
implements LineageVertexProvider,
59+
Sink<IN>,
5460
SupportsWriterState<IN, JdbcWriterState>,
5561
SupportsCommitter<JdbcCommitable> {
5662

@@ -120,4 +126,13 @@ public JdbcWriter<IN> restoreWriter(
120126
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
121127
return new JdbcWriterStateSerializer();
122128
}
129+
130+
@Override
131+
public LineageVertex getLineageVertex() {
132+
Optional<String> nameOpt = LineageUtils.tableNameOf(queryStatement.query(), false);
133+
String namespace = LineageUtils.namespaceOf(connectionProvider);
134+
LineageDataset dataset =
135+
LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList());
136+
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
137+
}
123138
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,35 @@
3535
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
3636
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
3737
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
38+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
3839
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
3940
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
4041
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
4142
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
4243
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
4344
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
45+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
46+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
4447
import org.apache.flink.core.io.SimpleVersionedSerializer;
48+
import org.apache.flink.streaming.api.lineage.LineageDataset;
49+
import org.apache.flink.streaming.api.lineage.LineageVertex;
50+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
4551
import org.apache.flink.util.Preconditions;
4652

4753
import javax.annotation.Nullable;
4854

4955
import java.io.Serializable;
5056
import java.util.ArrayList;
57+
import java.util.Arrays;
58+
import java.util.Collections;
5159
import java.util.Objects;
60+
import java.util.Optional;
5261

5362
/** JDBC source. */
5463
@PublicEvolving
5564
public class JdbcSource<OUT>
56-
implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
65+
implements LineageVertexProvider,
66+
Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
5767
ResultTypeQueryable<OUT> {
5868

5969
private final Boundedness boundedness;
@@ -195,4 +205,18 @@ public boolean equals(Object o) {
195205
&& deliveryGuarantee == that.deliveryGuarantee
196206
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
197207
}
208+
209+
@Override
210+
public LineageVertex getLineageVertex() {
211+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
212+
new DefaultTypeDatasetFacet(getTypeInformation());
213+
SqlTemplateSplitEnumerator enumerator =
214+
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
215+
Optional<String> nameOpt = LineageUtils.tableNameOf(enumerator.getSqlTemplate(), true);
216+
String namespace = LineageUtils.namespaceOf(connectionProvider);
217+
LineageDataset dataset =
218+
LineageUtils.datasetOf(
219+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
220+
return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset));
221+
}
198222
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,22 @@
2424
import org.apache.flink.api.common.io.RichInputFormat;
2525
import org.apache.flink.api.common.io.statistics.BaseStatistics;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.api.connector.source.Boundedness;
2728
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2829
import org.apache.flink.configuration.Configuration;
2930
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
3031
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
3132
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3233
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
34+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
35+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3336
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3437
import org.apache.flink.core.io.GenericInputSplit;
3538
import org.apache.flink.core.io.InputSplit;
3639
import org.apache.flink.core.io.InputSplitAssigner;
40+
import org.apache.flink.streaming.api.lineage.LineageDataset;
41+
import org.apache.flink.streaming.api.lineage.LineageVertex;
42+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3743
import org.apache.flink.table.data.RowData;
3844
import org.apache.flink.util.Preconditions;
3945

@@ -51,11 +57,13 @@
5157
import java.sql.Time;
5258
import java.sql.Timestamp;
5359
import java.util.Arrays;
60+
import java.util.Collections;
61+
import java.util.Optional;
5462

5563
/** InputFormat for {@link JdbcDynamicTableSource}. */
5664
@Internal
5765
public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
58-
implements ResultTypeQueryable<RowData> {
66+
implements LineageVertexProvider, ResultTypeQueryable<RowData> {
5967

6068
private static final long serialVersionUID = 2L;
6169
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
@@ -296,6 +304,19 @@ public static Builder builder() {
296304
return new Builder();
297305
}
298306

307+
@Override
308+
public LineageVertex getLineageVertex() {
309+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
310+
new DefaultTypeDatasetFacet(getProducedType());
311+
Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate, true);
312+
String namespace = LineageUtils.namespaceOf(connectionProvider);
313+
LineageDataset dataset =
314+
LineageUtils.datasetOf(
315+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
316+
return LineageUtils.sourceLineageVertexOf(
317+
Boundedness.BOUNDED, Collections.singleton(dataset));
318+
}
319+
299320
/** Builder for {@link JdbcRowDataInputFormat}. */
300321
public static class Builder {
301322
private JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,26 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.annotation.VisibleForTesting;
23+
import org.apache.flink.api.connector.source.Boundedness;
2324
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
2425
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
2526
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
2627
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
2728
import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
29+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
30+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
2831
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
32+
import org.apache.flink.streaming.api.lineage.LineageDataset;
33+
import org.apache.flink.streaming.api.lineage.LineageVertex;
34+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
2935
import org.apache.flink.table.data.RowData;
3036
import org.apache.flink.table.functions.FunctionContext;
3137
import org.apache.flink.table.functions.LookupFunction;
3238
import org.apache.flink.table.types.DataType;
3339
import org.apache.flink.table.types.logical.LogicalType;
3440
import org.apache.flink.table.types.logical.RowType;
41+
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
42+
import org.apache.flink.table.types.utils.TypeConversions;
3543

3644
import org.slf4j.Logger;
3745
import org.slf4j.LoggerFactory;
@@ -46,14 +54,15 @@
4654
import java.util.Collection;
4755
import java.util.Collections;
4856
import java.util.List;
57+
import java.util.Optional;
4958
import java.util.stream.Collectors;
5059

5160
import static org.apache.flink.util.Preconditions.checkArgument;
5261
import static org.apache.flink.util.Preconditions.checkNotNull;
5362

5463
/** A lookup function for {@link JdbcDynamicTableSource}. */
5564
@Internal
56-
public class JdbcRowDataLookupFunction extends LookupFunction {
65+
public class JdbcRowDataLookupFunction extends LookupFunction implements LineageVertexProvider {
5766

5867
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
5968
private static final long serialVersionUID = 2L;
@@ -67,6 +76,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction {
6776

6877
private final List<String> resolvedPredicates;
6978
private final Serializable[] pushdownParams;
79+
private final RowType producedType;
7080

7181
private transient FieldNamedPreparedStatement statement;
7282

@@ -106,12 +116,12 @@ public JdbcRowDataLookupFunction(
106116
.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
107117
JdbcDialect jdbcDialect = options.getDialect();
108118
this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType);
109-
this.lookupKeyRowConverter =
110-
jdbcDialect.getRowConverter(
111-
RowType.of(
112-
Arrays.stream(keyTypes)
113-
.map(DataType::getLogicalType)
114-
.toArray(LogicalType[]::new)));
119+
this.producedType =
120+
RowType.of(
121+
Arrays.stream(keyTypes)
122+
.map(DataType::getLogicalType)
123+
.toArray(LogicalType[]::new));
124+
this.lookupKeyRowConverter = jdbcDialect.getRowConverter(producedType);
115125
this.resolvedPredicates = resolvedPredicates;
116126
this.pushdownParams = pushdownParams;
117127
}
@@ -224,4 +234,19 @@ public void close() throws IOException {
224234
public Connection getDbConnection() {
225235
return connectionProvider.getConnection();
226236
}
237+
238+
@Override
239+
public LineageVertex getLineageVertex() {
240+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
241+
new DefaultTypeDatasetFacet(
242+
LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
243+
TypeConversions.fromLogicalToDataType(producedType)));
244+
Optional<String> nameOpt = LineageUtils.tableNameOf(query, true);
245+
String namespace = LineageUtils.namespaceOf(connectionProvider);
246+
LineageDataset dataset =
247+
LineageUtils.datasetOf(
248+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
249+
return LineageUtils.sourceLineageVertexOf(
250+
Boundedness.BOUNDED, Collections.singleton(dataset));
251+
}
227252
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,8 @@ public Connection reestablishConnection() throws SQLException, ClassNotFoundExce
149149
closeConnection();
150150
return getOrEstablishConnection();
151151
}
152+
153+
public String getDbURL() {
154+
return this.jdbcOptions.getDbURL();
155+
}
152156
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.runtime.state.FunctionSnapshotContext;
2828
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
2929
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
30+
import org.apache.flink.streaming.api.lineage.LineageVertex;
31+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3032
import org.apache.flink.util.Preconditions;
3133

3234
import javax.annotation.Nonnull;
@@ -36,7 +38,7 @@
3638
/** A generic SinkFunction for JDBC. */
3739
@Internal
3840
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
39-
implements CheckpointedFunction, InputTypeConfigurable {
41+
implements LineageVertexProvider, CheckpointedFunction, InputTypeConfigurable {
4042
private final JdbcOutputFormat<T, ?, ?> outputFormat;
4143
private JdbcOutputSerializer<T> serializer;
4244

@@ -78,4 +80,9 @@ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfi
7880
((TypeInformation<T>) type)
7981
.createSerializer(executionConfig.getSerializerConfig()));
8082
}
83+
84+
@Override
85+
public LineageVertex getLineageVertex() {
86+
return outputFormat.getLineageVertex();
87+
}
8188
}

0 commit comments

Comments
 (0)