From 015c272721df5095bfb1e0c2dc188f27739782cf Mon Sep 17 00:00:00 2001 From: Zachary Povey Date: Tue, 12 May 2020 18:02:45 +0100 Subject: [PATCH 1/2] Only list gcs folder blobs in GCSToBQLoadRunnable --- .../com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java | 4 +++- .../wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index bf4b43d6d..8bf1065ff 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -350,6 +350,7 @@ private void startGCSToBQLoadTask() { logger.info("Attempting to start GCS Load Executor."); gcsLoadExecutor = Executors.newScheduledThreadPool(1); String bucketName = config.getString(config.GCS_BUCKET_NAME_CONFIG); + String folderName = config.getString(config.GCS_FOLDER_NAME_CONFIG); Storage gcs = getGcs(); // get the bucket, or create it if it does not exist. Bucket bucket = gcs.get(bucketName); @@ -359,7 +360,8 @@ private void startGCSToBQLoadTask() { BucketInfo bucketInfo = BucketInfo.of(bucketName); bucket = gcs.create(bucketInfo); } - GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket); + Storage.BlobListOption folderPrefixOption = Storage.BlobListOption.prefix(folderName); + GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket, folderPrefixOption); int intervalSec = config.getInt(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG); gcsLoadExecutor.scheduleAtFixedRate(loadRunnable, intervalSec, intervalSec, TimeUnit.SECONDS); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java index 348fccb29..74ab476ab 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java @@ -29,6 +29,7 @@ import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.Storage.BlobListOption; import com.wepay.kafka.connect.bigquery.write.row.GCSToBQWriter; @@ -58,6 +59,7 @@ public class GCSToBQLoadRunnable implements Runnable { private final BigQuery bigQuery; private final Bucket bucket; + private final BlobListOption folderPrefixOption; private final Map> activeJobs; private final Set claimedBlobIds; private final Set deletableBlobIds; @@ -79,9 +81,10 @@ public class GCSToBQLoadRunnable implements Runnable { * @param bigQuery the {@link BigQuery} instance. * @param bucket the the GCS bucket to read from. */ - public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket) { + public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket, BlobListOption folderPrefixOption) { this.bigQuery = bigQuery; this.bucket = bucket; + this.folderPrefixOption = folderPrefixOption; this.activeJobs = new HashMap<>(); this.claimedBlobIds = new HashSet<>(); this.deletableBlobIds = new HashSet<>(); @@ -101,7 +104,7 @@ private Map> getBlobsUpToLimit() { Map tableToCurrentLoadSize = new HashMap<>(); logger.trace("Starting GCS bucket list"); - Page list = bucket.list(); + Page list = bucket.list(folderPrefixOption); logger.trace("Finished GCS bucket list"); for (Blob blob : list.iterateAll()) { From 90e63b61255102ccc5f2a8403abf64327252b3d8 Mon Sep 17 00:00:00 2001 From: Zachary Povey Date: Mon, 8 Jun 2020 17:20:24 +0100 Subject: [PATCH 2/2] Test gcs folder isolation --- build.gradle | 3 + .../it/BigQueryConnectorIntegrationTest.java | 11 ++ .../bigquery/it/utils/BucketClearer.java | 136 ++++++++++++++---- kcbq-connector/test/integrationtest.sh | 2 + 4 files changed, 123 insertions(+), 29 deletions(-) diff --git a/build.gradle b/build.gradle index 1a824d4e9..7bb31163d 100644 --- a/build.gradle +++ b/build.gradle @@ -164,12 +164,15 @@ project(':kcbq-connector') { } task integrationTestBucketPrep(type: JavaExec) { + dependsOn 'integrationTestTablePrep' main = 'com.wepay.kafka.connect.bigquery.it.utils.BucketClearer' classpath = sourceSets.integrationTest.runtimeClasspath args findProperty('kcbq_test_keyfile') ?: '' args findProperty('kcbq_test_project') ?: '' args findProperty('kcbq_test_bucket') ?: '' args findProperty('kcbq_test_keysource') ?: '' + args findProperty('kcbq_test_folder') ?: '' + args findProperty('kcbq_test_dataset') ?: '' } task integrationTest(type: Test) { diff --git a/kcbq-connector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/BigQueryConnectorIntegrationTest.java b/kcbq-connector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/BigQueryConnectorIntegrationTest.java index 8a6c6505d..50066798b 100644 --- a/kcbq-connector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/BigQueryConnectorIntegrationTest.java +++ b/kcbq-connector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/BigQueryConnectorIntegrationTest.java @@ -185,6 +185,11 @@ private List convertRow(List rowSchema, List row) { private List> readAllRows(String tableName) { Table table = bigQuery.getTable(dataset, tableName); + + if (table == null){ + return new ArrayList<>(); // if table doesn't exist, return empty list of rows + } + Schema schema = table.getDefinition().getSchema(); List> rows = new ArrayList<>(); @@ -216,6 +221,12 @@ public void testNull() { testRows(expectedRows, readAllRows("kcbq_test_nulls")); } + @Test + public void testAdjacentFolder() { + List populatedRows = readAllRows("kcbq_test_shouldnt_populate"); + assertEquals("Table using adjacent gcs folder shouldn't populate", 0, populatedRows.size()); + } + @Test public void testMatryoshka() { List> expectedRows = new ArrayList<>(); diff --git a/kcbq-connector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/utils/BucketClearer.java b/kcbq-connector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/utils/BucketClearer.java index b5bcb4317..4d4995dae 100644 --- a/kcbq-connector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/utils/BucketClearer.java +++ b/kcbq-connector/src/integration-test/java/com/wepay/kafka/connect/bigquery/it/utils/BucketClearer.java @@ -18,43 +18,121 @@ */ -import com.google.cloud.storage.Bucket; - -import com.google.cloud.storage.Storage; +import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; +import com.google.cloud.storage.*; +import com.google.gson.Gson; +import com.wepay.kafka.connect.bigquery.BigQueryHelper; import com.wepay.kafka.connect.bigquery.GCSBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.*; + +import static com.wepay.kafka.connect.bigquery.write.row.GCSToBQWriter.GCS_METADATA_TABLE_KEY; + public class BucketClearer { - private static final Logger logger = LoggerFactory.getLogger(BucketClearer.class); - private static String keySource; + private static final Logger logger = LoggerFactory.getLogger(BucketClearer.class); + + /** + * Clears tables in the given project and dataset, using a provided JSON service account key. + */ + public static void main(String[] args) { + if (args.length < 6) { + usage(); + } + + String keyfile = args[0]; + String project = args[1]; + String bucketName = args[2]; + String keySource = args[3]; + String folder = args[4]; + String dataset = args[5]; + + Storage gcs = new GCSBuilder(project).setKey(keyfile).setKeySource(keySource).build(); + Bucket bucket = getBucket(bucketName, gcs); + + // Empty the bucket + logger.info("Emptying bucket"); + for (Blob blob : bucket.list().iterateAll()) { + gcs.delete(blob.getBlobId()); + } + + // If using a folder, add some dummy data to an adjacent folder to ensure this doesn't get loaded + if (folder != null && !folder.equals("")) { + BlobInfo blobInAdjacentFolderInfo = getBlobInfo(project, dataset, keySource, keyfile, bucketName); + byte[] blobInAdjacentFolderContent = getBlobContent(); + gcs.create(blobInAdjacentFolderInfo, blobInAdjacentFolderContent); + } + } + + private static Bucket getBucket(String bucketName, Storage gcs) { + Bucket bucket = gcs.get(bucketName); + if (bucket == null) { + BucketInfo bucketInfo = BucketInfo.of(bucketName); + bucket = gcs.create(bucketInfo); + } + return bucket; + } + + private static BlobInfo getBlobInfo(String project, String dataset, String keySource, String keyfile, String bucketName) { + BlobId blobInAdjacentFolderId = BlobId.of(bucketName, "adjacentFolder/dataThatShouldNotLoad"); - /** - * Clears tables in the given project and dataset, using a provided JSON service account key. - */ - public static void main(String[] args) { - if (args.length < 4) { - usage(); + // create bq table + TableId tableId = TableId.of(project, dataset, "kcbq_test_shouldnt_populate"); + + Field[] fields = { + Field.of("dummy_field", LegacySQLTypeName.STRING) + }; + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(Schema.of(fields))).build(); + + BigQuery bq = new BigQueryHelper().setKeySource(keySource).connect(project, keyfile); + bq.create(tableInfo); + // --------------------- + + Map metadata = getMetadata(tableId); + + return BlobInfo.newBuilder(blobInAdjacentFolderId).setContentType("text/json").setMetadata(metadata).build(); + } + + private static Map getMetadata(TableId tableId) { + StringBuilder sb = new StringBuilder(); + if (tableId.getProject() != null) { + sb.append(tableId.getProject()).append(":"); + } + String serializedTableId = + sb.append(tableId.getDataset()).append(".").append(tableId.getTable()).toString(); + + return Collections.singletonMap(GCS_METADATA_TABLE_KEY, serializedTableId); + } + + private static byte[] getBlobContent() { + Map content = new HashMap<>(); + content.put("dummy_field", "dummy_value"); + List blobInAdjacentFolderContent = new ArrayList<>(); + blobInAdjacentFolderContent.add(RowToInsert.of(UUID.randomUUID().toString(), content)); + return toJson(blobInAdjacentFolderContent).getBytes(StandardCharsets.UTF_8); } - if (args.length == 4) { - keySource = args[3]; + + private static String toJson(List rows) { + Gson gson = new Gson(); + + StringBuilder jsonRecordsBuilder = new StringBuilder(); + for (RowToInsert row : rows) { + Map record = row.getContent(); + jsonRecordsBuilder.append(gson.toJson(record)); + jsonRecordsBuilder.append("\n"); + } + return jsonRecordsBuilder.toString(); } - Storage gcs = new GCSBuilder(args[1]).setKey(args[0]).setKeySource(keySource).build(); - - // if bucket exists, delete it. - String bucketName = args[2]; - if (gcs.delete(bucketName)) { - logger.info("Bucket {} deleted successfully", bucketName); - } else { - logger.info("Bucket {} does not exist", bucketName); + + private static void usage() { + System.err.println( + "usage: BucketClearer " + ); + System.exit(1); } - } - - private static void usage() { - System.err.println( - "usage: BucketClearer " - ); - System.exit(1); - } } diff --git a/kcbq-connector/test/integrationtest.sh b/kcbq-connector/test/integrationtest.sh index a88c5518b..8c65efd37 100755 --- a/kcbq-connector/test/integrationtest.sh +++ b/kcbq-connector/test/integrationtest.sh @@ -218,6 +218,7 @@ for file in "$BASE_DIR"/resources/test_schemas/*; do test_tables+="${test_tables:+ }kcbq_test_$(basename "${file/-/_}")" test_topics+="${test_topics:+,}kcbq_test_$(basename "$file")" done +test_tables+="${test_tables:+ }kcbq_test_shouldnt_populate" "$GRADLEW" -p "$BASE_DIR/.." \ -Pkcbq_test_keyfile="$KCBQ_TEST_KEYFILE" \ @@ -226,6 +227,7 @@ done -Pkcbq_test_tables="$test_tables" \ -Pkcbq_test_bucket="$KCBQ_TEST_BUCKET" \ -Pkcbq_test_keysource="$KCBQ_TEST_KEYSOURCE" \ + -Pkcbq_test_folder="$KCBQ_TEST_FOLDER" \ integrationTestPrep ####################################################################################################