Skip to content

Commit

Permalink
refactor: refactor the remote state merging code (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 16, 2024
1 parent caf02dd commit a8da6df
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 111 deletions.
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/Tochemey/gokv/build.yml)]((https://github.com/Tochemey/gokv/actions/workflows/build.yml))

Simple Distributed in-memory key/value store. GoKV provides high availability and fault tolerance which makes it suitable large-scale applications system without sacrificing performance and reliability.
With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers. Go-KV uses the push-pull anti-entropy method to replicate nodes' state across the cluster. This
approach makes Go-KV eventually consistent.
With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers.

## Installation

Expand All @@ -14,6 +13,8 @@ go get github.com/tochemey/gokv

## Features

- Go-KV uses the push-pull anti-entropy method to replicate nodes' state across the cluster. This approach makes Go-KV eventually consistent.
One can set the [`stateSyncInterval`](./cluster/config.go) value to low for frequent state synchronisation at a network cost.
- Discovery API to implement custom nodes discovery provider. See: [Discovery API](./discovery/provider.go)
- Comes bundled with some discovery providers that can help you hit the ground running:
- [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is fully functional
Expand All @@ -24,6 +25,15 @@ go get github.com/tochemey/gokv
- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `Exists`: check the existence of a given `key` in the cluster.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `archived`.

## Use Cases

- Distributed cache

## Example

There is an example on how to use it with NATs [here](./example/example.go)

## Builtin Discovery

Expand Down
47 changes: 37 additions & 10 deletions cluster/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
package cluster

import (
"maps"
"sync"
"time"

"github.com/hashicorp/memberlist"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/tochemey/gokv/internal/internalpb"
"github.com/tochemey/gokv/internal/lib"
Expand Down Expand Up @@ -118,22 +119,46 @@ func (d *Delegate) MergeRemoteState(buf []byte, join bool) {
// iterate all the entries coming from the remote node
// 1. if there is corresponding node ID in the node local state, combine the local state entries for that nodeID with the remote node entries
// 2. if there is no corresponding node ID in the node local state, set the entries with the remote entries
for _, nodeState := range remoteFSM.GetNodeStates() {
localEntries, ok := entries[nodeState.GetNodeId()]
for _, remoteNodeState := range remoteFSM.GetNodeStates() {
localEntries, ok := entries[remoteNodeState.GetNodeId()]
if !ok {
entries[nodeState.GetNodeId()] = nodeState.GetEntries()
entries[remoteNodeState.GetNodeId()] = remoteNodeState.GetEntries()
continue
}

// create entries when no entries are defined
if len(localEntries) == 0 {
localEntries = make(map[string]*internalpb.Entry)
}

maps.Copy(localEntries, nodeState.GetEntries())
entries[nodeState.GetNodeId()] = localEntries
/*******************************************************************************
small algorithm to merge incoming remote state entries with the local state entries
********************************************************************************/
// 1. iterate the incoming state entries
for key, remoteEntry := range remoteNodeState.GetEntries() {
// 2. check whether an incoming key already exists
localEntry, ok := localEntries[key]
// 3. if the key does not exist then add it as part of the existing entries
if !ok {
localEntries[key] = remoteEntry
continue
}

// 4. if the key entry exists then check its timestamp against the incoming entry
// 5. if the existing key entry is newer compared to the incoming entry ignore the incoming entry
if localEntry.GetTimestamp().AsTime().Unix() > remoteEntry.GetTimestamp().AsTime().Unix() {
continue
}

// 6. if the existing key entry is older compared to the incoming entry, t
// hen add the incoming entry as part of the existing entries
localEntries[key] = remoteEntry
}

entries[remoteNodeState.GetNodeId()] = localEntries
}

// iterate the entries and build the new nodeState list
// iterate the entries and build the new remoteNodeState list
nodeStates := make([]*internalpb.NodeState, 0, len(entries))
for k, v := range entries {
nodeStates = append(nodeStates, &internalpb.NodeState{
Expand All @@ -142,7 +167,7 @@ func (d *Delegate) MergeRemoteState(buf []byte, join bool) {
})
}

// set the local node state with the new nodeState list
// set the local node state with the new remoteNodeState list
d.fsm.NodeStates = nodeStates
}

Expand All @@ -159,8 +184,9 @@ func (d *Delegate) Put(key string, value []byte) {
for k := range nodeState.GetEntries() {
if k == key {
nodeState.Entries[k] = &internalpb.Entry{
Value: value,
Archived: nil,
Value: value,
Archived: nil,
Timestamp: timestamppb.New(time.Now().UTC()),
}
keyExists = true
break
Expand Down Expand Up @@ -222,6 +248,7 @@ func (d *Delegate) Delete(key string) {
for k := range nodeState.GetEntries() {
if k == key && nodeState.GetNodeId() == d.me {
nodeState.Entries[key].Archived = lib.Ptr(true)
nodeState.Entries[key].Timestamp = timestamppb.New(time.Now().UTC())
d.fsm.NodeStates[index] = nodeState
return
}
Expand Down
14 changes: 7 additions & 7 deletions cluster/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ func TestNodes(t *testing.T) {
require.NotEmpty(t, actual)
require.Equal(t, value, actual)

// // let us remove the key
// require.NoError(t, node2.Client().Delete(ctx, key))
// let us remove the key
require.NoError(t, node2.Client().Delete(ctx, key))

// // wait a bit for consistency
// lib.Pause(2 * time.Second)
// wait a bit for consistency
lib.Pause(time.Second)

// exists, err = node3.Client().Exists(ctx, key)
// require.NoError(t, err)
// require.False(t, exists)
exists, err = node3.Client().Exists(ctx, key)
require.NoError(t, err)
require.False(t, exists)

lib.Pause(time.Second)

Expand Down
Loading

0 comments on commit a8da6df

Please sign in to comment.