Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to pull XCom when passing multiple map_indexes #46728

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

insomnes
Copy link
Contributor

This PR adds the ability to pull XCom when passing multiple map_indexes on RuntimeTaskInstance.

  • Update description in doc string
  • Pull XCom for each value from the product of all task ids and map indexes
  • Fix None case for task id
  • Expand original tests

closes: #46665


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@insomnes
Copy link
Contributor Author

insomnes commented Feb 13, 2025

I am still not sure if this is the intended implementation or not, but it is limited by the current execution API restriction of 1 XCom result in response. I didn't touch it to not break anything.

Prints from original DAG example:

    @dag.task
    def xcom_pull(**context):
        ti: TaskInstance = context["ti"]

        # [1, 3, None, None, None, None]
        for x in [
            ti.xcom_pull(task_ids=["identity", "foo", "plusfive"], map_indexes=[0, 2]),
        ]:
            print(x)

@insomnes
Copy link
Contributor Author

If this PR is not in the right direction I am ok with full rework. I just need to understand what was planned for this feature then.

@insomnes insomnes force-pushed the xcom-pull-map-indexes branch from 42bea72 to dbb0021 Compare February 18, 2025 18:18
@amoghrajesh
Copy link
Contributor

Hello @insomnes, apologies for the delayed review. Taking a look at it.

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial set of comments, WDYT @ashb?

Comment on lines 282 to 283
instance is used. Setting ``map_indexes`` to *None* will pull XCom as from
not mapped task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
instance is used. Setting ``map_indexes`` to *None* will pull XCom as from
not mapped task.
instance is used. Setting ``map_indexes`` to *None* will pull XCom as it would from
a non mapped task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed as requested

@@ -289,27 +296,46 @@ def xcom_pull(

if task_ids is None:
# default to the current task if not provided
task_ids = self.task_id
task_ids = [self.task_id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

else:
map_indexes_iterable = map_indexes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be specific with an elif here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I also added a specific error on the final else branch, if it's not needed I can drop it

Comment on lines +315 to +334
# TODO: Execution API only allows working with a single map_index at a time
# this is inefficient and leads to task_id * map_index requests to the API.
# And we can't achieve the original behavior of XCom pull with multiple tasks
# directly now.
# Original behavior may be achieved after `LazyXComSequence` is finished?
#
# Original description:
#
# When pulling one single task (``task_id`` is *None* or a str) without
# specifying ``map_indexes``, the return value is inferred from whether
# the specified task is mapped. If not, value from the one single task
# instance is returned. If the task to pull is mapped, an iterator (not a
# list) yielding XComs from mapped task instances is returned. In either
# case, ``default`` (*None* if not specified) is returned if no matching
# XComs are found.
for t_id, m_idx in product(task_ids, map_indexes_iterable):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @ashb as we had some discussion re this in the past

@insomnes
Copy link
Contributor Author

@amoghrajesh thanks for the review! No worries about the delay!
I appreciate your time and I’m glad I could contribute to Task SDK development.

@insomnes insomnes force-pushed the xcom-pull-map-indexes branch 2 times, most recently from 2fdc2a9 to 6a31d6d Compare February 21, 2025 18:14
@insomnes
Copy link
Contributor Author

Up. Would be nice to know what's next.

@insomnes insomnes force-pushed the xcom-pull-map-indexes branch 2 times, most recently from ca39daf to 9e7a14c Compare February 25, 2025 17:42
@insomnes insomnes force-pushed the xcom-pull-map-indexes branch from 9e7a14c to 80488d5 Compare February 25, 2025 20:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Not able to pull XCom when passing multiple map_indexes
2 participants