Skip to content

Commit

Permalink
added code for batch_size_rows
Browse files Browse the repository at this point in the history
  • Loading branch information
BuzzCutNorman committed Feb 15, 2024
1 parent f8caa5a commit 93dcdb1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 11 deletions.
38 changes: 28 additions & 10 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def __init__(
self._batch_records_read: int = 0
self._batch_dupe_records_merged: int = 0

# Batch full markers
self._batch_size_rows: int | None = target.config.get(
"batch_size_rows",
)

self._validator: BaseJSONSchemaValidator | None = self.get_validator()

@cached_property
Expand Down Expand Up @@ -249,15 +254,6 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002

# Size properties

@property
def max_size(self) -> int:
"""Get max batch size.
Returns:
Max number of records to batch before `is_full=True`
"""
return self.MAX_SIZE_DEFAULT

@property
def current_size(self) -> int:
"""Get current batch size.
Expand All @@ -269,13 +265,35 @@ def current_size(self) -> int:

@property
def is_full(self) -> bool:
"""Check against size limit.
"""Calls the size limit check funtion.
Returns:
True if the sink needs to be drained.
"""
return self.current_size >= self.max_size

@property
def batch_size_rows(self) -> int | None:
"""Get batch_size_rows object.
Returns:
A batch_size_rows object.
"""
return self._batch_size_rows

@property
def max_size(self) -> int:
"""Get max batch size.
Returns:
Max number of records to batch before `is_full=True`
"""
return (
self.batch_size_rows
if self.batch_size_rows is not None
else self.MAX_SIZE_DEFAULT
)

# Tally methods

@t.final
Expand Down
3 changes: 2 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,9 @@ def _process_record_message(self, message_dict: dict) -> None:

if sink.is_full:
self.logger.info(
"Target sink for '%s' is full. Draining...",
"Target sink for '%s' is full. Current size is '%s'. Draining...",
sink.stream_name,
sink.current_size,
)
self.drain_one(sink)

Expand Down

0 comments on commit 93dcdb1

Please sign in to comment.