diff --git a/docs/conf.py b/docs/conf.py index 9a50bf165..d613ef14a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -152,6 +152,7 @@ # -- Options for intersphinx ----------------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/extensions/intersphinx.html#configuration intersphinx_mapping = { + "blinker": ("https://blinker.readthedocs.io/en/stable/", None), "requests": ("https://requests.readthedocs.io/en/latest/", None), "python": ("https://docs.python.org/3/", None), } diff --git a/docs/guides/index.md b/docs/guides/index.md index e86aa149c..f4739d106 100644 --- a/docs/guides/index.md +++ b/docs/guides/index.md @@ -8,4 +8,5 @@ The following pages contain useful information for developers building on top of porting pagination-classes custom-clis +signals ``` diff --git a/docs/guides/signals.md b/docs/guides/signals.md new file mode 100644 index 000000000..795ce3d9f --- /dev/null +++ b/docs/guides/signals.md @@ -0,0 +1,49 @@ +# Signals + +This guide will show you how to use the built-in [Blinker](inv:blinker:std:doc#index) signals in the Singer SDK. + +## Settings write-back + +The SDK provides a signal that allows you to write back settings to the configuration file. This is useful if you want to update the configuration file with new settings that were set during the run, like a `refresh_token`. + +```python +import requests +from singer_sdk.authenticators import OAuthAuthenticator +from singer_sdk.plugin_base import PluginBase + + +class RefreshTokenAuthenticator(OAuthAuthenticator): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.refresh_token = self.config["refresh_token"] + + @property + def oauth_request_body(self): + return { + "client_id": self.config["client_id"], + "client_secret": self.config["client_secret"], + "grant_type": "refresh_token", + "refresh_token": self.refresh_token, + "user_type": "Location", + } + + def update_access_token(self): + token_response = requests.post( + self.auth_endpoint, + headers=self._oauth_headers, + data=auth_request_payload, + timeout=60, + ) + token_response.raise_for_status() + token_json = token_response.json() + + self.access_token = token_json["access_token"] + self.refresh_token = token_json["refresh_token"] + PluginBase.config_updated.send(self, refresh_token=self.refresh_token) +``` + +In the example above, the `RefreshTokenAuthenticator` class is a subclass of `OAuthAuthenticator` that calls `PluginBase.config_updated.send` to send a signal to update the `refresh_token` in tap's configuration. + +```{note} +Only when a single file is passed via the `--config` command line option, the SDK will write back the settings to the same file. +``` diff --git a/poetry.lock b/poetry.lock index 8363c718d..609b0af43 100644 --- a/poetry.lock +++ b/poetry.lock @@ -158,6 +158,17 @@ charset-normalizer = ["charset-normalizer"] html5lib = ["html5lib"] lxml = ["lxml"] +[[package]] +name = "blinker" +version = "1.7.0" +description = "Fast, simple object-to-object and broadcast signaling" +optional = false +python-versions = ">=3.8" +files = [ + {file = "blinker-1.7.0-py3-none-any.whl", hash = "sha256:c3f865d4d54db7abc53758a01601cf343fe55b84c1de4e3fa910e420b438d5b9"}, + {file = "blinker-1.7.0.tar.gz", hash = "sha256:e6820ff6fa4e4d1d8e2747c2283749c3f547e4fee112b98555cdcdae32996182"}, +] + [[package]] name = "boto3" version = "1.34.100" @@ -2667,4 +2678,4 @@ testing = ["pytest", "pytest-durations"] [metadata] lock-version = "2.0" python-versions = ">=3.8" -content-hash = "6ce3b4821061e0e086e451a5d652882a18f669bb5685ad8029188f54b17243fd" +content-hash = "65ede351f294da59f85e9f1138c91b09dabce7ce11d89af3502996d599e8c746" diff --git a/pyproject.toml b/pyproject.toml index 221ca67e2..ace561d09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ license = "Apache-2.0" python = ">=3.8" backoff = { version = ">=2.0.0", python = "<4" } backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" } +blinker = ">=1.7.0" click = "~=8.0" cryptography = ">=3.4.6" fs = ">=2.4.16" diff --git a/singer_sdk/authenticators.py b/singer_sdk/authenticators.py index 6d9c7303f..38fd81a62 100644 --- a/singer_sdk/authenticators.py +++ b/singer_sdk/authenticators.py @@ -7,7 +7,6 @@ import typing as t import warnings from datetime import timedelta -from types import MappingProxyType from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit import requests @@ -16,6 +15,7 @@ if t.TYPE_CHECKING: import logging + from types import MappingProxyType from pendulum import DateTime @@ -90,19 +90,19 @@ def __init__(self, stream: RESTStream) -> None: stream: A stream for a RESTful endpoint. """ self.tap_name: str = stream.tap_name - self._config: dict[str, t.Any] = dict(stream.config) + self._config = stream.config self._auth_headers: dict[str, t.Any] = {} self._auth_params: dict[str, t.Any] = {} self.logger: logging.Logger = stream.logger @property - def config(self) -> t.Mapping[str, t.Any]: + def config(self) -> MappingProxyType: """Get stream or tap config. Returns: A frozen (read-only) config dictionary map. """ - return MappingProxyType(self._config) + return self._config @property def auth_headers(self) -> dict: diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 1564558cf..bf894e0c3 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -3,6 +3,7 @@ from __future__ import annotations import abc +import json import logging import os import sys @@ -13,6 +14,7 @@ from types import MappingProxyType import click +from blinker import Signal from jsonschema import Draft7Validator from singer_sdk import about, metrics @@ -98,6 +100,9 @@ class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904 _config: dict + # Signals + config_updated = Signal() + @classproperty def logger(cls) -> logging.Logger: # noqa: N805 """Get logger. @@ -134,10 +139,43 @@ def __init__( it can be a predetermined config dict. parse_env_config: True to parse settings from env vars. validate_config: True to require validation of config settings. + """ + self._config, self._config_path = self._process_config( + config=config, + parse_env_config=parse_env_config, + ) + metrics._setup_logging(self.config) # noqa: SLF001 + self.metrics_logger = metrics.get_metrics_logger() + + self._validate_config(raise_errors=validate_config) + self._mapper: PluginMapper | None = None + + # Initialization timestamp + self.__initialized_at = int(time.time() * 1000) + + self.config_updated.connect(self.update_config) + + def _process_config( + self, + *, + config: dict | PurePath | str | list[PurePath | str] | None = None, + parse_env_config: bool = False, + ) -> tuple[dict[str, t.Any], PurePath | str | None]: + """Process the plugin configuration. + + Args: + config: May be one or more paths, either as str or PurePath objects, or + it can be a predetermined config dict. + parse_env_config: True to parse settings from env vars. + + Returns: + A tuple containing the config dictionary and the config write-back path. Raises: ValueError: If config is not a dict or path string. """ + config_path = None + if not config: config_dict = {} elif isinstance(config, (str, PurePath)): @@ -148,28 +186,29 @@ def __init__( # Read each config file sequentially. Settings from files later in the # list will override those of earlier ones. config_dict.update(read_json_file(config_path)) + + if len(config) == 1 and not parse_env_config: + config_path = config[0] + elif isinstance(config, dict): config_dict = config - else: + else: # pragma: no cover msg = f"Error parsing config of type '{type(config).__name__}'." raise ValueError(msg) + + # Parse env var settings if parse_env_config: self.logger.info("Parsing env var for settings config...") config_dict.update(self._env_var_config) else: self.logger.info("Skipping parse of env var settings...") + + # Handle sensitive settings for k, v in config_dict.items(): if self._is_secret_config(k): config_dict[k] = SecretString(v) - self._config = config_dict - metrics._setup_logging(self.config) # noqa: SLF001 - self.metrics_logger = metrics.get_metrics_logger() - self._validate_config(raise_errors=validate_config) - self._mapper: PluginMapper | None = None - - # Initialization timestamp - self.__initialized_at = int(time.time() * 1000) + return config_dict, config_path def setup_mapper(self) -> None: """Initialize the plugin mapper for this tap.""" @@ -336,13 +375,28 @@ def state(self) -> dict: # Core plugin config: @property - def config(self) -> t.Mapping[str, t.Any]: + def config(self) -> MappingProxyType: """Get config. Returns: A frozen (read-only) config dictionary map. """ - return t.cast(dict, MappingProxyType(self._config)) + return MappingProxyType(self._config) + + def update_config(self, sender: t.Any, **settings: t.Any) -> None: # noqa: ANN401, ARG002 + """Update the config with new settings. + + This is a :external+blinker:std:doc:`Blinker ` signal receiver. + + Args: + sender: The sender of the signal. + **settings: New settings to update the config with. + """ + self._config.update(**settings) + if self._config_path is not None: # pragma: no cover + self.logger.info("Updating config file: %s", self._config_path) + with Path(self._config_path).open("w") as f: + json.dump(self._config, f) @staticmethod def _is_secret_config(config_key: str) -> bool: diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 886466c5d..9abd9a5a5 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -10,7 +10,6 @@ import typing as t from os import PathLike from pathlib import Path -from types import MappingProxyType import pendulum @@ -58,6 +57,7 @@ if t.TYPE_CHECKING: import logging + from types import MappingProxyType from singer_sdk.helpers._compat import Traversable from singer_sdk.tap_base import Tap @@ -135,7 +135,6 @@ def __init__( self.logger: logging.Logger = tap.logger.getChild(self.name) self.metrics_logger = tap.metrics_logger self.tap_name: str = tap.name - self._config: dict = dict(tap.config) self._tap = tap self._tap_state = tap.state self._tap_input_catalog: singer.Catalog | None = None @@ -602,13 +601,13 @@ def _singer_catalog(self) -> singer.Catalog: return singer.Catalog([(self.tap_stream_id, self._singer_catalog_entry)]) @property - def config(self) -> t.Mapping[str, t.Any]: + def config(self) -> MappingProxyType[str, t.Any]: """Get stream configuration. Returns: A frozen (read-only) config dictionary map. """ - return MappingProxyType(self._config) + return self._tap.config @property def tap_stream_id(self) -> str: diff --git a/tests/core/test_plugin_base.py b/tests/core/test_plugin_base.py index 04b6a1665..92cee623b 100644 --- a/tests/core/test_plugin_base.py +++ b/tests/core/test_plugin_base.py @@ -57,3 +57,11 @@ def test_mapper_not_initialized(): def test_supported_python_versions(): """Test that supported python versions are correctly parsed.""" assert PluginBase._get_supported_python_versions(SDK_PACKAGE_NAME) + + +def test_config_updated_signal(): + plugin = PluginTest(config={"prop1": "hello"}) + assert plugin.config == {"prop1": "hello"} + + PluginBase.config_updated.send(prop2="abc") + assert plugin.config == {"prop1": "hello", "prop2": "abc"}