Skip to content

Commit

Permalink
Merge pull request #233 from wepay/DI-2995
Browse files Browse the repository at this point in the history
Support non existent topics in KCBQ.
  • Loading branch information
Bingqin Zhou authored Jan 3, 2020
2 parents 77d2b6e + 4049471 commit 755b866
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 288 deletions.
1 change: 0 additions & 1 deletion kcbq-connector/quickstart/properties/connector.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ tasks.max=1
topics=kcbq-quickstart
sanitizeTopics=true

autoCreateTables=true
autoUpdateSchemas=true

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;

import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConnectorConfig;

import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;

Expand Down Expand Up @@ -76,7 +75,7 @@ public BigQuerySinkConnector() {
this.testSchemaManager = schemaManager;
}

private BigQuerySinkConnectorConfig config;
private BigQuerySinkConfig config;
private Map<String, String> configProperties;

private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConnector.class);
Expand Down Expand Up @@ -108,58 +107,35 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
}

private void ensureExistingTables(
BigQuery bigQuery,
SchemaManager schemaManager,
Map<String, TableId> topicsToTableIds) {
for (Map.Entry<String, TableId> topicToTableId : topicsToTableIds.entrySet()) {
String topic = topicToTableId.getKey();
TableId tableId = topicToTableId.getValue();
if (bigQuery.getTable(tableId) == null) {
logger.info("Table {} does not exist; attempting to create", tableId);
schemaManager.createTable(tableId, topic);
}
}
}

private void ensureExistingTables(
BigQuery bigQuery,
Map<String, TableId> topicsToTableIds) {
private void ensureExistingTables() {
BigQuery bigQuery = getBigQuery();
Map<String, TableId> topicsToTableIds = TopicToTableResolver.getTopicsToTables(config);
for (TableId tableId : topicsToTableIds.values()) {
if (bigQuery.getTable(tableId) == null) {
logger.warn(
"You may want to enable auto table creation by setting {}=true in the properties file",
config.TABLE_CREATE_CONFIG);
"You may want to enable auto table creation by setting {}=true in the properties file",
config.TABLE_CREATE_CONFIG);
throw new BigQueryConnectException("Table '" + tableId + "' does not exist");
}
}
}

private void ensureExistingTables() {
BigQuery bigQuery = getBigQuery();
Map<String, TableId> topicsToTableIds = TopicToTableResolver.getTopicsToTables(config);
if (config.getBoolean(config.TABLE_CREATE_CONFIG)) {
SchemaManager schemaManager = getSchemaManager(bigQuery);
ensureExistingTables(bigQuery, schemaManager, topicsToTableIds);
} else {
ensureExistingTables(bigQuery, topicsToTableIds);
}
}

@Override
public void start(Map<String, String> properties) {
logger.trace("connector.start()");
try {
configProperties = properties;
config = new BigQuerySinkConnectorConfig(properties);
config = new BigQuerySinkConfig(properties);
} catch (ConfigException err) {
throw new SinkConfigConnectException(
"Couldn't start BigQuerySinkConnector due to configuration error",
err
);
}

ensureExistingTables();
if (!config.getBoolean(config.TABLE_CREATE_CONFIG)) {
ensureExistingTables();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class BigQuerySinkTask extends SinkTask {

private final BigQuery testBigQuery;
private final Storage testGcs;
private final SchemaManager testSchemaManager;

private final UUID uuid = UUID.randomUUID();
private ScheduledExecutorService gcsLoadExecutor;
Expand All @@ -100,6 +101,7 @@ public BigQuerySinkTask() {
testBigQuery = null;
schemaRetriever = null;
testGcs = null;
testSchemaManager = null;
}

/**
Expand All @@ -108,12 +110,14 @@ public BigQuerySinkTask() {
* @param testBigQuery {@link BigQuery} to use for testing (likely a mock)
* @param schemaRetriever {@link SchemaRetriever} to use for testing (likely a mock)
* @param testGcs {@link Storage} to use for testing (likely a mock)
* @param testSchemaManager {@link SchemaManager} to use for testing (likely a mock)
* @see BigQuerySinkTask#BigQuerySinkTask()
*/
public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever, Storage testGcs) {
public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever, Storage testGcs, SchemaManager testSchemaManager) {
this.testBigQuery = testBigQuery;
this.schemaRetriever = schemaRetriever;
this.testGcs = testGcs;
this.testSchemaManager = testSchemaManager;
}

@Override
Expand All @@ -137,6 +141,8 @@ private PartitionedTableId getRecordTable(SinkRecord record) {

TableId baseTableId = topicsToBaseTableIds.get(record.topic());

maybeCreateTable(record, baseTableId);

PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
if (useMessageTimeDatePartitioning) {
if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
Expand All @@ -152,6 +158,20 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
return builder.build();
}

/**
* Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true.
* @param record Kafka Sink Record to be streamed into BigQuery.
* @param baseTableId BaseTableId in BigQuery.
*/
private void maybeCreateTable(SinkRecord record, TableId baseTableId) {
BigQuery bigQuery = getBigQuery();
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
if (autoCreateTables && bigQuery.getTable(baseTableId) == null) {
getSchemaManager(bigQuery).createTable(baseTableId, record.topic());
logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic());
}
}

private RowToInsert getRecordRow(SinkRecord record) {
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
Expand Down Expand Up @@ -248,6 +268,9 @@ private BigQuery getBigQuery() {
}

private SchemaManager getSchemaManager(BigQuery bigQuery) {
if (testSchemaManager != null) {
return testSchemaManager;
}
schemaRetriever = config.getSchemaRetriever();
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter =
config.getSchemaConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.kafka.common.config.ConfigException;

import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -52,6 +54,7 @@
public class BigQuerySinkConfig extends AbstractConfig {
private static final ConfigDef config;
private static final Validator validator = new Validator();
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class);

// Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33
public static final String TOPICS_CONFIG = SinkConnector.TOPICS_CONFIG;
Expand Down Expand Up @@ -209,6 +212,13 @@ public class BigQuerySinkConfig extends AbstractConfig {
"If true, no fields in any produced BigQuery schema will be REQUIRED. All "
+ "non-nullable avro fields will be translated as NULLABLE (or REPEATED, if arrays).";

public static final String TABLE_CREATE_CONFIG = "autoCreateTables";
private static final ConfigDef.Type TABLE_CREATE_TYPE = ConfigDef.Type.BOOLEAN;
public static final boolean TABLE_CREATE_DEFAULT = true;
private static final ConfigDef.Importance TABLE_CREATE_IMPORTANCE = ConfigDef.Importance.HIGH;
private static final String TABLE_CREATE_DOC =
"Automatically create BigQuery tables if they don't already exist";

static {
config = new ConfigDef()
.define(
Expand Down Expand Up @@ -324,7 +334,13 @@ public class BigQuerySinkConfig extends AbstractConfig {
CONVERT_DOUBLE_SPECIAL_VALUES_DEFAULT,
CONVERT_DOUBLE_SPECIAL_VALUES_IMPORTANCE,
CONVERT_DOUBLE_SPECIAL_VALUES_DOC
);
).define(
TABLE_CREATE_CONFIG,
TABLE_CREATE_TYPE,
TABLE_CREATE_DEFAULT,
TABLE_CREATE_IMPORTANCE,
TABLE_CREATE_DOC
);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -608,6 +624,18 @@ private void verifyBucketSpecified() throws ConfigException {
}
}

private void checkAutoCreateTables() {

Class<?> schemaRetriever = getClass(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG);
boolean autoCreateTables = getBoolean(TABLE_CREATE_CONFIG);

if (autoCreateTables && schemaRetriever == null) {
throw new ConfigException(
"Cannot specify automatic table creation without a schema retriever"
);
}
}

/**
* Return the ConfigDef object used to define this config's fields.
*
Expand All @@ -625,6 +653,7 @@ protected BigQuerySinkConfig(ConfigDef config, Map<String, String> properties) {
public BigQuerySinkConfig(Map<String, String> properties) {
super(config, properties);
verifyBucketSpecified();
checkAutoCreateTables();
}

}

This file was deleted.

Loading

0 comments on commit 755b866

Please sign in to comment.