Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft Common Message Queue #46694

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open

Draft Common Message Queue #46694

wants to merge 15 commits into from

Conversation

vikramkoka
Copy link
Contributor

Here is a very early draft PR to introduce and socialize the concept of a "common message queue" abstraction similar to the "Common SQL" and "Common IO" abstractions in Airflow.

This will be a provider package similar to those and is intended to be an abstraction over Apache Kafka, Amazon SQL, and Google PubSub to begin with. It can then be expanded to other messaging systems based on community adoption.

The initial goal with this is to provide a simple abstraction for integrating Event Driven Scheduling coming with Airflow 3 to message notification systems such as Kafka, currently being used to publish data availability.

At this stage, this is very much a WIP draft intended to solicit input from the community.

Here is a very early draft PR to introduce and socialize the concept of a "common message queue" abstraction similar to the "Common SQL" and "Common IO" abstractions in Airflow.

This will be a provider package similar to those and is intended to be an abstraction over Apache Kafka, Amazon SQL, and Google PubSub to begin with. It can then be expanded to other messaging systems based on community adoption.

The initial goal with this is to provide a simple abstraction for integrating Event Driven Scheduling coming with Airflow 3 to message notification systems such as Kafka, currently being used to publish data availability.

At this stage, this is very much a WIP draft intended to solicit input from the community.
Updated the Common Message Queue Readme with an example of an Event Driven Dag
Updated the message queue Operator and Sensor to fix an issue in my sync
Changed the Message Queue Sensor Operator to be a Deferrable Trigger
Fixed typos and import errors in the MsgQueueHook
@vincbeck
Copy link
Contributor

Implementation wise, here is my thinking. I am starting by MessageQueueTrigger.

Given msg_queue, MessageQueueTrigger needs to figure which hook it will use to poll/pop a message from the queue. Example: if msg_queue.starts_with("https://sqs."): hook = SqsHook(...). Then we can use the hook to retrieve the message. The hook will contain the logic for each provider (AWS, Google, Kafka, ...). This means, this new provider will have a dependency with all these providers. Do you think this is an issue? Did you have something else in mind?

Updated invocation of MsqQueueSensorTrigger to MsgQueueTrigger in example invocation
@vikramkoka
Copy link
Contributor Author

Implementation wise, here is my thinking. I am starting by MessageQueueTrigger.

Given msg_queue, MessageQueueTrigger needs to figure which hook it will use to poll/pop a message from the queue. Example: if msg_queue.starts_with("https://sqs."): hook = SqsHook(...). Then we can use the hook to retrieve the message. The hook will contain the logic for each provider (AWS, Google, Kafka, ...). This means, this new provider will have a dependency with all these providers. Do you think this is an issue? Did you have something else in mind?

You are right Vincent. I did think about the "Composition vs. Inheritance" approach tradeoff.

The composition style interface as defined here is easier for the DAG author, but more maintenance for us.
I talked about this with Ash and Jed as well and because of the underlying plumbing already present in Airflow for finding connections, et al, this seemed reasonable as an approach in order to make the end-user experience better.

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general looks good. Some more nit that I would have on the Python code/Interface but we can leave this until it is in real review.

Would be great to add an example DAG as well for the showcase.

@vincbeck
Copy link
Contributor

I am iterating on that PR but the new provider is not recognized. I get:

ModuleNotFoundError: No module named 'airflow.providers.common.msgq'

With the new restructure, what is the process to add a new provider? Do I just need to create provider.yaml and pyproject.toml and it will be automatically detected/indexed? @potiuk

@vincbeck
Copy link
Contributor

vincbeck commented Feb 21, 2025

I updated the PR. I focused only on the trigger side. Please let me know if this is what you had in mind in terms of implementation regarding the trigger. I really see it as a proxy of the provider triggers. I could not test it because the new provider is not recognized but once that solved I should be able to test it.

@potiuk
Copy link
Member

potiuk commented Feb 25, 2025

I am iterating on that PR but the new provider is not recognized. I get:

ModuleNotFoundError: No module named 'airflow.providers.common.msgq'

With the new restructure, what is the process to add a new provider? Do I just need to create provider.yaml and pyproject.toml and it will be automatically detected/indexed? @potiuk

You need to look at the main pyproject.toml and add the provider in the same way as others (there are few places). Then uv sync or image build should work after that.

And yes I updated https://github.com/apache/airflow/blob/main/providers/MANAGING_PROVIDERS_LIFECYCLE.rst#creating-a-new-community-provider - with the new structure and how to add a new provider, but that part is likely missing so after you figure it out, PRs there are most welcome.

BTW. It will likely slightly change in the future as we will move airflow-core and others, but still it would be great to keep it updated.

@potiuk
Copy link
Member

potiuk commented Feb 25, 2025

Generally @vincbeck -> look at everything below [dependency-groups] in the root pyproject.toml - the provider should be added in all those places.

@vincbeck
Copy link
Contributor

Thank you :D

@vincbeck vincbeck marked this pull request as ready for review February 25, 2025 21:26
@vincbeck
Copy link
Contributor

I am not sure what is going on 👀 check-providers-subpackages-init-file-exist is failing because it detects missing __init__ file in /home/runner/work/airflow/airflow/providers/common/messaging/src/airflow/__init__.py but this is the old provider directory. It no longer exist.

Added missing __init__.py file: /home/runner/work/airflow/airflow/providers/common/messaging/src/airflow/__init__.py
Added missing __init__.py file: /home/runner/work/airflow/airflow/providers/common/messaging/src/airflow/providers/__init__.py
Added missing __init__.py file: /home/runner/work/airflow/airflow/providers/common/messaging/src/airflow/providers/common/__init__.py
Added missing path extension to __init__.py file /home/runner/work/airflow/airflow/providers/common/messaging/src/airflow/__init__.py

.....

.. note::
This release of provider is only available for Airflow 2.9+ as explained in the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't match the 3.0+ requirement.

Comment on lines +34 to +36
task = EmptyOperator(task_id="task")

chain(task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
task = EmptyOperator(task_id="task")
chain(task)
EmptyOperator(task_id="task")

No need for a var and chain if there is a single task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little weird being in the test dir?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants