From 9d5d909f3c8ae8856b5708c8bade864effc21b6c Mon Sep 17 00:00:00 2001 From: Borna Almasi Date: Mon, 8 Aug 2022 11:41:04 -0400 Subject: [PATCH] [bugfix] Limit number of clustering keys to 4 (BQ limit) (#133) * :bug: Added limit of 4 for the number of clustering keys defined by a stream's primary keys * :memo: Added documentation on clustering behaviour * :bookmark: Bump version to 1.4.1 --- CHANGELOG.md | 4 ++ README.md | 12 ++++ setup.py | 2 +- target_bigquery/db_sync.py | 18 +++-- ...le_with_multi_pk_cluster_beyond_limit.json | 6 ++ .../table_with_multi_pk_cluster_changed.json | 5 +- ...h_multi_pk_cluster_changed_pk_removed.json | 5 ++ .../test_target_bigquery_cluster.py | 65 +++++++++++++++---- 8 files changed, 96 insertions(+), 21 deletions(-) create mode 100644 tests/integration/resources/table_with_multi_pk_cluster_beyond_limit.json create mode 100644 tests/integration/resources/table_with_multi_pk_cluster_changed_pk_removed.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 673cbde..5c1c8eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +1.4.1 (2022-08-02) +------------------- +- Fix: Instead of failing, use only the first 4 primary keys when clustering if more than 5 primary keys are defined by the tap stream's schema. + 1.4.0 (2022-07-22) ------------------- - Refactoring code to make it easier to add improvements diff --git a/README.md b/README.md index f38e7f2..f0a8cb0 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,18 @@ data only after the change. you need to resync the table. +#### Column clustering + +This target tries to speed up the querying of the resulting tables by clustering the +columns in each table by the primary key of the stream. + +The choice and ordering of the clustering keys are defined in the same order as the +`key_properties` columns in the stream's `SCHEMA` messages. + +Bigquery places a limit on the number of clustering keys (4 as of 2022-08-02), so if the +number of clustering keys is greater than 4, this target will simply use the first 4 +columns defined in `key_properties` property. + ### To run tests: 1. Define environment variables that requires running the tests diff --git a/setup.py b/setup.py index 2c2d745..d561ccf 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name="pipelinewise-target-bigquery", - version="1.4.0", + version="1.4.1", description="Singer.io target for loading data to BigQuery - PipelineWise compatible", long_description=long_description, long_description_content_type='text/markdown', diff --git a/target_bigquery/db_sync.py b/target_bigquery/db_sync.py index 2ffae8d..fe45d63 100644 --- a/target_bigquery/db_sync.py +++ b/target_bigquery/db_sync.py @@ -17,6 +17,7 @@ logger = singer.get_logger() +BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT = 4 PRECISION = 38 SCALE = 9 getcontext().prec = PRECISION @@ -525,13 +526,20 @@ def update_clustering_fields(self): new_clustering_fields = [ self.renamed_columns.get(c, c) for c in primary_column_names(self.stream_schema_message) ] - if new_clustering_fields and not table.clustering_fields: - logger.info('Clustering table on fields: {}'.format(new_clustering_fields)) - table.clustering_fields = new_clustering_fields + + new_clustering_fields_limited = new_clustering_fields[0:BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT] + + if len(new_clustering_fields) > BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT: + logger.info(f"The number of clustering fields ({len(new_clustering_fields)}) is greater than the limit of {BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT} allowed by BigQuery. Using only the first {BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT} columns to define clustering keys as ordered by the stream schema's 'key_properties' property.") + + if new_clustering_fields_limited and not table.clustering_fields: + logger.info('Clustering table on fields: {}'.format(new_clustering_fields_limited)) + table.clustering_fields = new_clustering_fields_limited self.client.update_table(table, ['clustering_fields']) + # avoid changing existing clusters so its possible to manually change clustering of a table outside of this target - elif table.clustering_fields != new_clustering_fields: - logger.info('Primary key fields have changed. Uncluster the table to allow the change: {}'.format(new_clustering_fields)) + elif table.clustering_fields != new_clustering_fields_limited: + logger.info('Primary key fields have changed. Uncluster the table to allow the change: {}'.format(new_clustering_fields_limited)) def version_column(self, field, stream): column = sql_utils.safe_column_name(field.name, quotes=False) diff --git a/tests/integration/resources/table_with_multi_pk_cluster_beyond_limit.json b/tests/integration/resources/table_with_multi_pk_cluster_beyond_limit.json new file mode 100644 index 0000000..07e3a3d --- /dev/null +++ b/tests/integration/resources/table_with_multi_pk_cluster_beyond_limit.json @@ -0,0 +1,6 @@ +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_cluster_multi", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_int_2": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_int_3": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk", "c_varchar", "c_int", "c_int_2", "c_int_3"]} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_cluster_multi", "version": 3} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_int_2": 22, "c_int_3": 222, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_int_2": 33, "c_int_3": 333, "c_date": "2019-02-15 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}} diff --git a/tests/integration/resources/table_with_multi_pk_cluster_changed.json b/tests/integration/resources/table_with_multi_pk_cluster_changed.json index d8c9d24..ca15f85 100644 --- a/tests/integration/resources/table_with_multi_pk_cluster_changed.json +++ b/tests/integration/resources/table_with_multi_pk_cluster_changed.json @@ -1,5 +1,6 @@ {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}} -{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_cluster_multi", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": []} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_cluster_multi", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_int_2": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_int_3": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk", "c_varchar", "c_date"]} {"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_cluster_multi", "version": 3} -{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_int_2": 22, "c_int_3": 222, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_int_2": 33, "c_int_3": 333, "c_date": "2019-02-15 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} {"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}} diff --git a/tests/integration/resources/table_with_multi_pk_cluster_changed_pk_removed.json b/tests/integration/resources/table_with_multi_pk_cluster_changed_pk_removed.json new file mode 100644 index 0000000..d8c9d24 --- /dev/null +++ b/tests/integration/resources/table_with_multi_pk_cluster_changed_pk_removed.json @@ -0,0 +1,5 @@ +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}} +{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_cluster_multi", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": []} +{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_cluster_multi", "version": 3} +{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} +{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}} diff --git a/tests/integration/test_target_bigquery_cluster.py b/tests/integration/test_target_bigquery_cluster.py index 47d523b..9da0abc 100644 --- a/tests/integration/test_target_bigquery_cluster.py +++ b/tests/integration/test_target_bigquery_cluster.py @@ -1,11 +1,6 @@ import datetime -import json -import os -import unittest.mock as mock from datetime import timezone -from decimal import Decimal, getcontext -import target_bigquery from target_bigquery.db_sync import DbSync, PRECISION try: @@ -46,7 +41,7 @@ def test_table_with_pk_adds_clustering(self): cluster_columns = query(bigquery, "SELECT clustering_ordinal_position, column_name FROM {}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'test_table_cluster' AND clustering_ordinal_position > 0 ORDER BY 1".format(target_schema)) # ---------------------------------------------------------------------- - # Check rows in table + # Check that rows in the stream are present # ---------------------------------------------------------------------- expected_table = [ {'c_pk': 2, 'c_int': 2, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 12, 2, 0, 0, tzinfo=timezone.utc)}, @@ -61,7 +56,7 @@ def test_table_with_pk_adds_clustering(self): self.assertEqual(cluster_columns, expected_cluster_columns) # ---------------------------------------------------------------------- - # Change the primary key and check if clustering stayed unchanged + # Change the primary key and expect that clustering stays unchanged # ---------------------------------------------------------------------- tap_lines = test_utils.get_test_tap_lines('table_with_pk_cluster_changed.json') self.persist_lines(tap_lines) @@ -77,8 +72,54 @@ def test_table_with_pk_adds_clustering(self): self.assertEqual(self.remove_metadata_columns_from_rows(table_changed), expected_table_changed) self.assertEqual(cluster_columns_changed, expected_cluster_columns) + + def test_table_with_pk_limits_clustering_keys(self): + """Tests table with a primary key gets clustered on those fields, up to the + maximum number of clustering keys allowed by bigquery""" + + tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster_beyond_limit.json') + self.persist_lines(tap_lines) + + # Get loaded rows from tables + bigquery = DbSync(self.config) + target_schema = self.config.get('default_target_schema', '') + table = query(bigquery, "SELECT * FROM {}.test_table_cluster_multi ORDER BY c_pk".format(target_schema)) + cluster_columns = query(bigquery, "SELECT clustering_ordinal_position, column_name FROM {}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'test_table_cluster_multi' AND clustering_ordinal_position > 0 ORDER BY 1".format(target_schema)) + + # ---------------------------------------------------------------------- + # Check that rows in the stream are present and clustered on the first 4 keys + # ---------------------------------------------------------------------- + expected_table = [ + {'c_pk': 2, 'c_int': 2, 'c_int_2': 22, 'c_int_3': 222, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 12, 2, 0, 0, tzinfo=timezone.utc)}, + {'c_pk': 3, 'c_int': 3, 'c_int_2': 33, 'c_int_3': 333, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 15, 2, 0, 0, tzinfo=timezone.utc)} + ] + + expected_cluster_columns = [ + {'clustering_ordinal_position': 1, 'column_name': 'c_pk'}, + {'clustering_ordinal_position': 2, 'column_name': 'c_varchar'}, + {'clustering_ordinal_position': 3, 'column_name': 'c_int'}, + {'clustering_ordinal_position': 4, 'column_name': 'c_int_2'}, + ] + + self.assertEqual(self.remove_metadata_columns_from_rows(table), expected_table) + self.assertEqual(cluster_columns, expected_cluster_columns) + + # ---------------------------------------------------------------------- + # Change the primary key and expect that clustering stays unchanged + # ---------------------------------------------------------------------- + + tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster_changed.json') + self.persist_lines(tap_lines) + + table_changed = query(bigquery, "SELECT * FROM {}.test_table_cluster_multi ORDER BY c_pk".format(target_schema)) + cluster_columns_changed = query(bigquery, "SELECT clustering_ordinal_position, column_name FROM {}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'test_table_cluster_multi' AND clustering_ordinal_position > 0 ORDER BY 1".format(target_schema)) + + self.assertEqual(self.remove_metadata_columns_from_rows(table_changed), expected_table) + self.assertEqual(cluster_columns_changed, expected_cluster_columns) + + def test_table_with_pk_multi_column_removed(self): - """Test table with a pk with multiple columns gets clustered by those and removing the pk doesnt cause errors""" + """Test table with a pk with multiple columns gets clustered by those and removing the pk doesn't cause errors""" tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster.json') self.persist_lines(tap_lines) @@ -89,7 +130,7 @@ def test_table_with_pk_multi_column_removed(self): cluster_columns = query(bigquery, "SELECT clustering_ordinal_position, column_name FROM {}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'test_table_cluster_multi' AND clustering_ordinal_position > 0 ORDER BY 1".format(target_schema)) # ---------------------------------------------------------------------- - # Check rows in table + # Check that rows in the stream are present and clustered on the primary keys # ---------------------------------------------------------------------- expected_table = [ {'c_pk': 2, 'c_int': 2, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 12, 2, 0, 0, tzinfo=timezone.utc)}, @@ -105,10 +146,10 @@ def test_table_with_pk_multi_column_removed(self): self.assertEqual(cluster_columns, expected_cluster_columns) # ---------------------------------------------------------------------- - # Remove the primary key and check if clustering stayed unchanged + # Remove the primary key and expect that clustering stays unchanged # ---------------------------------------------------------------------- self.config['primary_key_required'] = False - tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster_changed.json') + tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster_changed_pk_removed.json') self.persist_lines(tap_lines) table_changed = query(bigquery, "SELECT * FROM {}.test_table_cluster_multi ORDER BY c_pk".format(target_schema)) @@ -120,7 +161,5 @@ def test_table_with_pk_multi_column_removed(self): {'c_pk': 3, 'c_int': 3, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 15, 2, 0, 0, tzinfo=timezone.utc)} ] - expected_cluster_columns_changed = [] - self.assertEqual(self.remove_metadata_columns_from_rows(table_changed), expected_table_changed) self.assertEqual(cluster_columns_changed, expected_cluster_columns)