Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Highest Write Wins flag, register, map and set #40

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/main/scala/akka/contrib/datareplication/HWWFlag.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package akka.contrib.datareplication

object HWWFlag {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the end user semantics.
In the case of concurrent updates during a partition the most active replica (or side of the partition) will win. By "most active" I mean changing the flag back and forth most number of times.
Is that correct understanding?

val initial = HWWFlag(0L)

def apply(): HWWFlag = initial

/**
* Java API
*/
def create(): HWWFlag = initial
}

case class HWWFlag(state: Long) extends ReplicatedData {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state should perhaps be private?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should

type T = HWWFlag

def value: Boolean = (state & 1) == 1

def setValue(b: Boolean): HWWFlag =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setValue is a bit strange for an immutable data type, perhaps withValue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this. The problem is that a name that is passive does not convey the meaning that you should not just cache True/False values but always operate on the latest value by setting it. I don't know what name would be nice here.

if (b ^ value) HWWFlag(state + 1)
else this

override def merge(that: T): T = if (that.state > this.state) that else this
}
38 changes: 38 additions & 0 deletions src/main/scala/akka/contrib/datareplication/HWWMap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package akka.contrib.datareplication

object HWWMap {

val empty = HWWMap(Map.empty[String, HWWRegister].withDefaultValue(HWWRegister.initial))

def apply(): HWWMap = empty
}

case class HWWMap(private[akka] val state: Map[String, HWWRegister]) extends ReplicatedData {
override type T = HWWMap

override def merge(that: T): T = {
val allKeys = this.state.keySet ++ that.state.keySet
var acc = Map.empty[String, HWWRegister]

for (k <- allKeys) {
(this.state.get(k), that.state.get(k)) match {
case (Some(a), Some(b)) => acc += k -> a.merge(b)
case (None, Some(b)) => acc += k -> b
case (Some(a), None) => acc += k -> a
}
}

HWWMap(acc)
}



def get(key: String): Int = state.get(key).map(_.value).getOrElse(0)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps slightly more efficient to use match?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


/**
* Adds an entry to the map
*/
def put(key: String, value: Int): HWWMap =
HWWMap(state + (key -> state(key).set(value)))

}
68 changes: 68 additions & 0 deletions src/main/scala/akka/contrib/datareplication/HWWRegister.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package akka.contrib.datareplication

object HWWRegister {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the end user semantics.
In the case of concurrent updates during a partition a "random" replica (or side of the partition) will win.
Is that correct understanding?
Is it be more likely that the side that perform most number of changes will win?

val initial = HWWRegister(0, 0)

def apply(): HWWRegister = initial


// FNV-1a hash used only as a simple and fast one-way function with good diffusion. Not cryptographically secure.
private[akka] def fnv32(x: Int): Int = {
var hash = 0x811C9DC5 // 2166136261
@inline def round(x: Int): Unit = {
hash ^= x
hash *= 16777619
}

round(x & 0xFF)
round((x >>> 8) & 0xFF)
round((x >>> 16) & 0xFF)
round(x >>> 24)
hash
}

// Two rounds Feistel block to create a "random" permutation. This is not cryptographically safe, the Feistel
// structure is only used to have an invertible function (permutation) Long => Long keyed by the epoch.
private[akka] def permute(x: Int, epoch: Int): Int = {
var right = x & 0xFFFF
var left = x >>> 16

@inline def feistelRound(): Unit = {
val oldright = right
right = left ^ (fnv32(oldright + epoch) & 0xFFFF)
left = oldright
}

feistelRound()
feistelRound()
feistelRound()
feistelRound()

(left << 16) | right
}

// A "randomly" changing comparison function that is different in every epoch, but consistent in a certain epoch
// values that are not equal originally will not be reported as equal after the permutation
private def largerThan(x: Int, y: Int, epoch: Int): Boolean = permute(x, epoch) > permute(y, epoch)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, I'm curious to know what role this permutation plays, something with bumping the epoch?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When epoch values are the same, then the register value itself decides. Unfortunately that would be unfair since concurrent writes of let's say 1 and 2 would always result in 2 winning. With this permutation I shuffle a bit the ordering so different values can win in different epochs.

}

case class HWWRegister(private[akka] val epoch: Int, value: Int) extends ReplicatedData {
import HWWRegister._
type T = HWWRegister

private def earlierThan(that: HWWRegister): Boolean = {
if (that.epoch > this.epoch) true
else if (that.epoch < this.epoch) false
else largerThan(that.value, this.value, epoch)
}

def set(newValue: Int): HWWRegister = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withValue?

if (newValue == value) this
else if (largerThan(newValue, value, epoch)) this.copy(value = newValue)
else this.copy(epoch = epoch + 1, value = newValue)
}

def get(): Int = value
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getValue if this is Java API?


override def merge(that: T): T = if (this earlierThan that) that else this
}
46 changes: 46 additions & 0 deletions src/main/scala/akka/contrib/datareplication/HWWSet.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package akka.contrib.datareplication

object HWWSet {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the end user semantics.
Same as for HWWFlag, but for each individual element in the Set.
Is that correct understanding?

val empty = HWWSet(Map.empty[Any, HWWFlag].withDefaultValue(HWWFlag.initial))

def apply(): HWWSet = empty
}

case class HWWSet(private[akka] val state: Map[Any, HWWFlag]) extends ReplicatedData {
type T = HWWSet

/**
* Monotonic merge function.
*/
override def merge(that: T): T = {
val allKeys = this.state.keySet ++ that.state.keySet
var acc = Map.empty[Any, HWWFlag]

for (k <- allKeys) {
(this.state.get(k), that.state.get(k)) match {
case (Some(a), Some(b)) => acc += k -> a.merge(b)
case (None, Some(b)) => acc += k -> b
case (Some(a), None) => acc += k -> a
}
}

HWWSet(acc)
}

def contains(x: Any): Boolean = state.get(x).exists(_.value)

lazy val value: Set[Any] = state.filter(_._2.value).keySet

/**
* Adds an element to the set
*/
def add(element: Any): HWWSet =
HWWSet(state + (element -> state(element).setValue(true)))

/**
* Removes an element from the set
*/
def remove(element: Any): HWWSet =
HWWSet(state + (element -> state(element).setValue(false)))

}
78 changes: 78 additions & 0 deletions src/test/scala/akka/contrib/datareplication/HWWMapSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.contrib.datareplication

import org.scalatest.WordSpec
import org.scalatest.Matchers
import akka.actor.Address
import akka.cluster.UniqueAddress

class HWWMapSpec extends WordSpec with Matchers {


"A HWWMap" must {

"be able to add entries" in {
val m = HWWMap().put("a", 1).put("b", 2)
m.get("a") should be(1)
m.get("b") should be(2)

val m2 = m.put("a", 3)
m2.get("a") should be(3)
}

"be able to change entry" in {
val m = HWWMap().put("a", 1).put("b", 2).put("a", 0)
m.get("a") should be(0)
m.get("b") should be(2)
}

"be able to change multiple times" in {
val m = HWWMap().put("a", 1).put("b", 2).put("a", 0).put("a", 2).put("b", 1)
m.get("a") should be(2)
m.get("b") should be(1)
}

"be able to have its entries correctly merged with another HWWMap with other entries" in {
val m1 = HWWMap().put("a", 1).put("b", 2)
val m2 = HWWMap().put("c", 3)

// merge both ways
val merged1 = m1 merge m2
merged1.get("a") should be(1)
merged1.get("b") should be(2)
merged1.get("c") should be(3)

val merged2 = m2 merge m1
merged2.get("a") should be(1)
merged2.get("b") should be(2)
merged2.get("c") should be(3)

}

"be able to have its entries correctly merged with another HWWMap with overlapping entries" in {

val m1 = HWWMap().put("a", 11).put("b", 12).put("a", 0).put("d", 14)
val m2 = HWWMap().put("c", 23).put("a", 21).put("b", 22).put("b", 0).put("d", 24)

// a -> 0 or a -> 21 can both happen, but not a -> 11
// b -> 12 or b -> 0 can happen, but not b -> 22
// c -> 23 must be true
// d -> 24 or d -> 14 can happen, but not d -> 0
val merged1 = m1 merge m2
merged1.get("a") should (equal(0) or equal(21))
merged1.get("b") should (equal(0) or equal(12))
merged1.get("c") should be(23)
merged1.get("d") should (equal(14) or equal(24))

val merged2 = m2 merge m1
merged1.get("a") should be(merged2.get("a"))
merged1.get("b") should be(merged2.get("b"))
merged1.get("c") should be(merged2.get("c"))
merged1.get("d") should be(merged2.get("d"))
}

}
}
44 changes: 44 additions & 0 deletions src/test/scala/akka/contrib/datareplication/HWWRegisterSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package akka.contrib.datareplication

import org.scalatest.{Matchers, WordSpec}

class HWWRegisterSpec extends WordSpec with Matchers {

"HWWRegister unit test" must {
import HWWRegister._

"verify that FNV-1a is implemented properly" in {
// Refereence values from http://www.isthe.com/chongo/tech/comp/fnv/
fnv32(0xC43124CC) should be(0)
fnv32(0xCB9F4DE0) should be(0)
}

"verify that permute is invertible" in {
val sampling = 0 to Int.MaxValue by 65537

def depermute(x: Int, epoch: Int): Int = {
var right = x & 0xFFFF
var left = x >>> 16

@inline def feistelRound(): Unit = {
val oldleft = left
left = right ^ (fnv32(oldleft + epoch) & 0xFFFF)
right = oldleft
}

feistelRound()
feistelRound()
feistelRound()
feistelRound()

(left << 16) | right
}

for (x <- sampling) {
val xx = HWWRegister.permute(x, epoch = 42)
depermute(xx, epoch = 42) should be(x)
}
}
}

}
Loading