Skip to content

Commit

Permalink
Move things around
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Dec 4, 2023
1 parent 59d9309 commit 4487f56
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 76 deletions.
10 changes: 9 additions & 1 deletion singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,12 @@ class ConformedNameClashException(Exception):


class MissingKeyPropertiesError(Exception):
"""Raised when a recieved (and/or transformed) record is missing key properties."""
"""Raised when a received (and/or transformed) record is missing key properties."""


class InvalidJSONSchema(Exception):
"""Raised when a JSON schema is invalid."""


class InvalidRecord(Exception):
"""Raised when a stream record is invalid according to its declared schema."""
4 changes: 2 additions & 2 deletions singer_sdk/io_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class SingerReader(metaclass=abc.ABCMeta):
def listen(self, file_input: t.IO[str] | None = None) -> None:
"""Read from input until all messages are processed.
This method is internal to the SDK and should not need to be overridden.
Args:
file_input: Readable stream of messages. Defaults to standard in.
This method is internal to the SDK and should not need to be overridden.
"""
if not file_input:
file_input = sys.stdin
Expand Down
84 changes: 35 additions & 49 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

import fastjsonschema

from singer_sdk.exceptions import MissingKeyPropertiesError
from singer_sdk.exceptions import (
InvalidJSONSchema,
InvalidRecord,
MissingKeyPropertiesError,
)
from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchConfig,
Expand All @@ -40,19 +44,11 @@
from singer_sdk.target_base import Target


class InvalidJSONSchema(Exception):
"""JSONSchema validation exception."""


class InvalidRecord(Exception):
"""JSONSchema invalid record data exception."""


class BaseJSONSchemaValidator(abc.ABC):
"""Abstract base class for JSONSchema validator."""

def __init__(self, schema: dict[str, t.Any]) -> None:
"""Initialize the validator.
"""Initialize the record validator.
Args:
schema: Schema of the stream to sink.
Expand All @@ -63,44 +59,42 @@ def __init__(self, schema: dict[str, t.Any]) -> None:
def validate(self, record: dict[str, t.Any]) -> None:
"""Validate a record message.
This method MUST raise an ``InvalidRecord`` exception if the record is invalid.
Args:
record: Record message to validate.
Returns: Record validation results.
"""
...


class FastJSONSchemaValidator(BaseJSONSchemaValidator):
"""Class to Interface with fastjsonschema validator."""
"""Validate records using the ``fastjsonschema`` library."""

def __init__(
self,
schema: dict,
*,
validate_formats: bool = False,
format_validators: dict[str, t.Callable],
format_validators: dict[str, t.Callable] | None = None,
):
"""Initialize the validator.
Args:
schema: Schema of the stream to sink.
validate_formats: Whether JSON string formats
(e.g. ``date-time``) should be validated.
format_validators: User defined format validators.
validate_formats: Whether JSON string formats (e.g. ``date-time``) should
be validated.
format_validators: User-defined format validators.
Raises:
InvalidJSONSchema: If the schema provided from tap or mapper is invalid.
"""
super().__init__(schema)
try:
self.validator = fastjsonschema.compile(
self.schema,
use_formats=validate_formats,
formats=format_validators,
formats=format_validators or {},
)
except fastjsonschema.exceptions.JsonSchemaDefinitionException as e:
except fastjsonschema.JsonSchemaDefinitionException as e:
error_message = f"Schema Validation Error: {e}"
raise InvalidJSONSchema(error_message) from e

Expand All @@ -110,10 +104,8 @@ def validate(self, record: dict): # noqa: ANN201
Args:
record: Record message to validate.
Returns: Record validation results.
Raises:
InvalidRecord: If the data in an element is formatted improperly.
InvalidRecord: If the record is invalid.
"""
try:
self.validator(record)
Expand All @@ -131,11 +123,11 @@ class Sink(metaclass=abc.ABCMeta):

MAX_SIZE_DEFAULT = 10000

record_validator = FastJSONSchemaValidator
validate_schema = True
"""Enable jsonschema validation, for example {"type": "integer"}."""
"""Enable JSON schema record validation."""

validate_field_string_format = False
"""Enable jsonschema format validation, for example `date-time` string fields."""
"""Enable JSON schema format validation, for example `date-time` string fields."""

def __init__(
self,
Expand Down Expand Up @@ -181,32 +173,24 @@ def __init__(
self._batch_records_read: int = 0
self._batch_dupe_records_merged: int = 0

self._validator: BaseJSONSchemaValidator | None = (
self.get_validator() if self.validate_schema else None
)
self._validator: BaseJSONSchemaValidator | None = self.get_validator()

def get_validator(self) -> BaseJSONSchemaValidator | None:
"""Get a record validator for this sink.
Override this method to use a custom format validator
or disable jsonschema validator, by returning `None`.
Override this method to use a custom format validator, or disable record
validation by returning `None`.
Returns:
An instance of a subclass of ``BaseJSONSchemaValidator``.
"""
return FastJSONSchemaValidator(
self.schema,
validate_formats=self.validate_field_string_format,
format_validators=self.get_validator_formats(),
)

def get_validator_formats(self) -> dict:
"""Get formats for JSON schema validator.
Returns:
A dictionary containing regex strings and callables
"""
return {}
if self.validate_schema:
return FastJSONSchemaValidator(
self.schema,
validate_formats=self.validate_field_string_format,
format_validators={},
)
return None

def _get_context(self, record: dict) -> dict: # noqa: ARG002
"""Return an empty dictionary by default.
Expand Down Expand Up @@ -437,12 +421,14 @@ def _validate_and_parse(self, record: dict) -> dict:
Returns:
TODO
"""
if self._validator:
if self._validator is not None:
# TODO: Check the performance impact of this try/except block. It runs
# on every record, so it's probably bad and should be moved up the stack.
try:
self._validator.validate(record)
except InvalidRecord:
message = "Record message validation failed"
self.logger.exception(message)
except InvalidRecord as e:
self.logger.exception("Record validation failed %s", e)

self._parse_timestamps_in_record(
record=record,
schema=self.schema,
Expand Down
29 changes: 15 additions & 14 deletions tests/core/sinks/test_format_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import typing as t

import pytest
from typing_extensions import override

from singer_sdk.sinks.core import InvalidRecord, Sink
from singer_sdk.sinks.core import FastJSONSchemaValidator, InvalidRecord, Sink
from singer_sdk.target_base import Target

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -78,19 +79,19 @@ class CustomSink(Sink):

validate_field_string_format = True

def get_validator_formats(self) -> dict:
"""Get formats for JSON schema validator.
Returns:
A dictionary containing regex strings and callables
"""
return {
"date-time": (
r"^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])"
r"T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]+)?"
r"(Z|[+-](?:2[0-3]|[01][0-9]):[0-5][0-9])?$"
),
}
@override
def get_validator(self) -> BaseJSONSchemaValidator | None:
return FastJSONSchemaValidator(
self.schema,
validate_formats=self.validate_field_string_format,
format_validators={
"date-time": (
r"^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])"
r"T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\.[0-9]+)?"
r"(Z|[+-](?:2[0-3]|[01][0-9]):[0-5][0-9])?$"
),
},
)

def process_batch(self, context: dict) -> None:
pass
Expand Down
7 changes: 1 addition & 6 deletions tests/core/sinks/test_type_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,4 @@ def process_batch(self, context: dict) -> None:
def process_record(self, record: dict, context: dict) -> None:
pass

try:
CustomSink(target, "test_stream", test_schema_invalid, None)
except InvalidJSONSchema:
pytest.fail("InvalidJSONSchema should not have been raised")
else:
pass
CustomSink(target, "test_stream", test_schema_invalid, None)
5 changes: 1 addition & 4 deletions tests/core/sinks/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ def test_validate_record_jsonschema_format_checking_enabled(capsys, draft7_sink)
)
assert updated_record["missing_datetime"] == "2021-01-01T00:00:00+00:00"
assert updated_record["invalid_datetime"] == "9999-12-31 23:59:59.999999"
assert (
"Record Message Validation Error: data.invalid_datetime must be date-time"
in captured.err
)
assert "data.invalid_datetime must be date-time" in captured.err


@pytest.fixture
Expand Down

0 comments on commit 4487f56

Please sign in to comment.