Skip to content

Commit

Permalink
Merge pull request #208 from osopardo1/v0.4.x
Browse files Browse the repository at this point in the history
Revert #200 and new version
  • Loading branch information
osopardo1 authored Aug 3, 2023
2 parents c1a5d65 + 96da313 commit 97c6ab7
Show file tree
Hide file tree
Showing 25 changed files with 353 additions and 127 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Dependencies._
import xerial.sbt.Sonatype._

val mainVersion = "0.3.3"
val mainVersion = "0.4.0"

lazy val qbeastCore = (project in file("core"))
.settings(
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/MetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ trait MetadataManager[DataSchema, FileDescriptor] {
*/
def updateTable(tableID: QTableID, tableChanges: TableChanges): Unit

/**
* This function checks if there's a conflict. A conflict happens if there
* are new cubes that have been optimized but they were not announced.
*
* @param tableID the table ID
* @param revisionID the revision ID
* @param knownAnnounced the cubes we know they were announced when the write operation started.
* @param oldReplicatedSet the old replicated set
* @return true if there is a conflict, false otherwise
*/
def hasConflicts(
tableID: QTableID,
revisionID: RevisionID,
knownAnnounced: Set[CubeId],
oldReplicatedSet: ReplicatedSet): Boolean

/**
* Checks if there's an existing log directory for the table
* @param tableID the table ID
Expand Down
26 changes: 13 additions & 13 deletions core/src/main/scala/io/qbeast/core/model/QbeastBlock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ package io.qbeast.core.model

/**
* Container class for Qbeast file's metadata
*
* @param path the file path
* @param cube the cube identifier
* @param revision the revision identifier
* @param minWeight the minimum weight of element
* @param maxWeight the maximum weight of element
* @param replicated the file is replicated
* @param elementCount the number of elements
* @param size the size in bytes
* @param modificationTime the modification timestamp
* @param path
* @param cube
* @param revision
* @param minWeight
* @param maxWeight
* @param state
* @param elementCount
* @param size
* @param modificationTime
*/

case class QbeastBlock(
path: String,
cube: String,
revision: Long,
minWeight: Weight,
maxWeight: Weight,
replicated: Boolean,
state: String,
elementCount: Long,
size: Long,
modificationTime: Long)
Expand All @@ -30,7 +30,7 @@ case class QbeastBlock(
object QbeastBlock {

private val metadataKeys =
Set("minWeight", "maxWeight", "replicated", "revision", "elementCount", "cube")
Set("minWeight", "maxWeight", "state", "revision", "elementCount", "cube")

private def checkBlockMetadata(blockMetadata: Map[String, String]): Unit = {
metadataKeys.foreach(key =>
Expand Down Expand Up @@ -60,7 +60,7 @@ object QbeastBlock {
blockMetadata("revision").toLong,
Weight(blockMetadata("minWeight").toInt),
Weight(blockMetadata("maxWeight").toInt),
blockMetadata("replicated").toBoolean,
blockMetadata("state"),
blockMetadata("elementCount").toLong,
size,
modificationTime)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package io.qbeast.core.model

import io.qbeast.core.transform.{
HashTransformation,
LinearTransformation,
Transformation,
Transformer
}
import io.qbeast.core.transform.{HashTransformation, LinearTransformation, Transformation, Transformer}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ class QbeastBlockTest extends AnyFlatSpec with Matchers {
"minWeight" -> "19217",
"cube" -> "",
"maxWeight" -> "11111111",
"replicated" -> "true",
"state" -> "FlOODED",
"revision" -> "1",
"elementCount" -> "777")

val qbeastBlock = QbeastBlock("path", blockMetadata, 0L, 0L)
qbeastBlock.cube shouldBe ""
qbeastBlock.minWeight shouldBe Weight(19217)
qbeastBlock.maxWeight shouldBe Weight(11111111)
qbeastBlock.replicated shouldBe true
qbeastBlock.state shouldBe "FlOODED"
qbeastBlock.revision shouldBe 1
qbeastBlock.elementCount shouldBe 777
}
Expand All @@ -32,7 +32,7 @@ class QbeastBlockTest extends AnyFlatSpec with Matchers {
"minWeight" -> "19217",
"cube" -> "",
"maxWeight" -> "11111111",
"replicated" -> "false",
"state" -> "FlOODED",
"revision" -> "bad_type",
"elementCount" -> "777")

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/CubeDataLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.qbeast.spark.delta

import io.qbeast.core.model.{CubeId, QTableID, Revision}
import io.qbeast.spark.utils.TagColumns
import io.qbeast.spark.utils.{State, TagColumns}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down Expand Up @@ -64,7 +64,7 @@ case class CubeDataLoader(tableID: QTableID) {
.where(
TagColumns.revision === lit(revision.revisionID.toString) &&
TagColumns.cube === lit(cube.string) &&
TagColumns.replicated === lit(false.toString()))
TagColumns.state != lit(State.ANNOUNCED))
.collect()

val fileNames = cubeBlocks.map(f => new Path(tableID.id, f.path).toString)
Expand Down
31 changes: 30 additions & 1 deletion src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
}
}

/**
* Constructs replicated set for each revision
*
* @return a map of revision identifier and replicated set
*/
private val replicatedSetsMap: Map[RevisionID, ReplicatedSet] = {
val listReplicatedSets = metadataMap.filterKeys(_.startsWith(MetadataConfig.replicatedSet))

listReplicatedSets.map { case (key: String, json: String) =>
val revisionID = key.split('.').last.toLong
val revision = getRevision(revisionID)
val replicatedSet = mapper
.readValue[Set[String]](json, classOf[Set[String]])
.map(revision.createCubeId)
(revisionID, replicatedSet)
}
}

/**
* Returns last available revision identifier
*
Expand All @@ -65,6 +83,16 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
throw AnalysisExceptionFactory.create(s"No space revision available with $revisionID"))
}

/**
* Returns the replicated set for a revision identifier if exists
* @param revisionID the revision identifier
* @return the replicated set
*/
private def getReplicatedSet(revisionID: RevisionID): ReplicatedSet = {
replicatedSetsMap
.getOrElse(revisionID, Set.empty)
}

/**
* Returns true if a revision with a specific revision identifier exists
*
Expand Down Expand Up @@ -92,7 +120,8 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
*/
override def loadIndexStatus(revisionID: RevisionID): IndexStatus = {
val revision = getRevision(revisionID)
new IndexStatusBuilder(this, revision).build()
val replicatedSet = getReplicatedSet(revisionID)
new IndexStatusBuilder(this, revision, replicatedSet).build()
}

/**
Expand Down
51 changes: 13 additions & 38 deletions src/main/scala/io/qbeast/spark/delta/IndexStatusBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@ package io.qbeast.spark.delta

import io.qbeast.core.model._
import io.qbeast.spark.delta.QbeastMetadataSQL._
import io.qbeast.spark.utils.State.FLOODED
import io.qbeast.spark.utils.TagColumns
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.min
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.{col, collect_list, lit, min, sum}
import org.apache.spark.sql.{Dataset, SparkSession}

import scala.collection.immutable.SortedMap

Expand All @@ -24,7 +20,11 @@ import scala.collection.immutable.SortedMap
* @param announcedSet the announced set available for the revision
* @param replicatedSet the replicated set available for the revision
*/
private[delta] class IndexStatusBuilder(qbeastSnapshot: DeltaQbeastSnapshot, revision: Revision)
private[delta] class IndexStatusBuilder(
qbeastSnapshot: DeltaQbeastSnapshot,
revision: Revision,
replicatedSet: ReplicatedSet,
announcedSet: Set[CubeId] = Set.empty)
extends Serializable
with StagingUtils {

Expand All @@ -37,19 +37,15 @@ private[delta] class IndexStatusBuilder(qbeastSnapshot: DeltaQbeastSnapshot, rev
qbeastSnapshot.loadRevisionBlocks(revision.revisionID)

def build(): IndexStatus = {
val cubeStatuses =
val cubeStatus =
if (isStaging(revision)) stagingCubeStatuses
else buildCubesStatuses

val (replicatedSet, announcedSet): (Set[CubeId], Set[CubeId]) =
if (isStaging(revision)) (Set.empty, Set.empty)
else buildReplicatedAndAnnouncedSets(cubeStatuses)

IndexStatus(
revision = revision,
replicatedSet = replicatedSet,
announcedSet = announcedSet,
cubesStatuses = cubeStatuses)
cubesStatuses = cubeStatus)
}

def stagingCubeStatuses: SortedMap[CubeId, CubeStatus] = {
Expand All @@ -64,7 +60,7 @@ private[delta] class IndexStatusBuilder(qbeastSnapshot: DeltaQbeastSnapshot, rev
revision.revisionID,
Weight.MinValue,
maxWeight,
false,
FLOODED,
0,
addFile.size,
addFile.modificationTime))
Expand Down Expand Up @@ -95,34 +91,13 @@ private[delta] class IndexStatusBuilder(qbeastSnapshot: DeltaQbeastSnapshot, rev
.select(
createCube(col("cube"), lit(ndims)).as("cubeId"),
col("maxWeight"),
normalizeWeight(col("maxWeight"), col("elementCount"), lit(rev.desiredCubeSize))
.as("normalizedWeight"),
normalizeWeight(col("maxWeight"), col("elementCount"), lit(rev.desiredCubeSize)).as(
"normalizedWeight"),
col("files"))
.as[CubeStatus]
.collect()
.foreach(row => builder += row.cubeId -> row)
builder.result()
}

def buildReplicatedAndAnnouncedSets(
cubeStatuses: Map[CubeId, CubeStatus]): (Set[CubeId], Set[CubeId]) = {
val replicatedSet = Set.newBuilder[CubeId]
val announcedSet = Set.newBuilder[CubeId]
cubeStatuses.foreach { case (id, status) =>
var hasReplicated = false
var hasUnreplicated = false
status.files.foreach(file =>
if (file.replicated) hasReplicated = true
else hasUnreplicated = true)
if (hasReplicated) {
if (hasUnreplicated) {
announcedSet += id
} else {
replicatedSet += id
}
}
}
(replicatedSet.result(), announcedSet.result())
}

}
37 changes: 35 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.delta

import io.qbeast.core.model.{Revision, StagingUtils, TableChanges, mapper}
import io.qbeast.core.model.{ReplicatedSet, Revision, StagingUtils, TableChanges, mapper}
import io.qbeast.spark.utils.MetadataConfig
import io.qbeast.spark.utils.MetadataConfig.{lastRevisionID, revision}
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -36,10 +36,38 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation w
}
}

/**
* Updates Delta Metadata Configuration with new replicated set
* for given revision
* @param baseConfiguration Delta Metadata Configuration
* @param revision the new revision to persist
* @param deltaReplicatedSet the new set of replicated cubes
*/

private def updateQbeastReplicatedSet(
baseConfiguration: Configuration,
revision: Revision,
deltaReplicatedSet: ReplicatedSet): Configuration = {

val revisionID = revision.revisionID
assert(baseConfiguration.contains(s"${MetadataConfig.revision}.$revisionID"))

val newReplicatedSet = deltaReplicatedSet.map(_.string)
// Save the replicated set of cube id's as String representation

baseConfiguration.updated(
s"${MetadataConfig.replicatedSet}.$revisionID",
mapper.writeValueAsString(newReplicatedSet))

}

private def overwriteQbeastConfiguration(baseConfiguration: Configuration): Configuration = {
val revisionKeys = baseConfiguration.keys.filter(_.startsWith(MetadataConfig.revision))
val replicatedSetKeys = {
baseConfiguration.keys.filter(_.startsWith(MetadataConfig.replicatedSet))
}
val other = baseConfiguration.keys.filter(_ == MetadataConfig.lastRevisionID)
val qbeastKeys = revisionKeys ++ other
val qbeastKeys = revisionKeys ++ replicatedSetKeys ++ other
baseConfiguration -- qbeastKeys
}

Expand Down Expand Up @@ -122,6 +150,11 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation w
val configuration =
if (isNewRevision || isOverwriteMode) {
updateQbeastRevision(baseConfiguration, latestRevision)
} else if (isOptimizeOperation) {
updateQbeastReplicatedSet(
baseConfiguration,
latestRevision,
tableChanges.announcedOrReplicatedSet)
} else baseConfiguration

if (txn.readVersion == -1) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/QbeastMetadataSQL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ object QbeastMetadataSQL {
col("size"),
col("modificationTime"),
weight(TagColumns.minWeight).as("minWeight"),
weight(TagColumns.maxWeight).as("maxWeight"),
TagColumns.replicated.cast("boolean").as("replicated"),
weight(TagColumns.maxWeight)
.as("maxWeight"),
TagColumns.state,
TagColumns.revision.cast("bigint").as("revision"),
TagColumns.elementCount.cast("bigint").as("elementCount"))

Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/io/qbeast/spark/delta/ReplicatedFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.delta

import io.qbeast.spark.utils.TagUtils
import io.qbeast.spark.utils.{State, TagUtils}
import org.apache.spark.sql.delta.actions.AddFile

/**
Expand All @@ -12,8 +12,7 @@ import org.apache.spark.sql.delta.actions.AddFile
object ReplicatedFile {

def apply(addFile: AddFile): AddFile = {
val newTags = addFile.tags
.updated(TagUtils.replicated, true.toString())
val newTags = addFile.tags.updated(TagUtils.state, State.REPLICATED)
addFile.copy(tags = newTags, modificationTime = System.currentTimeMillis())
}

Expand Down
Loading

0 comments on commit 97c6ab7

Please sign in to comment.