Skip to content

Commit fbc65dc

Browse files
committed
temp
1 parent b84d3e6 commit fbc65dc

File tree

15 files changed

+446
-15
lines changed

15 files changed

+446
-15
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6058,6 +6058,11 @@
60586058
"The replace function does not support nested column <colName>."
60596059
]
60606060
},
6061+
"SCHEMA_LEVEL_COLLATIONS" : {
6062+
"message" : [
6063+
"Default collation for the specified schema."
6064+
]
6065+
},
60616066
"SET_NAMESPACE_PROPERTY" : {
60626067
"message" : [
60636068
"<property> is a reserved namespace property, <msg>."

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,14 @@ statement
191191
| CREATE namespace (IF errorCapturingNot EXISTS)? identifierReference
192192
(commentSpec |
193193
locationSpec |
194+
collationSpec |
194195
(WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
195196
| ALTER namespace identifierReference
196197
SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties
197198
| ALTER namespace identifierReference
198199
UNSET (DBPROPERTIES | PROPERTIES) propertyList #unsetNamespaceProperties
200+
| ALTER namespace identifierReference
201+
collationSpec #setNamespaceCollation
199202
| ALTER namespace identifierReference
200203
SET locationSpec #setNamespaceLocation
201204
| DROP namespace (IF EXISTS)? identifierReference

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ public interface SupportsNamespaces extends CatalogPlugin {
5454
*/
5555
String PROP_COMMENT = "comment";
5656

57+
/**
58+
* A reserved property to specify the collation of the namespace. The collation
59+
* will be returned in the result of "DESCRIBE NAMESPACE" command.
60+
*/
61+
String PROP_COLLATION = "collation";
62+
5763
/**
5864
* A reserved property to specify the owner of the namespace.
5965
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyDefaultCollationToStringType.scala

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.catalyst.expressions.{Cast, DefaultStringProducingExpression, Expression, Literal, SubqueryExpression}
21-
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateTable, CreateTempView, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, ReplaceTable, V2CreateTablePlan}
21+
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateTable, CreateTempView, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, ReplaceTable, TableSpec, V2CreateTablePlan}
2222
import org.apache.spark.sql.catalyst.rules.Rule
23-
import org.apache.spark.sql.connector.catalog.TableCatalog
23+
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
24+
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_COLLATION
2425
import org.apache.spark.sql.types.{DataType, StringType}
2526

2627
/**
@@ -32,10 +33,12 @@ import org.apache.spark.sql.types.{DataType, StringType}
3233
*/
3334
object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
3435
def apply(plan: LogicalPlan): LogicalPlan = {
35-
fetchDefaultCollation(plan) match {
36+
val planWithResolvedDefaultCollation = resolveDefaultCollation(plan)
37+
38+
fetchDefaultCollation(planWithResolvedDefaultCollation) match {
3639
case Some(collation) =>
37-
transform(plan, StringType(collation))
38-
case None => plan
40+
transform(planWithResolvedDefaultCollation, StringType(collation))
41+
case None => planWithResolvedDefaultCollation
3942
}
4043
}
4144

@@ -44,8 +47,8 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
4447
*/
4548
private def fetchDefaultCollation(plan: LogicalPlan): Option[String] = {
4649
plan match {
47-
case createTable: CreateTable =>
48-
createTable.tableSpec.collation
50+
case CreateTable(_: ResolvedIdentifier, _, _, tableSpec: TableSpec, _) =>
51+
tableSpec.collation
4952

5053
// CreateView also handles CREATE OR REPLACE VIEW
5154
// Unlike for tables, CreateView also handles CREATE OR REPLACE VIEW
@@ -57,8 +60,8 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
5760
case createTempView: CreateTempView =>
5861
createTempView.collation
5962

60-
case replaceTable: ReplaceTable =>
61-
replaceTable.tableSpec.collation
63+
case ReplaceTable(_: ResolvedIdentifier, _, _, tableSpec: TableSpec, _) =>
64+
tableSpec.collation
6265

6366
// In `transform` we handle these 3 ALTER TABLE commands.
6467
case cmd: AddColumns => getCollationFromTableProps(cmd.table)
@@ -91,6 +94,50 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
9194
}
9295
}
9396

97+
/**
98+
* Determines the default collation for an object in the following order:
99+
* 1. Use the object's explicitly defined default collation, if available.
100+
* 2. Otherwise, use the default collation defined by the object's schema.
101+
* 3. If not defined in the schema, use the default collation from the object's catalog.
102+
*
103+
* If none of these collations are specified, None will be persisted as the default collation,
104+
* which means the system default collation `UTF8_BINARY` will be used and the plan will not be
105+
* changed.
106+
* This function applies to DDL commands. An object's default collation is persisted at the moment
107+
* of its creation, and altering the schema or catalog collation will not affect existing objects.
108+
*/
109+
def resolveDefaultCollation(plan: LogicalPlan): LogicalPlan = {
110+
try {
111+
plan match {
112+
case createTable@CreateTable(ResolvedIdentifier(
113+
catalog: SupportsNamespaces, identifier), _, _, tableSpec: TableSpec, _)
114+
if tableSpec.collation.isEmpty =>
115+
createTable.copy(tableSpec = tableSpec.copy(
116+
collation = getCollationFromSchemaMetadata(catalog, identifier.namespace())))
117+
case replaceTable@ReplaceTable(
118+
ResolvedIdentifier(catalog: SupportsNamespaces, identifier), _, _, tableSpec: TableSpec, _)
119+
if tableSpec.collation.isEmpty =>
120+
replaceTable.copy(tableSpec = tableSpec.copy(
121+
collation = getCollationFromSchemaMetadata(catalog, identifier.namespace())))
122+
case other =>
123+
other
124+
}
125+
} catch {
126+
case _: NoSuchNamespaceException =>
127+
plan
128+
}
129+
}
130+
131+
/**
132+
Retrieves the schema's default collation from the metadata of the given catalog and schema
133+
name. Returns None if the default collation is not specified for the schema.
134+
*/
135+
private def getCollationFromSchemaMetadata(
136+
catalog: SupportsNamespaces, schemaName: Array[String]): Option[String] = {
137+
val metadata = catalog.loadNamespaceMetadata(schemaName)
138+
Option(metadata.get(PROP_COLLATION))
139+
}
140+
94141
private def isCreateOrAlterPlan(plan: LogicalPlan): Boolean = plan match {
95142
// For CREATE TABLE, only v2 CREATE TABLE command is supported.
96143
// Also, table DEFAULT COLLATION cannot be specified through CREATE TABLE AS SELECT command.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4435,6 +4435,10 @@ class AstBuilder extends DataTypeAstBuilder
44354435
throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError(
44364436
PROP_LOCATION, ctx, "please use the LOCATION clause to specify it")
44374437
case (PROP_LOCATION, _) => false
4438+
case (PROP_COLLATION, _) if !legacyOn =>
4439+
throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError(
4440+
PROP_COLLATION, ctx, "please use the DEFAULT COLLATION clause to specify it")
4441+
case (PROP_COLLATION, _) => false
44384442
case (PROP_OWNER, _) if !legacyOn =>
44394443
throw QueryParsingErrors.cannotCleanReservedNamespacePropertyError(
44404444
PROP_OWNER, ctx, "it will be set to the current user")
@@ -4460,6 +4464,7 @@ class AstBuilder extends DataTypeAstBuilder
44604464
override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) {
44614465
import SupportsNamespaces._
44624466
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
4467+
checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx)
44634468
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
44644469
checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx)
44654470
checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx)
@@ -4478,6 +4483,14 @@ class AstBuilder extends DataTypeAstBuilder
44784483
properties += PROP_COMMENT -> _
44794484
}
44804485

4486+
// Schema-level default collation is under development and currently not enabled.
4487+
if (!SQLConf.get.schemaLevelCollationsEnabled && !ctx.collationSpec().isEmpty) {
4488+
throw QueryCompilationErrors.schemaLevelCollationsNotEnabledError()
4489+
}
4490+
visitCollationSpecList(ctx.collationSpec()).foreach {
4491+
properties += PROP_COLLATION -> _
4492+
}
4493+
44814494
visitLocationSpecList(ctx.locationSpec()).foreach {
44824495
properties += PROP_LOCATION -> _
44834496
}

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ private[sql] object CatalogV2Util {
7373
*/
7474
val NAMESPACE_RESERVED_PROPERTIES =
7575
Seq(SupportsNamespaces.PROP_COMMENT,
76+
SupportsNamespaces.PROP_COLLATION,
7677
SupportsNamespaces.PROP_LOCATION,
7778
SupportsNamespaces.PROP_OWNER)
7879

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
380380
)
381381
}
382382

383+
def schemaLevelCollationsNotEnabledError(): Throwable = {
384+
new AnalysisException(
385+
errorClass = "UNSUPPORTED_FEATURE.SCHEMA_LEVEL_COLLATIONS",
386+
messageParameters = Map.empty
387+
)
388+
}
389+
383390
def trimCollationNotEnabledError(): Throwable = {
384391
new AnalysisException(
385392
errorClass = "UNSUPPORTED_FEATURE.TRIM_COLLATION",

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,20 @@ object SQLConf {
949949
.booleanConf
950950
.createWithDefault(true)
951951

952+
lazy val SCHEMA_LEVEL_COLLATIONS_ENABLED =
953+
buildConf("spark.sql.collation.schemaLevel.enabled")
954+
.internal()
955+
.doc(
956+
"Schema level collations feature is under development and its use should be done " +
957+
"under this feature flag. The feature allows setting default collation for all " +
958+
"underlying objects within that schema, except the ones that were previously created." +
959+
"An object with an explicitly set collation will not inherit the collation from the " +
960+
"schema."
961+
)
962+
.version("4.0.0")
963+
.booleanConf
964+
.createWithDefault(false)
965+
952966
lazy val TRIM_COLLATION_ENABLED =
953967
buildConf("spark.sql.collation.trim.enabled")
954968
.internal()
@@ -6177,6 +6191,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
61776191

61786192
def objectLevelCollationsEnabled: Boolean = getConf(OBJECT_LEVEL_COLLATIONS_ENABLED)
61796193

6194+
def schemaLevelCollationsEnabled: Boolean = getConf(SCHEMA_LEVEL_COLLATIONS_ENABLED)
6195+
61806196
def trimCollationEnabled: Boolean = getConf(TRIM_COLLATION_ENABLED)
61816197

61826198
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,25 @@ class SparkSqlAstBuilder extends AstBuilder {
11581158
cleanedProperties)
11591159
}
11601160

1161+
/**
1162+
* Create an [[SetNamespaceCollation]] logical plan.
1163+
*
1164+
* For example:
1165+
* {{{
1166+
* ALTER (DATABASE|SCHEMA|NAMESPACE) namespace DEFAULT COLLATION collation;
1167+
* }}}
1168+
*/
1169+
override def visitSetNamespaceCollation(ctx: SetNamespaceCollationContext): LogicalPlan = {
1170+
withOrigin(ctx) {
1171+
if (!SQLConf.get.schemaLevelCollationsEnabled) {
1172+
throw QueryCompilationErrors.schemaLevelCollationsNotEnabledError()
1173+
}
1174+
SetNamespaceCollation(
1175+
withIdentClause(ctx.identifierReference, UnresolvedNamespace(_)),
1176+
visitCollationSpec(ctx.collationSpec()))
1177+
}
1178+
}
1179+
11611180
/**
11621181
* Create a [[DescribeColumn]] or [[DescribeRelation]] or [[DescribeRelationAsJsonCommand]]
11631182
* command.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command
19+
20+
import org.apache.spark.sql.{Row, SparkSession}
21+
import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
24+
import org.apache.spark.sql.connector.catalog.NamespaceChange
25+
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_COLLATION
26+
27+
/**
28+
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... DEFAULT COLLATION command.
29+
*/
30+
case class SetNamespaceCollationCommand(
31+
namespace: LogicalPlan,
32+
collation: String) extends UnaryRunnableCommand {
33+
override def child: LogicalPlan = namespace
34+
override protected def withNewChildInternal(newChild: LogicalPlan): SetNamespaceCollationCommand =
35+
copy(namespace = newChild)
36+
37+
override def run(sparkSession: SparkSession): Seq[Row] = {
38+
val ResolvedNamespace(catalog, ns, _) = child
39+
val collationChange = NamespaceChange.setProperty(PROP_COLLATION, collation)
40+
catalog.asNamespaceCatalog.alterNamespace(ns.toArray, collationChange)
41+
42+
Seq.empty
43+
}
44+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import org.apache.spark.util.ArrayImplicits._
6666
* {{{
6767
* CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
6868
* [COMMENT database_comment]
69+
* [DEFAULT COLLATION collation]
6970
* [LOCATION database_directory]
7071
* [WITH DBPROPERTIES (property_name=property_value, ...)];
7172
* }}}
@@ -143,6 +144,28 @@ case class AlterDatabasePropertiesCommand(
143144
}
144145
}
145146

147+
/**
148+
* A command for users to set new default collation for a database.
149+
* If the database does not exist, an error message will be issued to indicate the database
150+
* does not exist.
151+
* The syntax of using this command in SQL is:
152+
* {{{
153+
* ALTER (DATABASE|SCHEMA) database_name DEFAULT COLLATION collation
154+
* }}}
155+
*/
156+
case class AlterDatabaseSetCollationCommand(databaseName: String, collation: String)
157+
extends LeafRunnableCommand {
158+
159+
override def run(sparkSession: SparkSession): Seq[Row] = {
160+
val catalog = sparkSession.sessionState.catalog
161+
val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
162+
val properties = db.properties ++ Map(PROP_COLLATION -> collation)
163+
catalog.alterDatabase(db.copy(properties = properties))
164+
165+
Seq.empty[Row]
166+
}
167+
}
168+
146169
/**
147170
* A command for users to set new location path for a database
148171
* If the database does not exist, an error message will be issued to indicate the database

0 commit comments

Comments
 (0)