Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Centralize JSON SerDe into helper functions #2259

Merged
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7b50f09
moved deserialize_json and added serialize_json to _util
BuzzCutNorman Feb 20, 2024
e68e994
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Feb 20, 2024
e739083
add serialize_json to _flatten_record
BuzzCutNorman Feb 20, 2024
52f8f05
updated test_flattening_record
BuzzCutNorman Feb 20, 2024
209fc7d
update flattening related snapshots
BuzzCutNorman Feb 20, 2024
c8d594b
updated Tap.catalog_json_text to use serialize_json
BuzzCutNorman Feb 20, 2024
417000c
updated SQLConnector serialize_json and desrialize_json to use _util …
BuzzCutNorman Feb 20, 2024
20260c6
updated JSONLinesBatcher.get_batches to use serialize_json
BuzzCutNorman Feb 20, 2024
29b9903
added kwargs to serde functions, updated read_json_file to use deseri…
BuzzCutNorman Feb 20, 2024
9e4b698
updated Sink.process_batch_files to use deserialize_json
BuzzCutNorman Feb 20, 2024
8dc53ea
changed attributes to match SQLConnector versions.
BuzzCutNorman Feb 21, 2024
6ece47d
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Feb 21, 2024
efcc71e
moved type: ignore to correct line
BuzzCutNorman Feb 21, 2024
f11de62
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Feb 23, 2024
b04b362
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Feb 26, 2024
606387b
Merge branch 'main' into refactor-move-json-serde-to-helpers
BuzzCutNorman Apr 5, 2024
a1f5b94
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into refa…
BuzzCutNorman Jun 24, 2024
04d9370
applied a mypy suggestion by removing redundant dictionary cast
BuzzCutNorman Jun 24, 2024
1182f38
Merge branch 'main' into refactor-move-json-serde-to-helpers
edgarrmondragon Jul 12, 2024
5cfa21e
Move base SerDe to _singerlib
edgarrmondragon Jul 12, 2024
b9faa7f
Re-add `SingerReader.deserialize_json`
edgarrmondragon Jul 12, 2024
dc5c7a1
Implement naive central JSON loading and dumping functions
edgarrmondragon Jul 12, 2024
3f29a1f
Use util methods to SerDe to and from SQL
edgarrmondragon Jul 12, 2024
96525f1
Make the Singer writer and reader classes generic
edgarrmondragon Jul 13, 2024
af9b5d5
Move Singer IO to _singerlib
edgarrmondragon Jul 13, 2024
f65e2e0
Handle uncovered code
edgarrmondragon Jul 13, 2024
05d1ac1
Update docstrings
edgarrmondragon Jul 13, 2024
ac7be38
Move Singer exception catching to reader implementation
edgarrmondragon Jul 13, 2024
168485c
Increase encoder test coverage
edgarrmondragon Jul 13, 2024
8f390f6
Re-use message writing and formatting logic
edgarrmondragon Jul 13, 2024
a5f67a5
Test records with version
edgarrmondragon Jul 13, 2024
362ad22
Merge branch 'main' into refactor-move-json-serde-to-helpers
edgarrmondragon Jul 16, 2024
f848370
Move encodings to a private submodule
edgarrmondragon Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[flake8]
max-line-length = 88
exclude = cookiecutter
ignore = E, W
ignore = E, F, W
per-file-ignores =
# Don't require docstrings conventions in private modules
singer_sdk/helpers/_*.py:DAR
Expand Down
4 changes: 4 additions & 0 deletions singer_sdk/_singerlib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from singer_sdk._singerlib import exceptions
from singer_sdk._singerlib.catalog import (
Catalog,
CatalogEntry,
Expand All @@ -16,6 +17,7 @@
SingerMessageType,
StateMessage,
exclude_null_dict,
format_message,
write_message,
)
from singer_sdk._singerlib.schema import Schema, resolve_schema_references
Expand All @@ -35,7 +37,9 @@
"SingerMessageType",
"StateMessage",
"StreamMetadata",
"exceptions",
"exclude_null_dict",
"format_message",
"resolve_schema_references",
"strftime",
"strptime_to_utc",
Expand Down
12 changes: 12 additions & 0 deletions singer_sdk/_singerlib/encoding/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from __future__ import annotations

from .base import GenericSingerReader, GenericSingerWriter, SingerMessageType
from .simple import SingerReader, SingerWriter

__all__ = [
"GenericSingerReader",
"GenericSingerWriter",
"SingerMessageType",
"SingerReader",
"SingerWriter",
]
144 changes: 144 additions & 0 deletions singer_sdk/_singerlib/encoding/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Abstract base classes for all Singer messages IO operations."""

from __future__ import annotations

import abc
import logging
import typing as t
from collections import Counter, defaultdict

from singer_sdk._singerlib import exceptions
from singer_sdk._singerlib.messages import Message, SingerMessageType

logger = logging.getLogger(__name__)


# TODO: Use to default to 'str' here
# https://peps.python.org/pep-0696/
T = t.TypeVar("T", str, bytes)


class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta):
"""Interface for all plugins reading Singer messages as strings or bytes."""

@t.final
def listen(self, file_input: t.IO[T] | None = None) -> None:
"""Read from input until all messages are processed.

Args:
file_input: Readable stream of messages. Defaults to standard in.
"""
self._process_lines(file_input or self.default_input)
self._process_endofpipe()

def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]:
"""Internal method to process jsonl lines from a Singer tap.

Args:
file_input: Readable stream of messages, each on a separate line.

Returns:
A counter object for the processed lines.
"""
stats: dict[str, int] = defaultdict(int)
for line in file_input:
line_dict = self.deserialize_json(line)
self._assert_line_requires(line_dict, requires={"type"})

record_type: SingerMessageType = line_dict["type"]
if record_type == SingerMessageType.SCHEMA:
self._process_schema_message(line_dict)

elif record_type == SingerMessageType.RECORD:
self._process_record_message(line_dict)

elif record_type == SingerMessageType.ACTIVATE_VERSION:
self._process_activate_version_message(line_dict)

elif record_type == SingerMessageType.STATE:
self._process_state_message(line_dict)

elif record_type == SingerMessageType.BATCH: # pragma: no cover
self._process_batch_message(line_dict)

else:
self._process_unknown_message(line_dict) # pragma: no cover

stats[record_type] += 1

return Counter(**stats)

@property
@abc.abstractmethod
def default_input(self) -> t.IO[T]: ...

@staticmethod
def _assert_line_requires(line_dict: dict, requires: set[str]) -> None:
"""Check if dictionary .

Args:
line_dict: TODO
requires: TODO

Raises:
InvalidInputLine: raised if any required keys are missing
"""
if not requires.issubset(line_dict):
missing = requires - set(line_dict)
msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}"
raise exceptions.InvalidInputLine(msg)

@abc.abstractmethod
def deserialize_json(self, line: T) -> dict: ...

@abc.abstractmethod
def _process_schema_message(self, message_dict: dict) -> None: ...

@abc.abstractmethod
def _process_record_message(self, message_dict: dict) -> None: ...

@abc.abstractmethod
def _process_state_message(self, message_dict: dict) -> None: ...

@abc.abstractmethod
def _process_activate_version_message(self, message_dict: dict) -> None: ...

@abc.abstractmethod
def _process_batch_message(self, message_dict: dict) -> None: ...

def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 # pragma: no cover
"""Internal method to process unknown message types from a Singer tap.

Args:
message_dict: Dictionary representation of the Singer message.

Raises:
ValueError: raised if a message type is not recognized
"""
record_type = message_dict["type"]
msg = f"Unknown message type '{record_type}' in message."
raise ValueError(msg)

def _process_endofpipe(self) -> None: # noqa: PLR6301
logger.debug("End of pipe reached")


class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta):
"""Interface for all plugins writing Singer messages as strings or bytes."""

def format_message(self, message: Message) -> T:
"""Format a message as a JSON string.

Args:
message: The message to format.

Returns:
The formatted message.
"""
return self.serialize_json(message.to_dict())

@abc.abstractmethod
def serialize_json(self, obj: object) -> T: ...

@abc.abstractmethod
def write_message(self, message: Message) -> None: ...
65 changes: 65 additions & 0 deletions singer_sdk/_singerlib/encoding/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

import json
import logging
import sys
import typing as t

from singer_sdk._singerlib.exceptions import InvalidInputLine
from singer_sdk._singerlib.json import deserialize_json, serialize_json

from .base import GenericSingerReader, GenericSingerWriter

if t.TYPE_CHECKING:
from singer_sdk._singerlib.messages import Message

logger = logging.getLogger(__name__)


class SingerReader(GenericSingerReader[str]):
"""Base class for all plugins reading Singer messages as strings from stdin."""

default_input = sys.stdin

def deserialize_json(self, line: str) -> dict: # noqa: PLR6301
"""Deserialize a line of json.

Args:
line: A single line of json.

Returns:
A dictionary of the deserialized json.

Raises:
InvalidInputLine: If the line is not valid JSON.
"""
try:
return deserialize_json(line)
except json.decoder.JSONDecodeError as exc:
logger.exception("Unable to parse:\n%s", line)
msg = f"Unable to parse line as JSON: {line}"
raise InvalidInputLine(msg) from exc


class SingerWriter(GenericSingerWriter[str]):
"""Interface for all plugins writing Singer messages to stdout."""

def serialize_json(self, obj: object) -> str: # noqa: PLR6301
"""Serialize a dictionary into a line of json.

Args:
obj: A Python object usually a dict.

Returns:
A string of serialized json.
"""
return serialize_json(obj)

def write_message(self, message: Message) -> None:
"""Write a message to stdout.

Args:
message: The message to write.
"""
sys.stdout.write(self.format_message(message) + "\n")
sys.stdout.flush()
9 changes: 9 additions & 0 deletions singer_sdk/_singerlib/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import annotations

__all__ = [
"InvalidInputLine",
]


class InvalidInputLine(Exception):
"""Raised when an input line is not a valid Singer message."""
61 changes: 61 additions & 0 deletions singer_sdk/_singerlib/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

import datetime
import decimal
import json
import typing as t

import simplejson

__all__ = [
"deserialize_json",
"serialize_json",
]


def _default_encoding(obj: t.Any) -> str: # noqa: ANN401
"""Default JSON encoder.

Args:
obj: The object to encode.

Returns:
The encoded object.
"""
return obj.isoformat(sep="T") if isinstance(obj, datetime.datetime) else str(obj)


def deserialize_json(json_str: str | bytes, **kwargs: t.Any) -> dict:
"""Deserialize a line of json.

Args:
json_str: A single line of json.
**kwargs: Optional key word arguments.

Returns:
A dictionary of the deserialized json.
"""
return json.loads( # type: ignore[no-any-return]
json_str,
parse_float=decimal.Decimal,
**kwargs,
)


def serialize_json(obj: object, **kwargs: t.Any) -> str:
"""Serialize a dictionary into a line of json.

Args:
obj: A Python object usually a dict.
**kwargs: Optional key word arguments.

Returns:
A string of serialized json.
"""
return simplejson.dumps(
obj,
use_decimal=True,
default=_default_encoding,
separators=(",", ":"),
**kwargs,
)
21 changes: 2 additions & 19 deletions singer_sdk/_singerlib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone

import simplejson as json
from singer_sdk._singerlib.json import serialize_json

if sys.version_info < (3, 11):
from backports.datetime_fromisoformat import MonkeyPatch
Expand All @@ -26,18 +26,6 @@ class SingerMessageType(str, enum.Enum):
BATCH = "BATCH"


def _default_encoding(obj: t.Any) -> str: # noqa: ANN401
"""Default JSON encoder.

Args:
obj: The object to encode.

Returns:
The encoded object.
"""
return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj)


def exclude_null_dict(pairs: list[tuple[str, t.Any]]) -> dict[str, t.Any]:
"""Exclude null values from a dictionary.

Expand Down Expand Up @@ -226,12 +214,7 @@ def format_message(message: Message) -> str:
Returns:
The formatted message.
"""
return json.dumps(
message.to_dict(),
use_decimal=True,
default=_default_encoding,
separators=(",", ":"),
)
return serialize_json(message.to_dict())


def write_message(message: Message) -> None:
Expand Down
Loading
Loading