From 6b1d4d46eb3bcfa22914931d8d40f469aacd8422 Mon Sep 17 00:00:00 2001 From: Alejandro del Castillo Date: Fri, 12 Jun 2020 14:27:48 -0500 Subject: [PATCH] add bestEffortDeduplication config option The connector is currently adding an InsertId per row, which is used by BigQuery to dedupe rows that have the same insertId (in a 1 minute window). Using insertIds throttles the ingestion rate to a maximum of 100k rows per second & 100 MB/s. Insertions without a insertId disable best effort de-duplication [1], which increases the ingestion quota to a maximum of 1 GB/s. For high throughput applications, its desirable to disable dedupe, handling duplication on the query side. [1] https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication Signed-off-by: Alejandro del Castillo --- .../connect/bigquery/BigQuerySinkTask.java | 8 +++-- .../bigquery/config/BigQuerySinkConfig.java | 17 +++++++++- .../bigquery/BigQuerySinkTaskTest.java | 32 +++++++++++++++++++ 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index bf4b43d6d..8c66a15d7 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -173,10 +173,14 @@ private RowToInsert getRecordRow(SinkRecord record) { if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) { convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord); } - return RowToInsert.of(getRowId(record), convertedRecord); + if (config.getBoolean(config.BEST_EFFORT_DEDUPLICATION_CONFIG)) { + return RowToInsert.of(getRowId(record), convertedRecord); + } else { + return RowToInsert.of(convertedRecord); + } } - private String getRowId(SinkRecord record) { + String getRowId(SinkRecord record) { return String.format("%s-%d-%d", record.topic(), record.kafkaPartition(), diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index d4362c381..06ecce9b4 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -233,6 +233,15 @@ public class BigQuerySinkConfig extends AbstractConfig { private static final String TABLE_CREATE_DOC = "Automatically create BigQuery tables if they don't already exist"; + public static final String BEST_EFFORT_DEDUPLICATION_CONFIG = "bestEffortDeduplication"; + private static final ConfigDef.Type BEST_EFFORT_DEDUPLICATION_TYPE = ConfigDef.Type.BOOLEAN; + public static final Boolean BEST_EFFORT_DEDUPLICATION_DEFAULT = true; + private static final ConfigDef.Importance BEST_EFFORT_DEDUPLICATION_IMPORTANCE = + ConfigDef.Importance.MEDIUM; + private static final String BEST_EFFORT_DEDUPLICATION_DOC = + "If false, Big Query best effort de-duplication will be disabled, which increases " + + "the streaming ingest quota, at the expense of not checking for duplicates"; + static { config = new ConfigDef() .define( @@ -365,7 +374,13 @@ public class BigQuerySinkConfig extends AbstractConfig { TABLE_CREATE_DEFAULT, TABLE_CREATE_IMPORTANCE, TABLE_CREATE_DOC - ); + ).define( + BEST_EFFORT_DEDUPLICATION_CONFIG, + BEST_EFFORT_DEDUPLICATION_TYPE, + BEST_EFFORT_DEDUPLICATION_DEFAULT, + BEST_EFFORT_DEDUPLICATION_IMPORTANCE, + BEST_EFFORT_DEDUPLICATION_DOC + ); } /** * Throw an exception if the passed-in properties do not constitute a valid sink. diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index af6547568..df9b3d8d7 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -24,6 +24,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -98,6 +99,37 @@ public void testSimplePut() { verify(bigQuery, times(1)).insertAll(any(InsertAllRequest.class)); } + @Test + public void testPutWhenBestEffortDeduplicationIsSetToFalse() { + final String topic = "test-topic"; + + Map properties = propertiesFactory.getProperties(); + properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic); + properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch"); + properties.put(BigQuerySinkTaskConfig.BEST_EFFORT_DEDUPLICATION_CONFIG, "false"); + + BigQuery bigQuery = mock(BigQuery.class); + Storage storage = mock(Storage.class); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); + + when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); + when(insertAllResponse.hasErrors()).thenReturn(false); + + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = spy(new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager)); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + + testTask.put(Collections.singletonList(spoofSinkRecord(topic))); + testTask.flush(Collections.emptyMap()); + verify(bigQuery, times(1)).insertAll(any(InsertAllRequest.class)); + verify(testTask, times(0)).getRowId(any(SinkRecord.class)); + } + @Test public void testSimplePutWhenSchemaRetrieverIsNotNull() { final String topic = "test-topic";