Skip to content

[FLINK-37817] Adds bundled aggregates for group by (FLIP-491) #26580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

AlanConfluent
Copy link
Contributor

@AlanConfluent AlanConfluent commented May 21, 2025

What is the purpose of the change

This PR implements one of the first portions of FLIP-491. It has a couple components:

  • Adds a new operator KeyedAsyncWaitOperator which is similar to AsyncWaitOperator, but has keyed state.
    • Adds KeyedAsyncFunction which extends AsyncFunction, but will has it's own context.
      • Future use will expose timer related methods (e.g. setting row time timers, proc timers)
      • Note that it exposes runOnMailboxThread rather than just using the complete(SupplierWithException...) because anything requiring multiple async calls will need to processed async responses outside of a pure output result, so a more generic callback had to be introduced. Group by doesn't require this, but over operators will.
    • Currently supports ORDERED output. Followup version will also support ROW_TIME, to support over operators.
  • The main group by implementation of a KeyedAsyncFunction

Brief change log

  • Adds KeyedAsyncWaitOperator and KeyedAsyncFunction.
  • Adds BundledAggregateAsyncFunction which handled group by

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

This change added tests and can be verified as follows:

(example:)

  • Added unit testing for KeyedAsyncWaitOperator and KeyedStreamElementQueueImpl
  • Added unit testing for KeyedAsyncFunctionCommon
  • Added ITCase BundledAggregateITCase which tests the whole stack together.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    • Just for the new bundled type
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
    • Just for the new configs.
    • Will add more if necessary, though this is an internal-ish feature
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented May 21, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants