Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SQL column with NOT NULL constraint #581

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ com.spotify.dbeam.options.JdbcExportPipelineOptions:
--useAvroLogicalTypes=<Boolean>
Default: false
Controls whether generated Avro schema will contain logicalTypes or not.
--supportAvroNotNullTypes=<Boolean>
Default: false
Controls whether generated Avro schema will support not null types.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a breaking change? i.e. when people simply upgrade DBeam will the auto generated schema that had nullable fields become (possibly) non nullable?

We should avoid such schema auto generation breaking changes..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a breaking change?

No, by default it has the same behaviour (backwards compatible).

```

#### Input Avro schema file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public abstract class JdbcExportArgs implements Serializable {

public abstract Boolean useAvroLogicalTypes();

public abstract Boolean useAvroNotNullTypes();

public abstract Duration exportTimeout();

public abstract Optional<Schema> inputAvroSchema();
Expand All @@ -64,6 +66,8 @@ abstract static class Builder {

abstract Builder setUseAvroLogicalTypes(Boolean useAvroLogicalTypes);

abstract Builder setUseAvroNotNullTypes(Boolean useAvroNotNullTypes);

abstract Builder setExportTimeout(Duration exportTimeout);

abstract Builder setInputAvroSchema(Optional<Schema> inputAvroSchema);
Expand All @@ -81,6 +85,7 @@ static JdbcExportArgs create(
Optional.empty(),
Optional.empty(),
false,
false,
Duration.ofDays(7),
Optional.empty());
}
Expand All @@ -92,6 +97,7 @@ public static JdbcExportArgs create(
final Optional<String> avroSchemaName,
final Optional<String> avroDoc,
final Boolean useAvroLogicalTypes,
final Boolean useAvroNotNullTypes,
final Duration exportTimeout,
final Optional<Schema> inputAvroSchema) {
return new AutoValue_JdbcExportArgs.Builder()
Expand All @@ -101,6 +107,7 @@ public static JdbcExportArgs create(
.setAvroSchemaName(avroSchemaName)
.setAvroDoc(avroDoc)
.setUseAvroLogicalTypes(useAvroLogicalTypes)
.setUseAvroNotNullTypes(useAvroNotNullTypes)
.setExportTimeout(exportTimeout)
.setInputAvroSchema(inputAvroSchema)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

public class BeamJdbcAvroSchema {

private static Logger LOGGER = LoggerFactory.getLogger(BeamJdbcAvroSchema.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BeamJdbcAvroSchema.class);

/**
* Generate Avro schema by reading one row. Expose Beam metrics via a Beam PTransform.
Expand Down Expand Up @@ -90,7 +90,8 @@ private static Schema generateAvroSchema(final JdbcExportArgs args, final Connec
args.avroSchemaNamespace(),
args.avroSchemaName(),
avroDoc,
args.useAvroLogicalTypes());
args.useAvroLogicalTypes(),
args.useAvroNotNullTypes());
}

public static Optional<Schema> parseOptionalInputAvroSchemaFile(final String filename)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*-
* -\-\-
* DBeam Core
* --
* Copyright (C) 2016 - 2023 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.dbeam.avro;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;

public class FieldTypeHelper {
public static SchemaBuilder.FieldAssembler<Schema> setStringType(
final SchemaBuilder.FieldTypeBuilder<Schema> field, final boolean useNotNullTypes) {
return useNotNullTypes
? field.stringType().noDefault()
: field.unionOf().nullBuilder().endNull().and().stringType().endUnion().nullDefault();
}

public static SchemaBuilder.FieldAssembler<Schema> setIntType(
final SchemaBuilder.FieldTypeBuilder<Schema> field, final boolean useNotNullTypes) {
return useNotNullTypes
? field.intType().noDefault()
: field.unionOf().nullBuilder().endNull().and().intType().endUnion().nullDefault();
}

public static SchemaBuilder.FieldAssembler<Schema> setLongType(
final SchemaBuilder.FieldTypeBuilder<Schema> field, final boolean useNotNullTypes) {
return useNotNullTypes
? field.longType().noDefault()
: field.unionOf().nullBuilder().endNull().and().longType().endUnion().nullDefault();
}

public static SchemaBuilder.FieldAssembler<Schema> setLongLogicalType(
final SchemaBuilder.FieldTypeBuilder<Schema> field, final boolean useNotNullTypes) {
return useNotNullTypes
? field.longBuilder().prop("logicalType", "timestamp-millis").endLong().noDefault()
: field
.unionOf()
.nullBuilder()
.endNull()
.and()
.longBuilder()
.prop("logicalType", "timestamp-millis")
.endLong()
.endUnion()
.nullDefault();
}

public static SchemaBuilder.FieldAssembler<Schema> setBytesType(
final SchemaBuilder.FieldTypeBuilder<Schema> field, final boolean useNotNullTypes) {
return useNotNullTypes
? field.bytesType().noDefault()
: field.unionOf().nullBuilder().endNull().and().bytesType().endUnion().nullDefault();
}

public static SchemaBuilder.FieldAssembler<Schema> setBooleanType(
final SchemaBuilder.FieldTypeBuilder<Schema> field, final boolean useNotNullTypes) {
return useNotNullTypes
? field.booleanType().noDefault()
: field.unionOf().nullBuilder().endNull().and().booleanType().endUnion().nullDefault();
}

public static SchemaBuilder.FieldAssembler<Schema> setFloatType(
final SchemaBuilder.FieldTypeBuilder<Schema> field, final boolean useNotNullTypes) {
return useNotNullTypes
? field.floatType().noDefault()
: field.unionOf().nullBuilder().endNull().and().floatType().endUnion().nullDefault();
}

public static SchemaBuilder.FieldAssembler<Schema> setDoubleType(
final SchemaBuilder.FieldTypeBuilder<Schema> field, final boolean useNotNullTypes) {
return useNotNullTypes
? field.doubleType().noDefault()
: field.unionOf().nullBuilder().endNull().and().doubleType().endUnion().nullDefault();
}
}
103 changes: 57 additions & 46 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public static Schema createSchemaByReadingOneRow(
final String avroSchemaNamespace,
final Optional<String> schemaName,
final String avroDoc,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final boolean useNotNullTypes)
throws SQLException {
LOGGER.debug("Creating Avro schema based on the first read row from the database");
try (Statement statement = connection.createStatement()) {
Expand All @@ -81,7 +82,8 @@ public static Schema createSchemaByReadingOneRow(
connection.getMetaData().getURL(),
schemaName,
avroDoc,
useLogicalTypes);
useLogicalTypes,
useNotNullTypes);
LOGGER.info("Schema created successfully. Generated schema: {}", schema.toString());
return schema;
}
Expand All @@ -93,7 +95,8 @@ public static Schema createAvroSchema(
final String connectionUrl,
final Optional<String> maybeSchemaName,
final String avroDoc,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final boolean useNotNullTypes)
throws SQLException {

final ResultSetMetaData meta = resultSet.getMetaData();
Expand All @@ -107,7 +110,7 @@ public static Schema createAvroSchema(
.prop("tableName", tableName)
.prop("connectionUrl", connectionUrl)
.fields();
return createAvroFields(meta, builder, useLogicalTypes).endRecord();
return createAvroFields(meta, builder, useLogicalTypes, useNotNullTypes).endRecord();
}

static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException {
Expand All @@ -125,20 +128,16 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep
private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final ResultSetMetaData meta,
final SchemaBuilder.FieldAssembler<Schema> builder,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final boolean useNotNullTypes)
throws SQLException {

for (int i = 1; i <= meta.getColumnCount(); i++) {

final String columnName;
if (meta.getColumnName(i).isEmpty()) {
columnName = meta.getColumnLabel(i);
} else {
columnName = meta.getColumnName(i);
}
final String columnName = getColumnName(meta, i);

final int columnType = meta.getColumnType(i);
final String typeName = JDBCType.valueOf(columnType).getName();
final String typeName = getSqlTypeName(columnType);
final String columnClassName = meta.getColumnClassName(i);
final SchemaBuilder.FieldBuilder<Schema> field =
builder
Expand All @@ -149,23 +148,38 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
.prop("typeName", typeName)
.prop("columnClassName", columnClassName);

final SchemaBuilder.BaseTypeBuilder<
SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>
fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and();
final boolean isNullTypeSupported = isNotNullColumn(useNotNullTypes, meta.isNullable(i));

final SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>> schemaFieldAssembler =
setAvroColumnType(
columnType,
meta.getPrecision(i),
columnClassName,
useLogicalTypes,
fieldSchemaBuilder);
final SchemaBuilder.FieldTypeBuilder<Schema> fieldSchemaBuilder = field.type();

schemaFieldAssembler.endUnion().nullDefault();
setAvroColumnType(
columnType,
meta.getPrecision(i),
columnClassName,
useLogicalTypes,
isNullTypeSupported,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is no need to pass isNullTypeSupported boolean around.

if (isNullTypeSupported) {
  fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and();
} else {
  fieldSchemaBuilder = field.type();
}

// then later
if/else .endUnion().nullDefault();

Also, in that case no need for FieldTypeHelper and setAvroColumnType() can stay the way it is.. Doing so, this PR can be simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish it would be that easy.
Unfortunately, in Avro these two similar methods type() belong to unrelated classes
BaseFieldTypeBuilder and BaseTypeBuilder.

fieldSchemaBuilder);
}
return builder;
}

public static String getSqlTypeName(final int columnType) {
return JDBCType.valueOf(columnType).getName();
}

private static String getColumnName(final ResultSetMetaData meta, final int columnIndex)
throws SQLException {
return (meta.getColumnName(columnIndex).isEmpty())
? meta.getColumnLabel(columnIndex)
: meta.getColumnName(columnIndex);
}

private static boolean isNotNullColumn(
final boolean globalSettingUseNotNullTypes, final int columnNullabilityStatus) {
return globalSettingUseNotNullTypes
&& (columnNullabilityStatus == ResultSetMetaData.columnNoNulls);
}

/**
* Creates Avro field schema based on JDBC MetaData
*
Expand All @@ -176,68 +190,65 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
* <li>{@link com.mysql.cj.MysqlType }
* <li>{@link org.h2.value.TypeInfo }
* </ul>
*
*/
private static SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>
setAvroColumnType(
final int columnType,
final int precision,
final String columnClassName,
final boolean useLogicalTypes,
final SchemaBuilder.BaseTypeBuilder<
SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>
field) {
private static SchemaBuilder.FieldAssembler<Schema> setAvroColumnType(
final int columnType,
final int precision,
final String columnClassName,
final boolean useLogicalTypes,
final boolean useNotNullTypes,
final SchemaBuilder.FieldTypeBuilder<Schema> field) {
switch (columnType) {
case VARCHAR:
case CHAR:
case CLOB:
case LONGNVARCHAR:
case LONGVARCHAR:
case NCHAR:
return field.stringType();
return FieldTypeHelper.setStringType(field, useNotNullTypes);
case BIGINT:
return field.longType();
return FieldTypeHelper.setLongType(field, useNotNullTypes);
case INTEGER:
case SMALLINT:
case TINYINT:
if (Long.class.getCanonicalName().equals(columnClassName)) {
return field.longType();
return FieldTypeHelper.setLongType(field, useNotNullTypes);
} else {
return field.intType();
return FieldTypeHelper.setIntType(field, useNotNullTypes);
}
case TIMESTAMP:
case DATE:
case TIME:
case TIME_WITH_TIMEZONE:
if (useLogicalTypes) {
return field.longBuilder().prop("logicalType", "timestamp-millis").endLong();
return FieldTypeHelper.setLongLogicalType(field, useNotNullTypes);
} else {
return field.longType();
return FieldTypeHelper.setLongType(field, useNotNullTypes);
}
case BOOLEAN:
return field.booleanType();
return FieldTypeHelper.setBooleanType(field, useNotNullTypes);
case BIT:
// Note that bit types can take a param/typemod qualifying its length
// some further docs:
// https://www.postgresql.org/docs/8.2/datatype-bit.html
if (precision <= 1) {
return field.booleanType();
return FieldTypeHelper.setBooleanType(field, useNotNullTypes);
} else {
return field.bytesType();
return FieldTypeHelper.setBytesType(field, useNotNullTypes);
}
case BINARY:
case VARBINARY:
case LONGVARBINARY:
case ARRAY:
case BLOB:
return field.bytesType();
return FieldTypeHelper.setBytesType(field, useNotNullTypes);
case DOUBLE:
return field.doubleType();
return FieldTypeHelper.setDoubleType(field, useNotNullTypes);
case FLOAT:
case REAL:
return field.floatType();
return FieldTypeHelper.setFloatType(field, useNotNullTypes);
default:
return field.stringType();
return FieldTypeHelper.setStringType(field, useNotNullTypes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static JdbcExportArgs fromPipelineOptions(final PipelineOptions options)
Optional.ofNullable(exportOptions.getAvroSchemaName()),
Optional.ofNullable(exportOptions.getAvroDoc()),
exportOptions.isUseAvroLogicalTypes(),
exportOptions.isUseAvroNotNullTypes(),
Duration.parse(exportOptions.getExportTimeout()),
BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(exportOptions.getAvroSchemaFilePath()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions {

void setUseAvroLogicalTypes(Boolean value);

@Default.Boolean(false)
@Description("Controls whether generated Avro schema will contain not null types.")
Boolean isUseAvroNotNullTypes();

void setUseAvroNotNullTypes(Boolean value);

@Default.Integer(10000)
@Description("Configures JDBC Statement fetch size.")
Integer getFetchSize();
Expand Down
Loading