Skip to content

[SPARK-52219][SQL] Schema level collation support for tables #50937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6058,6 +6058,11 @@
"The replace function does not support nested column <colName>."
]
},
"SCHEMA_LEVEL_COLLATIONS" : {
"message" : [
"Default collation for the specified schema."
]
},
"SET_NAMESPACE_PROPERTY" : {
"message" : [
"<property> is a reserved namespace property, <msg>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,14 @@ statement
| CREATE namespace (IF errorCapturingNot EXISTS)? identifierReference
(commentSpec |
locationSpec |
collationSpec |
(WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
| ALTER namespace identifierReference
SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties
| ALTER namespace identifierReference
UNSET (DBPROPERTIES | PROPERTIES) propertyList #unsetNamespaceProperties
| ALTER namespace identifierReference
collationSpec #setNamespaceCollation
| ALTER namespace identifierReference
SET locationSpec #setNamespaceLocation
| DROP namespace (IF EXISTS)? identifierReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public interface SupportsNamespaces extends CatalogPlugin {
*/
String PROP_COMMENT = "comment";

/**
* A reserved property to specify the collation of the namespace. The collation
* will be returned in the result of "DESCRIBE NAMESPACE" command.
*/
String PROP_COLLATION = "collation";

/**
* A reserved property to specify the owner of the namespace.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{Cast, DefaultStringProducingExpression, Expression, Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateTable, CreateTempView, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, ReplaceTable, V2CreateTablePlan}
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateTable, CreateTempView, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, ReplaceTable, TableSpec, V2CreateTablePlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_COLLATION
import org.apache.spark.sql.types.{DataType, StringType}

/**
Expand All @@ -32,10 +33,12 @@ import org.apache.spark.sql.types.{DataType, StringType}
*/
object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
fetchDefaultCollation(plan) match {
val planWithResolvedDefaultCollation = resolveDefaultCollation(plan)

fetchDefaultCollation(planWithResolvedDefaultCollation) match {
case Some(collation) =>
transform(plan, StringType(collation))
case None => plan
transform(planWithResolvedDefaultCollation, StringType(collation))
case None => planWithResolvedDefaultCollation
}
}

Expand All @@ -44,8 +47,8 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
*/
private def fetchDefaultCollation(plan: LogicalPlan): Option[String] = {
plan match {
case createTable: CreateTable =>
createTable.tableSpec.collation
case CreateTable(_: ResolvedIdentifier, _, _, tableSpec: TableSpec, _) =>
tableSpec.collation

// CreateView also handles CREATE OR REPLACE VIEW
// Unlike for tables, CreateView also handles CREATE OR REPLACE VIEW
Expand All @@ -57,8 +60,8 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
case createTempView: CreateTempView =>
createTempView.collation

case replaceTable: ReplaceTable =>
replaceTable.tableSpec.collation
case ReplaceTable(_: ResolvedIdentifier, _, _, tableSpec: TableSpec, _) =>
tableSpec.collation

// In `transform` we handle these 3 ALTER TABLE commands.
case cmd: AddColumns => getCollationFromTableProps(cmd.table)
Expand Down Expand Up @@ -91,6 +94,50 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
}
}

/**
* Determines the default collation for an object in the following order:
* 1. Use the object's explicitly defined default collation, if available.
* 2. Otherwise, use the default collation defined by the object's schema.
* 3. If not defined in the schema, use the default collation from the object's catalog.
*
* If none of these collations are specified, None will be persisted as the default collation,
* which means the system default collation `UTF8_BINARY` will be used and the plan will not be
* changed.
* This function applies to DDL commands. An object's default collation is persisted at the moment
* of its creation, and altering the schema or catalog collation will not affect existing objects.
*/
def resolveDefaultCollation(plan: LogicalPlan): LogicalPlan = {
try {
plan match {
case createTable@CreateTable(ResolvedIdentifier(
catalog: SupportsNamespaces, identifier), _, _, tableSpec: TableSpec, _)
if tableSpec.collation.isEmpty =>
createTable.copy(tableSpec = tableSpec.copy(
collation = getCollationFromSchemaMetadata(catalog, identifier.namespace())))
case replaceTable@ReplaceTable(
ResolvedIdentifier(catalog: SupportsNamespaces, identifier), _, _, tableSpec: TableSpec, _)
if tableSpec.collation.isEmpty =>
replaceTable.copy(tableSpec = tableSpec.copy(
collation = getCollationFromSchemaMetadata(catalog, identifier.namespace())))
case other =>
other
}
} catch {
case _: NoSuchNamespaceException =>
plan
}
}

/**
Retrieves the schema's default collation from the metadata of the given catalog and schema
name. Returns None if the default collation is not specified for the schema.
*/
private def getCollationFromSchemaMetadata(
catalog: SupportsNamespaces, schemaName: Array[String]): Option[String] = {
val metadata = catalog.loadNamespaceMetadata(schemaName)
Option(metadata.get(PROP_COLLATION))
}

private def isCreateOrAlterPlan(plan: LogicalPlan): Boolean = plan match {
// For CREATE TABLE, only v2 CREATE TABLE command is supported.
// Also, table DEFAULT COLLATION cannot be specified through CREATE TABLE AS SELECT command.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4435,6 +4435,10 @@ class AstBuilder extends DataTypeAstBuilder
throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError(
PROP_LOCATION, ctx, "please use the LOCATION clause to specify it")
case (PROP_LOCATION, _) => false
case (PROP_COLLATION, _) if !legacyOn =>
throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError(
PROP_COLLATION, ctx, "please use the DEFAULT COLLATION clause to specify it")
case (PROP_COLLATION, _) => false
case (PROP_OWNER, _) if !legacyOn =>
throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError(
PROP_OWNER, ctx, "it will be set to the current user")
Expand All @@ -4460,6 +4464,7 @@ class AstBuilder extends DataTypeAstBuilder
override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) {
import SupportsNamespaces._
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx)
checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx)
Expand All @@ -4478,6 +4483,14 @@ class AstBuilder extends DataTypeAstBuilder
properties += PROP_COMMENT -> _
}

// Schema-level default collation is under development and currently not enabled.
if (!SQLConf.get.schemaLevelCollationsEnabled && !ctx.collationSpec().isEmpty) {
throw QueryCompilationErrors.schemaLevelCollationsNotEnabledError()
}
visitCollationSpecList(ctx.collationSpec()).foreach {
properties += PROP_COLLATION -> _
}

visitLocationSpecList(ctx.locationSpec()).foreach {
properties += PROP_LOCATION -> _
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private[sql] object CatalogV2Util {
*/
val NAMESPACE_RESERVED_PROPERTIES =
Seq(SupportsNamespaces.PROP_COMMENT,
SupportsNamespaces.PROP_COLLATION,
SupportsNamespaces.PROP_LOCATION,
SupportsNamespaces.PROP_OWNER)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
)
}

def schemaLevelCollationsNotEnabledError(): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.SCHEMA_LEVEL_COLLATIONS",
messageParameters = Map.empty
)
}

def trimCollationNotEnabledError(): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.TRIM_COLLATION",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,20 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

lazy val SCHEMA_LEVEL_COLLATIONS_ENABLED =
buildConf("spark.sql.collation.schemaLevel.enabled")
.internal()
.doc(
"Schema level collations feature is under development and its use should be done " +
"under this feature flag. The feature allows setting default collation for all " +
"underlying objects within that schema, except the ones that were previously created." +
"An object with an explicitly set collation will not inherit the collation from the " +
"schema."
)
.version("4.0.0")
.booleanConf
.createWithDefault(false)

lazy val TRIM_COLLATION_ENABLED =
buildConf("spark.sql.collation.trim.enabled")
.internal()
Expand Down Expand Up @@ -6177,6 +6191,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def objectLevelCollationsEnabled: Boolean = getConf(OBJECT_LEVEL_COLLATIONS_ENABLED)

def schemaLevelCollationsEnabled: Boolean = getConf(SCHEMA_LEVEL_COLLATIONS_ENABLED)

def trimCollationEnabled: Boolean = getConf(TRIM_COLLATION_ENABLED)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,25 @@ class SparkSqlAstBuilder extends AstBuilder {
cleanedProperties)
}

/**
* Create an [[SetNamespaceCollation]] logical plan.
*
* For example:
* {{{
* ALTER (DATABASE|SCHEMA|NAMESPACE) namespace DEFAULT COLLATION collation;
* }}}
*/
override def visitSetNamespaceCollation(ctx: SetNamespaceCollationContext): LogicalPlan = {
withOrigin(ctx) {
if (!SQLConf.get.schemaLevelCollationsEnabled) {
throw QueryCompilationErrors.schemaLevelCollationsNotEnabledError()
}
SetNamespaceCollationCommand(
withIdentClause(ctx.identifierReference, UnresolvedNamespace(_)),
visitCollationSpec(ctx.collationSpec()))
}
}

/**
* Create a [[DescribeColumn]] or [[DescribeRelation]] or [[DescribeRelationAsJsonCommand]]
* command.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.catalog.NamespaceChange
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_COLLATION

/**
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... DEFAULT COLLATION command.
*/
case class SetNamespaceCollationCommand(
namespace: LogicalPlan,
collation: String) extends UnaryRunnableCommand {
override def child: LogicalPlan = namespace
override protected def withNewChildInternal(newChild: LogicalPlan): SetNamespaceCollationCommand =
copy(namespace = newChild)

override def run(sparkSession: SparkSession): Seq[Row] = {
val ResolvedNamespace(catalog, ns, _) = child
val collationChange = NamespaceChange.setProperty(PROP_COLLATION, collation)
catalog.asNamespaceCatalog.alterNamespace(ns.toArray, collationChange)

Seq.empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import org.apache.spark.util.ArrayImplicits._
* {{{
* CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
* [COMMENT database_comment]
* [DEFAULT COLLATION collation]
* [LOCATION database_directory]
* [WITH DBPROPERTIES (property_name=property_value, ...)];
* }}}
Expand Down
Loading