Skip to content

Commit

Permalink
Move tests
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jul 17, 2024
1 parent e09bd59 commit 4febeba
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 1 deletion.
2 changes: 1 addition & 1 deletion singer_sdk/_singerlib/encoding/_msgspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ def write_message(self, message: Message) -> None:
message: The message to write.
"""
sys.stdout.buffer.write(self.format_message(message) + b"\n")
sys.stdout.buffer.flush()
sys.stdout.flush()
35 changes: 35 additions & 0 deletions tests/_singerlib/encoding/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations # noqa: INP001

import json

import pytest

from singer_sdk._singerlib import RecordMessage


@pytest.fixture
def bench_record():
return {
"stream": "users",
"type": "RECORD",
"record": {
"Id": 1,
"created_at": "2021-01-01T00:08:00-07:00",
"updated_at": "2022-01-02T00:09:00-07:00",
"deleted_at": "2023-01-03T00:10:00-07:00",
"value": 1.23,
"RelatedId": 32412,
"TypeId": 1,
},
"time_extracted": "2023-01-01T11:00:00.00000-07:00",
}


@pytest.fixture
def bench_record_message(bench_record):
return RecordMessage.from_dict(bench_record)


@pytest.fixture
def bench_encoded_record(bench_record):
return json.dumps(bench_record)
167 changes: 167 additions & 0 deletions tests/_singerlib/encoding/test_msgspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Test IO operations for msgspec Singer reader and writer.""" # noqa: INP001

from __future__ import annotations

import decimal
import io
import itertools
from contextlib import nullcontext, redirect_stdout
from textwrap import dedent

import pytest

from singer_sdk._singerlib import RecordMessage
from singer_sdk._singerlib.encoding._msgspec import (
MsgSpecReader,
MsgSpecWriter,
dec_hook,
enc_hook,
)
from singer_sdk._singerlib.exceptions import InvalidInputLine


@pytest.mark.parametrize(
"test_type,test_value,expected_value,expected_type",
[
pytest.param(
int,
1,
"1",
str,
id="int-to-str",
),
],
)
def test_dec_hook(test_type, test_value, expected_value, expected_type):
returned = dec_hook(type=test_type, obj=test_value)
returned_type = type(returned)

assert returned == expected_value
assert returned_type == expected_type


@pytest.mark.parametrize(
"test_value,expected_value",
[
pytest.param(
1,
"1",
id="int-to-str",
),
],
)
def test_enc_hook(test_value, expected_value):
returned = enc_hook(obj=test_value)

assert returned == expected_value


class DummyReader(MsgSpecReader):
def _process_activate_version_message(self, message_dict: dict) -> None:
pass

def _process_batch_message(self, message_dict: dict) -> None:
pass

def _process_record_message(self, message_dict: dict) -> None:
pass

def _process_schema_message(self, message_dict: dict) -> None:
pass

def _process_state_message(self, message_dict: dict) -> None:
pass


@pytest.mark.parametrize(
"line,expected,exception",
[
pytest.param(
"not-valid-json",
None,
pytest.raises(InvalidInputLine),
id="unparsable",
),
pytest.param(
'{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}',
{
"type": "RECORD",
"stream": "users",
"record": {"id": 1, "value": decimal.Decimal("1.23")},
},
nullcontext(),
id="record",
),
],
)
def test_deserialize(line, expected, exception):
reader = DummyReader()
with exception:
assert reader.deserialize_json(line) == expected


def test_listen():
reader = DummyReader()
input_lines = io.StringIO(
dedent("""\
{"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}}
{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}
{"type": "RECORD", "stream": "users", "record": {"id": 2, "value": 2.34}}
{"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}}}}
{"type": "SCHEMA", "stream": "batches", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}}
{"type": "BATCH", "stream": "batches", "encoding": {"format": "jsonl", "compression": "gzip"}, "manifest": ["file1.jsonl.gz", "file2.jsonl.gz"]}
{"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}, "batches": {"id": 1000000}}}}
""") # noqa: E501
)
reader.listen(input_lines)


def test_listen_unknown_message():
reader = DummyReader()
input_lines = io.StringIO('{"type": "UNKNOWN"}\n')
with pytest.raises(ValueError, match="Unknown message type"):
reader.listen(input_lines)


def test_write_message():
writer = MsgSpecWriter()
message = RecordMessage(
stream="test",
record={"id": 1, "name": "test"},
)
with redirect_stdout(io.TextIOWrapper(io.BytesIO())) as out: # noqa: PLW1514
writer.write_message(message)

out.seek(0)
assert out.read() == (
'{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n'
)


# Benchmark Tests


def test_bench_format_message(benchmark, bench_record_message: RecordMessage):
"""Run benchmark for Sink._validator method validate."""
number_of_runs = 1000

writer = MsgSpecWriter()

def run_format_message():
for record in itertools.repeat(bench_record_message, number_of_runs):
writer.format_message(record)

benchmark(run_format_message)


def test_bench_deserialize_json(benchmark, bench_encoded_record: str):
"""Run benchmark for Sink._validator method validate."""
number_of_runs = 1000

reader = DummyReader()

def run_deserialize_json():
for record in itertools.repeat(bench_encoded_record, number_of_runs):
reader.deserialize_json(record)

benchmark(run_deserialize_json)
125 changes: 125 additions & 0 deletions tests/_singerlib/encoding/test_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Test IO operations for simple Singer reader and writer.""" # noqa: INP001

from __future__ import annotations

import decimal
import io
import itertools
from contextlib import nullcontext, redirect_stdout
from textwrap import dedent

import pytest

from singer_sdk._singerlib import RecordMessage
from singer_sdk._singerlib.encoding._simple import SingerReader, SingerWriter
from singer_sdk._singerlib.exceptions import InvalidInputLine


class DummyReader(SingerReader):
def _process_activate_version_message(self, message_dict: dict) -> None:
pass

def _process_batch_message(self, message_dict: dict) -> None:
pass

def _process_record_message(self, message_dict: dict) -> None:
pass

def _process_schema_message(self, message_dict: dict) -> None:
pass

def _process_state_message(self, message_dict: dict) -> None:
pass


@pytest.mark.parametrize(
"line,expected,exception",
[
pytest.param(
"not-valid-json",
None,
pytest.raises(InvalidInputLine),
id="unparsable",
),
pytest.param(
'{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}',
{
"type": "RECORD",
"stream": "users",
"record": {"id": 1, "value": decimal.Decimal("1.23")},
},
nullcontext(),
id="record",
),
],
)
def test_deserialize(line, expected, exception):
reader = DummyReader()
with exception:
assert reader.deserialize_json(line) == expected


def test_listen():
reader = DummyReader()
input_lines = io.StringIO(
dedent("""\
{"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}}
{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}
{"type": "RECORD", "stream": "users", "record": {"id": 2, "value": 2.34}}
{"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}}}}
{"type": "SCHEMA", "stream": "batches", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}}
{"type": "BATCH", "stream": "batches", "encoding": {"format": "jsonl", "compression": "gzip"}, "manifest": ["file1.jsonl.gz", "file2.jsonl.gz"]}
{"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}, "batches": {"id": 1000000}}}}
""") # noqa: E501
)
reader.listen(input_lines)


def test_listen_unknown_message():
reader = DummyReader()
input_lines = io.StringIO('{"type": "UNKNOWN"}\n')
with pytest.raises(ValueError, match="Unknown message type"):
reader.listen(input_lines)


def test_write_message():
writer = SingerWriter()
message = RecordMessage(
stream="test",
record={"id": 1, "name": "test"},
)
with redirect_stdout(io.StringIO()) as out:
writer.write_message(message)

assert out.getvalue() == (
'{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n'
)


# Benchmark Tests


def test_bench_format_message(benchmark, bench_record_message: RecordMessage):
"""Run benchmark for Sink._validator method validate."""
number_of_runs = 1000

writer = SingerWriter()

def run_format_message():
for record in itertools.repeat(bench_record_message, number_of_runs):
writer.format_message(record)

benchmark(run_format_message)


def test_bench_deserialize_json(benchmark, bench_encoded_record: str):
"""Run benchmark for Sink._validator method validate."""
number_of_runs = 1000

reader = DummyReader()

def run_deserialize_json():
for record in itertools.repeat(bench_encoded_record, number_of_runs):
reader.deserialize_json(record)

benchmark(run_deserialize_json)

0 comments on commit 4febeba

Please sign in to comment.