diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 770f2f5b52499..710ba45503ca1 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -8,6 +8,18 @@
+For example, imagine the following query: SELECT B1_SUM(a), B2_COUNT(b), SUM(c), COUNT(d) FROM + * T; + * + *
Here, B1_SUM and B2_COUNT are bundled aggregate functions, and SUM and COUNT are non-bundled
+ * aggregates. The bundled calls will be separate while both the non-bundled are produced using the
+ * conventional {@link org.apache.flink.table.runtime.generated.AggsHandleFunction}. The result of
+ * all three are then combined to a single {@link BundledKeySegmentApplied}.
+ */
+public class BundledResultCombiner implements Serializable {
+ private static final long serialVersionUID = -3486860542451993040L;
+
+ private final RowType accTypeInfo;
+ private final RowType valueType;
+
+ /**
+ * Creates a new {@link Combiner} factory.
+ *
+ * @param accTypeInfo the accumulator type information
+ * @param valueType the value type information
+ */
+ public BundledResultCombiner(RowType accTypeInfo, RowType valueType) {
+ this.accTypeInfo = accTypeInfo;
+ this.valueType = valueType;
+ }
+
+ /** Creates a new {@link Combiner} instance. */
+ public Combiner newCombiner() {
+ return new Combiner(accTypeInfo, valueType);
+ }
+
+ /**
+ * Combines all kinds of aggregates into a single result conforming to the bundled interface, as
+ * if it were not bundled at all.
+ */
+ public static class Combiner
+ implements Serializable, SupplierWithException The operator offers different output modes depending on the chosen {@link
+ * KeyedAsyncOutputMode}. In order to give exactly once processing guarantees, the operator stores
+ * all currently in-flight {@link StreamElement} in per-key state. Upon recovery the recorded set of
+ * stream elements is replayed.
+ *
+ * Because {@link KeyedAsyncFunction}s can utilize row-based timers, retries are not supported by
+ * this operator. Moving the current watermark forward prevents timers from being retried. If
+ * retries are desired, they should be internal to the {@link KeyedAsyncFunction}.
+ *
+ * In case of chaining of this operator, it has to be made sure that the operators in the chain
+ * are opened tail to head. The reason for this is that an opened {@link KeyedAsyncWaitOperator}
+ * starts already emitting recovered {@link StreamElement} to downstream operators.
+ *
+ * @param Between two insertion attempts, this method yields the execution to the mailbox, such that
+ * events as well as asynchronous results can be processed.
+ *
+ * @param streamElement to add to the operator's queue
+ * @throws InterruptedException if the current thread has been interrupted while yielding to
+ * mailbox
+ * @return a handle that allows to set the result of the async computation for the given
+ * element.
+ */
+ private ResultFuture This method will be called from {@link #processWatermark(Watermark)} and from a mail
+ * processing the result of an async function call.
+ */
+ private void outputCompletedElement() {
+ if (queue.hasCompletedElements()) {
+ // emit only one element to not block the mailbox thread unnecessarily
+ queue.emitCompletedElement(timestampedCollector);
+
+ // if there are more completed elements, emit them with subsequent mails
+ if (queue.hasCompletedElements()) {
+ try {
+ mailboxExecutor.execute(
+ this::outputCompletedElement,
+ "AsyncWaitOperator#outputCompletedElement");
+ } catch (RejectedExecutionException mailboxClosedException) {
+ // This exception can only happen if the operator is cancelled which means all
+ // pending records can be safely ignored since they will be processed one more
+ // time after recovery.
+ LOG.debug(
+ "Attempt to complete element is ignored since the mailbox rejected the execution.",
+ mailboxClosedException);
+ }
+ }
+ }
+ }
+
+ /** Utility method to register timeout timer. */
+ private ScheduledFuture> registerTimer(
+ ProcessingTimeService processingTimeService,
+ long timeout,
+ ThrowingConsumer> updatedValuesAfterEachRow = new ArrayList<>();
+ for (int i = 0; i < ithEntries.size(); i++) {
+ BundledKeySegmentApplied update = ithEntries.get(i);
+ // Non bundled accumulators are already wrapped in a row to contain them, so should not
+ // create another layer.
+ boolean avoidWrappingInRow = !isBundled.get(i);
+ accs.add(
+ avoidWrappingInRow
+ ? update.getAccumulator()
+ : GenericRowData.of(update.getAccumulator()));
+
+ if (shouldIncludeValues.get(i)) {
+ RowData startingValue = update.getStartingValue();
+ RowData finalValue = update.getFinalValue();
+
+ startingValues.add(startingValue);
+ finalValues.add(finalValue);
+
+ if (updatedValuesAfterEachRow.isEmpty()) {
+ for (int j = 0; j < update.getUpdatedValuesAfterEachRow().size(); j++) {
+ updatedValuesAfterEachRow.add(new ArrayList<>(ithEntries.size()));
+ updatedValuesAfterEachRow
+ .get(j)
+ .add(update.getUpdatedValuesAfterEachRow().get(j));
+ }
+ } else {
+ Preconditions.checkState(
+ updatedValuesAfterEachRow.size()
+ == update.getUpdatedValuesAfterEachRow().size());
+ for (int j = 0; j < update.getUpdatedValuesAfterEachRow().size(); j++) {
+ updatedValuesAfterEachRow
+ .get(j)
+ .add(update.getUpdatedValuesAfterEachRow().get(j));
+ }
+ }
+ }
+ }
+
+ final List