-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
e09bd59
commit cbe10bd
Showing
5 changed files
with
328 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from __future__ import annotations # noqa: INP001 | ||
|
||
import json | ||
|
||
import pytest | ||
|
||
from singer_sdk._singerlib import RecordMessage | ||
|
||
|
||
@pytest.fixture | ||
def bench_record(): | ||
return { | ||
"stream": "users", | ||
"type": "RECORD", | ||
"record": { | ||
"Id": 1, | ||
"created_at": "2021-01-01T00:08:00-07:00", | ||
"updated_at": "2022-01-02T00:09:00-07:00", | ||
"deleted_at": "2023-01-03T00:10:00-07:00", | ||
"value": 1.23, | ||
"RelatedId": 32412, | ||
"TypeId": 1, | ||
}, | ||
"time_extracted": "2023-01-01T11:00:00.00000-07:00", | ||
} | ||
|
||
|
||
@pytest.fixture | ||
def bench_record_message(bench_record): | ||
return RecordMessage.from_dict(bench_record) | ||
|
||
|
||
@pytest.fixture | ||
def bench_encoded_record(bench_record): | ||
return json.dumps(bench_record) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
"""Test IO operations for msgspec Singer reader and writer.""" # noqa: INP001 | ||
|
||
from __future__ import annotations | ||
|
||
import decimal | ||
import io | ||
import itertools | ||
from contextlib import nullcontext, redirect_stdout | ||
from textwrap import dedent | ||
|
||
import pytest | ||
|
||
from singer_sdk._singerlib import RecordMessage | ||
from singer_sdk._singerlib.encoding._msgspec import ( | ||
MsgSpecReader, | ||
MsgSpecWriter, | ||
dec_hook, | ||
enc_hook, | ||
) | ||
from singer_sdk._singerlib.exceptions import InvalidInputLine | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"test_type,test_value,expected_value,expected_type", | ||
[ | ||
pytest.param( | ||
int, | ||
1, | ||
"1", | ||
str, | ||
id="int-to-str", | ||
), | ||
], | ||
) | ||
def test_dec_hook(test_type, test_value, expected_value, expected_type): | ||
returned = dec_hook(type=test_type, obj=test_value) | ||
returned_type = type(returned) | ||
|
||
assert returned == expected_value | ||
assert returned_type == expected_type | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"test_value,expected_value", | ||
[ | ||
pytest.param( | ||
1, | ||
"1", | ||
id="int-to-str", | ||
), | ||
], | ||
) | ||
def test_enc_hook(test_value, expected_value): | ||
returned = enc_hook(obj=test_value) | ||
|
||
assert returned == expected_value | ||
|
||
|
||
class DummyReader(MsgSpecReader): | ||
def _process_activate_version_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
def _process_batch_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
def _process_record_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
def _process_schema_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
def _process_state_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"line,expected,exception", | ||
[ | ||
pytest.param( | ||
"not-valid-json", | ||
None, | ||
pytest.raises(InvalidInputLine), | ||
id="unparsable", | ||
), | ||
pytest.param( | ||
'{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}', | ||
{ | ||
"type": "RECORD", | ||
"stream": "users", | ||
"record": {"id": 1, "value": decimal.Decimal("1.23")}, | ||
}, | ||
nullcontext(), | ||
id="record", | ||
), | ||
], | ||
) | ||
def test_deserialize(line, expected, exception): | ||
reader = DummyReader() | ||
with exception: | ||
assert reader.deserialize_json(line) == expected | ||
|
||
|
||
def test_listen(): | ||
reader = DummyReader() | ||
input_lines = io.StringIO( | ||
dedent("""\ | ||
{"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} | ||
{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}} | ||
{"type": "RECORD", "stream": "users", "record": {"id": 2, "value": 2.34}} | ||
{"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}}}} | ||
{"type": "SCHEMA", "stream": "batches", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} | ||
{"type": "BATCH", "stream": "batches", "encoding": {"format": "jsonl", "compression": "gzip"}, "manifest": ["file1.jsonl.gz", "file2.jsonl.gz"]} | ||
{"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}, "batches": {"id": 1000000}}}} | ||
""") # noqa: E501 | ||
) | ||
reader.listen(input_lines) | ||
|
||
|
||
def test_listen_unknown_message(): | ||
reader = DummyReader() | ||
input_lines = io.StringIO('{"type": "UNKNOWN"}\n') | ||
with pytest.raises(ValueError, match="Unknown message type"): | ||
reader.listen(input_lines) | ||
|
||
|
||
def test_write_message(): | ||
writer = MsgSpecWriter() | ||
message = RecordMessage( | ||
stream="test", | ||
record={"id": 1, "name": "test"}, | ||
) | ||
with redirect_stdout(io.TextIOWrapper(io.BytesIO())) as out: # noqa: PLW1514 | ||
writer.write_message(message) | ||
|
||
out.seek(0) | ||
assert out.read() == ( | ||
'{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' | ||
) | ||
|
||
|
||
# Benchmark Tests | ||
|
||
|
||
def test_bench_format_message(benchmark, bench_record_message: RecordMessage): | ||
"""Run benchmark for Sink._validator method validate.""" | ||
number_of_runs = 1000 | ||
|
||
writer = MsgSpecWriter() | ||
|
||
def run_format_message(): | ||
for record in itertools.repeat(bench_record_message, number_of_runs): | ||
writer.format_message(record) | ||
|
||
benchmark(run_format_message) | ||
|
||
|
||
def test_bench_deserialize_json(benchmark, bench_encoded_record: str): | ||
"""Run benchmark for Sink._validator method validate.""" | ||
number_of_runs = 1000 | ||
|
||
reader = DummyReader() | ||
|
||
def run_deserialize_json(): | ||
for record in itertools.repeat(bench_encoded_record, number_of_runs): | ||
reader.deserialize_json(record) | ||
|
||
benchmark(run_deserialize_json) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
"""Test IO operations for simple Singer reader and writer.""" # noqa: INP001 | ||
|
||
from __future__ import annotations | ||
|
||
import decimal | ||
import io | ||
import itertools | ||
from contextlib import nullcontext, redirect_stdout | ||
from textwrap import dedent | ||
|
||
import pytest | ||
|
||
from singer_sdk._singerlib import RecordMessage | ||
from singer_sdk._singerlib.encoding._simple import SingerReader, SingerWriter | ||
from singer_sdk._singerlib.exceptions import InvalidInputLine | ||
|
||
|
||
class DummyReader(SingerReader): | ||
def _process_activate_version_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
def _process_batch_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
def _process_record_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
def _process_schema_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
def _process_state_message(self, message_dict: dict) -> None: | ||
pass | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"line,expected,exception", | ||
[ | ||
pytest.param( | ||
"not-valid-json", | ||
None, | ||
pytest.raises(InvalidInputLine), | ||
id="unparsable", | ||
), | ||
pytest.param( | ||
'{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}', | ||
{ | ||
"type": "RECORD", | ||
"stream": "users", | ||
"record": {"id": 1, "value": decimal.Decimal("1.23")}, | ||
}, | ||
nullcontext(), | ||
id="record", | ||
), | ||
], | ||
) | ||
def test_deserialize(line, expected, exception): | ||
reader = DummyReader() | ||
with exception: | ||
assert reader.deserialize_json(line) == expected | ||
|
||
|
||
def test_listen(): | ||
reader = DummyReader() | ||
input_lines = io.StringIO( | ||
dedent("""\ | ||
{"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} | ||
{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}} | ||
{"type": "RECORD", "stream": "users", "record": {"id": 2, "value": 2.34}} | ||
{"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}}}} | ||
{"type": "SCHEMA", "stream": "batches", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} | ||
{"type": "BATCH", "stream": "batches", "encoding": {"format": "jsonl", "compression": "gzip"}, "manifest": ["file1.jsonl.gz", "file2.jsonl.gz"]} | ||
{"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}, "batches": {"id": 1000000}}}} | ||
""") # noqa: E501 | ||
) | ||
reader.listen(input_lines) | ||
|
||
|
||
def test_listen_unknown_message(): | ||
reader = DummyReader() | ||
input_lines = io.StringIO('{"type": "UNKNOWN"}\n') | ||
with pytest.raises(ValueError, match="Unknown message type"): | ||
reader.listen(input_lines) | ||
|
||
|
||
def test_write_message(): | ||
writer = SingerWriter() | ||
message = RecordMessage( | ||
stream="test", | ||
record={"id": 1, "name": "test"}, | ||
) | ||
with redirect_stdout(io.StringIO()) as out: | ||
writer.write_message(message) | ||
|
||
assert out.getvalue() == ( | ||
'{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' | ||
) | ||
|
||
|
||
# Benchmark Tests | ||
|
||
|
||
def test_bench_format_message(benchmark, bench_record_message: RecordMessage): | ||
"""Run benchmark for Sink._validator method validate.""" | ||
number_of_runs = 1000 | ||
|
||
writer = SingerWriter() | ||
|
||
def run_format_message(): | ||
for record in itertools.repeat(bench_record_message, number_of_runs): | ||
writer.format_message(record) | ||
|
||
benchmark(run_format_message) | ||
|
||
|
||
def test_bench_deserialize_json(benchmark, bench_encoded_record: str): | ||
"""Run benchmark for Sink._validator method validate.""" | ||
number_of_runs = 1000 | ||
|
||
reader = DummyReader() | ||
|
||
def run_deserialize_json(): | ||
for record in itertools.repeat(bench_encoded_record, number_of_runs): | ||
reader.deserialize_json(record) | ||
|
||
benchmark(run_deserialize_json) |