From cbcdd8dbfe2daf8f8542c7a042691460e5557be8 Mon Sep 17 00:00:00 2001
From: davidyuan <yuanfuyuan@mafengwo.com>
Date: Wed, 12 Mar 2025 23:27:28 +0800
Subject: [PATCH 1/4] producers

---
 .../main/resources/table_command_spec.json    | 16 ++++++
 .../spark/authz/gen/PaimonCommands.scala      |  9 ++-
 ...imonCatalogRangerSparkExtensionSuite.scala | 55 +++++++++++++++++++
 3 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 2c7f2d47272..ba018c64209 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -2589,6 +2589,22 @@
     "isInput" : false,
     "comment" : "Delta"
   } ]
+}, {
+  "classname" : "org.apache.paimon.spark.catalyst.plans.logical.PaimonCallCommand",
+  "tableDescs" : [ {
+    "fieldName" : "args",
+    "fieldExtractor" : "ExpressionSeqTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false,
+    "comment" : "Paimon"
+  } ],
+  "opType" : "ALTERTABLE_PROPERTIES",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
 }, {
   "classname" : "org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand",
   "tableDescs" : [ {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
index 4d3a111591a..c16c13e040f 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/PaimonCommands.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.plugin.spark.authz.gen
 
+import org.apache.kyuubi.plugin.spark.authz.OperationType
 import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
 import org.apache.kyuubi.plugin.spark.authz.serde._
 
@@ -56,8 +57,14 @@ object PaimonCommands extends CommandSpecs[TableCommandSpec] {
     TableCommandSpec(cmd, Seq(targetTableDesc, sourceTableDesc))
   }
 
+  val PaimonCallCommand = {
+    val cmd = "org.apache.paimon.spark.catalyst.plans.logical.PaimonCallCommand"
+    val td = TableDesc("args", classOf[ExpressionSeqTableExtractor], comment = "Paimon")
+    TableCommandSpec(cmd, Seq(td), opType = OperationType.ALTERTABLE_PROPERTIES)
+  }
   override def specs: Seq[TableCommandSpec] = Seq(
     UpdatePaimonTable,
     DeleteFromPaimonTable,
-    MergeIntoPaimonTable)
+    MergeIntoPaimonTable,
+    PaimonCallCommand)
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
index 4248ea44160..6bcc914219b 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
@@ -531,6 +531,61 @@ class PaimonCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
     }
   }
 
+  test("Producers") {
+    withCleanTmpResources(Seq(
+      (s"$catalogV2.$namespace1.$table1", "table"))) {
+      try {
+        doAs(admin, sql(createTableSql(namespace1, table1)))
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (1, 'a'), (2, 'b')"))
+
+        var currentCatalogName =
+          doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
+
+        doAs(admin, sql(s"use $catalogV2"))
+        currentCatalogName = doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
+        assert(currentCatalogName.equals(catalogV2))
+
+        // Create Tag
+        val createTagSql = s"Call sys.create_tag(table =>" +
+          s"'$catalogV2.$namespace1.$table1', tag => 'test_tag', snapshot => 1)"
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(createTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(createTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(createTagSql))
+
+        // Delete Tag
+        val deleteTagSql = s"Call sys.delete_tag(table =>" +
+          s"'$catalogV2.$namespace1.$table1', tag => 'test_tag')"
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(deleteTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(deleteTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(deleteTagSql))
+
+        // Rollback
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (3, 'a'), (4, 'b')"))
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (5, 'a'), (6, 'b')"))
+        val rollbackTagSql = s"Call sys.rollback(table =>" +
+          s"'$catalogV2.$namespace1.$table1', version => '2')"
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(rollbackTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(rollbackTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(rollbackTagSql))
+
+      } finally {
+        doAs(admin, sql(s"use spark_catalog"))
+      }
+    }
+  }
+
   def createTableSql(namespace: String, table: String): String =
     s"""
        |CREATE TABLE IF NOT EXISTS $catalogV2.$namespace.$table

From 57aac600bf4553c85e01abd308d6b32fa38bf7cd Mon Sep 17 00:00:00 2001
From: davidyuan <yuanfuyuan@mafengwo.com>
Date: Fri, 14 Mar 2025 17:15:08 +0800
Subject: [PATCH 2/4] update

---
 ...imonCatalogRangerSparkExtensionSuite.scala | 102 +++++++++---------
 1 file changed, 52 insertions(+), 50 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
index 6bcc914219b..665396d2b0b 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
@@ -532,56 +532,58 @@ class PaimonCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
   }
 
   test("Producers") {
-    withCleanTmpResources(Seq(
-      (s"$catalogV2.$namespace1.$table1", "table"))) {
-      try {
-        doAs(admin, sql(createTableSql(namespace1, table1)))
-        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (1, 'a'), (2, 'b')"))
-
-        var currentCatalogName =
-          doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
-
-        doAs(admin, sql(s"use $catalogV2"))
-        currentCatalogName = doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
-        assert(currentCatalogName.equals(catalogV2))
-
-        // Create Tag
-        val createTagSql = s"Call sys.create_tag(table =>" +
-          s"'$catalogV2.$namespace1.$table1', tag => 'test_tag', snapshot => 1)"
-        interceptEndsWith[AccessControlException] {
-          doAs(table1OnlyUserForNs, sql(createTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        interceptEndsWith[AccessControlException] {
-          doAs(someone, sql(createTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        doAs(admin, sql(createTagSql))
-
-        // Delete Tag
-        val deleteTagSql = s"Call sys.delete_tag(table =>" +
-          s"'$catalogV2.$namespace1.$table1', tag => 'test_tag')"
-        interceptEndsWith[AccessControlException] {
-          doAs(table1OnlyUserForNs, sql(deleteTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        interceptEndsWith[AccessControlException] {
-          doAs(someone, sql(deleteTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        doAs(admin, sql(deleteTagSql))
-
-        // Rollback
-        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (3, 'a'), (4, 'b')"))
-        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (5, 'a'), (6, 'b')"))
-        val rollbackTagSql = s"Call sys.rollback(table =>" +
-          s"'$catalogV2.$namespace1.$table1', version => '2')"
-        interceptEndsWith[AccessControlException] {
-          doAs(table1OnlyUserForNs, sql(rollbackTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        interceptEndsWith[AccessControlException] {
-          doAs(someone, sql(rollbackTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        doAs(admin, sql(rollbackTagSql))
-
-      } finally {
-        doAs(admin, sql(s"use spark_catalog"))
+    if (isSparkV34OrGreater) {
+      withCleanTmpResources(Seq(
+        (s"$catalogV2.$namespace1.$table1", "table"))) {
+        try {
+          doAs(admin, sql(createTableSql(namespace1, table1)))
+          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (1, 'a'), (2, 'b')"))
+
+          var currentCatalogName =
+            doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
+
+          doAs(admin, sql(s"use $catalogV2"))
+          currentCatalogName = doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
+          assert(currentCatalogName.equals(catalogV2))
+
+          // Create Tag
+          val createTagSql = s"Call sys.create_tag(table =>" +
+            s"'$catalogV2.$namespace1.$table1', tag => 'test_tag', snapshot => 1)"
+          interceptEndsWith[AccessControlException] {
+            doAs(table1OnlyUserForNs, sql(createTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          interceptEndsWith[AccessControlException] {
+            doAs(someone, sql(createTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          doAs(admin, sql(createTagSql))
+
+          // Delete Tag
+          val deleteTagSql = s"Call sys.delete_tag(table =>" +
+            s"'$catalogV2.$namespace1.$table1', tag => 'test_tag')"
+          interceptEndsWith[AccessControlException] {
+            doAs(table1OnlyUserForNs, sql(deleteTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          interceptEndsWith[AccessControlException] {
+            doAs(someone, sql(deleteTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          doAs(admin, sql(deleteTagSql))
+
+          // Rollback
+          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (3, 'a'), (4, 'b')"))
+          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (5, 'a'), (6, 'b')"))
+          val rollbackTagSql = s"Call sys.rollback(table =>" +
+            s"'$catalogV2.$namespace1.$table1', version => '2')"
+          interceptEndsWith[AccessControlException] {
+            doAs(table1OnlyUserForNs, sql(rollbackTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          interceptEndsWith[AccessControlException] {
+            doAs(someone, sql(rollbackTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          doAs(admin, sql(rollbackTagSql))
+
+        } finally {
+          doAs(admin, sql(s"use spark_catalog"))
+        }
       }
     }
   }

From f68edef41cd5d3d8c68e07765669841c465abef0 Mon Sep 17 00:00:00 2001
From: davidyuan <yuanfuyuan@mafengwo.com>
Date: Sat, 22 Mar 2025 02:49:14 +0800
Subject: [PATCH 3/4] producers

---
 ...imonCatalogRangerSparkExtensionSuite.scala | 104 +++++++++---------
 pom.xml                                       |   2 +-
 2 files changed, 53 insertions(+), 53 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
index 665396d2b0b..04055e6892f 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
@@ -532,60 +532,60 @@ class PaimonCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
   }
 
   test("Producers") {
-    if (isSparkV34OrGreater) {
-      withCleanTmpResources(Seq(
-        (s"$catalogV2.$namespace1.$table1", "table"))) {
-        try {
-          doAs(admin, sql(createTableSql(namespace1, table1)))
-          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (1, 'a'), (2, 'b')"))
-
-          var currentCatalogName =
-            doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
-
-          doAs(admin, sql(s"use $catalogV2"))
-          currentCatalogName = doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
-          assert(currentCatalogName.equals(catalogV2))
-
-          // Create Tag
-          val createTagSql = s"Call sys.create_tag(table =>" +
-            s"'$catalogV2.$namespace1.$table1', tag => 'test_tag', snapshot => 1)"
-          interceptEndsWith[AccessControlException] {
-            doAs(table1OnlyUserForNs, sql(createTagSql))
-          }(s"does not have [alter] privilege on [$namespace1/$table1]")
-          interceptEndsWith[AccessControlException] {
-            doAs(someone, sql(createTagSql))
-          }(s"does not have [alter] privilege on [$namespace1/$table1]")
-          doAs(admin, sql(createTagSql))
-
-          // Delete Tag
-          val deleteTagSql = s"Call sys.delete_tag(table =>" +
-            s"'$catalogV2.$namespace1.$table1', tag => 'test_tag')"
-          interceptEndsWith[AccessControlException] {
-            doAs(table1OnlyUserForNs, sql(deleteTagSql))
-          }(s"does not have [alter] privilege on [$namespace1/$table1]")
-          interceptEndsWith[AccessControlException] {
-            doAs(someone, sql(deleteTagSql))
-          }(s"does not have [alter] privilege on [$namespace1/$table1]")
-          doAs(admin, sql(deleteTagSql))
-
-          // Rollback
-          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (3, 'a'), (4, 'b')"))
-          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (5, 'a'), (6, 'b')"))
-          val rollbackTagSql = s"Call sys.rollback(table =>" +
-            s"'$catalogV2.$namespace1.$table1', version => '2')"
-          interceptEndsWith[AccessControlException] {
-            doAs(table1OnlyUserForNs, sql(rollbackTagSql))
-          }(s"does not have [alter] privilege on [$namespace1/$table1]")
-          interceptEndsWith[AccessControlException] {
-            doAs(someone, sql(rollbackTagSql))
-          }(s"does not have [alter] privilege on [$namespace1/$table1]")
-          doAs(admin, sql(rollbackTagSql))
-
-        } finally {
-          doAs(admin, sql(s"use spark_catalog"))
-        }
+
+    withCleanTmpResources(Seq(
+      (s"$catalogV2.$namespace1.$table1", "table"))) {
+      try {
+        doAs(admin, sql(createTableSql(namespace1, table1)))
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (1, 'a'), (2, 'b')"))
+
+        var currentCatalogName =
+          doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
+
+        doAs(admin, sql(s"use $catalogV2"))
+        currentCatalogName = doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
+        assert(currentCatalogName.equals(catalogV2))
+
+        // Create Tag
+        val createTagSql = s"Call sys.create_tag(table =>" +
+          s"'$catalogV2.$namespace1.$table1', tag => 'test_tag', snapshot => 1)"
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(createTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(createTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(createTagSql))
+
+        // Delete Tag
+        val deleteTagSql = s"Call sys.delete_tag(table =>" +
+          s"'$catalogV2.$namespace1.$table1', tag => 'test_tag')"
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(deleteTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(deleteTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(deleteTagSql))
+
+        // Rollback
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (3, 'a'), (4, 'b')"))
+        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (5, 'a'), (6, 'b')"))
+        val rollbackTagSql = s"Call sys.rollback(table =>" +
+          s"'$catalogV2.$namespace1.$table1', version => '2')"
+        interceptEndsWith[AccessControlException] {
+          doAs(table1OnlyUserForNs, sql(rollbackTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        interceptEndsWith[AccessControlException] {
+          doAs(someone, sql(rollbackTagSql))
+        }(s"does not have [alter] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(rollbackTagSql))
+
+      } finally {
+        doAs(admin, sql(s"use spark_catalog"))
       }
     }
+
   }
 
   def createTableSql(namespace: String, table: String): String =
diff --git a/pom.xml b/pom.xml
index 7d3a9dba9e5..49d04844a51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,7 +183,7 @@
         <netty.version>4.1.108.Final</netty.version>
         <openai.java.version>0.12.0</openai.java.version>
         <retrofit.version>2.9.0</retrofit.version>
-        <paimon.version>0.8.2</paimon.version>
+        <paimon.version>0.9.0</paimon.version>
         <paimon.artifact>paimon-spark-${spark.binary.version}</paimon.artifact>
         <phoenix.version>6.0.0</phoenix.version>
         <postgresql.version>42.7.2</postgresql.version>

From 90f367c6a43fc8f6e14c6d435d044182d687d901 Mon Sep 17 00:00:00 2001
From: davidyuan <yuanfuyuan@mafengwo.com>
Date: Wed, 26 Mar 2025 10:45:12 +0800
Subject: [PATCH 4/4] update

---
 ...imonCatalogRangerSparkExtensionSuite.scala | 104 +++++++++---------
 pom.xml                                       |   2 +-
 2 files changed, 53 insertions(+), 53 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
index b35b1b379db..efaf28df8e0 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/PaimonCatalogRangerSparkExtensionSuite.scala
@@ -569,60 +569,60 @@ class PaimonCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
   }
 
   test("Producers") {
-
-    withCleanTmpResources(Seq(
-      (s"$catalogV2.$namespace1.$table1", "table"))) {
-      try {
-        doAs(admin, sql(createTableSql(namespace1, table1)))
-        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (1, 'a'), (2, 'b')"))
-
-        var currentCatalogName =
-          doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
-
-        doAs(admin, sql(s"use $catalogV2"))
-        currentCatalogName = doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
-        assert(currentCatalogName.equals(catalogV2))
-
-        // Create Tag
-        val createTagSql = s"Call sys.create_tag(table =>" +
-          s"'$catalogV2.$namespace1.$table1', tag => 'test_tag', snapshot => 1)"
-        interceptEndsWith[AccessControlException] {
-          doAs(table1OnlyUserForNs, sql(createTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        interceptEndsWith[AccessControlException] {
-          doAs(someone, sql(createTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        doAs(admin, sql(createTagSql))
-
-        // Delete Tag
-        val deleteTagSql = s"Call sys.delete_tag(table =>" +
-          s"'$catalogV2.$namespace1.$table1', tag => 'test_tag')"
-        interceptEndsWith[AccessControlException] {
-          doAs(table1OnlyUserForNs, sql(deleteTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        interceptEndsWith[AccessControlException] {
-          doAs(someone, sql(deleteTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        doAs(admin, sql(deleteTagSql))
-
-        // Rollback
-        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (3, 'a'), (4, 'b')"))
-        doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (5, 'a'), (6, 'b')"))
-        val rollbackTagSql = s"Call sys.rollback(table =>" +
-          s"'$catalogV2.$namespace1.$table1', version => '2')"
-        interceptEndsWith[AccessControlException] {
-          doAs(table1OnlyUserForNs, sql(rollbackTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        interceptEndsWith[AccessControlException] {
-          doAs(someone, sql(rollbackTagSql))
-        }(s"does not have [alter] privilege on [$namespace1/$table1]")
-        doAs(admin, sql(rollbackTagSql))
-
-      } finally {
-        doAs(admin, sql(s"use spark_catalog"))
+    if (isSparkV34OrGreater) {
+      withCleanTmpResources(Seq(
+        (s"$catalogV2.$namespace1.$table1", "table"))) {
+        try {
+          doAs(admin, sql(createTableSql(namespace1, table1)))
+          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (1, 'a'), (2, 'b')"))
+
+          var currentCatalogName =
+            doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
+
+          doAs(admin, sql(s"use $catalogV2"))
+          currentCatalogName = doAs(admin, spark.sessionState.catalogManager.currentCatalog.name())
+          assert(currentCatalogName.equals(catalogV2))
+
+          // Create Tag
+          val createTagSql = s"Call sys.create_tag(table =>" +
+            s"'$catalogV2.$namespace1.$table1', tag => 'test_tag', snapshot => 1)"
+          interceptEndsWith[AccessControlException] {
+            doAs(table1OnlyUserForNs, sql(createTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          interceptEndsWith[AccessControlException] {
+            doAs(someone, sql(createTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          doAs(admin, sql(createTagSql))
+
+          // Delete Tag
+          val deleteTagSql = s"Call sys.delete_tag(table =>" +
+            s"'$catalogV2.$namespace1.$table1', tag => 'test_tag')"
+          interceptEndsWith[AccessControlException] {
+            doAs(table1OnlyUserForNs, sql(deleteTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          interceptEndsWith[AccessControlException] {
+            doAs(someone, sql(deleteTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          doAs(admin, sql(deleteTagSql))
+
+          // Rollback
+          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (3, 'a'), (4, 'b')"))
+          doAs(admin, sql(s"INSERT INTO $catalogV2.$namespace1.$table1 VALUES (5, 'a'), (6, 'b')"))
+          val rollbackTagSql = s"Call sys.rollback(table =>" +
+            s"'$catalogV2.$namespace1.$table1', version => '2')"
+          interceptEndsWith[AccessControlException] {
+            doAs(table1OnlyUserForNs, sql(rollbackTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          interceptEndsWith[AccessControlException] {
+            doAs(someone, sql(rollbackTagSql))
+          }(s"does not have [alter] privilege on [$namespace1/$table1]")
+          doAs(admin, sql(rollbackTagSql))
+
+        } finally {
+          doAs(admin, sql(s"use spark_catalog"))
+        }
       }
     }
-
   }
 
   def createTableSql(namespace: String, table: String): String =
diff --git a/pom.xml b/pom.xml
index 49d04844a51..7d3a9dba9e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,7 +183,7 @@
         <netty.version>4.1.108.Final</netty.version>
         <openai.java.version>0.12.0</openai.java.version>
         <retrofit.version>2.9.0</retrofit.version>
-        <paimon.version>0.9.0</paimon.version>
+        <paimon.version>0.8.2</paimon.version>
         <paimon.artifact>paimon-spark-${spark.binary.version}</paimon.artifact>
         <phoenix.version>6.0.0</phoenix.version>
         <postgresql.version>42.7.2</postgresql.version>