-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Highest Write Wins flag, register, map and set #40
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package akka.contrib.datareplication | ||
|
||
object HWWFlag { | ||
val initial = HWWFlag(0L) | ||
|
||
def apply(): HWWFlag = initial | ||
|
||
/** | ||
* Java API | ||
*/ | ||
def create(): HWWFlag = initial | ||
} | ||
|
||
case class HWWFlag(state: Long) extends ReplicatedData { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it should |
||
type T = HWWFlag | ||
|
||
def value: Boolean = (state & 1) == 1 | ||
|
||
def setValue(b: Boolean): HWWFlag = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about this. The problem is that a name that is passive does not convey the meaning that you should not just cache True/False values but always operate on the latest value by setting it. I don't know what name would be nice here. |
||
if (b ^ value) HWWFlag(state + 1) | ||
else this | ||
|
||
override def merge(that: T): T = if (that.state > this.state) that else this | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package akka.contrib.datareplication | ||
|
||
object HWWMap { | ||
|
||
val empty = HWWMap(Map.empty[String, HWWRegister].withDefaultValue(HWWRegister.initial)) | ||
|
||
def apply(): HWWMap = empty | ||
} | ||
|
||
case class HWWMap(private[akka] val state: Map[String, HWWRegister]) extends ReplicatedData { | ||
override type T = HWWMap | ||
|
||
override def merge(that: T): T = { | ||
val allKeys = this.state.keySet ++ that.state.keySet | ||
var acc = Map.empty[String, HWWRegister] | ||
|
||
for (k <- allKeys) { | ||
(this.state.get(k), that.state.get(k)) match { | ||
case (Some(a), Some(b)) => acc += k -> a.merge(b) | ||
case (None, Some(b)) => acc += k -> b | ||
case (Some(a), None) => acc += k -> a | ||
} | ||
} | ||
|
||
HWWMap(acc) | ||
} | ||
|
||
|
||
|
||
def get(key: String): Int = state.get(key).map(_.value).getOrElse(0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps slightly more efficient to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
|
||
/** | ||
* Adds an entry to the map | ||
*/ | ||
def put(key: String, value: Int): HWWMap = | ||
HWWMap(state + (key -> state(key).set(value))) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package akka.contrib.datareplication | ||
|
||
object HWWRegister { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trying to understand the end user semantics. |
||
val initial = HWWRegister(0, 0) | ||
|
||
def apply(): HWWRegister = initial | ||
|
||
|
||
// FNV-1a hash used only as a simple and fast one-way function with good diffusion. Not cryptographically secure. | ||
private[akka] def fnv32(x: Int): Int = { | ||
var hash = 0x811C9DC5 // 2166136261 | ||
@inline def round(x: Int): Unit = { | ||
hash ^= x | ||
hash *= 16777619 | ||
} | ||
|
||
round(x & 0xFF) | ||
round((x >>> 8) & 0xFF) | ||
round((x >>> 16) & 0xFF) | ||
round(x >>> 24) | ||
hash | ||
} | ||
|
||
// Two rounds Feistel block to create a "random" permutation. This is not cryptographically safe, the Feistel | ||
// structure is only used to have an invertible function (permutation) Long => Long keyed by the epoch. | ||
private[akka] def permute(x: Int, epoch: Int): Int = { | ||
var right = x & 0xFFFF | ||
var left = x >>> 16 | ||
|
||
@inline def feistelRound(): Unit = { | ||
val oldright = right | ||
right = left ^ (fnv32(oldright + epoch) & 0xFFFF) | ||
left = oldright | ||
} | ||
|
||
feistelRound() | ||
feistelRound() | ||
feistelRound() | ||
feistelRound() | ||
|
||
(left << 16) | right | ||
} | ||
|
||
// A "randomly" changing comparison function that is different in every epoch, but consistent in a certain epoch | ||
// values that are not equal originally will not be reported as equal after the permutation | ||
private def largerThan(x: Int, y: Int, epoch: Int): Boolean = permute(x, epoch) > permute(y, epoch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting, I'm curious to know what role this permutation plays, something with bumping the epoch? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When epoch values are the same, then the register value itself decides. Unfortunately that would be unfair since concurrent writes of let's say 1 and 2 would always result in 2 winning. With this permutation I shuffle a bit the ordering so different values can win in different epochs. |
||
} | ||
|
||
case class HWWRegister(private[akka] val epoch: Int, value: Int) extends ReplicatedData { | ||
import HWWRegister._ | ||
type T = HWWRegister | ||
|
||
private def earlierThan(that: HWWRegister): Boolean = { | ||
if (that.epoch > this.epoch) true | ||
else if (that.epoch < this.epoch) false | ||
else largerThan(that.value, this.value, epoch) | ||
} | ||
|
||
def set(newValue: Int): HWWRegister = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if (newValue == value) this | ||
else if (largerThan(newValue, value, epoch)) this.copy(value = newValue) | ||
else this.copy(epoch = epoch + 1, value = newValue) | ||
} | ||
|
||
def get(): Int = value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
override def merge(that: T): T = if (this earlierThan that) that else this | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package akka.contrib.datareplication | ||
|
||
object HWWSet { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trying to understand the end user semantics. |
||
val empty = HWWSet(Map.empty[Any, HWWFlag].withDefaultValue(HWWFlag.initial)) | ||
|
||
def apply(): HWWSet = empty | ||
} | ||
|
||
case class HWWSet(private[akka] val state: Map[Any, HWWFlag]) extends ReplicatedData { | ||
type T = HWWSet | ||
|
||
/** | ||
* Monotonic merge function. | ||
*/ | ||
override def merge(that: T): T = { | ||
val allKeys = this.state.keySet ++ that.state.keySet | ||
var acc = Map.empty[Any, HWWFlag] | ||
|
||
for (k <- allKeys) { | ||
(this.state.get(k), that.state.get(k)) match { | ||
case (Some(a), Some(b)) => acc += k -> a.merge(b) | ||
case (None, Some(b)) => acc += k -> b | ||
case (Some(a), None) => acc += k -> a | ||
} | ||
} | ||
|
||
HWWSet(acc) | ||
} | ||
|
||
def contains(x: Any): Boolean = state.get(x).exists(_.value) | ||
|
||
lazy val value: Set[Any] = state.filter(_._2.value).keySet | ||
|
||
/** | ||
* Adds an element to the set | ||
*/ | ||
def add(element: Any): HWWSet = | ||
HWWSet(state + (element -> state(element).setValue(true))) | ||
|
||
/** | ||
* Removes an element from the set | ||
*/ | ||
def remove(element: Any): HWWSet = | ||
HWWSet(state + (element -> state(element).setValue(false))) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/** | ||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> | ||
*/ | ||
|
||
package akka.contrib.datareplication | ||
|
||
import org.scalatest.WordSpec | ||
import org.scalatest.Matchers | ||
import akka.actor.Address | ||
import akka.cluster.UniqueAddress | ||
|
||
class HWWMapSpec extends WordSpec with Matchers { | ||
|
||
|
||
"A HWWMap" must { | ||
|
||
"be able to add entries" in { | ||
val m = HWWMap().put("a", 1).put("b", 2) | ||
m.get("a") should be(1) | ||
m.get("b") should be(2) | ||
|
||
val m2 = m.put("a", 3) | ||
m2.get("a") should be(3) | ||
} | ||
|
||
"be able to change entry" in { | ||
val m = HWWMap().put("a", 1).put("b", 2).put("a", 0) | ||
m.get("a") should be(0) | ||
m.get("b") should be(2) | ||
} | ||
|
||
"be able to change multiple times" in { | ||
val m = HWWMap().put("a", 1).put("b", 2).put("a", 0).put("a", 2).put("b", 1) | ||
m.get("a") should be(2) | ||
m.get("b") should be(1) | ||
} | ||
|
||
"be able to have its entries correctly merged with another HWWMap with other entries" in { | ||
val m1 = HWWMap().put("a", 1).put("b", 2) | ||
val m2 = HWWMap().put("c", 3) | ||
|
||
// merge both ways | ||
val merged1 = m1 merge m2 | ||
merged1.get("a") should be(1) | ||
merged1.get("b") should be(2) | ||
merged1.get("c") should be(3) | ||
|
||
val merged2 = m2 merge m1 | ||
merged2.get("a") should be(1) | ||
merged2.get("b") should be(2) | ||
merged2.get("c") should be(3) | ||
|
||
} | ||
|
||
"be able to have its entries correctly merged with another HWWMap with overlapping entries" in { | ||
|
||
val m1 = HWWMap().put("a", 11).put("b", 12).put("a", 0).put("d", 14) | ||
val m2 = HWWMap().put("c", 23).put("a", 21).put("b", 22).put("b", 0).put("d", 24) | ||
|
||
// a -> 0 or a -> 21 can both happen, but not a -> 11 | ||
// b -> 12 or b -> 0 can happen, but not b -> 22 | ||
// c -> 23 must be true | ||
// d -> 24 or d -> 14 can happen, but not d -> 0 | ||
val merged1 = m1 merge m2 | ||
merged1.get("a") should (equal(0) or equal(21)) | ||
merged1.get("b") should (equal(0) or equal(12)) | ||
merged1.get("c") should be(23) | ||
merged1.get("d") should (equal(14) or equal(24)) | ||
|
||
val merged2 = m2 merge m1 | ||
merged1.get("a") should be(merged2.get("a")) | ||
merged1.get("b") should be(merged2.get("b")) | ||
merged1.get("c") should be(merged2.get("c")) | ||
merged1.get("d") should be(merged2.get("d")) | ||
} | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package akka.contrib.datareplication | ||
|
||
import org.scalatest.{Matchers, WordSpec} | ||
|
||
class HWWRegisterSpec extends WordSpec with Matchers { | ||
|
||
"HWWRegister unit test" must { | ||
import HWWRegister._ | ||
|
||
"verify that FNV-1a is implemented properly" in { | ||
// Refereence values from http://www.isthe.com/chongo/tech/comp/fnv/ | ||
fnv32(0xC43124CC) should be(0) | ||
fnv32(0xCB9F4DE0) should be(0) | ||
} | ||
|
||
"verify that permute is invertible" in { | ||
val sampling = 0 to Int.MaxValue by 65537 | ||
|
||
def depermute(x: Int, epoch: Int): Int = { | ||
var right = x & 0xFFFF | ||
var left = x >>> 16 | ||
|
||
@inline def feistelRound(): Unit = { | ||
val oldleft = left | ||
left = right ^ (fnv32(oldleft + epoch) & 0xFFFF) | ||
right = oldleft | ||
} | ||
|
||
feistelRound() | ||
feistelRound() | ||
feistelRound() | ||
feistelRound() | ||
|
||
(left << 16) | right | ||
} | ||
|
||
for (x <- sampling) { | ||
val xx = HWWRegister.permute(x, epoch = 42) | ||
depermute(xx, epoch = 42) should be(x) | ||
} | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand the end user semantics.
In the case of concurrent updates during a partition the most active replica (or side of the partition) will win. By "most active" I mean changing the flag back and forth most number of times.
Is that correct understanding?