From 9d2c88485a2b7bd7610d761ac158c76c57a10385 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mindaugas=20Ni=C5=BEauskas?= Date: Wed, 11 Oct 2023 06:45:17 +0300 Subject: [PATCH 1/4] create fetch_size config --- tap_cassandra/client.py | 12 +++++------- tap_cassandra/tap.py | 7 +++++++ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/tap_cassandra/client.py b/tap_cassandra/client.py index 2a02719..849abb1 100644 --- a/tap_cassandra/client.py +++ b/tap_cassandra/client.py @@ -16,8 +16,6 @@ from cassandra.query import dict_factory, SimpleStatement -BATCH_SIZE = 10000 - class CassandraConnector: """Connects to the Cassandra source.""" @@ -178,10 +176,10 @@ def get_fully_qualified_name( return delimiter.join(parts) @staticmethod - def query_statement(cql): + def query_statement(cql, fetch_size): """Create a simple query statement with batch size defined.""" - return SimpleStatement(cql, fetch_size=BATCH_SIZE) + return SimpleStatement(cql, fetch_size=fetch_size) def _is_connected(self): """Method to check if connection to Cassandra cluster.""" @@ -198,7 +196,7 @@ def execute(self, query): """Method to execute the query and return the output.""" try: - res = self.session.execute(self.query_statement(query)) + res = self.session.execute(self.query_statement(query, self.config.get('fetch_size'))) while res.has_more_pages or res.current_rows: batch = res.current_rows self.logger.info(f'{len(batch)} row(s) fetched.') @@ -235,7 +233,7 @@ def discover_catalog_entry( ''' # Initialize columns list - for row in self.session.execute(self.query_statement(schema_query)): + for row in self.session.execute(self.query_statement(schema_query, self.config.get('fetch_size'))): row_column_name = row['column_name'] dtype = row['type'] @@ -313,7 +311,7 @@ def discover_catalog_entries(self) -> list[dict]: from system_schema.tables where keyspace_name = '{self.config.get('keyspace')}' ''' - for table in self.session.execute(self.query_statement(table_query)): + for table in self.session.execute(self.query_statement(table_query, self.config.get('fetch_size'))): catalog_entry = self.discover_catalog_entry(table['table_name']) result.append(catalog_entry.to_dict()) diff --git a/tap_cassandra/tap.py b/tap_cassandra/tap.py index 0d2544c..b0e6782 100644 --- a/tap_cassandra/tap.py +++ b/tap_cassandra/tap.py @@ -87,6 +87,13 @@ class TapCassandra(SQLTap): default=65, description="The maximum version of the native protocol to use.", ), + th.Property( + "fetch_size", + th.IntegerType, + required=False, + default=10000, + description="The fetch size when syncing data from Cassandra.", + ), ).to_dict() @property From 01b126a3612d93483d4fd423f94d4ea8fcb49533 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mindaugas=20Ni=C5=BEauskas?= Date: Fri, 29 Dec 2023 08:50:14 +0200 Subject: [PATCH 2/4] add fetch_size to readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 30dc10f..f5673b2 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ pipx install git+https://github.com/datarts-tech/tap-cassandra.git@main | reconnect_delay | False | 60 | Floating point number of seconds to wait inbetween each attempt. | | max_attempts | False | 5 | Should be a total number of attempts to be made before giving up. | | protocol_version | False | 65 | The maximum version of the native protocol to use. | +| fetch_size | False | 10000 | The fetch size when syncing data from Cassandra. | | stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | stream_map_config | False | None | User-defined config values to be used within map expressions. | | flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | From dc7eb740d40eacd49b80e9e4e2ed54af282d7ba4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mindaugas=20Ni=C5=BEauskas?= Date: Fri, 29 Dec 2023 08:52:20 +0200 Subject: [PATCH 3/4] rm fetch_size from readme --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index f5673b2..30dc10f 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,6 @@ pipx install git+https://github.com/datarts-tech/tap-cassandra.git@main | reconnect_delay | False | 60 | Floating point number of seconds to wait inbetween each attempt. | | max_attempts | False | 5 | Should be a total number of attempts to be made before giving up. | | protocol_version | False | 65 | The maximum version of the native protocol to use. | -| fetch_size | False | 10000 | The fetch size when syncing data from Cassandra. | | stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | stream_map_config | False | None | User-defined config values to be used within map expressions. | | flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | From 0ee429a7031966231eb8d05cff1f48d7d463b817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mindaugas=20Ni=C5=BEauskas?= Date: Fri, 29 Dec 2023 08:54:58 +0200 Subject: [PATCH 4/4] add fetch_size to readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 30dc10f..f5673b2 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ pipx install git+https://github.com/datarts-tech/tap-cassandra.git@main | reconnect_delay | False | 60 | Floating point number of seconds to wait inbetween each attempt. | | max_attempts | False | 5 | Should be a total number of attempts to be made before giving up. | | protocol_version | False | 65 | The maximum version of the native protocol to use. | +| fetch_size | False | 10000 | The fetch size when syncing data from Cassandra. | | stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | | stream_map_config | False | None | User-defined config values to be used within map expressions. | | flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |