From 547a8018683fd72b50165f72961965e5585a9df5 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Fri, 13 Sep 2024 01:05:21 +0100 Subject: [PATCH 1/3] Support stream aliasing of `BATCH` messages via stream maps --- singer_sdk/streams/core.py | 31 ++++++++++++++----- tests/core/test_mapper.py | 16 ++++++++++ .../mapped_stream/aliased_stream_batch.jsonl | 5 +++ 3 files changed, 45 insertions(+), 7 deletions(-) create mode 100644 tests/snapshots/mapped_stream/aliased_stream_batch.jsonl diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index e65f577f2..c65bc87dc 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -880,6 +880,27 @@ def _generate_record_messages( time_extracted=utc_now(), ) + def _generate_batch_messages( + self, + encoding: BaseBatchFileEncoding, + manifest: list[str], + ) -> t.Generator[SDKBatchMessage, None, None]: + """Write out a BATCH message. + + Args: + encoding: The encoding to use for the batch. + manifest: A list of filenames for the batch. + + Yields: + Batch message objects. + """ + for stream_map in self.stream_maps: + yield SDKBatchMessage( + stream=stream_map.stream_alias, + encoding=encoding, + manifest=manifest, + ) + def _write_record_message(self, record: types.Record) -> None: """Write out a RECORD message. @@ -902,13 +923,9 @@ def _write_batch_message( encoding: The encoding to use for the batch. manifest: A list of filenames for the batch. """ - self._tap.write_message( - SDKBatchMessage( - stream=self.name, - encoding=encoding, - manifest=manifest, - ), - ) + for batch_message in self._generate_batch_messages(encoding, manifest): + self._tap.write_message(batch_message) + self._is_state_flushed = False def _log_metric(self, point: metrics.Point) -> None: diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index 96c277436..c70a0949e 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -657,6 +657,9 @@ def get_records(self, context): # noqa: ARG002 }, } + def get_batches(self, batch_config, context): + yield batch_config.encoding, ["file:///tmp/stream.json.gz"] + class MappedTap(Tap): """A tap with mapped streams.""" @@ -752,6 +755,19 @@ def discover_streams(self): "aliased_stream.jsonl", id="aliased_stream", ), + pytest.param( + {"mystream": {"__alias__": "aliased_stream"}}, + { + "flattening_enabled": False, + "flattening_max_depth": 0, + "batch_config": { + "encoding": {"format": "jsonl", "compression": "gzip"}, + "storage": {"root": "file:///tmp"}, + }, + }, + "aliased_stream_batch.jsonl", + id="aliased_stream_batch", + ), pytest.param( {}, {"flattening_enabled": True, "flattening_max_depth": 0}, diff --git a/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl b/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl new file mode 100644 index 000000000..c28e8b9e2 --- /dev/null +++ b/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl @@ -0,0 +1,5 @@ +{"type":"STATE","value":{}} +{"type":"SCHEMA","stream":"aliased_stream","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]} +{"type":"BATCH","stream":"aliased_stream","encoding":{"format":"jsonl","compression":"gzip"},"manifest":["file:///tmp/stream.json.gz"]} +{"type":"STATE","value":{}} +{"type":"STATE","value":{"bookmarks":{"mystream":{}}}} From 75853f46d91c6e4febc391b72f0ec79fc470aa71 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Fri, 13 Sep 2024 01:18:40 +0100 Subject: [PATCH 2/3] Ensure support at target --- singer_sdk/target_base.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 22ad28176..8907b6201 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -462,14 +462,16 @@ def _process_batch_message(self, message_dict: dict) -> None: Args: message_dict: TODO """ - sink = self.get_sink(message_dict["stream"]) - + stream_name = message_dict["stream"] encoding = BaseBatchFileEncoding.from_dict(message_dict["encoding"]) - sink.process_batch_files( - encoding, - message_dict["manifest"], - ) - self._handle_max_record_age() + + for stream_map in self.mapper.stream_maps[stream_name]: + sink = self.get_sink(stream_map.stream_alias) + sink.process_batch_files( + encoding, + message_dict["manifest"], + ) + self._handle_max_record_age() # Sink drain methods From 82290ba2dabbda587166cc720b34f2b3719a450c Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Fri, 13 Sep 2024 01:22:03 +0100 Subject: [PATCH 3/3] Satisfy pre-commit --- tests/core/test_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index c70a0949e..abec1c7ff 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -657,7 +657,7 @@ def get_records(self, context): # noqa: ARG002 }, } - def get_batches(self, batch_config, context): + def get_batches(self, batch_config, context): # noqa: ARG002 yield batch_config.encoding, ["file:///tmp/stream.json.gz"]