Skip to content

Commit

Permalink
refactor: simplify api
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 17, 2024
1 parent 2e4f72a commit 0f05cb9
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 300 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ jobs:
- name: Run tests
run: |
go-acc ./... -o coverage.out --ignore internal/internalpb,test/data/testpb -- -mod=vendor -race -v
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./coverage.out
fail_ci_if_error: false
verbose: true
lint:
runs-on: ubuntu-latest
timeout-minutes: 10
Expand Down
6 changes: 0 additions & 6 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ jobs:
- name: Run tests
run: |
go-acc ./... -o coverage.out --ignore internal/internalpb,test/data/testpb -- -mod=vendor -race -v
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./coverage.out
fail_ci_if_error: false
verbose: true
lint:
runs-on: ubuntu-latest
timeout-minutes: 10
Expand Down
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
## Go-KV

[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/Tochemey/gokv/build.yml)]((https://github.com/Tochemey/gokv/actions/workflows/build.yml))
[![codecov](https://codecov.io/gh/Tochemey/gokv/graph/badge.svg?token=L9KEvLkm09)](https://codecov.io/gh/Tochemey/gokv)

Simple Distributed in-memory key/value store. GoKV provides high availability and fault tolerance which makes it suitable large-scale applications system without sacrificing performance and reliability.
With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers.
Expand All @@ -24,11 +23,8 @@ go get github.com/tochemey/gokv
- [dns](./discovery/dnssd) is fully functional
- Built-in [client](./cluster/client.go) to interact with the distributed store via the following apis:
- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `PutProto`: to create a key/value pair where the value is a protocol buffer message
- `PutString`: to create a key/value pair where the value is a string
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `GetProto`: retrieves a protocol buffer message for a given `key`. This requires `PutProto` or `Put` to be used to set the value.
- `GetString`: retrieves a string value for a given `key`. This requires `PutString` or `Put` to be used to set the value.
- `List`: retrieves the list of key/value pairs in the cluster at a point in time
- `Exists`: check the existence of a given `key` in the cluster.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `archived`.

Expand Down
50 changes: 16 additions & 34 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"connectrpc.com/connect"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

"github.com/tochemey/gokv/internal/http"
"github.com/tochemey/gokv/internal/internalpb"
Expand All @@ -50,36 +49,22 @@ type Client struct {
}

// Put distributes the key/value pair in the cluster
func (client *Client) Put(ctx context.Context, key string, value []byte, expiration time.Duration) error {
func (client *Client) Put(ctx context.Context, entry *Entry, expiration time.Duration) error {
if !client.connected.Load() {
return ErrClientNotConnected
}

_, err := client.kvService.Put(ctx, connect.NewRequest(
&internalpb.PutRequest{
Key: key,
Value: value,
Key: entry.Key,
Value: entry.Value,
Expiry: setExpiry(expiration),
}))
return err
}

// PutProto creates a key/value pair where the value is a proto message and distributes in the cluster
func (client *Client) PutProto(ctx context.Context, key string, value proto.Message, expiration time.Duration) error {
bytea, err := proto.Marshal(value)
if err != nil {
return err
}
return client.Put(ctx, key, bytea, expiration)
}

// PutString creates a key/value pair where the value is a string and distributes in the cluster
func (client *Client) PutString(ctx context.Context, key string, value string, expiration time.Duration) error {
return client.Put(ctx, key, []byte(value), expiration)
}

// Get retrieves the value of the given key from the cluster
func (client *Client) Get(ctx context.Context, key string) ([]byte, error) {
func (client *Client) Get(ctx context.Context, key string) (*Entry, error) {
if !client.connected.Load() {
return nil, ErrClientNotConnected
}
Expand All @@ -96,28 +81,25 @@ func (client *Client) Get(ctx context.Context, key string) ([]byte, error) {
return nil, err
}

return response.Msg.GetValue(), nil
return fromNode(response.Msg.GetEntry()), nil
}

// GetProto retrieves the value of the given from the cluster as protocol buffer message
// Prior to calling this method one must set a proto message as the value of the key
func (client *Client) GetProto(ctx context.Context, key string, dst proto.Message) error {
bytea, err := client.Get(ctx, key)
if err != nil {
return err
// List returns the list of entries at a point in time
func (client *Client) List(ctx context.Context) ([]*Entry, error) {
if !client.connected.Load() {
return nil, ErrClientNotConnected
}
return proto.Unmarshal(bytea, dst)
}

// GetString retrieves the value of the given from the cluster as a string
// Prior to calling this method one must set a string as the value of the key
func (client *Client) GetString(ctx context.Context, key string) (string, error) {
bytea, err := client.Get(ctx, key)
response, err := client.kvService.List(ctx, connect.NewRequest(&internalpb.ListRequest{}))
if err != nil {
return "", err
return nil, err
}

return string(bytea), nil
entries := make([]*Entry, 0, len(response.Msg.GetEntries()))
for _, entry := range response.Msg.GetEntries() {
entries = append(entries, fromNode(entry))
}
return entries, nil
}

// Delete deletes a given key from the cluster
Expand Down
143 changes: 9 additions & 134 deletions cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package cluster

import (
"context"
"encoding/json"
"testing"
"time"

Expand All @@ -39,86 +38,7 @@ import (
)

func TestClient(t *testing.T) {
t.Run("With PutProto GetProto", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
srv := startNatsServer(t)
// create a cluster node1
node1, sd1 := startNode(t, srv.Addr().String())
require.NotNil(t, node1)

// create a cluster node2
node2, sd2 := startNode(t, srv.Addr().String())
require.NotNil(t, node2)

key := "my-key"
value := new(testpb.Hello)
err := node2.Client().PutProto(ctx, key, value, NoExpiration)
require.NoError(t, err)

// wait for the key to be distributed in the cluster
lib.Pause(time.Second)

// let us retrieve the key from the other nodes
exists, err := node1.Client().Exists(ctx, key)
require.NoError(t, err)
require.True(t, exists)

actual := &testpb.Hello{}
err = node1.Client().GetProto(ctx, key, actual)
require.NoError(t, err)

assert.True(t, proto.Equal(value, actual))

lib.Pause(time.Second)
t.Cleanup(func() {
assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd2.Close())
srv.Shutdown()
})
})
t.Run("With PutString GetString", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
srv := startNatsServer(t)
// create a cluster node1
node1, sd1 := startNode(t, srv.Addr().String())
require.NotNil(t, node1)

// create a cluster node2
node2, sd2 := startNode(t, srv.Addr().String())
require.NotNil(t, node2)

key := "my-key"
value := "my-value"
err := node2.Client().PutString(ctx, key, value, NoExpiration)
require.NoError(t, err)

// wait for the key to be distributed in the cluster
lib.Pause(time.Second)

// let us retrieve the key from the other nodes
exists, err := node1.Client().Exists(ctx, key)
require.NoError(t, err)
require.True(t, exists)

actual, err := node1.Client().GetString(ctx, key)
require.NoError(t, err)
require.NotEmpty(t, actual)
require.Equal(t, value, actual)

lib.Pause(time.Second)
t.Cleanup(func() {
assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd2.Close())
srv.Shutdown()
})
})
t.Run("With PutProto GetProto with expiration", func(t *testing.T) {
t.Run("With Put Get with expiration", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
srv := startNatsServer(t)
Expand All @@ -133,47 +53,14 @@ func TestClient(t *testing.T) {
expiration := 100 * time.Millisecond
key := "my-key"
value := &testpb.Hello{Name: key}
err := node2.Client().PutProto(ctx, key, value, expiration)
bytea, err := proto.Marshal(value)
require.NoError(t, err)
entry := &Entry{
Key: key,
Value: bytea,
}

// wait for the key to be distributed in the cluster
lib.Pause(time.Second)

// let us retrieve the key from the other nodes
exists, err := node1.Client().Exists(ctx, key)
require.NoError(t, err)
require.False(t, exists)

actual := &testpb.Hello{}
err = node1.Client().GetProto(ctx, key, actual)
require.Error(t, err)
assert.EqualError(t, err, ErrKeyNotFound.Error())

lib.Pause(time.Second)
t.Cleanup(func() {
assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd2.Close())
srv.Shutdown()
})
})
t.Run("With PutString GetString with expiration", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
srv := startNatsServer(t)
// create a cluster node1
node1, sd1 := startNode(t, srv.Addr().String())
require.NotNil(t, node1)

// create a cluster node2
node2, sd2 := startNode(t, srv.Addr().String())
require.NotNil(t, node2)

key := "my-key"
value := "my-value"
expiration := 100 * time.Millisecond
err := node2.Client().PutString(ctx, key, value, expiration)
err = node2.Client().Put(ctx, entry, expiration)
require.NoError(t, err)

// wait for the key to be distributed in the cluster
Expand All @@ -184,9 +71,9 @@ func TestClient(t *testing.T) {
require.NoError(t, err)
require.False(t, exists)

actual, err := node1.Client().GetString(ctx, key)
actual, err := node1.Client().Get(ctx, key)
require.Error(t, err)
require.Empty(t, actual)
require.Nil(t, actual)
assert.EqualError(t, err, ErrKeyNotFound.Error())

lib.Pause(time.Second)
Expand All @@ -199,15 +86,3 @@ func TestClient(t *testing.T) {
})
})
}

type testCodec struct{}

func (c *testCodec) Encode(t interface{}) ([]byte, error) {
return json.Marshal(t)
}

func (c *testCodec) Decode(bytea []byte) (interface{}, error) {
var t interface{}
err := json.Unmarshal(bytea, &t)
return t, err
}
Loading

0 comments on commit 0f05cb9

Please sign in to comment.