Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: support lag for non-UTC datetime cursor fields #2170

Conversation

hairrrrr
Copy link
Contributor

This PR:

  • Add ensure_pendulum_datetime_non_utc to parse datetime strings into non-UTC datetime objects.
  • Add _datetime_obj_to_str to preserve the colon in the timezone when converting datetime objects back to strings.
  • Skip writing back state if no valid rows are found for last_value in the transformer, which may otherwise cause incorrect behavior.

Fix issure: #2169

Please let me know if there are any problems with my code. Thank you!

Copy link

netlify bot commented Dec 20, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit e18fb40
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/67691a4bbec71a00083b99fe

@hairrrrr hairrrrr force-pushed the fix/support-lag-for-non-utc-datatime-cursor-field branch 3 times, most recently from 5235731 to ba4151e Compare December 21, 2024 02:36
@rudolfix rudolfix self-requested a review December 22, 2024 19:27
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for catching edge cases for the lag! the problem you've found is deeper than just empty results. please see my comments.

tests/common/test_time.py Outdated Show resolved Hide resolved
# write back state
self._cached_state["last_value"] = transformer.last_value
# Skip writing back state if no valid rows for last_value in transformer may be wrong
if rows is not None:
Copy link
Collaborator

@rudolfix rudolfix Dec 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really a good catch but the problem is deeper than this. we cannot let the state go back and indeed when there's a lag this can unfortunately happen (like in your test, where the "last value" record is missing in the second read).

so IMO this code will fix it

        if self.lag and (cached_last_value := self._cached_state.get("last_value")):
            transformer.last_value = self.last_value_func(transformer.last_value, cached_last_value)
        self._cached_state["last_value"] = transformer.last_value

it uses last_value_func to make sure that "highest" last_value is preserved ("highest" per order established by the last_value_func)

rows is of TDataItems type: so it may be a single item or a list of items. items may be dicts, panda frames, arrow tables or batches.

so for example rows may be a pandas frame or a list of pandas frames. that's way the checks below are not sufficient. but as I said the problem is deeper (but solution is at the end simpler)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @rudolfix! I think this modification aligns well with the dlt code style. As you mentioned, there might be deeper issues that I’m not aware of at the moment.

so for example rows may be a pandas frame or a list of pandas frames. that's way the checks below are not sufficient. but as I said the problem is deeper (but solution is at the end simpler)

I have a question. If rows is an empty list of pandas DataFrames, I think the code rows != [] will handle this condition. This is because of the following code:

        if isinstance(rows, list):
            rows = [
                item
                for item in (self._transform_item(transformer, row) for row in rows)
                if item is not None
            ]
        else:
            rows = self._transform_item(transformer, rows)

@hairrrrr hairrrrr force-pushed the fix/support-lag-for-non-utc-datatime-cursor-field branch 3 times, most recently from 00e2cd4 to ead27f8 Compare December 23, 2024 07:30
- Add `ensure_pendulum_datetime_non_utc` to parse datetime strings into non-UTC datetime objects.
- Add `_datetime_obj_to_str` to preserve the colon in the timezone when converting datetime objects back to strings.
- Skip writing back state if no valid rows are found for `last_value` in the transformer, which may otherwise cause incorrect behavior.
@hairrrrr hairrrrr force-pushed the fix/support-lag-for-non-utc-datatime-cursor-field branch from ead27f8 to e18fb40 Compare December 23, 2024 08:07
@hairrrrr
Copy link
Contributor Author

Hi, @rudolfix. I have resovled all the changes you ask! Please review again. Thank you!

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rudolfix rudolfix merged commit e91de1f into dlt-hub:devel Dec 28, 2024
59 checks passed
@jrhorne
Copy link

jrhorne commented Jan 2, 2025

Apologies if this is the wrong place, but was reading through the threads on timezones for the lag parameter.

I'm trying to add a lag to an incremental loading that did not previously have a lag. So I changed:

  s3_alerts = filesystem(
      bucket_url=f"s3://{s3_bucket_name}/",
      file_glob=alert_glob_expression,
      extract_content=False,
      files_per_page=10000
  )

s3_alerts.apply_hints(incremental=dlt.sources.incremental("modification_date"))

to now have the hints be:
s3_alerts.apply_hints(incremental=dlt.sources.incremental("modification_date", lag=3*24*60*60))

Is this the correct way to do this for the s3 filesystem type? I'm now getting this traceback

File "/app/.venv/lib/python3.10/site-packages/dlt/extract/incremental/__init__.py", line 378, in last_value
    last_value = apply_lag(
  File "/app/.venv/lib/python3.10/site-packages/dlt/extract/incremental/lag.py", line 70, in apply_lag
    and last_value_func((initial_value, lagged_last_value)) == initial_value
TypeError: can't compare offset-naive and offset-aware datetimes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants