Skip to content

Commit

Permalink
Merge pull request #12 from datarts-tech/introduce_try_catch_for_full…
Browse files Browse the repository at this point in the history
…_table_syncs

create tests
  • Loading branch information
MindaugasN authored Jan 11, 2024
2 parents 867fd23 + bf07eaf commit 7512935
Show file tree
Hide file tree
Showing 11 changed files with 1,130 additions and 738 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
Expand All @@ -25,6 +25,11 @@ jobs:
- name: Install dependencies
run: |
poetry install
- name: Start Cassandra Cluster
run: |
docker-compose up -d
- name: Wait Cassandra Cluster
run: sleep 60s
- name: Test with pytest
run: |
poetry run pytest
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pipx install git+https://github.com/datarts-tech/tap-cassandra.git@main
| max_attempts | False | 5 | Should be a total number of attempts to be made before giving up. |
| protocol_version | False | 65 | The maximum version of the native protocol to use. |
| fetch_size | False | 10000 | The fetch size when syncing data from Cassandra. |
| skip_hot_partitions | False | False | When set to `True` skipping partitions when faced ReadTimout or ReadFailure errors. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
Expand Down
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: '3.9'

services:
cassandra:
image: cassandra:latest
ports:
- 9042:9042
volumes:
- ~/apps/cassandra:/var/lib/cassandra
environment:
- CASSANDRA_CLUSTER_NAME=cassandra
2 changes: 2 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ plugins:
- name: reconnect_delay
- name: max_attempts
- name: protocol_version
- name: fetch_size
- name: skip_hot_partitions
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
1,601 changes: 905 additions & 696 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ keywords = [
license = "MIT"

[tool.poetry.dependencies]
python = "<3.12,>=3.7.1"
singer-sdk = { version="^0.28.0" }
python = "<3.12,>=3.9.1"
singer-sdk = { version="^0.34.1" }
fs-s3fs = { version = "^1.1.1", optional = true }
cassandra-driver = "^3.28.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.1"
singer-sdk = { version="^0.28.0", extras = ["testing"] }
singer-sdk = { version="^0.34.1", extras = ["testing"] }

[tool.poetry.extras]
s3 = ["fs-s3fs"]
Expand Down
71 changes: 58 additions & 13 deletions tap_cassandra/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Custom client handling, including CassandraStream base class."""

import time
import logging

from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema

from cassandra import ReadFailure, ReadTimeout
from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import (
Expand Down Expand Up @@ -50,10 +52,7 @@ class CassandraConnector:
'list<': th.StringType, #
}

def __init__(
self,
config: dict | None,
) -> None:
def __init__(self, config):
"""Initialize the connector.
Args:
Expand Down Expand Up @@ -114,9 +113,10 @@ def cluster(self):
max_attempts=self.config.get('max_attempts')
),
auth_provider=self.auth_provider,
protocol_version=self.config.get('protocol_version'),
port=self.config.get('port')
)
if self.config.get('protocol_version'):
self._cluster.protocol_version = self.config.get('protocol_version')
return self._cluster

@property
Expand All @@ -137,11 +137,7 @@ def logger(self) -> logging.Logger:
return logging.getLogger("cassandra.connector")

@staticmethod
def get_fully_qualified_name(
table_name: str | None = None,
schema_name: str | None = None,
delimiter: str = ".",
) -> str:
def get_fully_qualified_name(table_name=None, schema_name=None, delimiter="."):
"""Concatenates a fully qualified name from the parts.
Args:
Expand Down Expand Up @@ -193,8 +189,12 @@ def _disconnect(self):
self.cluster.shutdown()

def execute(self, query):
"""Method to execute the query and return the output."""

"""Method to execute the query and return the output.
Args:
query: Cassandra CQL query to execute
"""

try:
res = self.session.execute(self.query_statement(query, self.config.get('fetch_size')))
while res.has_more_pages or res.current_rows:
Expand All @@ -208,6 +208,50 @@ def execute(self, query):
finally:
self._disconnect()

def execute_with_skip(self, query, key_col):
"""Method to execute the query and return the output.
Handles ReadTimeout and ReadFailure to skip hot partitions.
Args:
query: Cassandra CQL query to execute
key_col: first partition_key of a table
"""

# Retry for ReadTimeout and ReadFailure
sleep_time_seconds = 30
retry = 0
max_retries = 3
while retry < max_retries:
try:
batch = None
res = self.session.execute(self.query_statement(query, self.config.get('fetch_size')))
while res.has_more_pages or res.current_rows:
batch = res.current_rows
self.logger.info(f'{len(batch)} row(s) fetched.')
for row in batch:
yield row
res.fetch_next_page()
self._disconnect()
break
except (ReadTimeout, ReadFailure) as re:
retry += 1
if not batch:
res = self.session.execute(self.query_statement(query, 1))
batch = res.current_rows
self.logger.info(f'{len(batch)} row(s) fetched.')
last_key = batch[-1][key_col]
self.logger.info(f'Skipping {key_col} = {last_key}')
# Remove any filters done for a query
base_query = query.lower().split('where')[0].rstrip()
query = base_query + f" where token({key_col}) > token({last_key})"
print(f'Sleeping for {sleep_time_seconds} before retry')
self.logger.info(f'Sleeping for {sleep_time_seconds} before retry {retry} out of {max_retries}.')
time.sleep(sleep_time_seconds)
except Exception as e:
self._disconnect()
raise(e)

def discover_catalog_entry(
self,
table_name: str
Expand All @@ -220,7 +264,7 @@ def discover_catalog_entry(
Returns:
`CatalogEntry` object for the given table
"""

self.logger.info('discover_catalog_entry called.')
table_schema = th.PropertiesList()
partition_keys = list()
clustering_keys = list()
Expand Down Expand Up @@ -312,6 +356,7 @@ def discover_catalog_entries(self) -> list[dict]:
where keyspace_name = '{self.config.get('keyspace')}'
'''
for table in self.session.execute(self.query_statement(table_query, self.config.get('fetch_size'))):
# for table in self.session.execute(self.query_statement(table_query, self.config.get('fetch_size'))):
catalog_entry = self.discover_catalog_entry(table['table_name'])
result.append(catalog_entry.to_dict())

Expand Down
13 changes: 9 additions & 4 deletions tap_cassandra/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class CassandraStream(SQLStream):
"""Stream class for Cassandra streams."""

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
def get_records(self, context):
"""Return a generator of record-type dictionary objects.
If the stream has a replication_key value defined, records will be sorted by the
Expand All @@ -31,8 +31,13 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
raise NotImplementedError(msg)

selected_column_names = self.get_selected_schema()["properties"].keys()
selected_column_string = ','.join(selected_column_names)
selected_column_string = ','.join(selected_column_names) if selected_column_names else '*'

cql = f"select {selected_column_string} from {self.name.split('-')[1]}"
for record in self.connector.execute(cql):
yield record

if self.config.get('skip_hot_partitions'):
for row in self.connector.execute_with_skip(cql, self.catalog_entry['key_properties'][0]):
yield row
else:
for row in self.connector.execute(cql):
yield row
13 changes: 9 additions & 4 deletions tap_cassandra/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TapCassandra(SQLTap):
th.Property(
"keyspace",
th.StringType,
required=True,
required=False,
description="Keyspace will be the default keyspace for operations on the Session.",
),
th.Property(
Expand All @@ -56,14 +56,12 @@ class TapCassandra(SQLTap):
"request_timeout",
th.IntegerType,
required=False,
default=100,
description="Request timeout used when not overridden in Session.execute().",
),
th.Property(
"local_dc",
th.StringType,
required=False,
default=None,
description="The local_dc parameter should be the name of the datacenter.",
),
th.Property(
Expand All @@ -84,7 +82,7 @@ class TapCassandra(SQLTap):
"protocol_version",
th.IntegerType,
required=False,
default=65,
default=None,
description="The maximum version of the native protocol to use.",
),
th.Property(
Expand All @@ -94,6 +92,13 @@ class TapCassandra(SQLTap):
default=10000,
description="The fetch size when syncing data from Cassandra.",
),
th.Property(
"skip_hot_partitions",
th.BooleanType,
required=False,
default=False,
description="When set to `True` skipping partitions when faced ReadTimout or ReadFailure errors.",
),
).to_dict()

@property
Expand Down
84 changes: 84 additions & 0 deletions tests/resources/catalog.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
{
"streams": [
{
"tap_stream_id": "test_schema-test_table",
"table_name": "test_table",
"replication_method": "",
"key_properties": [
"id"
],
"schema": {
"properties": {
"id": {
"type": [
"integer"
]
},
"updated_at": {
"format": "date-time",
"type": [
"string",
"null"
]
},
"name": {
"type": [
"string",
"null"
]
}
},
"type": "object",
"required": [
"id"
]
},
"stream": "test_schema-test_table",
"metadata": [
{
"breadcrumb": [
"properties",
"id"
],
"metadata": {
"inclusion": "available",
"selected": true
}
},
{
"breadcrumb": [
"properties",
"updated_at"
],
"metadata": {
"inclusion": "available",
"selected": true
}
},
{
"breadcrumb": [
"properties",
"name"
],
"metadata": {
"inclusion": "automatic",
"selected": true
}
},
{
"breadcrumb": [],
"metadata": {
"inclusion": "available",
"table-key-properties": [
"id"
],
"forced-replication-method": "",
"schema-name": "test_schema",
"selected": true
}
}
],
"selected": true
}
]
}
Loading

0 comments on commit 7512935

Please sign in to comment.