Skip to content

Commit

Permalink
Update loadIndexFiles in the HudiQbeastSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed Jan 13, 2025
1 parent 57155a9 commit 246ad03
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions hudi/src/main/scala/io/qbeast/spark/hudi/HudiQbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.exception.HoodieIOException
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.AvroConversionUtils
Expand Down Expand Up @@ -114,6 +115,7 @@ case class HudiQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with Sta
override def loadIndexFiles(revisionID: RevisionID): Dataset[IndexFile] = {
val dimensionCount = loadRevision(revisionID).transformations.size
val indexFilesBuffer = ListBuffer[IndexFile]()
val processedCommitTimes = scala.collection.mutable.Set[String]()

val inputFiles = loadFileIndex().inputFiles

Expand All @@ -132,14 +134,26 @@ case class HudiQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with Sta
timeline.filterCompletedInstants.getInstants.asScala
.filter(instant => commitTimes.contains(instant.requestedTime()))
.foreach { instant =>
val commitMetadataBytes = timeline.getInstantDetails(instant).get()
val metadata = TimelineMetadataUtils.deserializeCommitMetadata(commitMetadataBytes)
val commitMetadata =
HoodieCommitMetadata.fromJsonString(metadata.toString, classOf[HoodieCommitMetadata])
val indexFiles = HudiQbeastFileUtils
.fromCommitFile(dimensionCount)(commitMetadata)
.filter(file => file.revisionId == revisionID && fileNames.contains(file.path))
indexFilesBuffer ++= indexFiles
try {
val commitMetadataBytes = timeline.getInstantDetails(instant).get()
val metadata = TimelineMetadataUtils.deserializeCommitMetadata(commitMetadataBytes)
val commitMetadata =
HoodieCommitMetadata.fromJsonString(
metadata.toString,
classOf[HoodieCommitMetadata])
val indexFiles = HudiQbeastFileUtils
.fromCommitFile(dimensionCount)(commitMetadata)
.filter(file => file.revisionId == revisionID && fileNames.contains(file.path))
indexFilesBuffer ++= indexFiles
processedCommitTimes += instant.requestedTime()
} catch {
// This may occur if a commit obtained in the active pipeline is moved to
// the archive pipeline during the processing of this function. In such cases,
// we can safely ignore it, as the commit will be handled properly by the coming
// processTimeline(archivedTimeline) call below.
case _: HoodieIOException => // Do nothing
}

}
}

Expand All @@ -149,6 +163,17 @@ case class HudiQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with Sta
val archivedTimeline = metaClient.getArchivedTimeline(commitTimes.min)
processTimeline(archivedTimeline)

// At this point, we verify that all Qbeast metadata is correctly loaded
// from all the commit files associated with the Parquet files in this snapshot.
// If there are unprocessed commit times, it indicates an inconsistency,
// and an exception is thrown to highlight the issue.
val unprocessedCommitTimes = commitTimes.diff(processedCommitTimes.toSeq)
if (unprocessedCommitTimes.nonEmpty) {
val errorMessage =
s"Some commit times were not processed: ${unprocessedCommitTimes.mkString(", ")}"
throw new IllegalStateException(errorMessage)
}

spark.createDataset(indexFilesBuffer.toList)
}

Expand Down

0 comments on commit 246ad03

Please sign in to comment.