diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index e65f577f2..c65bc87dc 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -880,6 +880,27 @@ def _generate_record_messages( time_extracted=utc_now(), ) + def _generate_batch_messages( + self, + encoding: BaseBatchFileEncoding, + manifest: list[str], + ) -> t.Generator[SDKBatchMessage, None, None]: + """Write out a BATCH message. + + Args: + encoding: The encoding to use for the batch. + manifest: A list of filenames for the batch. + + Yields: + Batch message objects. + """ + for stream_map in self.stream_maps: + yield SDKBatchMessage( + stream=stream_map.stream_alias, + encoding=encoding, + manifest=manifest, + ) + def _write_record_message(self, record: types.Record) -> None: """Write out a RECORD message. @@ -902,13 +923,9 @@ def _write_batch_message( encoding: The encoding to use for the batch. manifest: A list of filenames for the batch. """ - self._tap.write_message( - SDKBatchMessage( - stream=self.name, - encoding=encoding, - manifest=manifest, - ), - ) + for batch_message in self._generate_batch_messages(encoding, manifest): + self._tap.write_message(batch_message) + self._is_state_flushed = False def _log_metric(self, point: metrics.Point) -> None: diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 22ad28176..8907b6201 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -462,14 +462,16 @@ def _process_batch_message(self, message_dict: dict) -> None: Args: message_dict: TODO """ - sink = self.get_sink(message_dict["stream"]) - + stream_name = message_dict["stream"] encoding = BaseBatchFileEncoding.from_dict(message_dict["encoding"]) - sink.process_batch_files( - encoding, - message_dict["manifest"], - ) - self._handle_max_record_age() + + for stream_map in self.mapper.stream_maps[stream_name]: + sink = self.get_sink(stream_map.stream_alias) + sink.process_batch_files( + encoding, + message_dict["manifest"], + ) + self._handle_max_record_age() # Sink drain methods diff --git a/tests/core/test_mapper.py b/tests/core/test_mapper.py index 96c277436..abec1c7ff 100644 --- a/tests/core/test_mapper.py +++ b/tests/core/test_mapper.py @@ -657,6 +657,9 @@ def get_records(self, context): # noqa: ARG002 }, } + def get_batches(self, batch_config, context): # noqa: ARG002 + yield batch_config.encoding, ["file:///tmp/stream.json.gz"] + class MappedTap(Tap): """A tap with mapped streams.""" @@ -752,6 +755,19 @@ def discover_streams(self): "aliased_stream.jsonl", id="aliased_stream", ), + pytest.param( + {"mystream": {"__alias__": "aliased_stream"}}, + { + "flattening_enabled": False, + "flattening_max_depth": 0, + "batch_config": { + "encoding": {"format": "jsonl", "compression": "gzip"}, + "storage": {"root": "file:///tmp"}, + }, + }, + "aliased_stream_batch.jsonl", + id="aliased_stream_batch", + ), pytest.param( {}, {"flattening_enabled": True, "flattening_max_depth": 0}, diff --git a/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl b/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl new file mode 100644 index 000000000..c28e8b9e2 --- /dev/null +++ b/tests/snapshots/mapped_stream/aliased_stream_batch.jsonl @@ -0,0 +1,5 @@ +{"type":"STATE","value":{}} +{"type":"SCHEMA","stream":"aliased_stream","schema":{"properties":{"email":{"type":["string"]},"count":{"type":["integer","null"]},"user":{"properties":{"id":{"type":["integer","null"]},"sub":{"properties":{"num":{"type":["integer","null"]},"custom_obj":{"type":["string","null"]}},"type":["object","null"]},"some_numbers":{"items":{"type":["number"]},"type":["array","null"]}},"type":["object","null"]}},"type":"object","required":["email"]},"key_properties":[]} +{"type":"BATCH","stream":"aliased_stream","encoding":{"format":"jsonl","compression":"gzip"},"manifest":["file:///tmp/stream.json.gz"]} +{"type":"STATE","value":{}} +{"type":"STATE","value":{"bookmarks":{"mystream":{}}}}