From 72a7ebb2a51743ed8c162be8b6cfb2f4e0562e63 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 7 Mar 2024 13:08:02 -0800 Subject: [PATCH 1/6] added TAP_MAX_PARALLELISM_CONFIG to capabilities --- singer_sdk/helpers/capabilities.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index f76400c5a..4240eae87 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -159,6 +159,13 @@ description="Maximum number of rows in each batch.", ), ).to_dict() +TAP_MAX_PARALLELISM_CONFIG = PropertiesList( + Property( + "max_parallelism", + IntegerType, + description="Max number of streams that can sync in parallel.", + ), +).to_dict() class TargetLoadMethods(str, Enum): From 6720090db1c16b941bdfec5d7a4afac6247f38c3 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 7 Mar 2024 13:11:00 -0800 Subject: [PATCH 2/6] added TAP_MAX_PARALLELISM_CONFIG capability to Tap Class --- singer_sdk/tap_base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index caf016cd0..ad12076f2 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -24,6 +24,7 @@ from singer_sdk.helpers._util import read_json_file from singer_sdk.helpers.capabilities import ( BATCH_CONFIG, + TAP_MAX_PARALLELISM_CONFIG, CapabilitiesEnum, PluginCapabilities, TapCapabilities, @@ -218,6 +219,9 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non capabilities = cls.capabilities if PluginCapabilities.BATCH in capabilities: merge_missing_config_jsonschema(BATCH_CONFIG, config_jsonschema) + merge_missing_config_jsonschema( + TAP_MAX_PARALLELISM_CONFIG, config_jsonschema + ) # Connection and sync tests: From 1421227e44bcd29489ba90c8a5b279e23a3d33d0 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 7 Mar 2024 13:22:13 -0800 Subject: [PATCH 3/6] added max_parallelism property to Tap class --- singer_sdk/tap_base.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index ad12076f2..ce31540d4 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -97,6 +97,7 @@ def __init__( self._input_catalog: Catalog | None = None self._state: dict[str, Stream] = {} self._catalog: Catalog | None = None # Tap's working catalog + self._max_parallelism: int | None = self.config.get("max_parallelism") # Process input catalog if isinstance(catalog, Catalog): @@ -181,6 +182,20 @@ def setup_mapper(self) -> None: super().setup_mapper() self.mapper.register_raw_streams_from_catalog(self.catalog) + @property + def max_parallelism(self) -> int: + """Get max parallel sinks. + + The default is None if not overridden. + + Returns: + Max number of streams that can be synced in parallel. + """ + if self._max_parallelism in (0, 1): + self._max_parallelism = None + + return self._max_parallelism + @classproperty def capabilities(self) -> list[CapabilitiesEnum]: """Get tap capabilities. From b488a1f7baeafcd1b88930cf38050b1880d31116 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 7 Mar 2024 13:31:22 -0800 Subject: [PATCH 4/6] added sync_one method and strated utilizing it in sync_all --- singer_sdk/tap_base.py | 44 +++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index ce31540d4..72bfb70b7 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -461,6 +461,33 @@ def _set_compatible_replication_methods(self) -> None: # Sync methods + @t.final + def sync_one( + self, + stream: Stream, + ) -> None: + """Sync a single stream. + + Args: + stream: The stream that your would like to sync. + """ + if not stream.selected and not stream.has_selected_descendents: + self.logger.info("Skipping deselected stream '%s'.", stream.name) + return + + if stream.parent_stream_type: + self.logger.debug( + "Child stream '%s' is expected to be called " + "by parent stream '%s'. " + "Skipping direct invocation.", + type(stream).__name__, + stream.parent_stream_type.__name__, + ) + return + + stream.sync() + stream.finalize_state_progress_markers() + @t.final def sync_all(self) -> None: """Sync all streams.""" @@ -470,22 +497,7 @@ def sync_all(self) -> None: stream: Stream for stream in self.streams.values(): - if not stream.selected and not stream.has_selected_descendents: - self.logger.info("Skipping deselected stream '%s'.", stream.name) - continue - - if stream.parent_stream_type: - self.logger.debug( - "Child stream '%s' is expected to be called " - "by parent stream '%s'. " - "Skipping direct invocation.", - type(stream).__name__, - stream.parent_stream_type.__name__, - ) - continue - - stream.sync() - stream.finalize_state_progress_markers() + self.sync_one(stream=stream) # this second loop is needed for all streams to print out their costs # including child streams which are otherwise skipped in the loop above From 51019783c1790abd79b9f8d8932380f07622b74b Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 7 Mar 2024 13:59:31 -0800 Subject: [PATCH 5/6] add option to sync_all for joblib parallel using loky to call sync_one --- singer_sdk/tap_base.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 72bfb70b7..c50dde248 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -10,6 +10,7 @@ from enum import Enum import click +from joblib import Parallel, delayed, parallel_config from singer_sdk._singerlib import Catalog, StateMessage from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema @@ -495,9 +496,17 @@ def sync_all(self) -> None: self._set_compatible_replication_methods() self.write_message(StateMessage(value=self.state)) - stream: Stream - for stream in self.streams.values(): - self.sync_one(stream=stream) + if self.max_parallelism is None: + stream: Stream + for stream in self.streams.values(): + self.sync_one(stream=stream) + else: + with parallel_config( + backend="loky", prefer="processes", n_jobs=self.max_parallelism + ), Parallel() as parallel: + parallel( + delayed(self.sync_one)(stream) for stream in self.streams.values() + ) # this second loop is needed for all streams to print out their costs # including child streams which are otherwise skipped in the loop above From 502b36df7c29d8008cbf364b26c132d93df18d82 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 7 Mar 2024 15:02:20 -0800 Subject: [PATCH 6/6] fixed missing console logging from stream processes --- singer_sdk/tap_base.py | 48 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index c50dde248..f7759ffda 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -6,8 +6,12 @@ import abc import contextlib import json +import logging +import sys import typing as t from enum import Enum +from logging.handlers import QueueHandler, QueueListener +from multiprocessing import Manager, Queue import click from joblib import Parallel, delayed, parallel_config @@ -466,12 +470,26 @@ def _set_compatible_replication_methods(self) -> None: def sync_one( self, stream: Stream, + log_level: logging.Logger | None = None, + log_queue: Queue | None = None, ) -> None: """Sync a single stream. Args: stream: The stream that your would like to sync. + log_level: The logging level used by Tap.logger. + log_queue: Multiprocess Queue used by the listener. + + This is a link to a logging example for joblib. + https://github.com/joblib/joblib/issues/1017 """ + if self.max_parallelism is not None and not self.logger.hasHandlers(): + queue_handler = QueueHandler(log_queue) + self.logger.addHandler(queue_handler) + self.logger.setLevel(log_level) + self.metrics_logger.addHandler(queue_handler) + self.metrics_logger.setLevel(log_level) + if not stream.selected and not stream.has_selected_descendents: self.logger.info("Skipping deselected stream '%s'.", stream.name) return @@ -501,12 +519,32 @@ def sync_all(self) -> None: for stream in self.streams.values(): self.sync_one(stream=stream) else: - with parallel_config( - backend="loky", prefer="processes", n_jobs=self.max_parallelism - ), Parallel() as parallel: - parallel( - delayed(self.sync_one)(stream) for stream in self.streams.values() + with Manager() as manager: + # Prepare logger for parallel processes + console_handler = logging.StreamHandler(sys.stderr) + console_formatter = logging.Formatter( + fmt="{asctime:23s} | {levelname:8s} | {name:20s} | {message}", + style="{", ) + console_handler.setFormatter(console_formatter) + self.logger.addHandler(console_handler) + log_queue = manager.Queue() + listener = QueueListener(log_queue, *self.logger.handlers) + listener.start() + with parallel_config( + backend="loky", + prefer="processes", + n_jobs=self.max_parallelism, + ), Parallel() as parallel: + parallel( + delayed(self.sync_one)( + stream, + log_queue=log_queue, + log_level=self.logger.getEffectiveLevel(), + ) + for stream in self.streams.values() + ) + listener.stop() # this second loop is needed for all streams to print out their costs # including child streams which are otherwise skipped in the loop above