Skip to content

Commit

Permalink
[bugfix] Limit number of clustering keys to 4 (BQ limit) (#133)
Browse files Browse the repository at this point in the history
* 🐛 Added limit of 4 for the number of clustering keys defined by a stream's primary keys

* 📝 Added documentation on clustering behaviour

* 🔖 Bump version to 1.4.1
  • Loading branch information
balmasi authored Aug 8, 2022
1 parent 66f777f commit 9d5d909
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 21 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.4.1 (2022-08-02)
-------------------
- Fix: Instead of failing, use only the first 4 primary keys when clustering if more than 5 primary keys are defined by the tap stream's schema.

1.4.0 (2022-07-22)
-------------------
- Refactoring code to make it easier to add improvements
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ data only after the change.
you need to resync the table.


#### Column clustering

This target tries to speed up the querying of the resulting tables by clustering the
columns in each table by the primary key of the stream.

The choice and ordering of the clustering keys are defined in the same order as the
`key_properties` columns in the stream's `SCHEMA` messages.

Bigquery places a limit on the number of clustering keys (4 as of 2022-08-02), so if the
number of clustering keys is greater than 4, this target will simply use the first 4
columns defined in `key_properties` property.

### To run tests:

1. Define environment variables that requires running the tests
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name="pipelinewise-target-bigquery",
version="1.4.0",
version="1.4.1",
description="Singer.io target for loading data to BigQuery - PipelineWise compatible",
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
18 changes: 13 additions & 5 deletions target_bigquery/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

logger = singer.get_logger()

BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT = 4
PRECISION = 38
SCALE = 9
getcontext().prec = PRECISION
Expand Down Expand Up @@ -525,13 +526,20 @@ def update_clustering_fields(self):
new_clustering_fields = [
self.renamed_columns.get(c, c) for c in primary_column_names(self.stream_schema_message)
]
if new_clustering_fields and not table.clustering_fields:
logger.info('Clustering table on fields: {}'.format(new_clustering_fields))
table.clustering_fields = new_clustering_fields

new_clustering_fields_limited = new_clustering_fields[0:BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT]

if len(new_clustering_fields) > BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT:
logger.info(f"The number of clustering fields ({len(new_clustering_fields)}) is greater than the limit of {BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT} allowed by BigQuery. Using only the first {BIGQUERY_NUM_CLUSTERED_COLUMNS_LIMIT} columns to define clustering keys as ordered by the stream schema's 'key_properties' property.")

if new_clustering_fields_limited and not table.clustering_fields:
logger.info('Clustering table on fields: {}'.format(new_clustering_fields_limited))
table.clustering_fields = new_clustering_fields_limited
self.client.update_table(table, ['clustering_fields'])

# avoid changing existing clusters so its possible to manually change clustering of a table outside of this target
elif table.clustering_fields != new_clustering_fields:
logger.info('Primary key fields have changed. Uncluster the table to allow the change: {}'.format(new_clustering_fields))
elif table.clustering_fields != new_clustering_fields_limited:
logger.info('Primary key fields have changed. Uncluster the table to allow the change: {}'.format(new_clustering_fields_limited))

def version_column(self, field, stream):
column = sql_utils.safe_column_name(field.name, quotes=False)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_cluster_multi", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_int_2": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_int_3": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk", "c_varchar", "c_int", "c_int_2", "c_int_3"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_cluster_multi", "version": 3}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_int_2": 22, "c_int_3": 222, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_int_2": 33, "c_int_3": 333, "c_date": "2019-02-15 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_cluster_multi", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": []}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_cluster_multi", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_int_2": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_int_3": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk", "c_varchar", "c_date"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_cluster_multi", "version": 3}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_int_2": 22, "c_int_3": 222, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_int_2": 33, "c_int_3": 333, "c_date": "2019-02-15 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_cluster_multi", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_cluster_multi", "version": 3}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_cluster_multi", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_cluster_multi", "bookmarks": {"tap_mysql_test-test_table_cluster_multi": {"initial_full_table_complete": true}}}}
65 changes: 52 additions & 13 deletions tests/integration/test_target_bigquery_cluster.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import datetime
import json
import os
import unittest.mock as mock
from datetime import timezone
from decimal import Decimal, getcontext

import target_bigquery
from target_bigquery.db_sync import DbSync, PRECISION

try:
Expand Down Expand Up @@ -46,7 +41,7 @@ def test_table_with_pk_adds_clustering(self):
cluster_columns = query(bigquery, "SELECT clustering_ordinal_position, column_name FROM {}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'test_table_cluster' AND clustering_ordinal_position > 0 ORDER BY 1".format(target_schema))

# ----------------------------------------------------------------------
# Check rows in table
# Check that rows in the stream are present
# ----------------------------------------------------------------------
expected_table = [
{'c_pk': 2, 'c_int': 2, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 12, 2, 0, 0, tzinfo=timezone.utc)},
Expand All @@ -61,7 +56,7 @@ def test_table_with_pk_adds_clustering(self):
self.assertEqual(cluster_columns, expected_cluster_columns)

# ----------------------------------------------------------------------
# Change the primary key and check if clustering stayed unchanged
# Change the primary key and expect that clustering stays unchanged
# ----------------------------------------------------------------------
tap_lines = test_utils.get_test_tap_lines('table_with_pk_cluster_changed.json')
self.persist_lines(tap_lines)
Expand All @@ -77,8 +72,54 @@ def test_table_with_pk_adds_clustering(self):
self.assertEqual(self.remove_metadata_columns_from_rows(table_changed), expected_table_changed)
self.assertEqual(cluster_columns_changed, expected_cluster_columns)


def test_table_with_pk_limits_clustering_keys(self):
"""Tests table with a primary key gets clustered on those fields, up to the
maximum number of clustering keys allowed by bigquery"""

tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster_beyond_limit.json')
self.persist_lines(tap_lines)

# Get loaded rows from tables
bigquery = DbSync(self.config)
target_schema = self.config.get('default_target_schema', '')
table = query(bigquery, "SELECT * FROM {}.test_table_cluster_multi ORDER BY c_pk".format(target_schema))
cluster_columns = query(bigquery, "SELECT clustering_ordinal_position, column_name FROM {}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'test_table_cluster_multi' AND clustering_ordinal_position > 0 ORDER BY 1".format(target_schema))

# ----------------------------------------------------------------------
# Check that rows in the stream are present and clustered on the first 4 keys
# ----------------------------------------------------------------------
expected_table = [
{'c_pk': 2, 'c_int': 2, 'c_int_2': 22, 'c_int_3': 222, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 12, 2, 0, 0, tzinfo=timezone.utc)},
{'c_pk': 3, 'c_int': 3, 'c_int_2': 33, 'c_int_3': 333, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 15, 2, 0, 0, tzinfo=timezone.utc)}
]

expected_cluster_columns = [
{'clustering_ordinal_position': 1, 'column_name': 'c_pk'},
{'clustering_ordinal_position': 2, 'column_name': 'c_varchar'},
{'clustering_ordinal_position': 3, 'column_name': 'c_int'},
{'clustering_ordinal_position': 4, 'column_name': 'c_int_2'},
]

self.assertEqual(self.remove_metadata_columns_from_rows(table), expected_table)
self.assertEqual(cluster_columns, expected_cluster_columns)

# ----------------------------------------------------------------------
# Change the primary key and expect that clustering stays unchanged
# ----------------------------------------------------------------------

tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster_changed.json')
self.persist_lines(tap_lines)

table_changed = query(bigquery, "SELECT * FROM {}.test_table_cluster_multi ORDER BY c_pk".format(target_schema))
cluster_columns_changed = query(bigquery, "SELECT clustering_ordinal_position, column_name FROM {}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'test_table_cluster_multi' AND clustering_ordinal_position > 0 ORDER BY 1".format(target_schema))

self.assertEqual(self.remove_metadata_columns_from_rows(table_changed), expected_table)
self.assertEqual(cluster_columns_changed, expected_cluster_columns)


def test_table_with_pk_multi_column_removed(self):
"""Test table with a pk with multiple columns gets clustered by those and removing the pk doesnt cause errors"""
"""Test table with a pk with multiple columns gets clustered by those and removing the pk doesn't cause errors"""
tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster.json')
self.persist_lines(tap_lines)

Expand All @@ -89,7 +130,7 @@ def test_table_with_pk_multi_column_removed(self):
cluster_columns = query(bigquery, "SELECT clustering_ordinal_position, column_name FROM {}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = 'test_table_cluster_multi' AND clustering_ordinal_position > 0 ORDER BY 1".format(target_schema))

# ----------------------------------------------------------------------
# Check rows in table
# Check that rows in the stream are present and clustered on the primary keys
# ----------------------------------------------------------------------
expected_table = [
{'c_pk': 2, 'c_int': 2, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 12, 2, 0, 0, tzinfo=timezone.utc)},
Expand All @@ -105,10 +146,10 @@ def test_table_with_pk_multi_column_removed(self):
self.assertEqual(cluster_columns, expected_cluster_columns)

# ----------------------------------------------------------------------
# Remove the primary key and check if clustering stayed unchanged
# Remove the primary key and expect that clustering stays unchanged
# ----------------------------------------------------------------------
self.config['primary_key_required'] = False
tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster_changed.json')
tap_lines = test_utils.get_test_tap_lines('table_with_multi_pk_cluster_changed_pk_removed.json')
self.persist_lines(tap_lines)

table_changed = query(bigquery, "SELECT * FROM {}.test_table_cluster_multi ORDER BY c_pk".format(target_schema))
Expand All @@ -120,7 +161,5 @@ def test_table_with_pk_multi_column_removed(self):
{'c_pk': 3, 'c_int': 3, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 15, 2, 0, 0, tzinfo=timezone.utc)}
]

expected_cluster_columns_changed = []

self.assertEqual(self.remove_metadata_columns_from_rows(table_changed), expected_table_changed)
self.assertEqual(cluster_columns_changed, expected_cluster_columns)

0 comments on commit 9d5d909

Please sign in to comment.