Skip to content

Commit

Permalink
Merge pull request #10 from datarts-tech/parametrize_batch_size
Browse files Browse the repository at this point in the history
create fetch_size config
  • Loading branch information
MindaugasN authored Dec 29, 2023
2 parents 5154e58 + 0ee429a commit 867fd23
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pipx install git+https://github.com/datarts-tech/tap-cassandra.git@main
| reconnect_delay | False | 60 | Floating point number of seconds to wait inbetween each attempt. |
| max_attempts | False | 5 | Should be a total number of attempts to be made before giving up. |
| protocol_version | False | 65 | The maximum version of the native protocol to use. |
| fetch_size | False | 10000 | The fetch size when syncing data from Cassandra. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
Expand Down
12 changes: 5 additions & 7 deletions tap_cassandra/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from cassandra.query import dict_factory, SimpleStatement


BATCH_SIZE = 10000

class CassandraConnector:
"""Connects to the Cassandra source."""

Expand Down Expand Up @@ -178,10 +176,10 @@ def get_fully_qualified_name(
return delimiter.join(parts)

@staticmethod
def query_statement(cql):
def query_statement(cql, fetch_size):
"""Create a simple query statement with batch size defined."""

return SimpleStatement(cql, fetch_size=BATCH_SIZE)
return SimpleStatement(cql, fetch_size=fetch_size)

def _is_connected(self):
"""Method to check if connection to Cassandra cluster."""
Expand All @@ -198,7 +196,7 @@ def execute(self, query):
"""Method to execute the query and return the output."""

try:
res = self.session.execute(self.query_statement(query))
res = self.session.execute(self.query_statement(query, self.config.get('fetch_size')))
while res.has_more_pages or res.current_rows:
batch = res.current_rows
self.logger.info(f'{len(batch)} row(s) fetched.')
Expand Down Expand Up @@ -235,7 +233,7 @@ def discover_catalog_entry(
'''

# Initialize columns list
for row in self.session.execute(self.query_statement(schema_query)):
for row in self.session.execute(self.query_statement(schema_query, self.config.get('fetch_size'))):
row_column_name = row['column_name']

dtype = row['type']
Expand Down Expand Up @@ -313,7 +311,7 @@ def discover_catalog_entries(self) -> list[dict]:
from system_schema.tables
where keyspace_name = '{self.config.get('keyspace')}'
'''
for table in self.session.execute(self.query_statement(table_query)):
for table in self.session.execute(self.query_statement(table_query, self.config.get('fetch_size'))):
catalog_entry = self.discover_catalog_entry(table['table_name'])
result.append(catalog_entry.to_dict())

Expand Down
7 changes: 7 additions & 0 deletions tap_cassandra/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ class TapCassandra(SQLTap):
default=65,
description="The maximum version of the native protocol to use.",
),
th.Property(
"fetch_size",
th.IntegerType,
required=False,
default=10000,
description="The fetch size when syncing data from Cassandra.",
),
).to_dict()

@property
Expand Down

0 comments on commit 867fd23

Please sign in to comment.