Skip to content

[FLINK-37817] Adds bundled aggregates for group by (FLIP-491) #26580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>table.exec.async-agg.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>The max number of async i/o operations that the async table function can trigger.</td>
</tr>
<tr>
<td><h5>table.exec.async-agg.timeout</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">3 min</td>
<td>Duration</td>
<td>The async timeout for the asynchronous operation to complete.</td>
</tr>
<tr>
<td><h5>table.exec.async-lookup.buffer-capacity</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">100</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.shaded.asm9.org.objectweb.asm.Type.getConstructorDescriptor;
import static org.apache.flink.shaded.asm9.org.objectweb.asm.Type.getMethodDescriptor;
Expand Down Expand Up @@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, Type t) {
+ "Otherwise the type has to be specified explicitly using type information.");
}
}

/**
* Will return true if the type of the given generic class type.
*
* @param clazz The generic class to check against
* @param type The type to be checked
*/
public static boolean isGenericOfClass(Class<?> clazz, Type type) {
Optional<ParameterizedType> parameterized = getParameterizedType(type);
return clazz.equals(type)
|| parameterized.isPresent() && clazz.equals(parameterized.get().getRawType());
}

/**
* Returns an optional of a ParameterizedType, if that's what the type is.
*
* @param type The type to check
* @return optional which is present if the type is a ParameterizedType
*/
public static Optional<ParameterizedType> getParameterizedType(Type type) {
return Optional.of(type)
.filter(p -> p instanceof ParameterizedType)
.map(ParameterizedType.class::cast);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public abstract class AbstractStreamOperator<OUT>

private transient @Nullable MailboxExecutor mailboxExecutor;

private transient @Nullable MailboxWatermarkProcessor watermarkProcessor;
protected transient @Nullable MailboxWatermarkProcessor watermarkProcessor;

// ---------------- key/value state ------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,30 @@ public MailboxWatermarkProcessor(
}

public void emitWatermarkInsideMailbox(Watermark mark) throws Exception {
emitWatermarkInsideMailbox(mark, output::emitWatermark);
}

public void emitWatermarkInsideMailbox(Watermark mark, WatermarkEmitter watermarkEmitter)
throws Exception {
maxInputWatermark =
new Watermark(Math.max(maxInputWatermark.getTimestamp(), mark.getTimestamp()));
emitWatermarkInsideMailbox();
emitWatermarkInsideMailbox(watermarkEmitter);
}

private void emitWatermarkInsideMailbox() throws Exception {
private void emitWatermarkInsideMailbox(WatermarkEmitter watermarkEmitter) throws Exception {
// Try to progress min watermark as far as we can.
if (internalTimeServiceManager.tryAdvanceWatermark(
maxInputWatermark, mailboxExecutor::shouldInterrupt)) {
// In case output watermark has fully progressed emit it downstream.
output.emitWatermark(maxInputWatermark);
watermarkEmitter.emitWatermark(maxInputWatermark);
} else if (!progressWatermarkScheduled) {
progressWatermarkScheduled = true;
// We still have work to do, but we need to let other mails to be processed first.
mailboxExecutor.execute(
MailboxExecutor.MailOptions.deferrable(),
() -> {
progressWatermarkScheduled = false;
emitWatermarkInsideMailbox();
emitWatermarkInsideMailbox(watermarkEmitter);
},
"emitWatermarkInsideMailbox");
} else {
Expand All @@ -91,4 +96,12 @@ private void emitWatermarkInsideMailbox() throws Exception {
LOG.debug("emitWatermarkInsideMailbox is already scheduled, skipping.");
}
}

/** Interface to emit a watermark after all the timers have been fired. */
@Internal
public interface WatermarkEmitter {

/** Emit a watermark. */
void emitWatermark(Watermark watermark) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,25 @@ public class ExecutionConfigOptions {
"The max number of async retry attempts to make before task "
+ "execution is failed.");

// ------------------------------------------------------------------------
// Bundled Aggregate Options
// ------------------------------------------------------------------------
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_AGG_BUFFER_CAPACITY =
key("table.exec.async-agg.buffer-capacity")
.intType()
.defaultValue(10)
.withDescription(
"The max number of async i/o operations that the async table function can trigger.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_AGG_TIMEOUT =
key("table.exec.async-agg.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(3))
.withDescription(
"The async timeout for the asynchronous operation to complete.");

// ------------------------------------------------------------------------
// MiniBatch Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.functions.agg.BundledKeySegment;
import org.apache.flink.table.functions.agg.BundledKeySegmentApplied;

import java.util.concurrent.CompletableFuture;

/** The bundled interface to be implemented by {@AggregateFunction}s that may support bundling. */
@PublicEvolving
public interface BundledAggregateFunction extends FunctionDefinition {

/**
* Whether the implementor supports bundling. This allows them to programatically decide whether
* to use the bundling or non-bundling interface.
*/
boolean canBundle();

/** Whether the implementor supports retraction. */
default boolean canRetract() {
return false;
}

default void bundledAccumulateRetract(
CompletableFuture<BundledKeySegmentApplied> future, BundledKeySegment segment)
throws Exception {
throw new UnsupportedOperationException(
"This aggregate function does not support bundled calls.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.flink.table.functions.SpecializedFunction.ExpressionEvaluator;
import org.apache.flink.table.functions.SpecializedFunction.ExpressionEvaluatorFactory;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
import org.apache.flink.table.functions.agg.BundledKeySegment;
import org.apache.flink.table.functions.agg.BundledKeySegmentApplied;
import org.apache.flink.table.functions.python.utils.PythonFunctionUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.extraction.ExtractionUtils;
Expand All @@ -50,11 +52,14 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getParameterizedType;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isGenericOfClass;
import static org.apache.flink.util.Preconditions.checkState;

/**
Expand All @@ -81,6 +86,8 @@ public final class UserDefinedFunctionHelper {

public static final String AGGREGATE_MERGE = "merge";

public static final String AGGREGATE_BUNDLED = "bundledAccumulateRetract";

public static final String TABLE_AGGREGATE_ACCUMULATE = "accumulate";

public static final String TABLE_AGGREGATE_RETRACT = "retract";
Expand Down Expand Up @@ -483,9 +490,14 @@ private static void validateImplementationMethods(
} else if (AsyncTableFunction.class.isAssignableFrom(functionClass)) {
validateImplementationMethod(functionClass, true, false, ASYNC_TABLE_EVAL);
} else if (AggregateFunction.class.isAssignableFrom(functionClass)) {
validateImplementationMethod(functionClass, true, false, AGGREGATE_ACCUMULATE);
validateImplementationMethod(functionClass, true, true, AGGREGATE_RETRACT);
validateImplementationMethod(functionClass, true, true, AGGREGATE_MERGE);
if (BundledAggregateFunction.class.isAssignableFrom(functionClass)) {
validateImplementationMethod(functionClass, true, false, AGGREGATE_BUNDLED);
validateBundledImplementationMethod(functionClass, AGGREGATE_BUNDLED);
} else {
validateImplementationMethod(functionClass, true, false, AGGREGATE_ACCUMULATE);
validateImplementationMethod(functionClass, true, true, AGGREGATE_RETRACT);
validateImplementationMethod(functionClass, true, true, AGGREGATE_MERGE);
}
} else if (TableAggregateFunction.class.isAssignableFrom(functionClass)) {
validateImplementationMethod(functionClass, true, false, TABLE_AGGREGATE_ACCUMULATE);
validateImplementationMethod(functionClass, true, true, TABLE_AGGREGATE_RETRACT);
Expand Down Expand Up @@ -540,6 +552,50 @@ private static void validateImplementationMethod(
}
}

private static void validateBundledImplementationMethod(
Class<? extends UserDefinedFunction> clazz, String... methodNameOptions) {
final Set<String> nameSet = new HashSet<>(Arrays.asList(methodNameOptions));
final List<Method> methods = getAllDeclaredMethods(clazz);
for (Method method : methods) {
if (!nameSet.contains(method.getName())) {
continue;
}

if (!method.getReturnType().equals(Void.TYPE)) {
throw new ValidationException(
String.format(
"Method '%s' of function class '%s' must be void.",
method.getName(), clazz.getName()));
}

boolean foundSignature = false;
if (method.getParameterCount() == 2) {
Type firstParam = method.getGenericParameterTypes()[0];
Type secondType = method.getGenericParameterTypes()[1];
if (isGenericOfClass(CompletableFuture.class, firstParam)
&& isGenericOfClass(BundledKeySegment.class, secondType)) {
Optional<ParameterizedType> parameterizedFirst =
getParameterizedType(firstParam);
if (parameterizedFirst.isPresent()
&& parameterizedFirst.get().getActualTypeArguments().length > 0) {
firstParam = parameterizedFirst.get().getActualTypeArguments()[0];
if (BundledKeySegmentApplied.class.equals(firstParam)) {
foundSignature = true;
}
}
}
}

if (!foundSignature) {
throw new ValidationException(
String.format(
"Method '%s' of function class '%s' must have signature "
+ "void bundledAccumulateRetract(CompletableFuture<BundledKeySegmentApplied> future, BundledKeySegment segment).",
method.getName(), clazz.getName()));
}
}
}

private static void validateAsyncImplementationMethod(
Class<? extends UserDefinedFunction> clazz, String... methodNameOptions) {
final Set<String> nameSet = new HashSet<>(Arrays.asList(methodNameOptions));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.functions.agg;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

/** One segment of a bundled aggregate call, where all rows in the segment are for the same key. */
@PublicEvolving
public class BundledKeySegment {

/** The common key of the segment. */
private final RowData key;

/** The rows, where all rows are for the common key. */
private final List<RowData> rows;

/** The accumulator value under the current key. Can be null. */
private final List<RowData> accumulators;

/**
* If set, returns the updated value after each row is applied rather than only the final value.
*/
private final boolean updatedValuesAfterEachRow;

public BundledKeySegment(
RowData key,
List<RowData> rows,
@Nullable RowData accumulator,
boolean updatedValuesAfterEachRow) {
this.key = key;
this.rows = rows;
this.accumulators =
accumulator == null
? Collections.emptyList()
: Collections.singletonList(accumulator);
this.updatedValuesAfterEachRow = updatedValuesAfterEachRow;
}

public RowData getKey() {
return key;
}

public List<RowData> getRows() {
return rows;
}

@Nullable
public RowData getAccumulator() {
Preconditions.checkState(accumulators.size() <= 1);
return accumulators.isEmpty() ? null : accumulators.get(0);
}

public List<RowData> getAccumulatorsToMerge() {
return accumulators;
}

public boolean getUpdatedValuesAfterEachRow() {
return updatedValuesAfterEachRow;
}

public static BundledKeySegment of(
RowData key,
List<RowData> rows,
@Nullable RowData accumulator,
boolean updatedValuesAfterEachRow) {
return new BundledKeySegment(key, rows, accumulator, updatedValuesAfterEachRow);
}
}
Loading