Skip to content

Commit

Permalink
create tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MindaugasN committed Jan 2, 2024
1 parent 0ee429a commit 7e0b601
Show file tree
Hide file tree
Showing 10 changed files with 1,072 additions and 516 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ jobs:
- name: Install dependencies
run: |
poetry install
- name: Start Cassandra Cluster
run: |
docker-compose up -d
- name: Test with pytest
run: |
poetry run pytest
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: '3.9'

services:
cassandra:
image: cassandra:latest
ports:
- 9042:9042
volumes:
- ~/apps/cassandra:/var/lib/cassandra
environment:
- CASSANDRA_CLUSTER_NAME=cassandra
34 changes: 34 additions & 0 deletions plugins/loaders/target-jsonl--andyh1203.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"plugin_type": "loaders",
"name": "target-jsonl",
"namespace": "target_jsonl",
"variant": "andyh1203",
"label": "JSON Lines (JSONL)",
"docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203",
"repo": "https://github.com/andyh1203/target-jsonl",
"pip_url": "target-jsonl",
"description": "JSONL loader",
"logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png",
"settings": [
{
"name": "destination_path",
"kind": "string",
"value": "output",
"label": "Destination Path",
"description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n"
},
{
"name": "do_timestamp_file",
"kind": "boolean",
"value": false,
"label": "Include Timestamp in File Names",
"description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n"
},
{
"name": "custom_name",
"kind": "string",
"label": "Custom File Name Override",
"description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n"
}
]
}
1,369 changes: 888 additions & 481 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ license = "MIT"

[tool.poetry.dependencies]
python = "<3.12,>=3.7.1"
singer-sdk = { version="^0.28.0" }
singer-sdk = { version="^0.34.1" }
fs-s3fs = { version = "^1.1.1", optional = true }
cassandra-driver = "^3.28.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.1"
singer-sdk = { version="^0.28.0", extras = ["testing"] }
singer-sdk = { version="^0.34.1", extras = ["testing"] }

[tool.poetry.extras]
s3 = ["fs-s3fs"]
Expand Down
14 changes: 4 additions & 10 deletions tap_cassandra/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ class CassandraConnector:
'list<': th.StringType, #
}

def __init__(
self,
config: dict | None,
) -> None:
def __init__(self, config):
"""Initialize the connector.
Args:
Expand Down Expand Up @@ -114,9 +111,10 @@ def cluster(self):
max_attempts=self.config.get('max_attempts')
),
auth_provider=self.auth_provider,
protocol_version=self.config.get('protocol_version'),
port=self.config.get('port')
)
if self.config.get('protocol_version'):
self._cluster.protocol_version = self.config.get('protocol_version')
return self._cluster

@property
Expand All @@ -137,11 +135,7 @@ def logger(self) -> logging.Logger:
return logging.getLogger("cassandra.connector")

@staticmethod
def get_fully_qualified_name(
table_name: str | None = None,
schema_name: str | None = None,
delimiter: str = ".",
) -> str:
def get_fully_qualified_name(table_name=None, schema_name=None, delimiter="."):
"""Concatenates a fully qualified name from the parts.
Args:
Expand Down
4 changes: 2 additions & 2 deletions tap_cassandra/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class CassandraStream(SQLStream):
"""Stream class for Cassandra streams."""

def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
def get_records(self, context):
"""Return a generator of record-type dictionary objects.
If the stream has a replication_key value defined, records will be sorted by the
Expand All @@ -31,7 +31,7 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
raise NotImplementedError(msg)

selected_column_names = self.get_selected_schema()["properties"].keys()
selected_column_string = ','.join(selected_column_names)
selected_column_string = ','.join(selected_column_names) if selected_column_names else '*'

cql = f"select {selected_column_string} from {self.name.split('-')[1]}"
for record in self.connector.execute(cql):
Expand Down
6 changes: 2 additions & 4 deletions tap_cassandra/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TapCassandra(SQLTap):
th.Property(
"keyspace",
th.StringType,
required=True,
required=False,
description="Keyspace will be the default keyspace for operations on the Session.",
),
th.Property(
Expand All @@ -56,14 +56,12 @@ class TapCassandra(SQLTap):
"request_timeout",
th.IntegerType,
required=False,
default=100,
description="Request timeout used when not overridden in Session.execute().",
),
th.Property(
"local_dc",
th.StringType,
required=False,
default=None,
description="The local_dc parameter should be the name of the datacenter.",
),
th.Property(
Expand All @@ -84,7 +82,7 @@ class TapCassandra(SQLTap):
"protocol_version",
th.IntegerType,
required=False,
default=65,
default=None,
description="The maximum version of the native protocol to use.",
),
th.Property(
Expand Down
84 changes: 84 additions & 0 deletions tests/resources/catalog.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
{
"streams": [
{
"tap_stream_id": "test_schema-test_table",
"table_name": "test_table",
"replication_method": "",
"key_properties": [
"id"
],
"schema": {
"properties": {
"id": {
"type": [
"integer"
]
},
"updated_at": {
"format": "date-time",
"type": [
"string",
"null"
]
},
"name": {
"type": [
"string",
"null"
]
}
},
"type": "object",
"required": [
"id"
]
},
"stream": "test_schema-test_table",
"metadata": [
{
"breadcrumb": [
"properties",
"id"
],
"metadata": {
"inclusion": "available",
"selected": true
}
},
{
"breadcrumb": [
"properties",
"updated_at"
],
"metadata": {
"inclusion": "available",
"selected": true
}
},
{
"breadcrumb": [
"properties",
"name"
],
"metadata": {
"inclusion": "automatic",
"selected": true
}
},
{
"breadcrumb": [],
"metadata": {
"inclusion": "available",
"table-key-properties": [
"id"
],
"forced-replication-method": "",
"schema-name": "test_schema",
"selected": true
}
}
],
"selected": true
}
]
}
59 changes: 42 additions & 17 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,53 @@

import datetime

import pytest
from cassandra.cluster import Cluster
from singer_sdk.testing import get_tap_test_class

from tap_cassandra.tap import TapCassandra


SAMPLE_CONFIG = {
"hosts": ["10.0.0.1", "10.0.0.2"],
"port": 9042,
"keyspace": "my_keyspace",
"username": "test",
"password": "test",
"start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"),
"request_timeout": 60,
"local_dc": "my_dc",
"reconnect_delay": 60,
"max_attempts": 5,
"protocol_version": 65,
"hosts": "127.0.0.1",
"username": "cassandra",
"password": "cassandra",
"keyspace": "test_schema"
}

# TODO: Create additional tests
# Create a docker image with local cassandra so we can test TapCassandra.
# Run standard built-in tap tests from the SDK:
# TestTapCassandra = get_tap_test_class(
# tap_class=TapCassandra,
# config=SAMPLE_CONFIG,
# )
PreTestTapCassandra = get_tap_test_class(
tap_class=TapCassandra,
config=SAMPLE_CONFIG,
catalog="tests/resources/catalog.json",
)

class TestTapCassandra(PreTestTapCassandra):
keyspace_name = 'test_schema'
table_name = 'test_table'

def _setup(self):
cluster = Cluster()
session = cluster.connect()

create_keyspace_command = f"""
CREATE KEYSPACE IF NOT EXISTS {self.keyspace_name}
WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};
"""
session.execute(create_keyspace_command)
session.set_keyspace(self.keyspace_name)

create_table_command = f"""
CREATE TABLE IF NOT EXISTS {self.keyspace_name}.{self.table_name}
(id int, updated_at timestamp, name text, primary key (id));
"""
session.execute(create_table_command)

insert_command = f"""
INSERT INTO {self.keyspace_name}.{self.table_name} (id, updated_at, name) values (1, dateof(now()), 'test1');
"""
session.execute(insert_command)

@pytest.fixture(scope="class")
def resource(self):
self._setup()

0 comments on commit 7e0b601

Please sign in to comment.