Skip to content

Commit

Permalink
[Coral-Hive] Extend VersionedSqlUserDefinedFunction to support versio…
Browse files Browse the repository at this point in the history
…n-specific function names (#508)
  • Loading branch information
ljfgem authored Aug 27, 2024
1 parent 7908fdb commit a8b86e7
Show file tree
Hide file tree
Showing 19 changed files with 308 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
/**
* Copyright 2023 LinkedIn Corporation. All rights reserved.
* Copyright 2023-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.common.transformers;

import org.apache.calcite.sql.SqlCall;

import static com.linkedin.coral.common.calcite.CalciteUtil.*;


/**
* This class is a subclass of {@link SqlCallTransformer} which transforms a function operator on SqlNode layer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2018-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -44,6 +44,7 @@
* Class to resolve hive function names in SQL to Function.
*/
public class HiveFunctionResolver {
private static final String VERSIONED_UDF_CLASS_NAME_PREFIX = "coral_udf_version_(\\d+|x)_(\\d+|x)_(\\d+|x)";

public final FunctionRegistry registry;
private final ConcurrentHashMap<String, Function> dynamicFunctionRegistry;
Expand Down Expand Up @@ -111,22 +112,23 @@ public SqlOperator resolveBinaryOperator(String name) {
* this attempts to match dali-style function names (DB_TABLE_VERSION_FUNCTION).
* Right now, this method does not validate parameters leaving it to
* the subsequent validator and analyzer phases to validate parameter types.
* @param functionName hive function name
* @param originalViewTextFunctionName original function name in view text to resolve
* @param hiveTable handle to Hive table representing metastore information. This is used for resolving
* Dali function names, which are resolved using table parameters
* @param numOfOperands number of operands this function takes. This is needed to
* create SqlOperandTypeChecker to resolve Dali function dynamically
* @return resolved hive functions
* @throws UnknownSqlFunctionException if the function name can not be resolved.
*/
public Function tryResolve(@Nonnull String functionName, @Nullable Table hiveTable, int numOfOperands) {
checkNotNull(functionName);
Collection<Function> functions = registry.lookup(functionName);
public Function tryResolve(@Nonnull String originalViewTextFunctionName, @Nullable Table hiveTable,
int numOfOperands) {
checkNotNull(originalViewTextFunctionName);
Collection<Function> functions = registry.lookup(originalViewTextFunctionName);
if (functions.isEmpty() && hiveTable != null) {
functions = tryResolveAsDaliFunction(functionName, hiveTable, numOfOperands);
functions = tryResolveAsDaliFunction(originalViewTextFunctionName, hiveTable, numOfOperands);
}
if (functions.isEmpty()) {
throw new UnknownSqlFunctionException(functionName);
throw new UnknownSqlFunctionException(originalViewTextFunctionName);
}
if (functions.size() == 1) {
return functions.iterator().next();
Expand Down Expand Up @@ -160,66 +162,70 @@ public Collection<Function> resolve(String functionName) {
/**
* Tries to resolve function name as Dali function name using the provided Hive table catalog information.
* This uses table parameters 'function' property to resolve the function name to the implementing class.
* @param functionName function name to resolve
* @param originalViewTextFunctionName original function name in view text to resolve
* @param table Hive metastore table handle
* @param numOfOperands number of operands this function takes. This is needed to
* create SqlOperandTypeChecker to resolve Dali function dynamically
* @return list of matching Functions or empty list if the function name is not in the dali function name format
* of `databaseName_tableName_udfName` or `udfName` (without `databaseName_tableName_` prefix)
* @throws UnknownSqlFunctionException if the function name is in Dali function name format but there is no mapping
*/
public Collection<Function> tryResolveAsDaliFunction(String functionName, @Nonnull Table table, int numOfOperands) {
public Collection<Function> tryResolveAsDaliFunction(String originalViewTextFunctionName, @Nonnull Table table,
int numOfOperands) {
Preconditions.checkNotNull(table);
String functionPrefix = String.format("%s_%s_", table.getDbName(), table.getTableName());
if (!functionName.toLowerCase().startsWith(functionPrefix.toLowerCase())) {
// if functionName is not in `databaseName_tableName_udfName` format, we don't require the `databaseName_tableName_` prefix
if (!originalViewTextFunctionName.toLowerCase().startsWith(functionPrefix.toLowerCase())) {
// if originalViewTextFunctionName is not in `databaseName_tableName_udfName` format, we don't require the `databaseName_tableName_` prefix
functionPrefix = "";
}
String funcBaseName = functionName.substring(functionPrefix.length());
String funcBaseName = originalViewTextFunctionName.substring(functionPrefix.length());
HiveTable hiveTable = new HiveTable(table);
Map<String, String> functionParams = hiveTable.getDaliFunctionParams();
String funcClassName = functionParams.get(funcBaseName);
if (funcClassName == null) {
String functionClassName = functionParams.get(funcBaseName);
if (functionClassName == null) {
return ImmutableList.of();
}
final Collection<Function> Functions = registry.lookup(funcClassName);
if (Functions.size() == 0) {
// If the UDF class name is versioned, remove the versioning prefix, which allows user to
// register the unversioned UDF once and use different versioning prefix in the view
final Collection<Function> functions = registry.lookup(removeVersioningPrefix(functionClassName));
if (functions.isEmpty()) {
Collection<Function> dynamicResolvedFunctions =
resolveDaliFunctionDynamically(functionName, funcClassName, hiveTable, numOfOperands);
resolveDaliFunctionDynamically(originalViewTextFunctionName, functionClassName, hiveTable, numOfOperands);

if (dynamicResolvedFunctions.size() == 0) {
if (dynamicResolvedFunctions.isEmpty()) {
// we want to see class name in the exception message for coverage testing
// so throw exception here
throw new UnknownSqlFunctionException(funcClassName);
throw new UnknownSqlFunctionException(functionClassName);
}

return dynamicResolvedFunctions;
}

return Functions.stream()
.map(f -> new Function(f.getFunctionName(), new VersionedSqlUserDefinedFunction(
(SqlUserDefinedFunction) f.getSqlOperator(), hiveTable.getDaliUdfDependencies(), functionName)))
return functions.stream()
.map(f -> new Function(f.getFunctionName(),
new VersionedSqlUserDefinedFunction((SqlUserDefinedFunction) f.getSqlOperator(),
hiveTable.getDaliUdfDependencies(), originalViewTextFunctionName, functionClassName)))
.collect(Collectors.toList());
}

public void addDynamicFunctionToTheRegistry(String funcClassName, Function function) {
if (!dynamicFunctionRegistry.contains(funcClassName)) {
dynamicFunctionRegistry.put(funcClassName, function);
public void addDynamicFunctionToTheRegistry(String functionClassName, Function function) {
if (!dynamicFunctionRegistry.contains(functionClassName)) {
dynamicFunctionRegistry.put(functionClassName, function);
}
}

private @Nonnull Collection<Function> resolveDaliFunctionDynamically(String functionName, String funcClassName,
HiveTable hiveTable, int numOfOperands) {
if (dynamicFunctionRegistry.contains(funcClassName)) {
return ImmutableList.of(dynamicFunctionRegistry.get(functionName));
private @Nonnull Collection<Function> resolveDaliFunctionDynamically(String originalViewTextFunctionName,
String functionClassName, HiveTable hiveTable, int numOfOperands) {
if (dynamicFunctionRegistry.contains(functionClassName)) {
return ImmutableList.of(dynamicFunctionRegistry.get(originalViewTextFunctionName));
}
Function function = new Function(funcClassName,
Function function = new Function(functionClassName,
new VersionedSqlUserDefinedFunction(
new SqlUserDefinedFunction(new SqlIdentifier(funcClassName, ZERO),
new HiveGenericUDFReturnTypeInference(funcClassName, hiveTable.getDaliUdfDependencies()), null,
new SqlUserDefinedFunction(new SqlIdentifier(functionClassName, ZERO),
new HiveGenericUDFReturnTypeInference(functionClassName, hiveTable.getDaliUdfDependencies()), null,
createSqlOperandTypeChecker(numOfOperands), null, null),
hiveTable.getDaliUdfDependencies(), functionName));
dynamicFunctionRegistry.put(funcClassName, function);
hiveTable.getDaliUdfDependencies(), originalViewTextFunctionName, functionClassName));
dynamicFunctionRegistry.put(functionClassName, function);
return ImmutableList.of(function);
}

Expand All @@ -238,4 +244,22 @@ public void addDynamicFunctionToTheRegistry(String funcClassName, Function funct

return sqlOperandTypeChecker;
}

/**
* Removes the versioning prefix from a given UDF class name if it is present.
* A class name is considered versioned if the prefix before the first dot
* follows {@link HiveFunctionResolver#VERSIONED_UDF_CLASS_NAME_PREFIX} format
*/
private String removeVersioningPrefix(String className) {
if (className != null && !className.isEmpty()) {
int firstDotIndex = className.indexOf('.');
if (firstDotIndex != -1) {
String prefix = className.substring(0, firstDotIndex);
if (prefix.matches(VERSIONED_UDF_CLASS_NAME_PREFIX)) {
return className.substring(firstDotIndex + 1);
}
}
}
return className;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
/**
* Copyright 2019-2022 LinkedIn Corporation. All rights reserved.
* Copyright 2019-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.hive.hive2rel.functions;

import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.Function;
Expand All @@ -23,61 +25,101 @@
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorScope;

import com.linkedin.coral.com.google.common.base.CaseFormat;
import com.linkedin.coral.com.google.common.base.Converter;


/**
* Class that represents Dali versioned UDFs
*/
public class VersionedSqlUserDefinedFunction extends SqlUserDefinedFunction {

// Predefined map that associates class names with their corresponding short function names.
private static final Map<String, String> SHORT_FUNC_NAME_MAP = ImmutableMap.<String, String> builder()
.put("com.linkedin.dali.udf.watbotcrawlerlookup.hive.WATBotCrawlerLookup", "wat_bot_crawler_lookup")
.put("com.linkedin.stdudfs.parsing.hive.Ip2Str", "ip2str")
.put("com.linkedin.stdudfs.parsing.hive.UserAgentParser", "useragentparser")
.put("com.linkedin.stdudfs.lookup.hive.BrowserLookup", "browserlookup")
.put("com.linkedin.jobs.udf.hive.ConvertIndustryCode", "converttoindustryv1")
.put("com.linkedin.stdudfs.urnextractor.hive.UrnExtractorFunctionWrapper", "urn_extractor")
.put("com.linkedin.stdudfs.hive.daliudfs.UrnExtractorFunctionWrapper", "urn_extractor")
.put("com.linkedin.groot.runtime.udf.spark.HasMemberConsentUDF", "has_member_consent")
.put("com.linkedin.groot.runtime.udf.spark.RedactFieldIfUDF", "redact_field_if")
.put("com.linkedin.groot.runtime.udf.spark.RedactSecondarySchemaFieldIfUDF", "redact_secondary_schema_field_if")
.put("com.linkedin.groot.runtime.udf.spark.GetMappedValueUDF", "get_mapped_value")
.put("com.linkedin.coral.hive.hive2rel.CoralTestUDF", "coral_test").build();

// The list of dependencies specified found in the view's "dependencies" property.
// Example: "ivy://com.linkedin.udf-group:udf-artifact:0.1.8"
private final List<String> ivyDependencies;

// The view-dependent function name in the format of "dbName_viewName_functionName",
// where functionName is defined in the "functions" property of the view.
private final String viewDependentFunctionName;
private final String originalViewTextFunctionName;

// The UDF class name value defined in the "functions" property of the view.
// i.e. "functions = <originalViewTextFunctionName> : <functionClassName>"
private final String functionClassName;

private VersionedSqlUserDefinedFunction(SqlIdentifier opName, SqlReturnTypeInference returnTypeInference,
SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker,
List<RelDataType> paramTypes, Function function, List<String> ivyDependencies, String viewDependentFunctionName) {
List<RelDataType> paramTypes, Function function, List<String> ivyDependencies,
String originalViewTextFunctionName, String functionClassName) {
super(opName, returnTypeInference, operandTypeInference, operandTypeChecker, paramTypes, function,
SqlFunctionCategory.USER_DEFINED_FUNCTION);
this.ivyDependencies = ivyDependencies;
this.viewDependentFunctionName = viewDependentFunctionName;
}

public VersionedSqlUserDefinedFunction(String name, SqlReturnTypeInference returnTypeInference,
SqlOperandTypeChecker operandTypeChecker, List<RelDataType> paramTypes, Function function,
List<String> ivyDependencies, String viewDependentFunctionName) {
this(new SqlIdentifier(ImmutableList.of(name), SqlParserPos.ZERO), returnTypeInference, null, operandTypeChecker,
paramTypes, function, ivyDependencies, viewDependentFunctionName);
this.originalViewTextFunctionName = originalViewTextFunctionName;
this.functionClassName = functionClassName;
}

public VersionedSqlUserDefinedFunction(SqlUserDefinedFunction sqlUdf, List<String> ivyDependencies,
String viewDependentFunctionName) {
String originalViewTextFunctionName, String functionClassName) {
this(new SqlIdentifier(ImmutableList.of(sqlUdf.getName()), SqlParserPos.ZERO), sqlUdf.getReturnTypeInference(),
null, sqlUdf.getOperandTypeChecker(), sqlUdf.getParamTypes(), sqlUdf.getFunction(), ivyDependencies,
viewDependentFunctionName);
originalViewTextFunctionName, functionClassName);
}

public List<String> getIvyDependencies() {
return ivyDependencies;
}

public String getViewDependentFunctionName() {
return viewDependentFunctionName;
public String getOriginalViewTextFunctionName() {
return originalViewTextFunctionName;
}

/**
* Retrieves the short function name based on the class name. If the class name is found
* in the predefined {@link VersionedSqlUserDefinedFunction#SHORT_FUNC_NAME_MAP},
* the corresponding short name is returned. Otherwise, the method converts the last
* segment of the class name from UPPER_CAMEL to LOWER_UNDERSCORE format to generate
* the short function name.
*/
public String getShortFunctionName() {
// getName() returns the unversioned function class, which we use to identify the type inference.
// It's just a convention and other naming approaches are valid as long as they identify the type inference.
String unversionedClassName = getName();
if (SHORT_FUNC_NAME_MAP.containsKey(unversionedClassName)) {
return SHORT_FUNC_NAME_MAP.get(unversionedClassName);
}
Converter<String, String> caseConverter = CaseFormat.UPPER_CAMEL.converterTo(CaseFormat.LOWER_UNDERSCORE);
String[] nameSplit = unversionedClassName.split("\\.");
return caseConverter.convert(nameSplit[nameSplit.length - 1]);
}

public String getFunctionClassName() {
return functionClassName;
}

// This method is called during SQL validation. The super-class implementation resets the call's sqlOperator to one
// that is looked up from the StaticHiveFunctionRegistry or inferred dynamically if it's a Dali UDF. Since UDFs in the StaticHiveFunctionRegistry are not
// versioned, this method overrides the super-class implementation to properly restore the call's operator as
// a VersionedSqlUserDefinedFunction based on the already existing call's sqlOperator obtained from the
// StaticHiveFunctionRegistry, and hence preserve ivyDependencies and viewDependentFunctionName.
// StaticHiveFunctionRegistry, and hence preserve ivyDependencies and originalViewTextFunctionName.
@Override
public RelDataType deriveType(SqlValidator validator, SqlValidatorScope scope, SqlCall call) {
RelDataType relDataType = super.deriveType(validator, scope, call);
((SqlBasicCall) call).setOperator(new VersionedSqlUserDefinedFunction((SqlUserDefinedFunction) (call.getOperator()),
ivyDependencies, viewDependentFunctionName));
ivyDependencies, originalViewTextFunctionName, functionClassName));
return relDataType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,14 @@ public void testDaliUDFCall() {
assertEquals(RelOptUtil.toString(rel), expectedPlan);
}

@Test
public void testVersioningUDF() {
RelNode rel = converter.convertView("test", "tableOneViewShadePrefixUDF");
String expectedPlan = "LogicalProject(EXPR$0=[com.linkedin.coral.hive.hive2rel.CoralTestUDF($0)])\n"
+ " LogicalTableScan(table=[[hive, test, tableone]])\n";
assertEquals(RelOptUtil.toString(rel), expectedPlan);
}

@Test(expectedExceptions = UnknownSqlFunctionException.class)
public void testUnresolvedUdfError() {
final String sql = "SELECT default_foo_IsTestMemberId(a) from foo";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ public static TestHive setupDefaultHive(HiveConf conf) throws IOException {
throw new RuntimeException("Failed to setup view");
}

driver.run(
"create function lessThanHundred_with_versioning_prefix as 'coral_udf_version_0_1_x.com.linkedin.coral.hive.hive2rel.CoralTestUDF'");
response = driver.run(
"CREATE VIEW IF NOT EXISTS test.tableOneViewShadePrefixUDF as SELECT lessThanHundred_with_versioning_prefix(a) from test.tableOne");
if (response.getResponseCode() != 0) {
throw new RuntimeException("Failed to setup view");
}

driver.run(
"CREATE TABLE IF NOT EXISTS union_table(foo uniontype<int, double, array<string>, struct<a:int,b:string>>)");

Expand Down Expand Up @@ -234,8 +242,12 @@ public static TestHive setupDefaultHive(HiveConf conf) throws IOException {
setOrUpdateDaliFunction(tableOneView, "LessThanHundred", "com.linkedin.coral.hive.hive2rel.CoralTestUDF");
Table tableOneViewLateralUDTF = msc.getTable("test", "tableOneViewLateralUDTF");
setOrUpdateDaliFunction(tableOneViewLateralUDTF, "CountOfRow", "com.linkedin.coral.hive.hive2rel.CoralTestUDTF");
Table tableOneViewShadePrefixUDF = msc.getTable("test", "tableOneViewShadePrefixUDF");
setOrUpdateDaliFunction(tableOneViewShadePrefixUDF, "lessThanHundred_with_versioning_prefix",
"coral_udf_version_0_1_x.com.linkedin.coral.hive.hive2rel.CoralTestUDF");
msc.alter_table("test", "tableOneView", tableOneView);
msc.alter_table("test", "tableOneViewLateralUDTF", tableOneViewLateralUDTF);
msc.alter_table("test", "tableOneViewShadePrefixUDF", tableOneViewShadePrefixUDF);
hive = testHive;
return hive;
} catch (Exception e) {
Expand Down
Loading

0 comments on commit a8b86e7

Please sign in to comment.