From 93dcdb13fc40080049bf382b5befa2f855aed6f2 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Thu, 15 Feb 2024 14:44:53 -0800 Subject: [PATCH] added code for batch_size_rows --- singer_sdk/sinks/core.py | 38 ++++++++++++++++++++++++++++---------- singer_sdk/target_base.py | 3 ++- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index e35353789..5226eae5e 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -182,6 +182,11 @@ def __init__( self._batch_records_read: int = 0 self._batch_dupe_records_merged: int = 0 + # Batch full markers + self._batch_size_rows: int | None = target.config.get( + "batch_size_rows", + ) + self._validator: BaseJSONSchemaValidator | None = self.get_validator() @cached_property @@ -249,15 +254,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002 # Size properties - @property - def max_size(self) -> int: - """Get max batch size. - - Returns: - Max number of records to batch before `is_full=True` - """ - return self.MAX_SIZE_DEFAULT - @property def current_size(self) -> int: """Get current batch size. @@ -269,13 +265,35 @@ def current_size(self) -> int: @property def is_full(self) -> bool: - """Check against size limit. + """Calls the size limit check funtion. Returns: True if the sink needs to be drained. """ return self.current_size >= self.max_size + @property + def batch_size_rows(self) -> int | None: + """Get batch_size_rows object. + + Returns: + A batch_size_rows object. + """ + return self._batch_size_rows + + @property + def max_size(self) -> int: + """Get max batch size. + + Returns: + Max number of records to batch before `is_full=True` + """ + return ( + self.batch_size_rows + if self.batch_size_rows is not None + else self.MAX_SIZE_DEFAULT + ) + # Tally methods @t.final diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index d24b5e863..3c0d37234 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -364,8 +364,9 @@ def _process_record_message(self, message_dict: dict) -> None: if sink.is_full: self.logger.info( - "Target sink for '%s' is full. Draining...", + "Target sink for '%s' is full. Current size is '%s'. Draining...", sink.stream_name, + sink.current_size, ) self.drain_one(sink)