From fef0bd36064c0e8279d28a32739420b833a62da2 Mon Sep 17 00:00:00 2001 From: ilicmarkodb Date: Sat, 17 May 2025 19:08:38 +0200 Subject: [PATCH] temp --- .../resources/error/error-conditions.json | 5 + .../sql/catalyst/parser/SqlBaseParser.g4 | 3 + .../connector/catalog/SupportsNamespaces.java | 6 + .../ApplyDefaultCollationToStringType.scala | 65 ++++- .../sql/catalyst/parser/AstBuilder.scala | 13 + .../sql/connector/catalog/CatalogV2Util.scala | 1 + .../sql/errors/QueryCompilationErrors.scala | 7 + .../apache/spark/sql/internal/SQLConf.scala | 16 ++ .../spark/sql/execution/SparkSqlParser.scala | 19 ++ .../command/SetNamespaceCollation.scala | 44 ++++ .../spark/sql/execution/command/ddl.scala | 1 + .../collation/DefaultCollationTestSuite.scala | 231 +++++++++++++++++- .../errors/QueryCompilationErrorsSuite.scala | 14 ++ ...AlterNamespaceSetPropertiesSuiteBase.scala | 3 +- ...terNamespaceUnsetPropertiesSuiteBase.scala | 3 +- 15 files changed, 418 insertions(+), 13 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCollation.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 68234c4402354..dc740c6e9a8cb 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6058,6 +6058,11 @@ "The replace function does not support nested column ." ] }, + "SCHEMA_LEVEL_COLLATIONS" : { + "message" : [ + "Default collation for the specified schema." + ] + }, "SET_NAMESPACE_PROPERTY" : { "message" : [ " is a reserved namespace property, ." diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index d95a32de33854..08f222b2f4124 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -191,11 +191,14 @@ statement | CREATE namespace (IF errorCapturingNot EXISTS)? identifierReference (commentSpec | locationSpec | + collationSpec | (WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace | ALTER namespace identifierReference SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties | ALTER namespace identifierReference UNSET (DBPROPERTIES | PROPERTIES) propertyList #unsetNamespaceProperties + | ALTER namespace identifierReference + collationSpec #setNamespaceCollation | ALTER namespace identifierReference SET locationSpec #setNamespaceLocation | DROP namespace (IF EXISTS)? identifierReference diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index c1a4960068d24..49d4b0e6609b8 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -54,6 +54,12 @@ public interface SupportsNamespaces extends CatalogPlugin { */ String PROP_COMMENT = "comment"; + /** + * A reserved property to specify the collation of the namespace. The collation + * will be returned in the result of "DESCRIBE NAMESPACE" command. + */ + String PROP_COLLATION = "collation"; + /** * A reserved property to specify the owner of the namespace. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyDefaultCollationToStringType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyDefaultCollationToStringType.scala index 0082c65ab8b2c..b0133524ca40c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyDefaultCollationToStringType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyDefaultCollationToStringType.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.{Cast, DefaultStringProducingExpression, Expression, Literal, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateTable, CreateTempView, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, ReplaceTable, V2CreateTablePlan} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateTable, CreateTempView, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, ReplaceTable, TableSpec, V2CreateTablePlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog} +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_COLLATION import org.apache.spark.sql.types.{DataType, StringType} /** @@ -32,10 +33,12 @@ import org.apache.spark.sql.types.{DataType, StringType} */ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { - fetchDefaultCollation(plan) match { + val planWithResolvedDefaultCollation = resolveDefaultCollation(plan) + + fetchDefaultCollation(planWithResolvedDefaultCollation) match { case Some(collation) => - transform(plan, StringType(collation)) - case None => plan + transform(planWithResolvedDefaultCollation, StringType(collation)) + case None => planWithResolvedDefaultCollation } } @@ -44,8 +47,8 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] { */ private def fetchDefaultCollation(plan: LogicalPlan): Option[String] = { plan match { - case createTable: CreateTable => - createTable.tableSpec.collation + case CreateTable(_: ResolvedIdentifier, _, _, tableSpec: TableSpec, _) => + tableSpec.collation // CreateView also handles CREATE OR REPLACE VIEW // Unlike for tables, CreateView also handles CREATE OR REPLACE VIEW @@ -57,8 +60,8 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] { case createTempView: CreateTempView => createTempView.collation - case replaceTable: ReplaceTable => - replaceTable.tableSpec.collation + case ReplaceTable(_: ResolvedIdentifier, _, _, tableSpec: TableSpec, _) => + tableSpec.collation // In `transform` we handle these 3 ALTER TABLE commands. case cmd: AddColumns => getCollationFromTableProps(cmd.table) @@ -91,6 +94,50 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] { } } + /** + * Determines the default collation for an object in the following order: + * 1. Use the object's explicitly defined default collation, if available. + * 2. Otherwise, use the default collation defined by the object's schema. + * 3. If not defined in the schema, use the default collation from the object's catalog. + * + * If none of these collations are specified, None will be persisted as the default collation, + * which means the system default collation `UTF8_BINARY` will be used and the plan will not be + * changed. + * This function applies to DDL commands. An object's default collation is persisted at the moment + * of its creation, and altering the schema or catalog collation will not affect existing objects. + */ + def resolveDefaultCollation(plan: LogicalPlan): LogicalPlan = { + try { + plan match { + case createTable@CreateTable(ResolvedIdentifier( + catalog: SupportsNamespaces, identifier), _, _, tableSpec: TableSpec, _) + if tableSpec.collation.isEmpty => + createTable.copy(tableSpec = tableSpec.copy( + collation = getCollationFromSchemaMetadata(catalog, identifier.namespace()))) + case replaceTable@ReplaceTable( + ResolvedIdentifier(catalog: SupportsNamespaces, identifier), _, _, tableSpec: TableSpec, _) + if tableSpec.collation.isEmpty => + replaceTable.copy(tableSpec = tableSpec.copy( + collation = getCollationFromSchemaMetadata(catalog, identifier.namespace()))) + case other => + other + } + } catch { + case _: NoSuchNamespaceException => + plan + } + } + + /** + Retrieves the schema's default collation from the metadata of the given catalog and schema + name. Returns None if the default collation is not specified for the schema. + */ + private def getCollationFromSchemaMetadata( + catalog: SupportsNamespaces, schemaName: Array[String]): Option[String] = { + val metadata = catalog.loadNamespaceMetadata(schemaName) + Option(metadata.get(PROP_COLLATION)) + } + private def isCreateOrAlterPlan(plan: LogicalPlan): Boolean = plan match { // For CREATE TABLE, only v2 CREATE TABLE command is supported. // Also, table DEFAULT COLLATION cannot be specified through CREATE TABLE AS SELECT command. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index df348f1f6051e..9b9ff2175457b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -4435,6 +4435,10 @@ class AstBuilder extends DataTypeAstBuilder throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError( PROP_LOCATION, ctx, "please use the LOCATION clause to specify it") case (PROP_LOCATION, _) => false + case (PROP_COLLATION, _) if !legacyOn => + throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError( + PROP_COLLATION, ctx, "please use the DEFAULT COLLATION clause to specify it") + case (PROP_COLLATION, _) => false case (PROP_OWNER, _) if !legacyOn => throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError( PROP_OWNER, ctx, "it will be set to the current user") @@ -4460,6 +4464,7 @@ class AstBuilder extends DataTypeAstBuilder override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) { import SupportsNamespaces._ checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx) + checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx) checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx) checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx) @@ -4478,6 +4483,14 @@ class AstBuilder extends DataTypeAstBuilder properties += PROP_COMMENT -> _ } + // Schema-level default collation is under development and currently not enabled. + if (!SQLConf.get.schemaLevelCollationsEnabled && !ctx.collationSpec().isEmpty) { + throw QueryCompilationErrors.schemaLevelCollationsNotEnabledError() + } + visitCollationSpecList(ctx.collationSpec()).foreach { + properties += PROP_COLLATION -> _ + } + visitLocationSpecList(ctx.locationSpec()).foreach { properties += PROP_LOCATION -> _ } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 527972dd0a364..ded42a2b0f87e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -73,6 +73,7 @@ private[sql] object CatalogV2Util { */ val NAMESPACE_RESERVED_PROPERTIES = Seq(SupportsNamespaces.PROP_COMMENT, + SupportsNamespaces.PROP_COLLATION, SupportsNamespaces.PROP_LOCATION, SupportsNamespaces.PROP_OWNER) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 76a5a73659722..9de52533dbbb8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -380,6 +380,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ) } + def schemaLevelCollationsNotEnabledError(): Throwable = { + new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.SCHEMA_LEVEL_COLLATIONS", + messageParameters = Map.empty + ) + } + def trimCollationNotEnabledError(): Throwable = { new AnalysisException( errorClass = "UNSUPPORTED_FEATURE.TRIM_COLLATION", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b8b1ef4e135a2..7335d525c2a2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -949,6 +949,20 @@ object SQLConf { .booleanConf .createWithDefault(true) + lazy val SCHEMA_LEVEL_COLLATIONS_ENABLED = + buildConf("spark.sql.collation.schemaLevel.enabled") + .internal() + .doc( + "Schema level collations feature is under development and its use should be done " + + "under this feature flag. The feature allows setting default collation for all " + + "underlying objects within that schema, except the ones that were previously created." + + "An object with an explicitly set collation will not inherit the collation from the " + + "schema." + ) + .version("4.0.0") + .booleanConf + .createWithDefault(false) + lazy val TRIM_COLLATION_ENABLED = buildConf("spark.sql.collation.trim.enabled") .internal() @@ -6177,6 +6191,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def objectLevelCollationsEnabled: Boolean = getConf(OBJECT_LEVEL_COLLATIONS_ENABLED) + def schemaLevelCollationsEnabled: Boolean = getConf(SCHEMA_LEVEL_COLLATIONS_ENABLED) + def trimCollationEnabled: Boolean = getConf(TRIM_COLLATION_ENABLED) def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 9565bc9296a6a..0d218b6e183c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1158,6 +1158,25 @@ class SparkSqlAstBuilder extends AstBuilder { cleanedProperties) } + /** + * Create an [[SetNamespaceCollation]] logical plan. + * + * For example: + * {{{ + * ALTER (DATABASE|SCHEMA|NAMESPACE) namespace DEFAULT COLLATION collation; + * }}} + */ + override def visitSetNamespaceCollation(ctx: SetNamespaceCollationContext): LogicalPlan = { + withOrigin(ctx) { + if (!SQLConf.get.schemaLevelCollationsEnabled) { + throw QueryCompilationErrors.schemaLevelCollationsNotEnabledError() + } + SetNamespaceCollationCommand( + withIdentClause(ctx.identifierReference, UnresolvedNamespace(_)), + visitCollationSpec(ctx.collationSpec())) + } + } + /** * Create a [[DescribeColumn]] or [[DescribeRelation]] or [[DescribeRelationAsJsonCommand]] * command. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCollation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCollation.scala new file mode 100644 index 0000000000000..e434ec53ce934 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCollation.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper +import org.apache.spark.sql.connector.catalog.NamespaceChange +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_COLLATION + +/** + * The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... DEFAULT COLLATION command. + */ +case class SetNamespaceCollationCommand( + namespace: LogicalPlan, + collation: String) extends UnaryRunnableCommand { + override def child: LogicalPlan = namespace + override protected def withNewChildInternal(newChild: LogicalPlan): SetNamespaceCollationCommand = + copy(namespace = newChild) + + override def run(sparkSession: SparkSession): Seq[Row] = { + val ResolvedNamespace(catalog, ns, _) = child + val collationChange = NamespaceChange.setProperty(PROP_COLLATION, collation) + catalog.asNamespaceCatalog.alterNamespace(ns.toArray, collationChange) + + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6eb81e6ec670b..13994c58f1206 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -66,6 +66,7 @@ import org.apache.spark.util.ArrayImplicits._ * {{{ * CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name * [COMMENT database_comment] + * [DEFAULT COLLATION collation] * [LOCATION database_directory] * [WITH DBPROPERTIES (property_name=property_value, ...)]; * }}} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/DefaultCollationTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/DefaultCollationTestSuite.scala index 031af8200d7e9..9a2665e3054f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/DefaultCollationTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/DefaultCollationTestSuite.scala @@ -18,13 +18,25 @@ package org.apache.spark.sql.collation import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE import org.apache.spark.sql.catalyst.util.CollationFactory import org.apache.spark.sql.connector.DatasourceV2SQLBase +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StringType abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSession { + override def beforeEach(): Unit = { + super.beforeEach() + spark.conf.set(SQLConf.SCHEMA_LEVEL_COLLATIONS_ENABLED, true) + } + + override def afterEach(): Unit = { + spark.conf.set(SQLConf.SCHEMA_LEVEL_COLLATIONS_ENABLED, false) + super.afterEach() + } + val defaultStringProducingExpressions: Seq[String] = Seq( "current_timezone()", "current_database()", "md5('Spark' collate unicode)", "soundex('Spark' collate unicode)", "url_encode('https://spark.apache.org' collate unicode)", @@ -33,6 +45,7 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi ) def dataSource: String = "parquet" + def testSchema: String = "test_schema" def testTable: String = "test_tbl" def testView: String = "test_view" protected val fullyQualifiedPrefix = s"${CollationFactory.CATALOG}.${CollationFactory.SCHEMA}." @@ -129,6 +142,91 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi } } + Seq( + // (schemaDefaultCollation, tableDefaultCollation) + ("UTF8_BINARY", None), + ("UTF8_LCASE", None), + ("UNICODE", None), + ("DE", None), + ("UTF8_BINARY", Some("UTF8_BINARY")), + ("UTF8_BINARY", Some("UTF8_LCASE")), + ("UTF8_BINARY", Some("DE")), + ("UTF8_LCASE", Some("UTF8_BINARY")), + ("UTF8_LCASE", Some("UTF8_LCASE")), + ("UTF8_LCASE", Some("DE")) + ).foreach { + case (schemaDefaultCollation, tableDefaultCollation) => + test( + s"""CREATE table with schema level collation + | (schema default collation = $schemaDefaultCollation, + | table default collation = $tableDefaultCollation)""".stripMargin) { + testCreateTableWithSchemaLevelCollation( + schemaDefaultCollation, tableDefaultCollation) + } + + test( + s"""ALTER table with schema level collation + | (schema default collation = $schemaDefaultCollation, + | table default collation = $tableDefaultCollation)""".stripMargin) { + testAlterTableWithSchemaLevelCollation( + schemaDefaultCollation, tableDefaultCollation) + } + } + + Seq( + // (schemaOldCollation, schemaNewCollation) + (None, "UTF8_BINARY"), + (None, "UTF8_LCASE"), + (None, "DE"), + (Some("UTF8_BINARY"), "UTF8_BINARY"), + (Some("UTF8_BINARY"), "UTF8_LCASE"), + (Some("UTF8_BINARY"), "DE"), + (Some("UTF8_LCASE"), "UTF8_BINARY"), + (Some("UTF8_LCASE"), "UTF8_LCASE"), + (Some("UTF8_LCASE"), "DE") + ).foreach { + case (schemaOldCollation, schemaNewCollation) => + val schemaOldCollationDefaultClause = + if (schemaOldCollation.isDefined) { + s"DEFAULT COLLATION ${schemaOldCollation.get}" + } else { + "" + } + + test( + s"""ALTER schema default collation (old schema default collation = $schemaOldCollation, + | new schema default collation = $schemaNewCollation)""".stripMargin) { + withDatabase(testSchema) { + sql(s"CREATE SCHEMA $testSchema $schemaOldCollationDefaultClause") + sql(s"USE $testSchema") + + withTable(testTable) { + sql(s"CREATE TABLE $testTable (c1 STRING, c2 STRING COLLATE SR_AI)") + val tableDefaultCollation = + if (schemaOldCollation.isDefined) { + schemaOldCollation.get + } else { + "UTF8_BINARY" + } + + // ALTER SCHEMA + sql(s"ALTER SCHEMA $testSchema DEFAULT COLLATION $schemaNewCollation") + + // Altering schema default collation should not affect existing objects. + addAndAlterColumns(tableDefaultCollation = tableDefaultCollation) + } + + withTable(testTable) { + sql(s"CREATE TABLE $testTable " + + s"(c1 STRING, c2 STRING COLLATE SR_AI, c3 STRING COLLATE UTF8_BINARY)") + assertTableColumnCollation(testTable, "c1", schemaNewCollation) + assertTableColumnCollation(testTable, "c2", "SR_AI") + assertTableColumnCollation(testTable, "c3", "UTF8_BINARY") + } + } + } + } + test("create table as select") { // literals in select do not pick up session collation withTable(testTable) { @@ -263,6 +361,72 @@ abstract class DefaultCollationTestSuite extends QueryTest with SharedSparkSessi } // endregion + + protected def testCreateTableWithSchemaLevelCollation( + schemaDefaultCollation: String, + tableDefaultCollation: Option[String] = None, + replaceTable: Boolean = false): Unit = { + val (tableDefaultCollationClause, resolvedDefaultCollation) = + if (tableDefaultCollation.isDefined) { + (s"DEFAULT COLLATION ${tableDefaultCollation.get}", tableDefaultCollation.get) + } else { + ("", schemaDefaultCollation) + } + val replace = if (replaceTable) "OR REPLACE" else "" + + withDatabase(testSchema) { + sql(s"CREATE SCHEMA $testSchema DEFAULT COLLATION $schemaDefaultCollation") + sql(s"USE $testSchema") + withTable(testTable) { + sql(s"CREATE $replace TABLE $testTable " + + s"(c1 STRING, c2 STRING COLLATE SR_AI, c3 STRING COLLATE UTF8_BINARY) " + + s"$tableDefaultCollationClause") + assertTableColumnCollation(testTable, "c1", resolvedDefaultCollation) + assertTableColumnCollation(testTable, "c2", "SR_AI") + assertTableColumnCollation(testTable, "c3", "UTF8_BINARY") + } + } + } + + def testAlterTableWithSchemaLevelCollation( + schemaDefaultCollation: String, tableDefaultCollation: Option[String] = None): Unit = { + val (tableDefaultCollationClause, resolvedDefaultCollation) = + if (tableDefaultCollation.isDefined) { + (s"DEFAULT COLLATION ${tableDefaultCollation.get}", tableDefaultCollation.get) + } else { + ("", schemaDefaultCollation) + } + + withDatabase(testSchema) { + sql(s"CREATE SCHEMA $testSchema DEFAULT COLLATION $schemaDefaultCollation") + sql(s"USE $testSchema") + + withTable(testTable) { + sql(s"CREATE TABLE $testTable (c1 STRING, c2 STRING COLLATE SR_AI) " + + s"$tableDefaultCollationClause") + + addAndAlterColumns(tableDefaultCollation = resolvedDefaultCollation) + } + } + } + + private def addAndAlterColumns(tableDefaultCollation: String): Unit = { + // ADD COLUMN + sql(s"ALTER TABLE $testTable ADD COLUMN c3 STRING") + sql(s"ALTER TABLE $testTable ADD COLUMN c4 STRING COLLATE SR_AI") + sql(s"ALTER TABLE $testTable ADD COLUMN c5 STRING COLLATE UTF8_BINARY") + assertTableColumnCollation(testTable, "c3", tableDefaultCollation) + assertTableColumnCollation(testTable, "c4", "SR_AI") + assertTableColumnCollation(testTable, "c5", "UTF8_BINARY") + + // ALTER COLUMN + sql(s"ALTER TABLE $testTable ALTER COLUMN c1 TYPE STRING COLLATE UNICODE") + sql(s"ALTER TABLE $testTable ALTER COLUMN c2 TYPE STRING") + sql(s"ALTER TABLE $testTable ALTER COLUMN c3 TYPE STRING COLLATE UTF8_BINARY") + assertTableColumnCollation(testTable, "c1", "UNICODE") + assertTableColumnCollation(testTable, "c2", tableDefaultCollation) + assertTableColumnCollation(testTable, "c3", "UTF8_BINARY") + } } class DefaultCollationTestSuiteV1 extends DefaultCollationTestSuite { @@ -391,8 +555,14 @@ class DefaultCollationTestSuiteV1 extends DefaultCollationTestSuite { } class DefaultCollationTestSuiteV2 extends DefaultCollationTestSuite with DatasourceV2SQLBase { - override def testTable: String = s"testcat.${super.testTable}" - override def testView: String = s"testcat.${super.testView}" + override def testSchema: String = s"testcat.${super.testSchema}" + + override def beforeEach(): Unit = { + super.beforeEach() + sql("USE testcat") + sql(s"CREATE SCHEMA IF NOT EXISTS $DEFAULT_DATABASE") + sql(s"USE testcat.$DEFAULT_DATABASE") + } test("inline table in RTAS") { withTable(testTable) { @@ -427,4 +597,61 @@ class DefaultCollationTestSuiteV2 extends DefaultCollationTestSuite with Datasou checkAnswer(sql(s"SELECT DISTINCT COLLATION(c2) FROM $testTable"), Row(s"$prefix.UTF8_LCASE")) } } + + Seq( + // (schemaDefaultCollation, tableDefaultCollation) + ("UTF8_BINARY", None), + ("UTF8_LCASE", None), + ("UNICODE", None), + ("DE", None), + ("UTF8_BINARY", Some("UTF8_BINARY")), + ("UTF8_BINARY", Some("UTF8_LCASE")), + ("UTF8_BINARY", Some("DE")), + ("UTF8_LCASE", Some("UTF8_BINARY")), + ("UTF8_LCASE", Some("UTF8_LCASE")), + ("UTF8_LCASE", Some("DE")) + ).foreach { + case (schemaDefaultCollation, tableDefaultCollation) => + test( + s"""CREATE OR REPLACE table with schema level collation + | (schema default collation = $schemaDefaultCollation, + | table default collation = $tableDefaultCollation)""".stripMargin) { + testCreateTableWithSchemaLevelCollation( + schemaDefaultCollation, tableDefaultCollation, replaceTable = true) + } + + test( + s"""REPLACE COLUMNS with schema level collation + | (schema default collation = $schemaDefaultCollation, + | table default collation = $tableDefaultCollation""".stripMargin) { + testReplaceColumns( + schemaDefaultCollation, tableDefaultCollation) + } + } + + private def testReplaceColumns( + schemaDefaultCollation: String, tableDefaultCollation: Option[String] = None): Unit = { + val (tableDefaultCollationClause, resolvedDefaultCollation) = + if (tableDefaultCollation.isDefined) { + (s"DEFAULT COLLATION ${tableDefaultCollation.get}", tableDefaultCollation.get) + } else { + ("", schemaDefaultCollation) + } + + withDatabase(testSchema) { + sql(s"CREATE SCHEMA $testSchema DEFAULT COLLATION $schemaDefaultCollation") + sql(s"USE $testSchema") + + withTable(testTable) { + sql(s"CREATE TABLE $testTable (c1 STRING, c2 STRING COLLATE SR_AI) " + + s"$tableDefaultCollationClause") + + sql(s"ALTER TABLE $testTable REPLACE COLUMNS " + + "(c1 STRING COLLATE UNICODE, c2 STRING, c3 STRING COLLATE UTF8_BINARY)") + assertTableColumnCollation(testTable, "c1", "UNICODE") + assertTableColumnCollation(testTable, "c2", resolvedDefaultCollation) + assertTableColumnCollation(testTable, "c3", "UTF8_BINARY") + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index be83d2c653235..110347e766e80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -918,6 +918,20 @@ class QueryCompilationErrorsSuite } } + test("SPARK-52219: the schema level collations feature is unsupported") { + // TODO: when schema level collations are supported, change this test to set the flag to false + Seq( + "CREATE SCHEMA test_schema DEFAULT COLLATION UNICODE", + "ALTER SCHEMA test_schema DEFAULT COLLATION UNICODE" + ).foreach { + sqlText => + checkError( + exception = intercept[AnalysisException](sql(sqlText)), + condition = "UNSUPPORTED_FEATURE.SCHEMA_LEVEL_COLLATIONS" + ) + } + } + test("UNSUPPORTED_CALL: call the unsupported method update()") { checkError( exception = intercept[SparkUnsupportedOperationException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceSetPropertiesSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceSetPropertiesSuiteBase.scala index 3b0ac1d408234..f42fd131adc46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceSetPropertiesSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceSetPropertiesSuiteBase.scala @@ -100,7 +100,8 @@ trait AlterNamespaceSetPropertiesSuiteBase extends QueryTest with DDLCommandTest } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES + .filterNot(prop => prop == PROP_COLLATION || prop == PROP_COMMENT).foreach { key => withNamespace(ns) { // Set the location explicitly because v2 catalog may not set the default location. // Without this, `meta.get(key)` below may return null. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceUnsetPropertiesSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceUnsetPropertiesSuiteBase.scala index 42550ef844361..593726526e4ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceUnsetPropertiesSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceUnsetPropertiesSuiteBase.scala @@ -102,7 +102,8 @@ trait AlterNamespaceUnsetPropertiesSuiteBase extends QueryTest with DDLCommandTe } } withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) { - CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key => + CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES + .filterNot(prop => prop == PROP_COLLATION || prop == PROP_COMMENT).foreach { key => withNamespace(ns) { // Set the location explicitly because v2 catalog may not set the default location. // Without this, `meta.get(key)` below may return null.