Skip to content

Commit

Permalink
feat: Stream sync context is now available to all instances methods a…
Browse files Browse the repository at this point in the history
…s a `Stream.context` attribute
  • Loading branch information
edgarrmondragon committed Jul 10, 2024
1 parent 0b22840 commit b1db975
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
10 changes: 9 additions & 1 deletion singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@


class Stream(metaclass=abc.ABCMeta): # noqa: PLR0904
"""Abstract base class for tap streams."""
"""Abstract base class for tap streams.
:ivar context: Stream partition or context dictionary.
.. versionadded:: 0.39.0
"""

STATE_MSG_FREQUENCY = 10000
"""Number of records between state messages."""
Expand Down Expand Up @@ -134,6 +139,8 @@ def __init__(
self.logger: logging.Logger = tap.logger.getChild(self.name)
self.metrics_logger = tap.metrics_logger
self.tap_name: str = tap.name
self.context: Context | None = None

self._config: dict = dict(tap.config)
self._tap = tap
self._tap_state = tap.state
Expand Down Expand Up @@ -1173,6 +1180,7 @@ def sync(self, context: Context | None = None) -> None:
if context:
msg += f" with context: {context}"
self.logger.info("%s...", msg)
self.context = context

# Use a replication signpost, if available
signpost = self.get_replication_key_signpost(context)
Expand Down
7 changes: 2 additions & 5 deletions singer_sdk/streams/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def request_records(self, context: Context | None) -> t.Iterable[dict]:
Yields:
An item for every record in the response.
"""
paginator = self.get_new_paginator(context=context)
paginator = self.get_new_paginator()
decorated_request = self.request_decorator(self._request)
pages = 0

Expand Down Expand Up @@ -519,12 +519,9 @@ def prepare_request_payload(
next page of data.
"""

def get_new_paginator(self, *, context: Context | None = None) -> BaseAPIPaginator: # noqa: ARG002
def get_new_paginator(self) -> BaseAPIPaginator:
"""Get a fresh paginator for this API endpoint.
Args:
context: Stream partition or context dictionary.
Returns:
A paginator instance.
"""
Expand Down

0 comments on commit b1db975

Please sign in to comment.