Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pure-op based CRDT, ORSet: 'Remove' operations are being pruned before being applied #5

Open
ekkaiasmith opened this issue Jan 31, 2024 · 0 comments

Comments

@ekkaiasmith
Copy link
Contributor

The following test (which should take place in Crdt.Tests/commutative/pure/ORSetTests.fs) fails because the ORSet.remove 1 a operation is pruned from the PO-Log (state.Unstable) before it is applied. Hence, the 1 value is never removed from the state.

test "What happens when a new op obsolete a remove op before it is applied" {
        use sys = System.create "sys" <| Configuration.parse "akka.loglevel = DEBUG"

        let a = spawn sys "A" <| props (ORSet.props "A")
        let b = spawn sys "B" <| props (ORSet.props "B")
        a <! Connect("B", b)
        b <! Connect("A", a)

        let state = ORSet.add 1 a |> wait
        let state = ORSet.add 2 b |> wait

        Thread.Sleep 500

        let state = ORSet.remove 1 a |> wait
        Expect.equal state (Set.singleton 2) "A returns set without removed value"
        let state = ORSet.add 3 b |> wait
        
        Thread.Sleep 500
        
        let s1 = ORSet.query a |> wait
        let s2 = ORSet.query b |> wait
        
        Expect.equal s1 s2 "After replication both sides should have the same state"
        Expect.equal s1 (Set.ofList [2;3]) "On concurrent conflicting update, add wins"
}

I added some logs to make it clearly visible when the operation is removed:

Capture d’écran 2024-01-31 à 15 26 26 Capture d’écran 2024-01-31 à 15 27 48

It seems to come from the obsoletes method implementation in the OR-Set (Crdt/commutative/pure/ORSet.fs):

 member this.Obsoletes(o, n) =
            match n.Value, o.Value with
            | Add v2, Add v1 when Version.compare n.Version o.Version = Ord.Lt -> v1 = v2    // add v2 is obsolete when its lower than another add
            | Add v2, Remove v1 when Version.compare n.Version o.Version = Ord.Lt -> v1 = v2 // add v2 is obsolete when its lower than another remove
            | Remove _, _ -> true // since o can be only lower or concurrent, it's always losing (add-wins)
            | _ -> false

Also, PO-Log's event pruning is done before the operations are applied (Crdt/commutative/pure/Replication.fs):

for op in actual do
          let observed = state.Observed |> MTime.update nodeId op.Version
          // check if new incoming event is not obsolete
          let obsolete = state.Unstable |> Set.exists(fun o -> crdt.Obsoletes(o, op))

          // HERE --> prune unstable operations, which have been obsoleted by incoming event
          let pruned = state.Unstable |> Set.filter (fun o -> not (crdt.Obsoletes(op, o)))

          state <- { state with
                       Observed = observed
                       Unstable = if obsolete then pruned else Set.add op pruned
                       LatestVersion = Version.merge op.Version state.LatestVersion }
        
        // check if some of unstable events have stabilized now  
        let stableOps, unstableOps, stableVersion = stabilize state
        let state =
          if not (Set.isEmpty stableOps) then
            logDebugf ctx "stabilized state %A over %O (stable: %O, unstable: %O)" state.Stable stableVersion stableOps unstableOps
            // HERE --> operations are applied after pruning
            let stable = crdt.Apply(state.Stable, stableOps)
            { state with Stable = stable; Unstable = unstableOps; StableVersion = stableVersion }
          else
            state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant