Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Hudi Quickstart on EMR 6.15 with Hudi 1.0.1 not working #12963

Open
alberttwong opened this issue Mar 12, 2025 · 8 comments
Open

[SUPPORT] Hudi Quickstart on EMR 6.15 with Hudi 1.0.1 not working #12963

alberttwong opened this issue Mar 12, 2025 · 8 comments

Comments

@alberttwong
Copy link
Contributor

Describe the problem you faced

Following the quickstart at https://hudi.apache.org/docs/quick-start-guide/ on EMR 6.15.

To Reproduce

Steps to reproduce the behavior:

[hadoop@ip-10-0-125-28 ~]$ export SPARK_VERSION=3.4
[hadoop@ip-10-0-125-28 ~]$ rm hudi-spark3.4-bundle_2.12-1.0.1.jar
[hadoop@ip-10-0-125-28 ~]$ ls
[hadoop@ip-10-0-125-28 ~]$ export PYSPARK_PYTHON=$(which python3)
[hadoop@ip-10-0-125-28 ~]$ pyspark --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:1.0.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
Python 3.7.16 (default, Feb  8 2025, 00:19:05)
[GCC 7.3.1 20180712 (Red Hat 7.3.1-17)] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-daca4bcd-853b-4f9b-8233-340110eac111;1.0
	confs: [default]
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.1 in central
	found org.apache.hive#hive-storage-api;2.8.1 in central
	found org.slf4j#slf4j-api;1.7.36 in central
downloading https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/1.0.1/hudi-spark3.4-bundle_2.12-1.0.1.jar ...
	[SUCCESSFUL ] org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.1!hudi-spark3.4-bundle_2.12.jar (1294ms)
downloading https://repo1.maven.org/maven2/org/apache/hive/hive-storage-api/2.8.1/hive-storage-api-2.8.1.jar ...
	[SUCCESSFUL ] org.apache.hive#hive-storage-api;2.8.1!hive-storage-api.jar (29ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar ...
	[SUCCESSFUL ] org.slf4j#slf4j-api;1.7.36!slf4j-api.jar (21ms)
:: resolution report :: resolve 1080ms :: artifacts dl 1347ms
	:: modules in use:
	org.apache.hive#hive-storage-api;2.8.1 from central in [default]
	org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.1 from central in [default]
	org.slf4j#slf4j-api;1.7.36 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   3   |   3   |   0   ||   3   |   3   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-daca4bcd-853b-4f9b-8233-340110eac111
	confs: [default]
	3 artifacts copied, 0 already retrieved (108331kB/65ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/12 18:34:17 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
25/03/12 18:34:19 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/03/12 18:34:26 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-1.0.1.jar added multiple times to distributed cache.
25/03/12 18:34:26 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.apache.hive_hive-storage-api-2.8.1.jar added multiple times to distributed cache.
25/03/12 18:34:26 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.slf4j_slf4j-api-1.7.36.jar added multiple times to distributed cache.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.1-amzn-2
      /_/

Using Python version 3.7.16 (default, Feb  8 2025 00:19:05)
Spark context Web UI available at http://ip-10-0-125-28.us-west-2.compute.internal:4040
Spark context available as 'sc' (master = yarn, app id = application_1741804316774_0001).
SparkSession available as 'spark'.
>>> from pyspark.sql.functions import lit, col
>>>
>>> tableName = "trips_table"
>>> basePath = "file:///tmp/trips_table"
>>> columns = ["ts","uuid","rider","driver","fare","city"]
>>> data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
...        (1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
...        (1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
...        (1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
...        (1695115999911,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai")]
>>> inserts = spark.createDataFrame(data).toDF(*columns)
>>>
>>> hudi_options = {
...     'hoodie.table.name': tableName,
...     'hoodie.datasource.write.partitionpath.field': 'city'
... }
>>>
>>> inserts.write.format("hudi"). \
...     options(**hudi_options). \
...     mode("overwrite"). \
...     save(basePath)
25/03/12 18:35:00 WARN HoodieSparkSqlWriterInternal: Choosing BULK_INSERT as the operation type since auto record key generation is applicable
25/03/12 18:35:00 INFO HiveConf: Found configuration file file:/etc/spark/conf.dist/hive-site.xml
25/03/12 18:35:00 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
25/03/12 18:35:00 INFO metastore: Trying to connect to metastore with URI thrift://ip-10-0-125-28.us-west-2.compute.internal:9083
25/03/12 18:35:00 INFO metastore: Opened a connection to metastore, current connections: 1
25/03/12 18:35:00 INFO metastore: Connected to metastore.
25/03/12 18:35:04 WARN HoodieTableFileSystemView: Partition: files is not available in store
25/03/12 18:35:04 WARN HoodieTableFileSystemView: Partition: files is not available in store
Traceback (most recent call last):
  File "<stdin>", line 4, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 1398, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1323, in __call__
  File "/usr/lib/spark/python/pyspark/errors/exceptions/captured.py", line 169, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o124.save.
: org.apache.hudi.exception.HoodieException: Failed to instantiate Metadata table
	at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:309)
	at org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:271)
	at org.apache.hudi.client.BaseHoodieWriteClient.lambda$doInitTable$7(BaseHoodieWriteClient.java:1305)
	at org.apache.hudi.client.BaseHoodieWriteClient.executeUsingTxnManager(BaseHoodieWriteClient.java:1312)
	at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1302)
	at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1352)
	at org.apache.hudi.commit.BaseDatasetBulkInsertCommitActionExecutor.execute(BaseDatasetBulkInsertCommitActionExecutor.java:100)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow(HoodieSparkSqlWriter.scala:832)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:494)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:192)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
	at org.apache.spark.sql.adapter.BaseSpark3Adapter.sqlExecutionWithNewExecutionId(BaseSpark3Adapter.scala:105)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:214)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:129)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:170)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: FileGroup count for MDT partition files should be > 0
	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.prepRecords(HoodieBackedTableMetadataWriter.java:1442)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1349)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.bulkCommit(SparkHoodieBackedTableMetadataWriter.java:149)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:489)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:280)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:189)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:114)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:91)
	at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:303)
	... 71 more

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version : 1.0.1

  • Spark version : 3.4

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) :

  • Running on Docker? (yes/no) : no

@yihua
Copy link
Contributor

yihua commented Mar 12, 2025

@alberttwong Have you checked if /tmp/trips_table does not exist before running the script?

@yihua
Copy link
Contributor

yihua commented Mar 12, 2025

I verified that with OSS Spark 3.4.1, the script works to create a fresh new Hudi table successfully.

@alberttwong
Copy link
Contributor Author

[ec2-user@ip-10-0-10-186 ~]$ ssh [email protected]
The authenticity of host 'ip-10-0-111-168.us-west-2.compute.internal (10.0.111.168)' can't be established.
ED25519 key fingerprint is SHA256:KSKj/vzbIeI5Je8YdxSLKfdSWnuxRuSvXfyh0G/Q6hc.
This key is not known by any other names
Are you sure you want to continue connecting (yes/no/[fingerprint])? yes
Warning: Permanently added 'ip-10-0-111-168.us-west-2.compute.internal' (ED25519) to the list of known hosts.
   ,     #_
   ~\_  ####_        Amazon Linux 2
  ~~  \_#####\
  ~~     \###|       AL2 End of Life is 2026-06-30.
  ~~       \#/ ___
   ~~       V~' '->
    ~~~         /    A newer version of Amazon Linux is available!
      ~~._.   _/
         _/ _/       Amazon Linux 2023, GA and supported until 2028-03-15.
       _/m/'           https://aws.amazon.com/linux/amazon-linux-2023/

14 package(s) needed for security, out of 15 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

[hadoop@ip-10-0-111-168 ~]$ ls /tmp
d33409ee-a9f6-455a-9d8c-ce9190ccab8c_resources
hadoop14465626265819506123.tmp
hadoop2990841558949191269.tmp
hadoop-hdfs-namenode.pid
hadoop-unjar416709301599708946
hadoop-unjar775367731795166908
hadoop-yarn-yarn
hive
hsperfdata_hadoop
hsperfdata_hdfs
hsperfdata_hive
hsperfdata_kms
hsperfdata_livy
hsperfdata_mapred
hsperfdata_root
hsperfdata_spark
hsperfdata_tomcat
hsperfdata_trino
hsperfdata_yarn
hsperfdata_zookeeper
jetty-0_0_0_0-10002-hive-service-3_1_3-amzn-8_jar-_-any-4478184200613831529
jetty-10_0_111_168-8088-hadoop-yarn-common-3_3_6-amzn-1_jar-_-any-9187720009424936355
jetty-ip-10-0-111-168_us-west-2_compute_internal-19888-hadoop-yarn-common-3_3_6-amzn-1_jar-_-any-4187551277844480937
jetty-ip-10-0-111-168_us-west-2_compute_internal-20888-hadoop-yarn-common-3_3_6-amzn-1_jar-_-any-6642537296133348853
jetty-ip-10-0-111-168_us-west-2_compute_internal-8188-hadoop-yarn-common-3_3_6-amzn-1_jar-_-any-542379550446565138
jetty-ip-10-0-111-168_us-west-2_compute_internal-9870-hdfs-_-any-3613376253096943183
libleveldbjni-64-1-4334357513568310058.8
motd.FDkzr
motd.partpBPTq
puppet_bigtop_file_merge
puppet_inter_thread_comm6040914042171461435
systemd-private-f33283165519454fb0af11097df7cb3b-chronyd.service-VnbyxS
systemd-private-f33283165519454fb0af11097df7cb3b-httpd.service-N2bZxI
systemd-private-f33283165519454fb0af11097df7cb3b-mariadb.service-7bYeRo
systemd-private-f33283165519454fb0af11097df7cb3b-nginx.service-5pcRUa
zstd5629203585542328589.tmp
zstd8507920086534566345.tmp
[hadoop@ip-10-0-111-168 ~]$ export PYSPARK_PYTHON=$(which python3)
[hadoop@ip-10-0-111-168 ~]$ export SPARK_VERSION=3.4
[hadoop@ip-10-0-111-168 ~]$ pyspark --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:1.0.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
Python 3.7.16 (default, Feb  8 2025, 00:19:05)
[GCC 7.3.1 20180712 (Red Hat 7.3.1-17)] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
org.apache.hudi#hudi-spark3.4-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-49fa9c8e-fa5b-42d6-a6ce-158854b6158f;1.0
	confs: [default]
	found org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.1 in central
	found org.apache.hive#hive-storage-api;2.8.1 in central
	found org.slf4j#slf4j-api;1.7.36 in central
downloading https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.4-bundle_2.12/1.0.1/hudi-spark3.4-bundle_2.12-1.0.1.jar ...
	[SUCCESSFUL ] org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.1!hudi-spark3.4-bundle_2.12.jar (1106ms)
downloading https://repo1.maven.org/maven2/org/apache/hive/hive-storage-api/2.8.1/hive-storage-api-2.8.1.jar ...
	[SUCCESSFUL ] org.apache.hive#hive-storage-api;2.8.1!hive-storage-api.jar (25ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar ...
	[SUCCESSFUL ] org.slf4j#slf4j-api;1.7.36!slf4j-api.jar (31ms)
:: resolution report :: resolve 1145ms :: artifacts dl 1167ms
	:: modules in use:
	org.apache.hive#hive-storage-api;2.8.1 from central in [default]
	org.apache.hudi#hudi-spark3.4-bundle_2.12;1.0.1 from central in [default]
	org.slf4j#slf4j-api;1.7.36 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   3   |   3   |   0   ||   3   |   3   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-49fa9c8e-fa5b-42d6-a6ce-158854b6158f
	confs: [default]
	3 artifacts copied, 0 already retrieved (108331kB/66ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/12 21:31:31 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
25/03/12 21:31:33 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/03/12 21:31:39 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.apache.hudi_hudi-spark3.4-bundle_2.12-1.0.1.jar added multiple times to distributed cache.
25/03/12 21:31:39 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.apache.hive_hive-storage-api-2.8.1.jar added multiple times to distributed cache.
25/03/12 21:31:39 WARN Client: Same path resource file:///home/hadoop/.ivy2/jars/org.slf4j_slf4j-api-1.7.36.jar added multiple times to distributed cache.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.1-amzn-2
      /_/

Using Python version 3.7.16 (default, Feb  8 2025 00:19:05)
Spark context Web UI available at http://ip-10-0-111-168.us-west-2.compute.internal:4040
Spark context available as 'sc' (master = yarn, app id = application_1741814916834_0001).
SparkSession available as 'spark'.
>>> from pyspark.sql.functions import lit, col
>>>
>>> tableName = "trips_table"
>>> basePath = "file:///tmp/trips_table"
>>> columns = ["ts","uuid","rider","driver","fare","city"]
>>> data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
...        (1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
...        (1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
...        (1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
...        (1695115999911,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai")]
>>> inserts = spark.createDataFrame(data).toDF(*columns)

>>>
>>> hudi_options = {
...     'hoodie.table.name': tableName,
...     'hoodie.datasource.write.partitionpath.field': 'city'
... }
>>>
>>> inserts.write.format("hudi"). \
...     options(**hudi_options). \
...     mode("overwrite"). \
...     save(basePath)
25/03/12 21:32:09 WARN HoodieSparkSqlWriterInternal: Choosing BULK_INSERT as the operation type since auto record key generation is applicable
25/03/12 21:32:09 INFO HiveConf: Found configuration file file:/etc/spark/conf.dist/hive-site.xml
25/03/12 21:32:09 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
25/03/12 21:32:09 INFO metastore: Trying to connect to metastore with URI thrift://ip-10-0-111-168.us-west-2.compute.internal:9083
25/03/12 21:32:09 INFO metastore: Opened a connection to metastore, current connections: 1
25/03/12 21:32:09 INFO metastore: Connected to metastore.
25/03/12 21:32:13 WARN HoodieTableFileSystemView: Partition: files is not available in store
25/03/12 21:32:13 WARN HoodieTableFileSystemView: Partition: files is not available in store
Traceback (most recent call last):
  File "<stdin>", line 4, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 1398, in save
    self._jwrite.save(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1323, in __call__
  File "/usr/lib/spark/python/pyspark/errors/exceptions/captured.py", line 169, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o124.save.
: org.apache.hudi.exception.HoodieException: Failed to instantiate Metadata table
	at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:309)
	at org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:271)
	at org.apache.hudi.client.BaseHoodieWriteClient.lambda$doInitTable$7(BaseHoodieWriteClient.java:1305)
	at org.apache.hudi.client.BaseHoodieWriteClient.executeUsingTxnManager(BaseHoodieWriteClient.java:1312)
	at org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1302)
	at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1352)
	at org.apache.hudi.commit.BaseDatasetBulkInsertCommitActionExecutor.execute(BaseDatasetBulkInsertCommitActionExecutor.java:100)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow(HoodieSparkSqlWriter.scala:832)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:494)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:192)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
	at org.apache.spark.sql.adapter.BaseSpark3Adapter.sqlExecutionWithNewExecutionId(BaseSpark3Adapter.scala:105)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:214)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:129)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:170)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: FileGroup count for MDT partition files should be > 0
	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.prepRecords(HoodieBackedTableMetadataWriter.java:1442)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1349)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.bulkCommit(SparkHoodieBackedTableMetadataWriter.java:149)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:489)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:280)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:189)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:114)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:91)
	at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:303)
	... 71 more

@yihua
Copy link
Contributor

yihua commented Mar 13, 2025

Likely, this is related to the EMR environment. Will check again.

@yihua
Copy link
Contributor

yihua commented Mar 13, 2025

cc @CTTY

@alberttwong
Copy link
Contributor Author

happens also with EMR 7.6. #12974

@rangareddy
Copy link
Contributor

Hi @alberttwong

When you run in EMR, by default spark application will run in YARN mode. When operating in YARN mode, you need to specify the distributed file path, such as an HDFS or S3 path.

To resolve this issue, you can specify the base path as an S3 or HDFS path, or you can launch the Spark shell in local mode by specifying --master "local[*]"

@alberttwong
Copy link
Contributor Author

I can try it but if we need update the quickstart.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants