diff --git a/src/main/scala/akka/contrib/datareplication/HWWFlag.scala b/src/main/scala/akka/contrib/datareplication/HWWFlag.scala new file mode 100644 index 0000000..7eb0fa1 --- /dev/null +++ b/src/main/scala/akka/contrib/datareplication/HWWFlag.scala @@ -0,0 +1,24 @@ +package akka.contrib.datareplication + +object HWWFlag { + val initial = HWWFlag(0L) + + def apply(): HWWFlag = initial + + /** + * Java API + */ + def create(): HWWFlag = initial +} + +case class HWWFlag(state: Long) extends ReplicatedData { + type T = HWWFlag + + def value: Boolean = (state & 1) == 1 + + def setValue(b: Boolean): HWWFlag = + if (b ^ value) HWWFlag(state + 1) + else this + + override def merge(that: T): T = if (that.state > this.state) that else this +} diff --git a/src/main/scala/akka/contrib/datareplication/HWWMap.scala b/src/main/scala/akka/contrib/datareplication/HWWMap.scala new file mode 100644 index 0000000..aee0c89 --- /dev/null +++ b/src/main/scala/akka/contrib/datareplication/HWWMap.scala @@ -0,0 +1,38 @@ +package akka.contrib.datareplication + +object HWWMap { + + val empty = HWWMap(Map.empty[String, HWWRegister].withDefaultValue(HWWRegister.initial)) + + def apply(): HWWMap = empty +} + +case class HWWMap(private[akka] val state: Map[String, HWWRegister]) extends ReplicatedData { + override type T = HWWMap + + override def merge(that: T): T = { + val allKeys = this.state.keySet ++ that.state.keySet + var acc = Map.empty[String, HWWRegister] + + for (k <- allKeys) { + (this.state.get(k), that.state.get(k)) match { + case (Some(a), Some(b)) => acc += k -> a.merge(b) + case (None, Some(b)) => acc += k -> b + case (Some(a), None) => acc += k -> a + } + } + + HWWMap(acc) + } + + + + def get(key: String): Int = state.get(key).map(_.value).getOrElse(0) + + /** + * Adds an entry to the map + */ + def put(key: String, value: Int): HWWMap = + HWWMap(state + (key -> state(key).set(value))) + +} diff --git a/src/main/scala/akka/contrib/datareplication/HWWRegister.scala b/src/main/scala/akka/contrib/datareplication/HWWRegister.scala new file mode 100644 index 0000000..727239f --- /dev/null +++ b/src/main/scala/akka/contrib/datareplication/HWWRegister.scala @@ -0,0 +1,68 @@ +package akka.contrib.datareplication + +object HWWRegister { + val initial = HWWRegister(0, 0) + + def apply(): HWWRegister = initial + + + // FNV-1a hash used only as a simple and fast one-way function with good diffusion. Not cryptographically secure. + private[akka] def fnv32(x: Int): Int = { + var hash = 0x811C9DC5 // 2166136261 + @inline def round(x: Int): Unit = { + hash ^= x + hash *= 16777619 + } + + round(x & 0xFF) + round((x >>> 8) & 0xFF) + round((x >>> 16) & 0xFF) + round(x >>> 24) + hash + } + + // Two rounds Feistel block to create a "random" permutation. This is not cryptographically safe, the Feistel + // structure is only used to have an invertible function (permutation) Long => Long keyed by the epoch. + private[akka] def permute(x: Int, epoch: Int): Int = { + var right = x & 0xFFFF + var left = x >>> 16 + + @inline def feistelRound(): Unit = { + val oldright = right + right = left ^ (fnv32(oldright + epoch) & 0xFFFF) + left = oldright + } + + feistelRound() + feistelRound() + feistelRound() + feistelRound() + + (left << 16) | right + } + + // A "randomly" changing comparison function that is different in every epoch, but consistent in a certain epoch + // values that are not equal originally will not be reported as equal after the permutation + private def largerThan(x: Int, y: Int, epoch: Int): Boolean = permute(x, epoch) > permute(y, epoch) +} + +case class HWWRegister(private[akka] val epoch: Int, value: Int) extends ReplicatedData { + import HWWRegister._ + type T = HWWRegister + + private def earlierThan(that: HWWRegister): Boolean = { + if (that.epoch > this.epoch) true + else if (that.epoch < this.epoch) false + else largerThan(that.value, this.value, epoch) + } + + def set(newValue: Int): HWWRegister = { + if (newValue == value) this + else if (largerThan(newValue, value, epoch)) this.copy(value = newValue) + else this.copy(epoch = epoch + 1, value = newValue) + } + + def get(): Int = value + + override def merge(that: T): T = if (this earlierThan that) that else this +} diff --git a/src/main/scala/akka/contrib/datareplication/HWWSet.scala b/src/main/scala/akka/contrib/datareplication/HWWSet.scala new file mode 100644 index 0000000..b1e5d88 --- /dev/null +++ b/src/main/scala/akka/contrib/datareplication/HWWSet.scala @@ -0,0 +1,46 @@ +package akka.contrib.datareplication + +object HWWSet { + val empty = HWWSet(Map.empty[Any, HWWFlag].withDefaultValue(HWWFlag.initial)) + + def apply(): HWWSet = empty +} + +case class HWWSet(private[akka] val state: Map[Any, HWWFlag]) extends ReplicatedData { + type T = HWWSet + + /** + * Monotonic merge function. + */ + override def merge(that: T): T = { + val allKeys = this.state.keySet ++ that.state.keySet + var acc = Map.empty[Any, HWWFlag] + + for (k <- allKeys) { + (this.state.get(k), that.state.get(k)) match { + case (Some(a), Some(b)) => acc += k -> a.merge(b) + case (None, Some(b)) => acc += k -> b + case (Some(a), None) => acc += k -> a + } + } + + HWWSet(acc) + } + + def contains(x: Any): Boolean = state.get(x).exists(_.value) + + lazy val value: Set[Any] = state.filter(_._2.value).keySet + + /** + * Adds an element to the set + */ + def add(element: Any): HWWSet = + HWWSet(state + (element -> state(element).setValue(true))) + + /** + * Removes an element from the set + */ + def remove(element: Any): HWWSet = + HWWSet(state + (element -> state(element).setValue(false))) + +} diff --git a/src/test/scala/akka/contrib/datareplication/HWWMapSpec.scala b/src/test/scala/akka/contrib/datareplication/HWWMapSpec.scala new file mode 100644 index 0000000..946b8f4 --- /dev/null +++ b/src/test/scala/akka/contrib/datareplication/HWWMapSpec.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.contrib.datareplication + +import org.scalatest.WordSpec +import org.scalatest.Matchers +import akka.actor.Address +import akka.cluster.UniqueAddress + +class HWWMapSpec extends WordSpec with Matchers { + + + "A HWWMap" must { + + "be able to add entries" in { + val m = HWWMap().put("a", 1).put("b", 2) + m.get("a") should be(1) + m.get("b") should be(2) + + val m2 = m.put("a", 3) + m2.get("a") should be(3) + } + + "be able to change entry" in { + val m = HWWMap().put("a", 1).put("b", 2).put("a", 0) + m.get("a") should be(0) + m.get("b") should be(2) + } + + "be able to change multiple times" in { + val m = HWWMap().put("a", 1).put("b", 2).put("a", 0).put("a", 2).put("b", 1) + m.get("a") should be(2) + m.get("b") should be(1) + } + + "be able to have its entries correctly merged with another HWWMap with other entries" in { + val m1 = HWWMap().put("a", 1).put("b", 2) + val m2 = HWWMap().put("c", 3) + + // merge both ways + val merged1 = m1 merge m2 + merged1.get("a") should be(1) + merged1.get("b") should be(2) + merged1.get("c") should be(3) + + val merged2 = m2 merge m1 + merged2.get("a") should be(1) + merged2.get("b") should be(2) + merged2.get("c") should be(3) + + } + + "be able to have its entries correctly merged with another HWWMap with overlapping entries" in { + + val m1 = HWWMap().put("a", 11).put("b", 12).put("a", 0).put("d", 14) + val m2 = HWWMap().put("c", 23).put("a", 21).put("b", 22).put("b", 0).put("d", 24) + + // a -> 0 or a -> 21 can both happen, but not a -> 11 + // b -> 12 or b -> 0 can happen, but not b -> 22 + // c -> 23 must be true + // d -> 24 or d -> 14 can happen, but not d -> 0 + val merged1 = m1 merge m2 + merged1.get("a") should (equal(0) or equal(21)) + merged1.get("b") should (equal(0) or equal(12)) + merged1.get("c") should be(23) + merged1.get("d") should (equal(14) or equal(24)) + + val merged2 = m2 merge m1 + merged1.get("a") should be(merged2.get("a")) + merged1.get("b") should be(merged2.get("b")) + merged1.get("c") should be(merged2.get("c")) + merged1.get("d") should be(merged2.get("d")) + } + + } +} diff --git a/src/test/scala/akka/contrib/datareplication/HWWRegisterSpec.scala b/src/test/scala/akka/contrib/datareplication/HWWRegisterSpec.scala new file mode 100644 index 0000000..091a5db --- /dev/null +++ b/src/test/scala/akka/contrib/datareplication/HWWRegisterSpec.scala @@ -0,0 +1,44 @@ +package akka.contrib.datareplication + +import org.scalatest.{Matchers, WordSpec} + +class HWWRegisterSpec extends WordSpec with Matchers { + + "HWWRegister unit test" must { + import HWWRegister._ + + "verify that FNV-1a is implemented properly" in { + // Refereence values from http://www.isthe.com/chongo/tech/comp/fnv/ + fnv32(0xC43124CC) should be(0) + fnv32(0xCB9F4DE0) should be(0) + } + + "verify that permute is invertible" in { + val sampling = 0 to Int.MaxValue by 65537 + + def depermute(x: Int, epoch: Int): Int = { + var right = x & 0xFFFF + var left = x >>> 16 + + @inline def feistelRound(): Unit = { + val oldleft = left + left = right ^ (fnv32(oldleft + epoch) & 0xFFFF) + right = oldleft + } + + feistelRound() + feistelRound() + feistelRound() + feistelRound() + + (left << 16) | right + } + + for (x <- sampling) { + val xx = HWWRegister.permute(x, epoch = 42) + depermute(xx, epoch = 42) should be(x) + } + } + } + +} diff --git a/src/test/scala/akka/contrib/datareplication/HWWSetSpec.scala b/src/test/scala/akka/contrib/datareplication/HWWSetSpec.scala new file mode 100644 index 0000000..9860c74 --- /dev/null +++ b/src/test/scala/akka/contrib/datareplication/HWWSetSpec.scala @@ -0,0 +1,236 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.contrib.datareplication + +import org.scalatest.WordSpec +import org.scalatest.Matchers +import scala.collection.immutable.TreeMap +import akka.actor.Address +import akka.cluster.UniqueAddress + +class HWWSetSpec extends WordSpec with Matchers { + + val user1 = """{"username":"john","password":"coltrane"}""" + val user2 = """{"username":"sonny","password":"rollins"}""" + val user3 = """{"username":"charlie","password":"parker"}""" + val user4 = """{"username":"charles","password":"mingus"}""" + + "A HWWSet" must { + + "be able to add user" in { + val c1 = HWWSet() + + val c2 = c1.add(user1) + val c3 = c2.add(user2) + + val c4 = c3.add(user4) + val c5 = c4.add(user3) + + c5.value should contain(user1) + c5.value should contain(user2) + c5.value should contain(user3) + c5.value should contain(user4) + } + + "be able to remove added user" in { + val c1 = HWWSet() + + val c2 = c1.add(user1) + val c3 = c2.add(user2) + + val c4 = c3.remove(user2) + val c5 = c4.remove(user1) + + c5.value should not contain (user1) + c5.value should not contain (user2) + } + + "be able to add removed" in { + val c1 = HWWSet() + val c2 = c1.remove(user1) + val c3 = c2.add(user1) + c3.value should contain(user1) + val c4 = c3.remove(user1) + c4.value should not contain (user1) + val c5 = c4.add(user1) + c5.value should contain(user1) + } + + "be able to remove and add several times" in { + val c1 = HWWSet() + + val c2 = c1.add(user1) + val c3 = c2.add(user2) + val c4 = c3.remove(user1) + c4.value should not contain (user1) + c4.value should contain(user2) + + val c5 = c4.add(user1) + val c6 = c5.add(user2) + c6.value should contain(user1) + c6.value should contain(user2) + + val c7 = c6.remove(user1) + val c8 = c7.add(user2) + val c9 = c8.remove(user1) + c9.value should not contain (user1) + c9.value should contain(user2) + } + + "be able to have its user set correctly merged with another HWWSet with unique user sets" in { + // set 1 + val c1 = HWWSet().add(user1).add(user2) + c1.value should contain(user1) + c1.value should contain(user2) + + // set 2 + val c2 = HWWSet().add(user3).add(user4).remove(user3) + + c2.value should not contain (user3) + c2.value should contain(user4) + + println(c1) + println(c2) + + // merge both ways + println("MERGING") + val merged1 = c1 merge c2 + merged1.value should contain(user1) + merged1.value should contain(user2) + merged1.value should not contain (user3) + merged1.value should contain(user4) + + val merged2 = c2 merge c1 + merged2.value should contain(user1) + merged2.value should contain(user2) + merged2.value should not contain (user3) + merged2.value should contain(user4) + } + + "be able to have its user set correctly merged with another HWWSet with overlapping user sets" in { + // set 1 + val c1 = HWWSet().add(user1).add(user2).add(user3).remove(user1).remove(user3) + + c1.value should not contain (user1) + c1.value should contain(user2) + c1.value should not contain (user3) + + // set 2 + val c2 = HWWSet().add(user1).add(user2).add(user3).add(user4).remove(user3) + + c2.value should contain(user1) + c2.value should contain(user2) + c2.value should not contain (user3) + c2.value should contain(user4) + + // merge both ways + val merged1 = c1 merge c2 + // Difference from ORSet, since c1 only added user1, while c2 added and already removed + // it, so c2 wins by having the longest write chain on the user1 entry + // Observe that this corresponds to the following possible serialization: + // .add(user1) .add(user2) .add(user3) .remove(user1).remove(user3) + // .add(user1) .add(user2) .add(user3).add(user4) .remove(user3) + merged1.value should not contain (user1) + merged1.value should contain(user2) + merged1.value should not contain (user3) + merged1.value should contain(user4) + + val merged2 = c2 merge c1 + // Difference from ORSet, since c1 only added user1, while c2 added and already removed + // it, so c2 wins by having the longest write chain on the user1 entry + // Observe that this corresponds to the following possible serialization: + // .add(user1) .add(user2) .add(user3) .remove(user1).remove(user3) + // .add(user1) .add(user2) .add(user3).add(user4) .remove(user3) + merged2.value should not contain(user1) + merged2.value should contain(user2) + merged2.value should not contain (user3) + merged2.value should contain(user4) + } + + "be able to have its user set correctly merged for concurrent updates" in { + val c1 = HWWSet().add(user1).add(user2).add(user3) + + c1.value should contain(user1) + c1.value should contain(user2) + c1.value should contain(user3) + + val c2 = c1.add(user1).remove(user2).remove(user3) + + c2.value should contain(user1) + c2.value should not contain (user2) + c2.value should not contain (user3) + + // merge both ways + val merged1 = c1 merge c2 + merged1.value should contain(user1) + merged1.value should not contain (user2) + merged1.value should not contain (user3) + + val merged2 = c2 merge c1 + merged2.value should contain(user1) + merged2.value should not contain (user2) + merged2.value should not contain (user3) + + val c3 = c1.add(user4).remove(user3).add(user2) + + // merge both ways + val merged3 = c2 merge c3 + merged3.value should contain(user1) + // Difference from ORSet, since c1.add(user2) is treated as NOP since it does not contribute + // to the length of the local write chain + // Observe that this corresponds to the following possible serialization (starting from state (user1, user2, user3)): + // .add(user1) .remove(user2).remove(user3) + // .add(user2).add(user4) .remove(user3) + // Notice that add(user2) was moved to the front for the second writer -- in HWWSet no ordering is defined between + // operations on different element, so an .op(x).op(y) might be "reordered" to an .op(y).op(x) + merged3.value should not contain (user2) + merged3.value should not contain (user3) + merged3.value should contain(user4) + + val merged4 = c3 merge c2 + merged4.value should contain(user1) + // Difference from ORSet, since c1.add(user2) is treated as NOP since it does not contribute + // to the length of the local write chain + // Observe that this corresponds to the following possible serialization (starting from state (user1, user2, user3)): + // .add(user1) .remove(user2).remove(user3) + // .add(user2).add(user4) .remove(user3) + // Notice that add(user2) was moved to the front for the second writer -- in HWWSet no ordering is defined between + // operations on different element, so an .op(x).op(y) might be "reordered" to an .op(y).op(x) + merged4.value should not contain (user2) + merged4.value should not contain (user3) + merged4.value should contain(user4) + } + + "be able to have its user set correctly merged after remove" in { + val c1 = HWWSet().add(user1).add(user2) + val c2 = c1.remove(user2) + + // merge both ways + val merged1 = c1 merge c2 + merged1.value should contain(user1) + merged1.value should not contain (user2) + + val merged2 = c2 merge c1 + merged2.value should contain(user1) + merged2.value should not contain (user2) + + val c3 = c1.add(user3) + + // merge both ways + val merged3 = c3 merge c2 + merged3.value should contain(user1) + merged3.value should not contain (user2) + merged3.value should contain(user3) + + val merged4 = c2 merge c3 + merged4.value should contain(user1) + merged4.value should not contain (user2) + merged4.value should contain(user3) + } + + } + + +}