From 6517a067a78d1af5080490b67cd8f23ae35f705b Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Thu, 19 Dec 2019 14:29:39 -0800 Subject: [PATCH 1/8] Support non existent topics in KCBQ. --- .../bigquery/BigQuerySinkConnector.java | 42 ++-------- .../connect/bigquery/BigQuerySinkTask.java | 12 ++- .../bigquery/config/BigQuerySinkConfig.java | 36 +++++++- .../config/BigQuerySinkConnectorConfig.java | 82 ------------------- .../bigquery/BigQuerySinkConnectorTest.java | 9 +- .../SinkConnectorPropertiesFactory.java | 6 +- .../BigQuerySinkConnectorConfigTest.java | 53 ------------ .../convert/BigQueryRecordConverterTest.java | 8 +- 8 files changed, 64 insertions(+), 184 deletions(-) delete mode 100644 kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfig.java delete mode 100644 kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfigTest.java diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java index 7593e51cd..82b174bf1 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java @@ -24,7 +24,6 @@ import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; -import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConnectorConfig; import com.wepay.kafka.connect.bigquery.convert.SchemaConverter; @@ -76,7 +75,7 @@ public BigQuerySinkConnector() { this.testSchemaManager = schemaManager; } - private BigQuerySinkConnectorConfig config; + private BigQuerySinkConfig config; private Map configProperties; private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConnector.class); @@ -108,50 +107,25 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) { return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName); } - private void ensureExistingTables( - BigQuery bigQuery, - SchemaManager schemaManager, - Map topicsToTableIds) { - for (Map.Entry topicToTableId : topicsToTableIds.entrySet()) { - String topic = topicToTableId.getKey(); - TableId tableId = topicToTableId.getValue(); - if (bigQuery.getTable(tableId) == null) { - logger.info("Table {} does not exist; attempting to create", tableId); - schemaManager.createTable(tableId, topic); - } - } - } - - private void ensureExistingTables( - BigQuery bigQuery, - Map topicsToTableIds) { + private void ensureExistingTables() { + BigQuery bigQuery = getBigQuery(); + Map topicsToTableIds = TopicToTableResolver.getTopicsToTables(config); for (TableId tableId : topicsToTableIds.values()) { - if (bigQuery.getTable(tableId) == null) { + if (!config.getBoolean(config.TABLE_CREATE_CONFIG) && bigQuery.getTable(tableId) == null) { logger.warn( - "You may want to enable auto table creation by setting {}=true in the properties file", - config.TABLE_CREATE_CONFIG); + "You may want to enable auto table creation by setting {}=true in the properties file", + config.TABLE_CREATE_CONFIG); throw new BigQueryConnectException("Table '" + tableId + "' does not exist"); } } } - private void ensureExistingTables() { - BigQuery bigQuery = getBigQuery(); - Map topicsToTableIds = TopicToTableResolver.getTopicsToTables(config); - if (config.getBoolean(config.TABLE_CREATE_CONFIG)) { - SchemaManager schemaManager = getSchemaManager(bigQuery); - ensureExistingTables(bigQuery, schemaManager, topicsToTableIds); - } else { - ensureExistingTables(bigQuery, topicsToTableIds); - } - } - @Override public void start(Map properties) { logger.trace("connector.start()"); try { configProperties = properties; - config = new BigQuerySinkConnectorConfig(properties); + config = new BigQuerySinkConfig(properties); } catch (ConfigException err) { throw new SinkConfigConnectException( "Couldn't start BigQuerySinkConnector due to configuration error", diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index c9bf38c0e..9101a547a 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -127,7 +127,7 @@ public void flush(Map offsets) { topicPartitionManager.resumeAll(); } - private PartitionedTableId getRecordTable(SinkRecord record) { + private PartitionedTableId getRecordTable(boolean autoCreateTables, SinkRecord record) { // Dynamically update topicToBaseTableIds mapping. topicToBaseTableIds was used to be // constructed when connector starts hence new topic configuration needed connector to restart. // Dynamic update shall not require connector restart and shall compute table id in runtime. @@ -137,6 +137,12 @@ private PartitionedTableId getRecordTable(SinkRecord record) { TableId baseTableId = topicsToBaseTableIds.get(record.topic()); + BigQuery bigQuery = getBigQuery(); + if (autoCreateTables && bigQuery.getTable(baseTableId) == null) { + getSchemaManager(bigQuery).createTable(baseTableId, record.topic()); + logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic()); + } + PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId); if (useMessageTimeDatePartitioning) { if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) { @@ -182,9 +188,11 @@ public void put(Collection records) { // create tableWriters Map tableWriterBuilders = new HashMap<>(); + boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG); + for (SinkRecord record : records) { if (record.value() != null) { - PartitionedTableId table = getRecordTable(record); + PartitionedTableId table = getRecordTable(autoCreateTables, record); if (schemaRetriever != null) { schemaRetriever.setLastSeenSchema(table.getBaseTableId(), record.topic(), diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index 08e2da703..8df075ea2 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -32,6 +32,8 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.sink.SinkConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -52,6 +54,7 @@ public class BigQuerySinkConfig extends AbstractConfig { private static final ConfigDef config; private static final Validator validator = new Validator(); + private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class); // Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33 public static final String TOPICS_CONFIG = SinkConnector.TOPICS_CONFIG; @@ -209,6 +212,13 @@ public class BigQuerySinkConfig extends AbstractConfig { "If true, no fields in any produced BigQuery schema will be REQUIRED. All " + "non-nullable avro fields will be translated as NULLABLE (or REPEATED, if arrays)."; + public static final String TABLE_CREATE_CONFIG = "autoCreateTables"; + private static final ConfigDef.Type TABLE_CREATE_TYPE = ConfigDef.Type.BOOLEAN; + public static final boolean TABLE_CREATE_DEFAULT = false; + private static final ConfigDef.Importance TABLE_CREATE_IMPORTANCE = ConfigDef.Importance.HIGH; + private static final String TABLE_CREATE_DOC = + "Automatically create BigQuery tables if they don't already exist"; + static { config = new ConfigDef() .define( @@ -324,7 +334,13 @@ public class BigQuerySinkConfig extends AbstractConfig { CONVERT_DOUBLE_SPECIAL_VALUES_DEFAULT, CONVERT_DOUBLE_SPECIAL_VALUES_IMPORTANCE, CONVERT_DOUBLE_SPECIAL_VALUES_DOC - ); + ).define( + TABLE_CREATE_CONFIG, + TABLE_CREATE_TYPE, + TABLE_CREATE_DEFAULT, + TABLE_CREATE_IMPORTANCE, + TABLE_CREATE_DOC + ); } @SuppressWarnings("unchecked") @@ -608,6 +624,23 @@ private void verifyBucketSpecified() throws ConfigException { } } + private void checkAutoCreateTables() { + Class schemaRetriever = getClass(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG); + + boolean autoCreateTables = getBoolean(TABLE_CREATE_CONFIG); + if (autoCreateTables && schemaRetriever == null) { + throw new ConfigException( + "Cannot specify automatic table creation without a schema retriever" + ); + } + + if (schemaRetriever == null) { + logger.warn( + "No schema retriever class provided; auto table creation is impossible" + ); + } + } + /** * Return the ConfigDef object used to define this config's fields. * @@ -625,6 +658,7 @@ protected BigQuerySinkConfig(ConfigDef config, Map properties) { public BigQuerySinkConfig(Map properties) { super(config, properties); verifyBucketSpecified(); + checkAutoCreateTables(); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfig.java deleted file mode 100644 index 3d8c8b95a..000000000 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfig.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.wepay.kafka.connect.bigquery.config; - -/* - * Copyright 2016 WePay, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Class for connector-specific configuration properties. - */ -public class BigQuerySinkConnectorConfig extends BigQuerySinkConfig { - private static final ConfigDef config; - private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConnectorConfig.class); - - public static final String TABLE_CREATE_CONFIG = "autoCreateTables"; - private static final ConfigDef.Type TABLE_CREATE_TYPE = ConfigDef.Type.BOOLEAN; - public static final boolean TABLE_CREATE_DEFAULT = false; - private static final ConfigDef.Importance TABLE_CREATE_IMPORTANCE = ConfigDef.Importance.HIGH; - private static final String TABLE_CREATE_DOC = - "Automatically create BigQuery tables if they don't already exist"; - - static { - config = BigQuerySinkConfig.getConfig() - .define( - TABLE_CREATE_CONFIG, - TABLE_CREATE_TYPE, - TABLE_CREATE_DEFAULT, - TABLE_CREATE_IMPORTANCE, - TABLE_CREATE_DOC - ); - } - - private void checkAutoCreateTables() { - Class schemaRetriever = getClass(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG); - - boolean autoCreateTables = getBoolean(TABLE_CREATE_CONFIG); - if (autoCreateTables && schemaRetriever == null) { - throw new ConfigException( - "Cannot specify automatic table creation without a schema retriever" - ); - } - - if (schemaRetriever == null) { - logger.warn( - "No schema retriever class provided; auto table creation is impossible" - ); - } - } - - public static ConfigDef getConfig() { - return config; - } - - /** - * @param properties A Map detailing configuration properties and their respective values. - */ - public BigQuerySinkConnectorConfig(Map properties) { - super(config, properties); - checkAutoCreateTables(); - } -} diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java index 0c127a117..82740c5ea 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java @@ -37,7 +37,6 @@ import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; -import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConnectorConfig; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException; @@ -86,7 +85,7 @@ public void testAutoCreateTables() { final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table"); Map properties = propertiesFactory.getProperties(); - properties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "true"); + properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true"); properties.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, MockSchemaRetriever.class.getName()); properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); @@ -118,7 +117,7 @@ public void testNonAutoCreateTables() { final String[] tables = new String[] { "topic_one", "topicTwo", "TOPIC_THREE", "topic_four" }; Map properties = propertiesFactory.getProperties(); - properties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "false"); + properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); properties.put(BigQuerySinkConfig.TOPICS_CONFIG, String.join(",", topics)); @@ -150,7 +149,7 @@ public void testNonAutoCreateTablesFailure() { final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table"); Map properties = propertiesFactory.getProperties(); - properties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "false"); + properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); properties.put( @@ -216,7 +215,7 @@ public void testTaskConfigs() { @Test public void testConfig() { - assertEquals(BigQuerySinkConnectorConfig.getConfig(), new BigQuerySinkConnector().config()); + assertEquals(BigQuerySinkConfig.getConfig(), new BigQuerySinkConnector().config()); } // Make sure that a config exception is properly translated into a SinkConfigConnectException diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkConnectorPropertiesFactory.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkConnectorPropertiesFactory.java index 8da3ca467..e47dd8ac2 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkConnectorPropertiesFactory.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkConnectorPropertiesFactory.java @@ -18,7 +18,7 @@ */ -import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConnectorConfig; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import java.util.Map; @@ -27,7 +27,7 @@ public class SinkConnectorPropertiesFactory extends SinkPropertiesFactory { public Map getProperties() { Map properties = super.getProperties(); - properties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "false"); + properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); return properties; } @@ -37,7 +37,7 @@ public Map getProperties() { * * @param config The config object to test */ - public void testProperties(BigQuerySinkConnectorConfig config) { + public void testProperties(BigQuerySinkConfig config) { super.testProperties(config); config.getBoolean(config.TABLE_CREATE_CONFIG); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfigTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfigTest.java deleted file mode 100644 index a27c149e4..000000000 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfigTest.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.wepay.kafka.connect.bigquery.config; - -/* - * Copyright 2016 WePay, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import com.wepay.kafka.connect.bigquery.SinkConnectorPropertiesFactory; - -import org.apache.kafka.common.config.ConfigException; - -import org.junit.Before; -import org.junit.Test; - -import java.util.Map; - -public class BigQuerySinkConnectorConfigTest { - private SinkConnectorPropertiesFactory propertiesFactory; - - @Before - public void initializePropertiesFactory() { - propertiesFactory = new SinkConnectorPropertiesFactory(); - } - - @Test - public void metaTestBasicConfigProperties() { - Map basicConfigProperties = propertiesFactory.getProperties(); - BigQuerySinkConnectorConfig config = new BigQuerySinkConnectorConfig(basicConfigProperties); - propertiesFactory.testProperties(config); - } - - @Test(expected = ConfigException.class) - public void testAutoTableCreateWithoutRetriever() { - Map badConfigProperties = propertiesFactory.getProperties(); - badConfigProperties.remove(BigQuerySinkConnectorConfig.SCHEMA_RETRIEVER_CONFIG); - badConfigProperties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "true"); - - new BigQuerySinkConnectorConfig(badConfigProperties); - } -} diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java index 177f4cc33..2f22abaa4 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java @@ -613,8 +613,8 @@ public void testInvalidMapSchemalessNullValue() { put("f3", null); }}; - SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap); - Map stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord); + SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, false); + Map stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); Assert.assertEquals(kafkaConnectMap, stringObjectMap ); } @@ -630,9 +630,9 @@ public void testInvalidMapSchemalessNestedMapNullValue() { }}); }}; - SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap); + SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, true); Map stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE) - .convertRecord(kafkaConnectRecord); + .convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); Assert.assertEquals(kafkaConnectMap, stringObjectMap); } From 35c81ffa96873ac005cc1a4a540856f20aecacef Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Thu, 19 Dec 2019 17:38:42 -0800 Subject: [PATCH 2/8] Fix unit tests. --- .../connect/bigquery/BigQuerySinkTask.java | 8 +- .../bigquery/BigQuerySinkConnectorTest.java | 90 --------------- .../bigquery/BigQuerySinkTaskTest.java | 108 +++++++++++++++--- .../write/row/BigQueryWriterTest.java | 6 +- .../bigquery/write/row/GCSToBQWriterTest.java | 6 +- 5 files changed, 107 insertions(+), 111 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 9101a547a..048effe97 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -89,6 +89,7 @@ public class BigQuerySinkTask extends SinkTask { private final BigQuery testBigQuery; private final Storage testGcs; + private final SchemaManager testSchemaManager; private final UUID uuid = UUID.randomUUID(); private ScheduledExecutorService gcsLoadExecutor; @@ -100,6 +101,7 @@ public BigQuerySinkTask() { testBigQuery = null; schemaRetriever = null; testGcs = null; + testSchemaManager = null; } /** @@ -110,10 +112,11 @@ public BigQuerySinkTask() { * @param testGcs {@link Storage} to use for testing (likely a mock) * @see BigQuerySinkTask#BigQuerySinkTask() */ - public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever, Storage testGcs) { + public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever, Storage testGcs, SchemaManager testSchemaManager) { this.testBigQuery = testBigQuery; this.schemaRetriever = schemaRetriever; this.testGcs = testGcs; + this.testSchemaManager = testSchemaManager; } @Override @@ -256,6 +259,9 @@ private BigQuery getBigQuery() { } private SchemaManager getSchemaManager(BigQuery bigQuery) { + if (testSchemaManager != null) { + return testSchemaManager; + } schemaRetriever = config.getSchemaRetriever(); SchemaConverter schemaConverter = config.getSchemaConverter(); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java index 82740c5ea..fd946b507 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java @@ -76,96 +76,6 @@ public static void initializePropertiesFactory() { propertiesFactory = new SinkConnectorPropertiesFactory(); } - @Test - public void testAutoCreateTables() { - final String dataset = "scratch"; - final String existingTableTopic = "topic-with-existing-table"; - final String nonExistingTableTopic = "topic-without-existing-table"; - final TableId existingTable = TableId.of(dataset, "topic_with_existing_table"); - final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table"); - - Map properties = propertiesFactory.getProperties(); - properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true"); - properties.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, MockSchemaRetriever.class.getName()); - properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); - properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); - properties.put( - BigQuerySinkConfig.TOPICS_CONFIG, - String.format("%s, %s", existingTableTopic, nonExistingTableTopic) - ); - - BigQuery bigQuery = mock(BigQuery.class); - Table fakeTable = mock(Table.class); - when(bigQuery.getTable(existingTable)).thenReturn(fakeTable); - when(bigQuery.getTable(nonExistingTable)).thenReturn(null); - - SchemaManager schemaManager = mock(SchemaManager.class); - - BigQuerySinkConnector testConnector = new BigQuerySinkConnector(bigQuery, schemaManager); - testConnector.start(properties); - - verify(bigQuery).getTable(existingTable); - verify(bigQuery).getTable(nonExistingTable); - verify(schemaManager, never()).createTable(existingTable, existingTableTopic); - verify(schemaManager).createTable(nonExistingTable, nonExistingTableTopic); - } - - @Test - public void testNonAutoCreateTables() { - final String dataset = "scratch"; - final String[] topics = new String[] { "topic-one", "topicTwo", "TOPIC_THREE", "topic.four" }; - final String[] tables = new String[] { "topic_one", "topicTwo", "TOPIC_THREE", "topic_four" }; - - Map properties = propertiesFactory.getProperties(); - properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); - properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); - properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); - properties.put(BigQuerySinkConfig.TOPICS_CONFIG, String.join(",", topics)); - - BigQuery bigQuery = mock(BigQuery.class); - for (String table : tables) { - Table fakeTable = mock(Table.class); - when(bigQuery.getTable(TableId.of(dataset, table))).thenReturn(fakeTable); - } - - SchemaManager schemaManager = mock(SchemaManager.class); - - BigQuerySinkConnector testConnector = new BigQuerySinkConnector(bigQuery); - testConnector.start(properties); - - verify(schemaManager, never()).createTable(any(TableId.class), any(String.class)); - - for (String table : tables) { - verify(bigQuery).getTable(TableId.of(dataset, table)); - } - } - - @Test(expected = BigQueryConnectException.class) - public void testNonAutoCreateTablesFailure() { - final String dataset = "scratch"; - final String existingTableTopic = "topic-with-existing-table"; - final String nonExistingTableTopic = "topic-without-existing-table"; - final TableId existingTable = TableId.of(dataset, "topic_with_existing_table"); - final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table"); - - Map properties = propertiesFactory.getProperties(); - properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); - properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); - properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); - properties.put( - BigQuerySinkConfig.TOPICS_CONFIG, - String.format("%s, %s", existingTableTopic, nonExistingTableTopic) - ); - - BigQuery bigQuery = mock(BigQuery.class); - Table fakeTable = mock(Table.class); - when(bigQuery.getTable(existingTable)).thenReturn(fakeTable); - when(bigQuery.getTable(nonExistingTable)).thenReturn(null); - - BigQuerySinkConnector testConnector = new BigQuerySinkConnector(bigQuery); - testConnector.start(properties); - } - @Test public void testTaskClass() { assertEquals(BigQuerySinkTask.class, new BigQuerySinkConnector().taskClass()); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 4fefede72..75000901e 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryError; @@ -34,6 +35,7 @@ import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.Table; import com.google.cloud.storage.Storage; import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; @@ -85,7 +87,7 @@ public void testSimplePut() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -112,7 +114,7 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() { SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -123,13 +125,91 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() { any(String.class), any(Schema.class)); } + @Test + public void testAutoCreateTables() { + final String dataset = "scratch"; + final String existingTableTopic = "topic-with-existing-table"; + final String nonExistingTableTopic = "topic-without-existing-table"; + final TableId existingTable = TableId.of(dataset, "topic_with_existing_table"); + final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table"); + + Map properties = propertiesFactory.getProperties(); + properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true"); + properties.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, BigQuerySinkConnectorTest.MockSchemaRetriever.class.getName()); + properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); + properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); + properties.put(BigQuerySinkConfig.TOPICS_CONFIG, existingTableTopic); + + BigQuery bigQuery = mock(BigQuery.class); + Table fakeTable = mock(Table.class); + when(bigQuery.getTable(existingTable)).thenReturn(fakeTable); + when(bigQuery.getTable(nonExistingTable)).thenReturn(null); + InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); + when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); + when(insertAllResponse.hasErrors()).thenReturn(false); + + Storage storage = mock(Storage.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + + testTask.put(Collections.singletonList(spoofSinkRecord(nonExistingTableTopic))); + testTask.flush(Collections.emptyMap()); + + verify(schemaManager, never()).createTable(existingTable, existingTableTopic); + verify(schemaManager).createTable(nonExistingTable, nonExistingTableTopic); + } + + @Test + public void testNonAutoCreateTables() { + final String dataset = "scratch"; + final String existingTableTopic = "topic-with-existing-table"; + final String nonExistingTableTopic = "topic-without-existing-table"; + final TableId existingTable = TableId.of(dataset, "topic_with_existing_table"); + final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table"); + + Map properties = propertiesFactory.getProperties(); + properties.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, BigQuerySinkConnectorTest.MockSchemaRetriever.class.getName()); + properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); + properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); + properties.put(BigQuerySinkConfig.TOPICS_CONFIG, existingTableTopic); + + BigQuery bigQuery = mock(BigQuery.class); + Table fakeTable = mock(Table.class); + when(bigQuery.getTable(existingTable)).thenReturn(fakeTable); + when(bigQuery.getTable(nonExistingTable)).thenReturn(null); + InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); + when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); + when(insertAllResponse.hasErrors()).thenReturn(false); + + Storage storage = mock(Storage.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + + testTask.put(Collections.singletonList(spoofSinkRecord(nonExistingTableTopic))); + testTask.flush(Collections.emptyMap()); + + verify(schemaManager, never()).createTable(existingTable, existingTableTopic); + verify(schemaManager, never()).createTable(nonExistingTable, existingTableTopic); + } + + @Test public void testEmptyPut() { Map properties = propertiesFactory.getProperties(); BigQuery bigQuery = mock(BigQuery.class); Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.start(properties); testTask.put(Collections.emptyList()); @@ -148,7 +228,7 @@ public void testEmptyRecordPut() { BigQuery bigQuery = mock(BigQuery.class); Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.start(properties); SinkRecord emptyRecord = spoofSinkRecord(topic, simpleSchema, null); @@ -175,7 +255,7 @@ public void testPutWhenPartitioningOnMessageTime() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -206,7 +286,7 @@ public void testPutWhenPartitioningOnMessageTimeWhenNoTimestampType() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -232,7 +312,7 @@ public void testBufferClearOnFlushError() { .thenThrow(new RuntimeException("This is a test")); SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -253,7 +333,7 @@ public void testEmptyFlush() { Storage storage = mock(Storage.class); SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -284,7 +364,7 @@ public void testBigQuery5XXRetry() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.singletonList(spoofSinkRecord(topic))); @@ -318,7 +398,7 @@ public void testBigQuery403Retry() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.singletonList(spoofSinkRecord(topic))); @@ -349,7 +429,7 @@ public void testBigQueryRetryExceeded() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.singletonList(spoofSinkRecord(topic))); @@ -374,7 +454,7 @@ public void testInterruptedException() { when(bigQuery.insertAll(any(InsertAllRequest.class))).thenReturn(fakeResponse); SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -393,7 +473,7 @@ public void testConfigException() { badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG); BigQuerySinkTask testTask = - new BigQuerySinkTask(mock(BigQuery.class), null, mock(Storage.class)); + new BigQuerySinkTask(mock(BigQuery.class), null, mock(Storage.class), null); testTask.start(badProperties); } @@ -421,7 +501,7 @@ public void testStop() { when(insertAllResponse.hasErrors()).thenReturn(false); Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.singletonList(spoofSinkRecord(topic))); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java index 5ff18335d..d33287640 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java @@ -85,7 +85,7 @@ public void testBigQueryNoFailure() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( @@ -133,7 +133,7 @@ public void testBigQueryPartialFailure() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(sinkRecordList); @@ -182,7 +182,7 @@ public void testBigQueryCompleteFailure() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(sinkRecordList); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java index a8c8486c3..a381f860f 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java @@ -68,7 +68,7 @@ public void testGCSNoFailure(){ Storage storage = mock(Storage.class); SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( @@ -94,7 +94,7 @@ public void testGCSSomeFailures(){ .thenThrow(new StorageException(500, "internal server error")) // throw first time .thenReturn(null); // return second time. (we don't care about the result.) - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( @@ -119,7 +119,7 @@ public void testGCSAllFailures(){ when(storage.create((BlobInfo)anyObject(), (byte[])anyObject())) .thenThrow(new StorageException(500, "internal server error")); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( From d4201c0e32904692377fbe589da37d930f5aba9a Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Fri, 20 Dec 2019 10:57:37 -0800 Subject: [PATCH 3/8] Add missed param to doc. --- .../java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 048effe97..f183d5d6f 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -110,6 +110,7 @@ public BigQuerySinkTask() { * @param testBigQuery {@link BigQuery} to use for testing (likely a mock) * @param schemaRetriever {@link SchemaRetriever} to use for testing (likely a mock) * @param testGcs {@link Storage} to use for testing (likely a mock) + * @param testSchemaManager {@link SchemaManager} to use for testing (likely a mock) * @see BigQuerySinkTask#BigQuerySinkTask() */ public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever, Storage testGcs, SchemaManager testSchemaManager) { From 414f6a844018a23ef38a28a90548a8de9ccd1f6a Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Fri, 20 Dec 2019 14:27:23 -0800 Subject: [PATCH 4/8] Make changes according to comments. --- .../wepay/kafka/connect/bigquery/BigQuerySinkConnector.java | 6 ++++-- .../com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java index 82b174bf1..af0786eb6 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java @@ -111,7 +111,7 @@ private void ensureExistingTables() { BigQuery bigQuery = getBigQuery(); Map topicsToTableIds = TopicToTableResolver.getTopicsToTables(config); for (TableId tableId : topicsToTableIds.values()) { - if (!config.getBoolean(config.TABLE_CREATE_CONFIG) && bigQuery.getTable(tableId) == null) { + if (bigQuery.getTable(tableId) == null) { logger.warn( "You may want to enable auto table creation by setting {}=true in the properties file", config.TABLE_CREATE_CONFIG); @@ -133,7 +133,9 @@ public void start(Map properties) { ); } - ensureExistingTables(); + if (!config.getBoolean(config.TABLE_CREATE_CONFIG)) { + ensureExistingTables(); + } } @Override diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index f183d5d6f..30fb1898c 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -131,7 +131,7 @@ public void flush(Map offsets) { topicPartitionManager.resumeAll(); } - private PartitionedTableId getRecordTable(boolean autoCreateTables, SinkRecord record) { + private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateTables) { // Dynamically update topicToBaseTableIds mapping. topicToBaseTableIds was used to be // constructed when connector starts hence new topic configuration needed connector to restart. // Dynamic update shall not require connector restart and shall compute table id in runtime. @@ -196,7 +196,7 @@ public void put(Collection records) { for (SinkRecord record : records) { if (record.value() != null) { - PartitionedTableId table = getRecordTable(autoCreateTables, record); + PartitionedTableId table = getRecordTable(record, autoCreateTables); if (schemaRetriever != null) { schemaRetriever.setLastSeenSchema(table.getBaseTableId(), record.topic(), From 8cb0857d768db197d53194ed022a98307ef95ae3 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Fri, 20 Dec 2019 15:20:37 -0800 Subject: [PATCH 5/8] Make changes according to comments. --- .../bigquery/BigQuerySinkConnector.java | 4 ++-- .../connect/bigquery/BigQuerySinkTask.java | 20 ++++++++++++++----- .../bigquery/config/BigQuerySinkConfig.java | 11 +++------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java index af0786eb6..1986b709a 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java @@ -113,8 +113,8 @@ private void ensureExistingTables() { for (TableId tableId : topicsToTableIds.values()) { if (bigQuery.getTable(tableId) == null) { logger.warn( - "You may want to enable auto table creation by setting {}=true in the properties file", - config.TABLE_CREATE_CONFIG); + "You may want to enable auto table creation by setting {}=true in the properties file", + config.TABLE_CREATE_CONFIG); throw new BigQueryConnectException("Table '" + tableId + "' does not exist"); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 30fb1898c..a744692c7 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -141,11 +141,7 @@ private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateT TableId baseTableId = topicsToBaseTableIds.get(record.topic()); - BigQuery bigQuery = getBigQuery(); - if (autoCreateTables && bigQuery.getTable(baseTableId) == null) { - getSchemaManager(bigQuery).createTable(baseTableId, record.topic()); - logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic()); - } + maybeCreateTable(record, baseTableId, autoCreateTables); PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId); if (useMessageTimeDatePartitioning) { @@ -162,6 +158,20 @@ private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateT return builder.build(); } + /** + * Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true. + * @param record Kafka Sink Record to be streamed into BigQuery. + * @param baseTableId BaseTableId in BigQuery. + * @param autoCreateTables If this config is set to true, auto-creating the table that doesn't not exist. + */ + private void maybeCreateTable(SinkRecord record, TableId baseTableId, boolean autoCreateTables) { + BigQuery bigQuery = getBigQuery(); + if (autoCreateTables && bigQuery.getTable(baseTableId) == null) { + getSchemaManager(bigQuery).createTable(baseTableId, record.topic()); + logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic()); + } + } + private RowToInsert getRecordRow(SinkRecord record) { Map convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); Optional kafkaKeyFieldName = config.getKafkaKeyFieldName(); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index 8df075ea2..4700005b9 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -625,18 +625,13 @@ private void verifyBucketSpecified() throws ConfigException { } private void checkAutoCreateTables() { - Class schemaRetriever = getClass(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG); + Class schemaRetriever = getClass(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG); boolean autoCreateTables = getBoolean(TABLE_CREATE_CONFIG); + if (autoCreateTables && schemaRetriever == null) { throw new ConfigException( - "Cannot specify automatic table creation without a schema retriever" - ); - } - - if (schemaRetriever == null) { - logger.warn( - "No schema retriever class provided; auto table creation is impossible" + "Cannot specify automatic table creation without a schema retriever" ); } } From e810f7991ba4109b124d95e5f199594ff5944a5f Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Thu, 2 Jan 2020 11:30:42 -0800 Subject: [PATCH 6/8] Avoid passing an extra autoCreateTables to getRecordTable function. --- .../kafka/connect/bigquery/BigQuerySinkTask.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index a744692c7..da452b4fa 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -131,7 +131,7 @@ public void flush(Map offsets) { topicPartitionManager.resumeAll(); } - private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateTables) { + private PartitionedTableId getRecordTable(SinkRecord record) { // Dynamically update topicToBaseTableIds mapping. topicToBaseTableIds was used to be // constructed when connector starts hence new topic configuration needed connector to restart. // Dynamic update shall not require connector restart and shall compute table id in runtime. @@ -141,7 +141,7 @@ private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateT TableId baseTableId = topicsToBaseTableIds.get(record.topic()); - maybeCreateTable(record, baseTableId, autoCreateTables); + maybeCreateTable(record, baseTableId); PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId); if (useMessageTimeDatePartitioning) { @@ -162,10 +162,10 @@ private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateT * Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true. * @param record Kafka Sink Record to be streamed into BigQuery. * @param baseTableId BaseTableId in BigQuery. - * @param autoCreateTables If this config is set to true, auto-creating the table that doesn't not exist. */ - private void maybeCreateTable(SinkRecord record, TableId baseTableId, boolean autoCreateTables) { + private void maybeCreateTable(SinkRecord record, TableId baseTableId) { BigQuery bigQuery = getBigQuery(); + boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG); if (autoCreateTables && bigQuery.getTable(baseTableId) == null) { getSchemaManager(bigQuery).createTable(baseTableId, record.topic()); logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic()); @@ -202,11 +202,9 @@ public void put(Collection records) { // create tableWriters Map tableWriterBuilders = new HashMap<>(); - boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG); - for (SinkRecord record : records) { if (record.value() != null) { - PartitionedTableId table = getRecordTable(record, autoCreateTables); + PartitionedTableId table = getRecordTable(record); if (schemaRetriever != null) { schemaRetriever.setLastSeenSchema(table.getBaseTableId(), record.topic(), From 5c6805a4bcffdeb7bee717d295bd6bec627b64cf Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Thu, 2 Jan 2020 17:15:36 -0800 Subject: [PATCH 7/8] Default autoCreateTables to be true. --- kcbq-connector/quickstart/properties/connector.properties | 1 - .../wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java | 2 +- kcbq-connector/test/resources/connector-template.properties | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/kcbq-connector/quickstart/properties/connector.properties b/kcbq-connector/quickstart/properties/connector.properties index 05ee82fb1..5eb5fc3b3 100644 --- a/kcbq-connector/quickstart/properties/connector.properties +++ b/kcbq-connector/quickstart/properties/connector.properties @@ -20,7 +20,6 @@ tasks.max=1 topics=kcbq-quickstart sanitizeTopics=true -autoCreateTables=true autoUpdateSchemas=true schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index 4700005b9..d045f1b63 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -214,7 +214,7 @@ public class BigQuerySinkConfig extends AbstractConfig { public static final String TABLE_CREATE_CONFIG = "autoCreateTables"; private static final ConfigDef.Type TABLE_CREATE_TYPE = ConfigDef.Type.BOOLEAN; - public static final boolean TABLE_CREATE_DEFAULT = false; + public static final boolean TABLE_CREATE_DEFAULT = true; private static final ConfigDef.Importance TABLE_CREATE_IMPORTANCE = ConfigDef.Importance.HIGH; private static final String TABLE_CREATE_DOC = "Automatically create BigQuery tables if they don't already exist"; diff --git a/kcbq-connector/test/resources/connector-template.properties b/kcbq-connector/test/resources/connector-template.properties index b4503ddbe..20897982c 100644 --- a/kcbq-connector/test/resources/connector-template.properties +++ b/kcbq-connector/test/resources/connector-template.properties @@ -17,7 +17,6 @@ name=bigquery-connector connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector tasks.max=1 -autoCreateTables=true autoUpdateSchemas=true sanitizeTopics=true From 40494712947ca23045c0902b5e15f05db51ec2c5 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Thu, 2 Jan 2020 18:05:07 -0800 Subject: [PATCH 8/8] Fix unit tests. --- .../com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java | 1 + .../wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java index a87e1cb64..3daa31315 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java @@ -33,6 +33,7 @@ public class SinkPropertiesFactory { public Map getProperties() { Map properties = new HashMap<>(); + properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); properties.put(BigQuerySinkConfig.TOPICS_CONFIG, "kcbq-test"); properties.put(BigQuerySinkConfig.PROJECT_CONFIG, "test-project"); properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=test"); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java index cda1d6c94..453e05a7c 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkTaskPropertiesFactory.java @@ -18,6 +18,7 @@ */ +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import java.util.Map; @@ -28,6 +29,7 @@ public Map getProperties() { Map properties = super.getProperties(); properties.put(BigQuerySinkTaskConfig.SCHEMA_UPDATE_CONFIG, "false"); + properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false"); return properties; }