Skip to content

Commit 1a66b0a

Browse files
committed
temp
1 parent 702738b commit 1a66b0a

File tree

31 files changed

+480
-32
lines changed

31 files changed

+480
-32
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6030,6 +6030,11 @@
60306030
"The replace function does not support nested column <colName>."
60316031
]
60326032
},
6033+
"SCHEMA_LEVEL_COLLATIONS" : {
6034+
"message" : [
6035+
"Default collation for the specified schema."
6036+
]
6037+
},
60336038
"SET_NAMESPACE_PROPERTY" : {
60346039
"message" : [
60356040
"<property> is a reserved namespace property, <msg>."

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,14 @@ statement
191191
| CREATE namespace (IF errorCapturingNot EXISTS)? identifierReference
192192
(commentSpec |
193193
locationSpec |
194+
collationSpec |
194195
(WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
195196
| ALTER namespace identifierReference
196197
SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties
197198
| ALTER namespace identifierReference
198199
UNSET (DBPROPERTIES | PROPERTIES) propertyList #unsetNamespaceProperties
200+
| ALTER namespace identifierReference
201+
collationSpec #setNamespaceCollation
199202
| ALTER namespace identifierReference
200203
SET locationSpec #setNamespaceLocation
201204
| DROP namespace (IF EXISTS)? identifierReference

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ public interface SupportsNamespaces extends CatalogPlugin {
5454
*/
5555
String PROP_COMMENT = "comment";
5656

57+
/**
58+
* A reserved property to specify the collation of the namespace. The collation
59+
* will be returned in the result of "DESCRIBE NAMESPACE" command.
60+
*/
61+
String PROP_COLLATION = "collation";
62+
5763
/**
5864
* A reserved property to specify the owner of the namespace.
5965
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ApplyDefaultCollationToStringType.scala

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
2020
import org.apache.spark.sql.catalyst.expressions.{Cast, DefaultStringProducingExpression, Expression, Literal, SubqueryExpression}
2121
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateTable, CreateTempView, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, ReplaceTable, V2CreateTablePlan}
2222
import org.apache.spark.sql.catalyst.rules.Rule
23-
import org.apache.spark.sql.connector.catalog.TableCatalog
23+
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
24+
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_COLLATION
2425
import org.apache.spark.sql.types.{DataType, StringType}
2526

2627
/**
@@ -32,17 +33,23 @@ import org.apache.spark.sql.types.{DataType, StringType}
3233
*/
3334
object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
3435
def apply(plan: LogicalPlan): LogicalPlan = {
35-
fetchDefaultCollation(plan) match {
36+
val defaultCollation = fetchObjectLevelDefaultCollation(plan)
37+
.orElse(fetchSchemaLevelDefaultCollation(plan))
38+
39+
defaultCollation match {
3640
case Some(collation) =>
37-
transform(plan, StringType(collation))
41+
val planWithResolvedCollation =
42+
inheritCollationIfMissing(plan, collation)
43+
transform(planWithResolvedCollation, StringType(collation))
3844
case None => plan
3945
}
4046
}
4147

42-
/** Returns the default collation that should be applied to the plan
43-
* if specified; otherwise, returns None.
48+
/**
49+
* Fetches the default collation specified at the object level (e.g., table, view, or temporary
50+
* view) in the given logical plan. If no object-level collation is specified, returns None.
4451
*/
45-
private def fetchDefaultCollation(plan: LogicalPlan): Option[String] = {
52+
private def fetchObjectLevelDefaultCollation(plan: LogicalPlan): Option[String] = {
4653
plan match {
4754
case createTable: CreateTable =>
4855
createTable.tableSpec.collation
@@ -82,6 +89,55 @@ object ApplyDefaultCollationToStringType extends Rule[LogicalPlan] {
8289
}
8390
}
8491

92+
/**
93+
* Fetches the default collation specified at the schema level in the given logical plan.
94+
* If no schema-level collation is specified, returns None. We don't match for non-DDL commands
95+
* because schema/catalog default collation affects objects only during their creation.
96+
*/
97+
private def fetchSchemaLevelDefaultCollation(plan: LogicalPlan): Option[String] = {
98+
try {
99+
plan match {
100+
case CreateTable(ResolvedIdentifier(catalog: SupportsNamespaces, identifier), _, _, _, _) =>
101+
getCollationFromSchemaMetadata(catalog, identifier.namespace())
102+
case ReplaceTable(
103+
ResolvedIdentifier(catalog: SupportsNamespaces, identifier), _, _, _, _) =>
104+
getCollationFromSchemaMetadata(catalog, identifier.namespace())
105+
case _ =>
106+
None
107+
}
108+
} catch {
109+
case _: NoSuchNamespaceException =>
110+
None
111+
}
112+
}
113+
114+
/**
115+
* Inherits the provided collation if the logical plan does not already specify one. This applies
116+
* only to DDL commands, as the schema/catalog default collation affects objects only during their
117+
* creation.
118+
*/
119+
private def inheritCollationIfMissing(
120+
plan: LogicalPlan, collation: String): LogicalPlan = {
121+
plan match {
122+
case createTable@CreateTable(_, _, _, tableSpec, _) if tableSpec.collation.isEmpty =>
123+
createTable.copy(tableSpec = tableSpec.withNewCollation(Some(collation)))
124+
case replaceTable@ReplaceTable(_, _, _, tableSpec, _) if tableSpec.collation.isEmpty =>
125+
replaceTable.copy(tableSpec = tableSpec.withNewCollation(Some(collation)))
126+
case other =>
127+
other
128+
}
129+
}
130+
131+
/**
132+
* Retrieves the schema's default collation from the metadata of the given catalog and schema
133+
* name. Returns None if the default collation is not specified for the schema.
134+
*/
135+
private def getCollationFromSchemaMetadata(
136+
catalog: SupportsNamespaces, schemaName: Array[String]): Option[String] = {
137+
val metadata = catalog.loadNamespaceMetadata(schemaName)
138+
Option(metadata.get(PROP_COLLATION))
139+
}
140+
85141
private def getCollationFromTableProps(t: LogicalPlan): Option[String] = {
86142
t match {
87143
case resolvedTbl: ResolvedTable

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,7 @@ object CatalogTableType {
10321032
case class CatalogDatabase(
10331033
name: String,
10341034
description: String,
1035+
collation: Option[String] = None,
10351036
locationUri: URI,
10361037
properties: Map[String, String])
10371038

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4446,6 +4446,7 @@ class AstBuilder extends DataTypeAstBuilder
44464446
override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) {
44474447
import SupportsNamespaces._
44484448
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
4449+
checkDuplicateClauses(ctx.collationSpec(), "DEFAULT COLLATION", ctx)
44494450
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
44504451
checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx)
44514452
checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx)
@@ -4464,6 +4465,14 @@ class AstBuilder extends DataTypeAstBuilder
44644465
properties += PROP_COMMENT -> _
44654466
}
44664467

4468+
// Schema-level default collation is under development and currently not enabled.
4469+
if (!SQLConf.get.schemaLevelCollationsEnabled && !ctx.collationSpec().isEmpty) {
4470+
throw QueryCompilationErrors.schemaLevelCollationsNotEnabledError()
4471+
}
4472+
visitCollationSpecList(ctx.collationSpec()).foreach {
4473+
properties += PROP_COLLATION -> _
4474+
}
4475+
44674476
visitLocationSpecList(ctx.locationSpec()).foreach {
44684477
properties += PROP_LOCATION -> _
44694478
}
@@ -4509,6 +4518,25 @@ class AstBuilder extends DataTypeAstBuilder
45094518
}
45104519
}
45114520

4521+
/**
4522+
* Create an [[SetNamespaceCollation]] logical plan.
4523+
*
4524+
* For example:
4525+
* {{{
4526+
* ALTER (DATABASE|SCHEMA|NAMESPACE) namespace DEFAULT COLLATION collation;
4527+
* }}}
4528+
*/
4529+
override def visitSetNamespaceCollation(ctx: SetNamespaceCollationContext): LogicalPlan = {
4530+
withOrigin(ctx) {
4531+
if (!SQLConf.get.schemaLevelCollationsEnabled) {
4532+
throw QueryCompilationErrors.schemaLevelCollationsNotEnabledError()
4533+
}
4534+
SetNamespaceCollation(
4535+
withIdentClause(ctx.identifierReference, UnresolvedNamespace(_)),
4536+
visitCollationSpec(ctx.collationSpec()))
4537+
}
4538+
}
4539+
45124540
/**
45134541
* Create an [[SetNamespaceLocation]] logical plan.
45144542
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,17 @@ case class SetNamespaceProperties(
646646
copy(namespace = newChild)
647647
}
648648

649+
/**
650+
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... DEFAULT COLLATION command.
651+
*/
652+
case class SetNamespaceCollation(
653+
namespace: LogicalPlan,
654+
collation: String) extends UnaryCommand {
655+
override def child: LogicalPlan = namespace
656+
override protected def withNewChildInternal(newChild: LogicalPlan): SetNamespaceCollation =
657+
copy(namespace = newChild)
658+
}
659+
649660
/**
650661
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION command.
651662
*/
@@ -1500,6 +1511,8 @@ trait TableSpecBase {
15001511
def collation: Option[String]
15011512
def serde: Option[SerdeInfo]
15021513
def external: Boolean
1514+
1515+
def withNewCollation(collation: Option[String]): TableSpecBase
15031516
}
15041517

15051518
case class UnresolvedTableSpec(
@@ -1531,6 +1544,10 @@ case class UnresolvedTableSpec(
15311544
optionExpression = newChildren.head.asInstanceOf[OptionList],
15321545
constraints = newChildren.tail.asInstanceOf[Seq[TableConstraint]])
15331546
}
1547+
1548+
override def withNewCollation(collation: Option[String]): UnresolvedTableSpec = {
1549+
copy(collation = collation)
1550+
}
15341551
}
15351552

15361553
/**
@@ -1570,6 +1587,10 @@ case class TableSpec(
15701587
TableSpec(properties, provider, options, newLocation,
15711588
comment, collation, serde, external, constraints)
15721589
}
1590+
1591+
override def withNewCollation(collation: Option[String]): TableSpec = {
1592+
copy(collation = collation)
1593+
}
15731594
}
15741595

15751596
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
380380
)
381381
}
382382

383+
def schemaLevelCollationsNotEnabledError(): Throwable = {
384+
new AnalysisException(
385+
errorClass = "UNSUPPORTED_FEATURE.SCHEMA_LEVEL_COLLATIONS",
386+
messageParameters = Map.empty
387+
)
388+
}
389+
383390
def trimCollationNotEnabledError(): Throwable = {
384391
new AnalysisException(
385392
errorClass = "UNSUPPORTED_FEATURE.TRIM_COLLATION",

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,20 @@ object SQLConf {
921921
.booleanConf
922922
.createWithDefault(true)
923923

924+
lazy val SCHEMA_LEVEL_COLLATIONS_ENABLED =
925+
buildConf("spark.sql.collation.schemaLevel.enabled")
926+
.internal()
927+
.doc(
928+
"Schema level collations feature is under development and its use should be done " +
929+
"under this feature flag. The feature allows setting default collation for all " +
930+
"underlying objects within that schema, except the ones that were previously created." +
931+
"An object with an explicitly set collation will not inherit the collation from the " +
932+
"schema."
933+
)
934+
.version("4.0.0")
935+
.booleanConf
936+
.createWithDefault(false)
937+
924938
lazy val TRIM_COLLATION_ENABLED =
925939
buildConf("spark.sql.collation.trim.enabled")
926940
.internal()
@@ -6147,6 +6161,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
61476161

61486162
def objectLevelCollationsEnabled: Boolean = getConf(OBJECT_LEVEL_COLLATIONS_ENABLED)
61496163

6164+
def schemaLevelCollationsEnabled: Boolean = getConf(SCHEMA_LEVEL_COLLATIONS_ENABLED)
6165+
61506166
def trimCollationEnabled: Boolean = getConf(TRIM_COLLATION_ENABLED)
61516167

61526168
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExternalCatalogSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class AnalysisExternalCatalogSuite extends AnalysisTest with Matchers {
3333
private def getAnalyzer(externCatalog: ExternalCatalog, databasePath: File): Analyzer = {
3434
val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin)
3535
catalog.createDatabase(
36-
CatalogDatabase("default", "", databasePath.toURI, Map.empty),
36+
CatalogDatabase("default", "", collation = None, databasePath.toURI, Map.empty),
3737
ignoreIfExists = false)
3838
catalog.createTable(
3939
CatalogTable(
@@ -66,7 +66,7 @@ class AnalysisExternalCatalogSuite extends AnalysisTest with Matchers {
6666
val externCatalog = spy[InMemoryCatalog](inMemoryCatalog)
6767
val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin)
6868
catalog.createDatabase(
69-
CatalogDatabase("default", "", new URI(tempDir.toString), Map.empty),
69+
CatalogDatabase("default", "", collation = None, new URI(tempDir.toString), Map.empty),
7070
ignoreIfExists = false)
7171
reset(externCatalog)
7272
catalog.functionExists(FunctionIdentifier("sum"))

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ trait AnalysisTest extends PlanTest {
7777
val catalog = new SessionCatalog(
7878
new InMemoryCatalog, FunctionRegistry.builtin, TableFunctionRegistry.builtin)
7979
catalog.createDatabase(
80-
CatalogDatabase("default", "", new URI("loc"), Map.empty),
80+
CatalogDatabase("default", "", collation = None, new URI("loc"), Map.empty),
8181
ignoreIfExists = false)
8282
createTempView(catalog, "TaBlE", TestRelations.testRelation, overrideIfExists = true)
8383
createTempView(catalog, "TaBlE2", TestRelations.testRelation2, overrideIfExists = true)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ class LookupFunctionsSuite extends PlanTest {
4040
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
4141
val externalCatalog = new CustomInMemoryCatalog
4242
externalCatalog.createDatabase(
43-
CatalogDatabase("default", "", new URI("loc1"), Map.empty),
43+
CatalogDatabase("default", "", collation = None, new URI("loc1"), Map.empty),
4444
ignoreIfExists = false)
4545
externalCatalog.createDatabase(
46-
CatalogDatabase("db1", "", new URI("loc2"), Map.empty),
46+
CatalogDatabase("db1", "", collation = None, new URI("loc2"), Map.empty),
4747
ignoreIfExists = false)
4848
val catalog = new SessionCatalog(externalCatalog, new SimpleFunctionRegistry)
4949
val catalogManager = new CatalogManager(new CustomV2SessionCatalog(catalog), catalog)
@@ -79,7 +79,7 @@ class LookupFunctionsSuite extends PlanTest {
7979
val catalogManager = new CatalogManager(new CustomV2SessionCatalog(catalog), catalog)
8080
val analyzer = {
8181
catalog.createDatabase(
82-
CatalogDatabase("default", "", new URI("loc"), Map.empty),
82+
CatalogDatabase("default", "", collation = None, new URI("loc"), Map.empty),
8383
ignoreIfExists = false)
8484
new Analyzer(catalogManager)
8585
}
@@ -104,7 +104,7 @@ class LookupFunctionsSuite extends PlanTest {
104104
val catalogManager = new CatalogManager(new CustomV2SessionCatalog(catalog), catalog)
105105
val analyzer = {
106106
catalog.createDatabase(
107-
CatalogDatabase("default", "", new URI("loc"), Map.empty),
107+
CatalogDatabase("default", "", collation = None, new URI("loc"), Map.empty),
108108
ignoreIfExists = false)
109109
new Analyzer(catalogManager)
110110
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers {
3737
private def getAnalyzer(externalCatalog: ExternalCatalog, databasePath: File): Analyzer = {
3838
val v1Catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin)
3939
v1Catalog.createDatabase(
40-
CatalogDatabase("default", "", databasePath.toURI, Map.empty),
40+
CatalogDatabase("default", "", collation = None, databasePath.toURI, Map.empty),
4141
ignoreIfExists = false)
4242
v1Catalog.createTable(
4343
CatalogTable(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
4747
}
4848

4949
private def createDbDefinition(uri: URI): CatalogDatabase = {
50-
CatalogDatabase(name = "db5", description = "", locationUri = uri, Map.empty)
50+
CatalogDatabase(name = "db5", description = "", collation = None, locationUri = uri, Map.empty)
5151
}
5252

5353
private def createDbDefinition(): CatalogDatabase = {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1065,7 +1065,7 @@ abstract class CatalogTestUtils {
10651065
}
10661066

10671067
def newDb(name: String): CatalogDatabase = {
1068-
CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
1068+
CatalogDatabase(name, name + " description", collation = None, newUriForDatabase(), Map.empty)
10691069
}
10701070

10711071
def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class CatalogManagerSuite extends SparkFunSuite with SQLHelper {
3333
private def createSessionCatalog(): SessionCatalog = {
3434
val catalog = new V1InMemoryCatalog()
3535
catalog.createDatabase(
36-
CatalogDatabase(SessionCatalog.DEFAULT_DATABASE, "", new URI("fake"), Map.empty),
36+
CatalogDatabase(
37+
SessionCatalog.DEFAULT_DATABASE, "", collation = None, new URI("fake"), Map.empty),
3738
ignoreIfExists = true)
3839
new SessionCatalog(catalog, EmptyFunctionRegistry)
3940
}
@@ -91,7 +92,7 @@ class CatalogManagerSuite extends SparkFunSuite with SQLHelper {
9192
val v1SessionCatalog = createSessionCatalog()
9293
v1SessionCatalog.createDatabase(
9394
CatalogDatabase(
94-
"test", "", v1SessionCatalog.getDefaultDBPath("test"), Map.empty),
95+
"test", "", collation = None, v1SessionCatalog.getDefaultDBPath("test"), Map.empty),
9596
ignoreIfExists = false)
9697
val catalogManager = new CatalogManager(FakeV2SessionCatalog, v1SessionCatalog)
9798

0 commit comments

Comments
 (0)