Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create fetch_size config #10

Merged
merged 4 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pipx install git+https://github.com/datarts-tech/tap-cassandra.git@main
| reconnect_delay | False | 60 | Floating point number of seconds to wait inbetween each attempt. |
| max_attempts | False | 5 | Should be a total number of attempts to be made before giving up. |
| protocol_version | False | 65 | The maximum version of the native protocol to use. |
| fetch_size | False | 10000 | The fetch size when syncing data from Cassandra. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
Expand Down
12 changes: 5 additions & 7 deletions tap_cassandra/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from cassandra.query import dict_factory, SimpleStatement


BATCH_SIZE = 10000

class CassandraConnector:
"""Connects to the Cassandra source."""

Expand Down Expand Up @@ -178,10 +176,10 @@ def get_fully_qualified_name(
return delimiter.join(parts)

@staticmethod
def query_statement(cql):
def query_statement(cql, fetch_size):
"""Create a simple query statement with batch size defined."""

return SimpleStatement(cql, fetch_size=BATCH_SIZE)
return SimpleStatement(cql, fetch_size=fetch_size)

def _is_connected(self):
"""Method to check if connection to Cassandra cluster."""
Expand All @@ -198,7 +196,7 @@ def execute(self, query):
"""Method to execute the query and return the output."""

try:
res = self.session.execute(self.query_statement(query))
res = self.session.execute(self.query_statement(query, self.config.get('fetch_size')))
while res.has_more_pages or res.current_rows:
batch = res.current_rows
self.logger.info(f'{len(batch)} row(s) fetched.')
Expand Down Expand Up @@ -235,7 +233,7 @@ def discover_catalog_entry(
'''

# Initialize columns list
for row in self.session.execute(self.query_statement(schema_query)):
for row in self.session.execute(self.query_statement(schema_query, self.config.get('fetch_size'))):
row_column_name = row['column_name']

dtype = row['type']
Expand Down Expand Up @@ -313,7 +311,7 @@ def discover_catalog_entries(self) -> list[dict]:
from system_schema.tables
where keyspace_name = '{self.config.get('keyspace')}'
'''
for table in self.session.execute(self.query_statement(table_query)):
for table in self.session.execute(self.query_statement(table_query, self.config.get('fetch_size'))):
catalog_entry = self.discover_catalog_entry(table['table_name'])
result.append(catalog_entry.to_dict())

Expand Down
7 changes: 7 additions & 0 deletions tap_cassandra/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ class TapCassandra(SQLTap):
default=65,
description="The maximum version of the native protocol to use.",
),
th.Property(
"fetch_size",
th.IntegerType,
required=False,
default=10000,
description="The fetch size when syncing data from Cassandra.",
),
).to_dict()

@property
Expand Down
Loading