From 547a8018683fd72b50165f72961965e5585a9df5 Mon Sep 17 00:00:00 2001 From: Reuben Frankel Date: Fri, 13 Sep 2024 01:05:21 +0100 Subject: [PATCH] Support stream aliasing of `BATCH` messages via stream maps --- singer_sdk/streams/core.py | 31 ++++++++++++++----- tests/core/test_mapper.py | 16 ++++++++++ .../mapped_stream/aliased_stream_batch.jsonl | 5 +++ 3 files changed, 45 insertions(+), 7 deletions(-) create mode 100644 tests/snapshots/mapped_stream/aliased_stream_batch.jsonl diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index e65f577f2..c65bc87dc 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -880,6 +880,27 @@ def _generate_record_messages( time_extracted=utc_now(), ) + def _generate_batch_messages( + self, + encoding: BaseBatchFileEncoding, + manifest: list[str], + ) -> t.Generator[SDKBatchMessage, None, None]: + """Write out a BATCH message. + + Args: + encoding: The encoding to use for the batch. + manifest: A list of filenames for the batch. + + Yields: + Batch message objects. + """ + for stream_map in self.stream_maps: + yield SDKBatchMessage( + stream=stream_map.stream_alias, + encoding=encoding, + manifest=manifest, + ) + def _write_record_message(self, record: types.Record) -> None: """Write out a RECORD message. @@ -902,13 +923,9 @@ def _write_batch_message( encoding: The encoding to use for the batch. manifest: A list of filenames for the batch. """ - self._tap.write_message( - SDKBatchMessage( - stream=self.name, - encoding=encoding, - manifest=manifest, - ), - ) + for batch_message in self._generate_batch_messages(encoding, manifest): + self._tap.write_message(batch_message) + self._is_state_flushed = False def _log_metric(self, point: metrics.Point) -> None: diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index 96c277436..c70a0949e 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -657,6 +657,9 @@ def get_records(self, context): # noqa: ARG002 }, } + def get_batches(self, batch_config, context): + yield batch_config.encoding, ["file:///tmp/stream.json.gz"] + class MappedTap(Tap): """A tap with mapped streams.""" @@ -752,6 +755,19 @@ def discover_streams(self): "aliased_stream.jsonl", id="aliased_stream", ), + pytest.param( + {"mystream": {"__alias__": "aliased_stream"}}, + { + "flattening_enabled": False, + "flattening_max_depth": 0, + "batch_config": { + "encoding": {"format": "jsonl", "compression": "gzip"}, + "storage": {"root": "file:///tmp"}, + }, + }, + "aliased_stream_batch.jsonl", + id="aliased_stream_batch", + ), pytest.param( {}, {"flattening_enabled": True, "flattening_max_depth": 0}, diff --git a/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl b/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl new file mode 100644 index 000000000..c28e8b9e2 --- /dev/null +++ b/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl @@ -0,0 +1,5 @@ +{"type":"STATE","value":{}} +{"type":"SCHEMA","stream":"aliased_stream","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]} +{"type":"BATCH","stream":"aliased_stream","encoding":{"format":"jsonl","compression":"gzip"},"manifest":["file:///tmp/stream.json.gz"]} +{"type":"STATE","value":{}} +{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}