diff --git a/docs/builtin.rst b/docs/builtin.rst new file mode 100644 index 000000000..80299bd62 --- /dev/null +++ b/docs/builtin.rst @@ -0,0 +1,36 @@ +Built-in Settings and Capabilities +================================== + +.. currentmodule:: singer_sdk.helpers.capabilities + +The Singer SDK library provides a number of built-in settings and capabilities. + +.. autodata:: ACTIVATE_VERSION + :no-value: + +.. autodata:: ADD_RECORD_METADATA + :no-value: + +.. autodata:: BATCH + :no-value: + +.. autodata:: FLATTENING + :no-value: + +.. autodata:: STREAM_MAPS + :no-value: + +.. autodata:: TARGET_BATCH_SIZE_ROWS + :no-value: + +.. autodata:: TARGET_HARD_DELETE + :no-value: + +.. autodata:: TARGET_LOAD_METHOD + :no-value: + +.. autodata:: TARGET_SCHEMA + :no-value: + +.. autodata:: TARGET_VALIDATE_RECORDS + :no-value: diff --git a/docs/index.rst b/docs/index.rst index 081c56f24..a9c89fbb8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -57,6 +57,7 @@ within the `#singer-tap-development`_ and `#singer-target-development`_ Slack ch implementation/index typing capabilities + builtin .. toctree:: :caption: Advanced Concepts diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 540de023e..7bc8c6efe 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -17,8 +17,8 @@ from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.exceptions import ConfigValidationError +from singer_sdk.helpers import capabilities from singer_sdk.helpers._util import dump_json, load_json -from singer_sdk.helpers.capabilities import TargetLoadMethods if sys.version_info < (3, 13): from typing_extensions import deprecated @@ -1247,7 +1247,7 @@ def prepare_table( as_temp_table=as_temp_table, ) return - if self.config["load_method"] == TargetLoadMethods.OVERWRITE: + if self.config["load_method"] == capabilities.TargetLoadMethods.OVERWRITE: self.get_table(full_table_name=full_table_name).drop(self._engine) self.create_empty_table( full_table_name=full_table_name, diff --git a/singer_sdk/helpers/capabilities/__init__.py b/singer_sdk/helpers/capabilities/__init__.py new file mode 100644 index 000000000..2db0a849b --- /dev/null +++ b/singer_sdk/helpers/capabilities/__init__.py @@ -0,0 +1,191 @@ +"""Module with helpers to declare capabilities and plugin behavior.""" + +from __future__ import annotations + +from singer_sdk.helpers.capabilities import _schema as schema +from singer_sdk.helpers.capabilities._builtin import Builtin +from singer_sdk.helpers.capabilities._config_property import ConfigProperty +from singer_sdk.helpers.capabilities._enum import ( + CapabilitiesEnum, + PluginCapabilities, + TapCapabilities, + TargetCapabilities, + TargetLoadMethods, +) + +__all__ = [ + "ADD_RECORD_METADATA", + "BATCH", + "FLATTENING", + "STREAM_MAPS", + "TARGET_BATCH_SIZE_ROWS", + "TARGET_HARD_DELETE", + "TARGET_LOAD_METHOD", + "TARGET_SCHEMA", + "TARGET_VALIDATE_RECORDS", + "CapabilitiesEnum", + "ConfigProperty", + "PluginCapabilities", + "TapCapabilities", + "TargetCapabilities", + "TargetLoadMethods", +] + +#: Support the `ACTIVATE_VERSION `_ +#: extension. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "activate_version": true +#: } +#: +ACTIVATE_VERSION = Builtin( + schema=schema.ACTIVATE_VERSION_CONFIG, + capability=PluginCapabilities.ACTIVATE_VERSION, +) + +#: Add metadata to records. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "add_record_metadata": true +#: } +#: +ADD_RECORD_METADATA = Builtin(schema=schema.ADD_RECORD_METADATA_CONFIG) + +#: For taps, support emitting BATCH messages. For targets, support consuming BATCH +#: messages. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "batch_config": { +#: "encoding": { +#: "format": "jsonl", +#: "compression": "gzip" +#: }, +#: "storage": { +#: "type": "root", +#: "root": "file:///path/to/batch/files", +#: "prefix": "batch-" +#: } +#: } +#: } +#: +BATCH = Builtin( + schema=schema.BATCH_CONFIG, + capability=PluginCapabilities.BATCH, +) + +#: Support schema flattening, aka de-nesting of complex properties. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "flattening_enabled": true, +#: "flattening_max_depth": 3 +#: } +#: +FLATTENING = Builtin( + schema=schema.FLATTENING_CONFIG, + capability=PluginCapabilities.FLATTENING, +) + +#: Support inline stream map transforms. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "stream_maps": { +#: "users": { +#: "id": "id", +#: "fields": "[f for f in fields if f['key'] != 'age']" +#: } +#: } +#: } +#: +STREAM_MAPS = Builtin( + schema.STREAM_MAPS_CONFIG, + capability=PluginCapabilities.STREAM_MAPS, +) + +#: Target batch size in rows. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "batch_size_rows": 10000 +#: } +#: +TARGET_BATCH_SIZE_ROWS = Builtin(schema=schema.TARGET_BATCH_SIZE_ROWS_CONFIG) + +#: Support hard delete capability. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "hard_delete": true +#: } +#: +TARGET_HARD_DELETE = Builtin( + schema=schema.TARGET_HARD_DELETE_CONFIG, + capability=TargetCapabilities.HARD_DELETE, +) + +#: Target load method. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "load_method": "upsert" +#: } +#: +TARGET_LOAD_METHOD = Builtin(schema=schema.TARGET_LOAD_METHOD_CONFIG) + +#: Allow setting the target schema. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "default_target_schema": "my_schema" +#: } +#: +TARGET_SCHEMA = Builtin( + schema=schema.TARGET_SCHEMA_CONFIG, + capability=TargetCapabilities.TARGET_SCHEMA, +) + +#: Validate incoming records against their declared schema. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "validate_records": true +#: } +#: +TARGET_VALIDATE_RECORDS = Builtin( + schema=schema.TARGET_VALIDATE_RECORDS_CONFIG, + capability=TargetCapabilities.VALIDATE_RECORDS, +) diff --git a/singer_sdk/helpers/capabilities/_builtin.py b/singer_sdk/helpers/capabilities/_builtin.py new file mode 100644 index 000000000..b3a2c1479 --- /dev/null +++ b/singer_sdk/helpers/capabilities/_builtin.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import typing as t + +from ._config_property import ConfigProperty + +if t.TYPE_CHECKING: + from ._enum import CapabilitiesEnum + +_T = t.TypeVar("_T") + + +class Builtin: + """Use this class to define built-in setting(s) for a plugin.""" + + def __init__( + self, + schema: dict[str, t.Any], + *, + capability: CapabilitiesEnum | None = None, + **kwargs: t.Any, + ): + """Initialize the descriptor. + + Args: + schema: The JSON schema for the setting. + capability: The capability that the setting is associated with. + kwargs: Additional keyword arguments. + """ + self.schema = schema + self.capability = capability + self.kwargs = kwargs + + def attribute( # noqa: PLR6301 + self, + custom_key: str | None = None, + *, + default: _T | None = None, + ) -> ConfigProperty[_T]: + """Generate a class attribute for the setting. + + Args: + custom_key: Custom key to use in the config. + default: Default value for the setting. + + Returns: + Class attribute for the setting. + """ + return ConfigProperty(custom_key=custom_key, default=default) diff --git a/singer_sdk/helpers/capabilities/_config_property.py b/singer_sdk/helpers/capabilities/_config_property.py new file mode 100644 index 000000000..ef3e35b56 --- /dev/null +++ b/singer_sdk/helpers/capabilities/_config_property.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import typing as t + +T = t.TypeVar("T") + + +class ConfigProperty(t.Generic[T]): + """A descriptor that gets a value from a named key of the config attribute.""" + + def __init__(self, custom_key: str | None = None, *, default: T | None = None): + """Initialize the descriptor. + + Args: + custom_key: The key to get from the config attribute instead of the + attribute name. + default: The default value if the key is not found. + """ + self.key = custom_key + self.default = default + + def __set_name__(self, owner, name: str) -> None: # noqa: ANN001 + """Set the name of the attribute. + + Args: + owner: The class of the object. + name: The name of the attribute. + """ + self.key = self.key or name + + def __get__(self, instance, owner) -> T | None: # noqa: ANN001 + """Get the value from the instance's config attribute. + + Args: + instance: The instance of the object. + owner: The class of the object. + + Returns: + The value from the config attribute. + """ + return instance.config.get(self.key, self.default) # type: ignore[no-any-return] diff --git a/singer_sdk/helpers/capabilities/_enum.py b/singer_sdk/helpers/capabilities/_enum.py new file mode 100644 index 000000000..a6bbc46bc --- /dev/null +++ b/singer_sdk/helpers/capabilities/_enum.py @@ -0,0 +1,197 @@ +from __future__ import annotations + +import enum +import typing as t +import warnings + +_EnumMemberT = t.TypeVar("_EnumMemberT") + + +class TargetLoadMethods(str, enum.Enum): + """Target-specific capabilities.""" + + # always write all input records whether that records already exists or not + APPEND_ONLY = "append-only" + + # update existing records and insert new records + UPSERT = "upsert" + + # delete all existing records and insert all input records + OVERWRITE = "overwrite" + + +class DeprecatedEnum(enum.Enum): + """Base class for capabilities enumeration.""" + + def __new__( + cls, + value: _EnumMemberT, + deprecation: str | None = None, + ) -> DeprecatedEnum: + """Create a new enum member. + + Args: + value: Enum member value. + deprecation: Deprecation message. + + Returns: + An enum member value. + """ + member: DeprecatedEnum = object.__new__(cls) + member._value_ = value + member.deprecation = deprecation + return member + + @property + def deprecation_message(self) -> str | None: + """Get deprecation message. + + Returns: + Deprecation message. + """ + self.deprecation: str | None + return self.deprecation + + def emit_warning(self) -> None: + """Emit deprecation warning.""" + warnings.warn( + f"{self.name} is deprecated. {self.deprecation_message}", + DeprecationWarning, + stacklevel=3, + ) + + +class DeprecatedEnumMeta(enum.EnumMeta): + """Metaclass for enumeration with deprecation support.""" + + def __getitem__(cls, name: str) -> t.Any: # noqa: ANN401 + """Retrieve mapping item. + + Args: + name: Item name. + + Returns: + Enum member. + """ + obj: enum.Enum = super().__getitem__(name) + if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: + obj.emit_warning() + return obj + + def __getattribute__(cls, name: str) -> t.Any: # noqa: ANN401 + """Retrieve enum attribute. + + Args: + name: Attribute name. + + Returns: + Attribute. + """ + obj = super().__getattribute__(name) + if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: + obj.emit_warning() + return obj + + def __call__(cls, *args: t.Any, **kwargs: t.Any) -> t.Any: # noqa: ANN401 + """Call enum member. + + Args: + args: Positional arguments. + kwargs: Keyword arguments. + + Returns: + Enum member. + """ + obj = super().__call__(*args, **kwargs) + if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: + obj.emit_warning() + return obj + + +class CapabilitiesEnum(DeprecatedEnum, metaclass=DeprecatedEnumMeta): + """Base capabilities enumeration.""" + + def __str__(self) -> str: + """String representation. + + Returns: + Stringified enum value. + """ + return str(self.value) + + def __repr__(self) -> str: + """String representation. + + Returns: + Stringified enum value. + """ + return str(self.value) + + +class PluginCapabilities(CapabilitiesEnum): + """Core capabilities which can be supported by taps and targets.""" + + #: Support plugin capability and setting discovery. + ABOUT = "about" + + #: Support :doc:`inline stream map transforms`. + STREAM_MAPS = "stream-maps" + + #: Support schema flattening, aka de-nesting of complex properties. + FLATTENING = "schema-flattening" + + #: Support the + #: `ACTIVATE_VERSION `_ + #: extension. + ACTIVATE_VERSION = "activate-version" + + #: Input and output from + #: `batched files `_. + #: A.K.A ``FAST_SYNC``. + BATCH = "batch" + + +class TapCapabilities(CapabilitiesEnum): + """Tap-specific capabilities.""" + + #: Generate a catalog with `--discover`. + DISCOVER = "discover" + + #: Accept input catalog, apply metadata and selection rules. + CATALOG = "catalog" + + #: Incremental refresh by means of state tracking. + STATE = "state" + + #: Automatic connectivity and stream init test via :ref:`--test`. + TEST = "test" + + #: Support for ``replication_method: LOG_BASED``. You can read more about this + #: feature in `MeltanoHub `_. + LOG_BASED = "log-based" + + #: Deprecated. Please use :attr:`~TapCapabilities.CATALOG` instead. + PROPERTIES = "properties", "Please use CATALOG instead." + + +class TargetCapabilities(CapabilitiesEnum): + """Target-specific capabilities.""" + + #: Allows a ``soft_delete=True`` config option. + #: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION` + #: and/or :attr:`TapCapabilities.LOG_BASED`. + SOFT_DELETE = "soft-delete" + + #: Allows a ``hard_delete=True`` config option. + #: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION` + #: and/or :attr:`TapCapabilities.LOG_BASED`. + HARD_DELETE = "hard-delete" + + #: Fail safe for unknown JSON Schema types. + DATATYPE_FAILSAFE = "datatype-failsafe" + + #: Allow setting the target schema. + TARGET_SCHEMA = "target-schema" + + #: Validate the schema of the incoming records. + VALIDATE_RECORDS = "validate-records" diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities/_schema.py similarity index 53% rename from singer_sdk/helpers/capabilities.py rename to singer_sdk/helpers/capabilities/_schema.py index a679bfd84..c6e25f748 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities/_schema.py @@ -1,11 +1,7 @@ -"""Module with helpers to declare capabilities and plugin behavior.""" +"""Default JSON Schema to support config for built-in capabilities.""" from __future__ import annotations -import typing as t -from enum import Enum, EnumMeta -from warnings import warn - from singer_sdk.typing import ( ArrayType, BooleanType, @@ -18,9 +14,7 @@ StringType, ) -_EnumMemberT = t.TypeVar("_EnumMemberT") - -# Default JSON Schema to support config for built-in capabilities: +from ._enum import TargetLoadMethods STREAM_MAPS_CONFIG = PropertiesList( Property( @@ -65,11 +59,12 @@ description=( "Config for the [`Faker`](https://faker.readthedocs.io/en/master/) " "instance variable `fake` used within map expressions. Only applicable if " - "the plugin specifies `faker` as an addtional dependency (through the " + "the plugin specifies `faker` as an additional dependency (through the " "`singer-sdk` `faker` extra or directly)." ), ), ).to_dict() + FLATTENING_CONFIG = PropertiesList( Property( "flattening_enabled", @@ -87,6 +82,7 @@ description="The max depth to flatten schemas.", ), ).to_dict() + BATCH_CONFIG = PropertiesList( Property( "batch_config", @@ -136,6 +132,7 @@ ), ), ).to_dict() + TARGET_SCHEMA_CONFIG = PropertiesList( Property( "default_target_schema", @@ -144,6 +141,7 @@ description="The default target database schema name to use for all streams.", ), ).to_dict() + ACTIVATE_VERSION_CONFIG = PropertiesList( Property( "activate_version", @@ -153,6 +151,7 @@ description="Whether to process `ACTIVATE_VERSION` messages.", ), ).to_dict() + ADD_RECORD_METADATA_CONFIG = PropertiesList( Property( "add_record_metadata", @@ -161,6 +160,7 @@ description="Whether to add metadata fields to records.", ), ).to_dict() + TARGET_HARD_DELETE_CONFIG = PropertiesList( Property( "hard_delete", @@ -170,6 +170,7 @@ default=False, ), ).to_dict() + TARGET_VALIDATE_RECORDS_CONFIG = PropertiesList( Property( "validate_records", @@ -179,6 +180,7 @@ default=True, ), ).to_dict() + TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList( Property( "batch_size_rows", @@ -188,20 +190,6 @@ ), ).to_dict() - -class TargetLoadMethods(str, Enum): - """Target-specific capabilities.""" - - # always write all input records whether that records already exists or not - APPEND_ONLY = "append-only" - - # update existing records and insert new records - UPSERT = "upsert" - - # delete all existing records and insert all input records - OVERWRITE = "overwrite" - - TARGET_LOAD_METHOD_CONFIG = PropertiesList( Property( "load_method", @@ -221,180 +209,3 @@ class TargetLoadMethods(str, Enum): default=TargetLoadMethods.APPEND_ONLY, ), ).to_dict() - - -class DeprecatedEnum(Enum): - """Base class for capabilities enumeration.""" - - def __new__( - cls, - value: _EnumMemberT, - deprecation: str | None = None, - ) -> DeprecatedEnum: - """Create a new enum member. - - Args: - value: Enum member value. - deprecation: Deprecation message. - - Returns: - An enum member value. - """ - member: DeprecatedEnum = object.__new__(cls) - member._value_ = value - member.deprecation = deprecation - return member - - @property - def deprecation_message(self) -> str | None: - """Get deprecation message. - - Returns: - Deprecation message. - """ - self.deprecation: str | None - return self.deprecation - - def emit_warning(self) -> None: - """Emit deprecation warning.""" - warn( - f"{self.name} is deprecated. {self.deprecation_message}", - DeprecationWarning, - stacklevel=3, - ) - - -class DeprecatedEnumMeta(EnumMeta): - """Metaclass for enumeration with deprecation support.""" - - def __getitem__(cls, name: str) -> t.Any: # noqa: ANN401 - """Retrieve mapping item. - - Args: - name: Item name. - - Returns: - Enum member. - """ - obj: Enum = super().__getitem__(name) - if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: - obj.emit_warning() - return obj - - def __getattribute__(cls, name: str) -> t.Any: # noqa: ANN401 - """Retrieve enum attribute. - - Args: - name: Attribute name. - - Returns: - Attribute. - """ - obj = super().__getattribute__(name) - if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: - obj.emit_warning() - return obj - - def __call__(cls, *args: t.Any, **kwargs: t.Any) -> t.Any: # noqa: ANN401 - """Call enum member. - - Args: - args: Positional arguments. - kwargs: Keyword arguments. - - Returns: - Enum member. - """ - obj = super().__call__(*args, **kwargs) - if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: - obj.emit_warning() - return obj - - -class CapabilitiesEnum(DeprecatedEnum, metaclass=DeprecatedEnumMeta): - """Base capabilities enumeration.""" - - def __str__(self) -> str: - """String representation. - - Returns: - Stringified enum value. - """ - return str(self.value) - - def __repr__(self) -> str: - """String representation. - - Returns: - Stringified enum value. - """ - return str(self.value) - - -class PluginCapabilities(CapabilitiesEnum): - """Core capabilities which can be supported by taps and targets.""" - - #: Support plugin capability and setting discovery. - ABOUT = "about" - - #: Support :doc:`inline stream map transforms`. - STREAM_MAPS = "stream-maps" - - #: Support schema flattening, aka denesting of complex properties. - FLATTENING = "schema-flattening" - - #: Support the - #: `ACTIVATE_VERSION `_ - #: extension. - ACTIVATE_VERSION = "activate-version" - - #: Input and output from - #: `batched files `_. - #: A.K.A ``FAST_SYNC``. - BATCH = "batch" - - -class TapCapabilities(CapabilitiesEnum): - """Tap-specific capabilities.""" - - #: Generate a catalog with `--discover`. - DISCOVER = "discover" - - #: Accept input catalog, apply metadata and selection rules. - CATALOG = "catalog" - - #: Incremental refresh by means of state tracking. - STATE = "state" - - #: Automatic connectivity and stream init test via :ref:`--test`. - TEST = "test" - - #: Support for ``replication_method: LOG_BASED``. You can read more about this - #: feature in `MeltanoHub `_. - LOG_BASED = "log-based" - - #: Deprecated. Please use :attr:`~TapCapabilities.CATALOG` instead. - PROPERTIES = "properties", "Please use CATALOG instead." - - -class TargetCapabilities(CapabilitiesEnum): - """Target-specific capabilities.""" - - #: Allows a ``soft_delete=True`` config option. - #: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION` - #: and/or :attr:`TapCapabilities.LOG_BASED`. - SOFT_DELETE = "soft-delete" - - #: Allows a ``hard_delete=True`` config option. - #: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION` - #: and/or :attr:`TapCapabilities.LOG_BASED`. - HARD_DELETE = "hard-delete" - - #: Fail safe for unknown JSON Schema types. - DATATYPE_FAILSAFE = "datatype-failsafe" - - #: Allow setting the target schema. - TARGET_SCHEMA = "target-schema" - - #: Validate the schema of the incoming records. - VALIDATE_RECORDS = "validate-records" diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index 004d0f60a..2e27b8093 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -654,7 +654,7 @@ def _init_faker_instance(self) -> Faker | None: class PluginMapper: - """Inline map tranformer.""" + """Inline map transformer.""" def __init__( self, diff --git a/singer_sdk/mapper_base.py b/singer_sdk/mapper_base.py index 650552f5f..82ceb74ca 100644 --- a/singer_sdk/mapper_base.py +++ b/singer_sdk/mapper_base.py @@ -7,7 +7,6 @@ import click -from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities from singer_sdk.io_base import SingerReader, SingerWriter from singer_sdk.plugin_base import PluginBase @@ -19,16 +18,10 @@ class InlineMapper(PluginBase, SingerReader, SingerWriter, metaclass=abc.ABCMeta): """Abstract base class for inline mappers.""" - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301 - """Get capabilities. - - Returns: - A list of plugin capabilities. - """ - return [ - PluginCapabilities.STREAM_MAPS, - ] + #: A list of plugin capabilities. + capabilities: t.ClassVar[list[CapabilitiesEnum]] = [ + PluginCapabilities.STREAM_MAPS, + ] def _write_messages(self, messages: t.Iterable[singer.Message]) -> None: for message in messages: diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 62f72f804..7feed79c0 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -22,16 +22,11 @@ parse_environment_config, ) from singer_sdk.exceptions import ConfigValidationError +from singer_sdk.helpers import capabilities from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._compat import SingerSDKDeprecationWarning from singer_sdk.helpers._secrets import SecretString, is_common_secret_key from singer_sdk.helpers._util import read_json_file -from singer_sdk.helpers.capabilities import ( - FLATTENING_CONFIG, - STREAM_MAPS_CONFIG, - CapabilitiesEnum, - PluginCapabilities, -) from singer_sdk.mapper import PluginMapper from singer_sdk.typing import ( DEFAULT_JSONSCHEMA_VALIDATOR, @@ -108,7 +103,7 @@ def _format_validation_error(error: ValidationError) -> str: return result -class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904 +class PluginBase(metaclass=abc.ABCMeta): """Abstract base class for taps.""" #: The executable name of the tap or target plugin. e.g. tap-foo @@ -122,6 +117,14 @@ class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904 _config: dict + #: Advertised built-in plugin capabilities. Developers may override this property + #: in order to add or remove advertised capabilities for this plugin. + capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [ + capabilities.PluginCapabilities.STREAM_MAPS, + capabilities.PluginCapabilities.FLATTENING, + capabilities.PluginCapabilities.BATCH, + ] + @classproperty def logger(cls) -> logging.Logger: # noqa: N805 """Get logger. @@ -248,22 +251,6 @@ def initialized_at(self) -> int: """ return self.__initialized_at - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301 - """Get capabilities. - - Developers may override this property in oder to add or remove - advertised capabilities for this plugin. - - Returns: - A list of plugin capabilities. - """ - return [ - PluginCapabilities.STREAM_MAPS, - PluginCapabilities.FLATTENING, - PluginCapabilities.BATCH, - ] - @classproperty def _env_var_prefix(cls) -> str: # noqa: N805 return f"{cls.name.upper().replace('-', '_')}_" @@ -487,12 +474,17 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non Args: config_jsonschema: [description] """ - capabilities = cls.capabilities - if PluginCapabilities.STREAM_MAPS in capabilities: - merge_missing_config_jsonschema(STREAM_MAPS_CONFIG, config_jsonschema) + if capabilities.STREAM_MAPS.capability in cls.capabilities: + merge_missing_config_jsonschema( + capabilities.STREAM_MAPS.schema, + config_jsonschema, + ) - if PluginCapabilities.FLATTENING in capabilities: - merge_missing_config_jsonschema(FLATTENING_CONFIG, config_jsonschema) + if capabilities.FLATTENING.capability in cls.capabilities: + merge_missing_config_jsonschema( + capabilities.FLATTENING.schema, + config_jsonschema, + ) @classmethod def print_about( diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 7d9d24a3c..02f2aa2f3 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -8,7 +8,6 @@ import importlib.util import time import typing as t -from functools import cached_property from gzip import GzipFile from gzip import open as gzip_open from types import MappingProxyType @@ -24,6 +23,7 @@ InvalidRecord, MissingKeyPropertiesError, ) +from singer_sdk.helpers import capabilities from singer_sdk.helpers._batch import ( BaseBatchFileEncoding, BatchConfig, @@ -144,6 +144,22 @@ class Sink(metaclass=abc.ABCMeta): # noqa: PLR0904 fail_on_record_validation_exception: bool = True """Interrupt the target execution when a record fails schema validation.""" + #: Enable JSON schema record validation. + validate_schema = capabilities.TARGET_VALIDATE_RECORDS.attribute( + "validate_records", + default=True, + ) + + include_sdc_metadata_properties = capabilities.ADD_RECORD_METADATA.attribute( + "add_record_metadata", + default=False, + ) + + process_activate_version_messages = capabilities.ACTIVATE_VERSION.attribute( + "activate_version", + default=True, + ) + def __init__( self, target: Target, @@ -220,15 +236,6 @@ def batch_processing_timer(self) -> metrics.Timer: """ return self._batch_timer - @cached_property - def validate_schema(self) -> bool: - """Enable JSON schema record validation. - - Returns: - True if JSON schema validation is enabled. - """ - return self.config.get("validate_records", True) - def get_validator(self) -> BaseJSONSchemaValidator | None: """Get a record validator for this sink. @@ -390,24 +397,6 @@ def batch_config(self) -> BatchConfig | None: raw = self.config.get("batch_config") return BatchConfig.from_dict(raw) if raw else None - @property - def include_sdc_metadata_properties(self) -> bool: - """Check if metadata columns should be added. - - Returns: - True if metadata columns should be added. - """ - return self.config.get("add_record_metadata", False) - - @property - def process_activate_version_messages(self) -> bool: - """Check if activate version messages should be processed. - - Returns: - True if activate version messages should be processed. - """ - return self.config.get("activate_version", True) - @property def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum: """Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL. diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index cccbe4fd5..56edcfb70 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -18,17 +18,10 @@ AbortedSyncPausedException, ConfigValidationError, ) -from singer_sdk.helpers import _state -from singer_sdk.helpers._classproperty import classproperty +from singer_sdk.helpers import _state, capabilities from singer_sdk.helpers._compat import SingerSDKDeprecationWarning from singer_sdk.helpers._state import write_stream_state from singer_sdk.helpers._util import dump_json, read_json_file -from singer_sdk.helpers.capabilities import ( - BATCH_CONFIG, - CapabilitiesEnum, - PluginCapabilities, - TapCapabilities, -) from singer_sdk.io_base import SingerWriter from singer_sdk.plugin_base import PluginBase @@ -61,6 +54,17 @@ class Tap(PluginBase, SingerWriter, metaclass=abc.ABCMeta): # noqa: PLR0904 """Whether the tap's catalog is dynamic. Set to True if the catalog is generated dynamically (e.g. by querying a database's system tables).""" + #: A list of capabilities supported by this tap. + capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [ + capabilities.TapCapabilities.CATALOG, + capabilities.TapCapabilities.STATE, + capabilities.TapCapabilities.DISCOVER, + capabilities.PluginCapabilities.ABOUT, + capabilities.PluginCapabilities.STREAM_MAPS, + capabilities.PluginCapabilities.FLATTENING, + capabilities.PluginCapabilities.BATCH, + ] + # Constructor def __init__( @@ -193,23 +197,6 @@ def setup_mapper(self) -> None: super().setup_mapper() self.mapper.register_raw_streams_from_catalog(self.catalog) - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301 - """Get tap capabilities. - - Returns: - A list of capabilities supported by this tap. - """ - return [ - TapCapabilities.CATALOG, - TapCapabilities.STATE, - TapCapabilities.DISCOVER, - PluginCapabilities.ABOUT, - PluginCapabilities.STREAM_MAPS, - PluginCapabilities.FLATTENING, - PluginCapabilities.BATCH, - ] - @classmethod def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> None: """Appends built-in config to `config_jsonschema` if not already set. @@ -228,9 +215,11 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non """ PluginBase.append_builtin_config(config_jsonschema) - capabilities = cls.capabilities - if PluginCapabilities.BATCH in capabilities: - merge_missing_config_jsonschema(BATCH_CONFIG, config_jsonschema) + if capabilities.BATCH.capability in cls.capabilities: # pragma: no branch + merge_missing_config_jsonschema( + capabilities.BATCH.schema, + config_jsonschema, + ) # Connection and sync tests: diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index c30c2084a..e7d925cff 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -13,21 +13,8 @@ from joblib import Parallel, delayed, parallel_config from singer_sdk.exceptions import RecordsWithoutSchemaException +from singer_sdk.helpers import capabilities from singer_sdk.helpers._batch import BaseBatchFileEncoding -from singer_sdk.helpers._classproperty import classproperty -from singer_sdk.helpers.capabilities import ( - ACTIVATE_VERSION_CONFIG, - ADD_RECORD_METADATA_CONFIG, - BATCH_CONFIG, - TARGET_BATCH_SIZE_ROWS_CONFIG, - TARGET_HARD_DELETE_CONFIG, - TARGET_LOAD_METHOD_CONFIG, - TARGET_SCHEMA_CONFIG, - TARGET_VALIDATE_RECORDS_CONFIG, - CapabilitiesEnum, - PluginCapabilities, - TargetCapabilities, -) from singer_sdk.io_base import SingerMessageType, SingerReader from singer_sdk.plugin_base import PluginBase @@ -57,6 +44,14 @@ class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta): # Required if `Target.get_sink_class()` is not defined. default_sink_class: type[Sink] + #: Target capabilities. + capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [ + capabilities.PluginCapabilities.ABOUT, + capabilities.PluginCapabilities.STREAM_MAPS, + capabilities.PluginCapabilities.FLATTENING, + capabilities.TargetCapabilities.VALIDATE_RECORDS, + ] + def __init__( self, *, @@ -96,20 +91,6 @@ def __init__( if setup_mapper: self.setup_mapper() - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301 - """Get target capabilities. - - Returns: - A list of capabilities supported by this target. - """ - return [ - PluginCapabilities.ABOUT, - PluginCapabilities.STREAM_MAPS, - PluginCapabilities.FLATTENING, - TargetCapabilities.VALIDATE_RECORDS, - ] - @property def max_parallelism(self) -> int: """Get max parallel sinks. @@ -629,20 +610,21 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: if k not in target_jsonschema["properties"]: target_jsonschema["properties"][k] = v - _merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema) - _merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema) - _merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema) - - capabilities = cls.capabilities + _merge_missing(capabilities.ADD_RECORD_METADATA.schema, config_jsonschema) + _merge_missing(capabilities.TARGET_LOAD_METHOD.schema, config_jsonschema) + _merge_missing(capabilities.TARGET_BATCH_SIZE_ROWS.schema, config_jsonschema) - if PluginCapabilities.ACTIVATE_VERSION in capabilities: - _merge_missing(ACTIVATE_VERSION_CONFIG, config_jsonschema) + if capabilities.ACTIVATE_VERSION.capability in cls.capabilities: + _merge_missing(capabilities.ACTIVATE_VERSION.schema, config_jsonschema) - if PluginCapabilities.BATCH in capabilities: - _merge_missing(BATCH_CONFIG, config_jsonschema) + if capabilities.BATCH.capability in cls.capabilities: + _merge_missing(capabilities.BATCH.schema, config_jsonschema) - if TargetCapabilities.VALIDATE_RECORDS in capabilities: - _merge_missing(TARGET_VALIDATE_RECORDS_CONFIG, config_jsonschema) + if capabilities.TARGET_VALIDATE_RECORDS.capability in cls.capabilities: + _merge_missing( + capabilities.TARGET_VALIDATE_RECORDS.schema, + config_jsonschema, + ) super().append_builtin_config(config_jsonschema) @@ -654,6 +636,14 @@ class SQLTarget(Target): default_sink_class: type[SQLSink] + #: Target capabilities. + capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [ + *Target.capabilities, + capabilities.PluginCapabilities.ACTIVATE_VERSION, + capabilities.TargetCapabilities.TARGET_SCHEMA, + capabilities.TargetCapabilities.HARD_DELETE, + ] + @property def target_connector(self) -> SQLConnector: """The connector object. @@ -667,24 +657,6 @@ def target_connector(self) -> SQLConnector: ) return self._target_connector - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: - """Get target capabilities. - - Returns: - A list of capabilities supported by this target. - """ - sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities - sql_target_capabilities.extend( - [ - PluginCapabilities.ACTIVATE_VERSION, - TargetCapabilities.TARGET_SCHEMA, - TargetCapabilities.HARD_DELETE, - ] - ) - - return sql_target_capabilities - @classmethod def append_builtin_config(cls: type[SQLTarget], config_jsonschema: dict) -> None: """Appends built-in config to `config_jsonschema` if not already set. @@ -708,13 +680,11 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: if k not in target_jsonschema["properties"]: target_jsonschema["properties"][k] = v - capabilities = cls.capabilities - - if TargetCapabilities.TARGET_SCHEMA in capabilities: - _merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema) + if capabilities.TARGET_SCHEMA.capability in cls.capabilities: + _merge_missing(capabilities.TARGET_SCHEMA.schema, config_jsonschema) - if TargetCapabilities.HARD_DELETE in capabilities: - _merge_missing(TARGET_HARD_DELETE_CONFIG, config_jsonschema) + if capabilities.TARGET_HARD_DELETE.capability in cls.capabilities: + _merge_missing(capabilities.TARGET_HARD_DELETE.schema, config_jsonschema) super().append_builtin_config(config_jsonschema) diff --git a/tests/core/helpers/__init__.py b/tests/core/helpers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/core/helpers/capabilities/__init__.py b/tests/core/helpers/capabilities/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/core/test_capabilities.py b/tests/core/helpers/capabilities/test_capabilities.py similarity index 63% rename from tests/core/test_capabilities.py rename to tests/core/helpers/capabilities/test_capabilities.py index 49149469a..b4297c8d5 100644 --- a/tests/core/test_capabilities.py +++ b/tests/core/helpers/capabilities/test_capabilities.py @@ -16,10 +16,36 @@ class DummyCapabilitiesEnum(CapabilitiesEnum): def test_deprecated_capabilities(): + # Dictionary access + with warnings.catch_warnings(): + warnings.simplefilter("error") + _ = DummyCapabilitiesEnum["MY_SUPPORTED_FEATURE"] + + # Call + with warnings.catch_warnings(): + warnings.simplefilter("error") + _ = DummyCapabilitiesEnum("supported") + + # Attribute access with warnings.catch_warnings(): warnings.simplefilter("error") _ = DummyCapabilitiesEnum.MY_SUPPORTED_FEATURE + # Dictionary access + with pytest.warns( + DeprecationWarning, + match="is deprecated. No longer supported", + ) as record: + _ = DummyCapabilitiesEnum["MY_DEPRECATED_FEATURE"] + + # Call + with pytest.warns( + DeprecationWarning, + match="is deprecated. No longer supported", + ) as record: + DummyCapabilitiesEnum("deprecated") + + # Attribute access with pytest.warns( DeprecationWarning, match="is deprecated. No longer supported", diff --git a/tests/core/helpers/capabilities/test_config_property.py b/tests/core/helpers/capabilities/test_config_property.py new file mode 100644 index 000000000..10165dcc5 --- /dev/null +++ b/tests/core/helpers/capabilities/test_config_property.py @@ -0,0 +1,33 @@ +"""Test the BuiltinSetting descriptor.""" + +from __future__ import annotations + +from singer_sdk.helpers.capabilities import ConfigProperty + + +def test_builtin_setting_descriptor(): + class ObjWithConfig: + example = ConfigProperty(default=1) + + def __init__(self): + self.config = {"example": 1} + + obj = ObjWithConfig() + assert obj.example == 1 + + obj.config["example"] = 2 + assert obj.example == 2 + + +def test_builtin_setting_descriptor_custom_key(): + class ObjWithConfig: + my_attr = ConfigProperty("example", default=1) + + def __init__(self): + self.config = {"example": 1} + + obj = ObjWithConfig() + assert obj.my_attr == 1 + + obj.config["example"] = 2 + assert obj.my_attr == 2