-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
convert add_limit to pipe step based limiting #2131
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks so simple now and I think implementation is right. two things:
- we have a test failing in a curious way. apparently we call rest API twice even if the limit is 1. why? we count items at the end of the pipe but there's just a single pipe. we must investigate
- by counting at end of the pipe we change the behavior. I think it makes sense... but maybe we can add sticky flag as an argument to add_limit? so people can still stick it to gen object and count unfiltered items as before?
dlt/extract/items.py
Outdated
return self | ||
|
||
def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: | ||
if self.count == self.max_items: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that is enough. we should close gen when we reach max items. but in this implementation we close gen at the end of the pipe. not all steps are sync steps, there are for example steps that yield. (or maybe even async steps, I do not remember). we should still return None when count > max_items.
I think we need to add more tests.
- what happens if we do add_yield_map?
- are we testing limit for async generators / iterators
- any differences for round robin / fifo?
- make sure that all expected items are pushed to transformer (this happens via special ForkStep)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is all tested now, except for round robin and fifo, but I am quite sure that this will not make a difference, since rr and fifo only apply at the get_source_item level and there is no async stuff going on in the add_limit (it's all taken care of already in other places)
dlt/extract/resource.py
Outdated
if validator: | ||
self.add_step(validator, insert_at=step_no if step_no >= 0 else None) | ||
self.add_step(validator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: i remove inserting at the same position in favor of automatic resolution via placement affinity. I think this makes more sense, I can revert to the old behavior though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should put it exactly at previous place. users may want to transform/filter data items before or after this step. and that must be preserved
dlt/extract/items.py
Outdated
class LimitItem(ItemTransform[TDataItem]): | ||
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental | ||
|
||
def __init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved the time limit and rate limiting over from the other PR. I think the time limit is fairly uncontroversial, the rate limiting is a little bit sketchy. I think it would be really cool to implement this with this PR, but we could also add a kind of global rate limiting on the Pipeiterator level that gets handed over from the DltSource to the Pipeiterator and is applied in the _get_source_item function to only extract a new item from any pipe if a min amount of time has passed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update: only keeping the time limit here which is very straightforward to implement and I think pretty useful.
related to #2142 |
closes #2142 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still missing two things:
- actual test where limit is used with incremental to do the backfilling. my take is to add this to sql_database tests and (remember to add row_order)
- having this example I'd add it to performance guide and say how to split large backfils. esp. mentioning that records should be ordered and not to retake data twice (ie. via WHERE clause)
- on Py 3.11 some tests are not passing consistently. pls take a look
dlt/extract/resource.py
Outdated
if validator: | ||
self.add_step(validator, insert_at=step_no if step_no >= 0 else None) | ||
self.add_step(validator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should put it exactly at previous place. users may want to transform/filter data items before or after this step. and that must be preserved
add some convenience methods for pipe step management
3fc4d90
to
f109a87
Compare
I have added a general test that combines incremental and add_limit. I will also add a nice example using the rfam database I think but will have to do this tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should always wrap iterators. See my comment
@rudolfix I have delayed the iterator wrapping to the LimitItem binding now, I agree that this is probably a good idea. That said, the typing/importing is a bit messy now imho.. |
tests/extract/test_incremental.py
Outdated
|
||
resource.add_limit(10) | ||
|
||
p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True) | |
p = dlt.pipeline(pipeline_name="incremental_limit", destination="duckdb", dev_mode=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
Up until now we were managing limits inside a somewhat conflated function that was wrapping generators. The problem was that limits where applied before incrementals where and that we had a certain amount of code duplication with respect to wrapping async iterators. This PRs solves this.