Skip to content

Commit

Permalink
refactor: client utility functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Oct 5, 2024
1 parent e5d6037 commit 3bff476
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 0 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ one can set the [`stateSyncInterval`](./cluster/config.go) value to a low value.
## Features
- Built-in [client](./cluster/client.go) to interact with the cluster via the following apis:
- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array. One can set an expiry to the key.
- `PutProto`: to create a key/value pair where the value is a protocol buffer message
- `PutString`: to create a key/value pair where the value is a string
- `PutAny`: to create a key/value pair with a given [`Codec`](./cluster/codec.go) to encode the value type.
- `Get`: retrieves the value of a given `key` from the cluster of nodes. This can return a false negative meaning that the key may exist but at the time of checking it is having yet to be replicated in the cluster.
- `GetProto`: retrieves a protocol buffer message for a given `key`. This requires `PutProto` or `Put` to be used to set the value.
- `GetString`: retrieves a string value for a given `key`. This requires `PutString` or `Put` to be used to set the value.
- `GetAny`: retrieves any value type for a given `key`. This requires `PutAny` to be used to set the value.
- `List`: retrieves the list of key/value pairs in the cluster at a point in time
- `Exists`: check the existence of a given `key` in the cluster. This can return a false negative meaning that the key may exist but at the time of checking it is having yet to be replicated in the cluster.
- `Delete`: delete a given `key` from the cluster. Node only deletes the key they own
Expand Down
60 changes: 60 additions & 0 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"connectrpc.com/connect"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

"github.com/tochemey/gokv/internal/http"
"github.com/tochemey/gokv/internal/internalpb"
Expand Down Expand Up @@ -63,6 +64,65 @@ func (client *Client) Put(ctx context.Context, entry *Entry, expiration time.Dur
return err
}

// PutProto creates a key/value pair where the value is a proto message and distributes in the cluster
func (client *Client) PutProto(ctx context.Context, key string, value proto.Message, expiration time.Duration) error {
bytea, err := proto.Marshal(value)
if err != nil {
return err
}

entry := &Entry{Key: key, Value: bytea}
return client.Put(ctx, entry, expiration)
}

// PutString creates a key/value pair where the value is a string and distributes in the cluster
func (client *Client) PutString(ctx context.Context, key string, value string, expiration time.Duration) error {
entry := &Entry{Key: key, Value: []byte(value)}
return client.Put(ctx, entry, expiration)
}

// PutAny distributes the key/value pair in the cluster.
// A binary encoder is required to properly encode the value.
func (client *Client) PutAny(ctx context.Context, key string, value any, expiration time.Duration, codec Codec) error {
bytea, err := codec.Encode(value)
if err != nil {
return err
}
entry := &Entry{Key: key, Value: bytea}
return client.Put(ctx, entry, expiration)
}

// GetProto retrieves the value of the given from the cluster as protocol buffer message
// Prior to calling this method one must set a proto message as the value of the key
func (client *Client) GetProto(ctx context.Context, key string, dst proto.Message) error {
entry, err := client.Get(ctx, key)
if err != nil {
return err
}
return proto.Unmarshal(entry.Value, dst)
}

// GetString retrieves the value of the given from the cluster as a string
// Prior to calling this method one must set a string as the value of the key
func (client *Client) GetString(ctx context.Context, key string) (string, error) {
entry, err := client.Get(ctx, key)
if err != nil {
return "", err
}

return string(entry.Value), nil
}

// GetAny retrieves the value of the given from the cluster
// Prior to calling this method one must set a string as the value of the key
func (client *Client) GetAny(ctx context.Context, key string, codec Codec) (any, error) {
entry, err := client.Get(ctx, key)
if err != nil {
return nil, err
}
return codec.Decode(entry.Value)
}

// Get retrieves the value of the given key from the cluster
func (client *Client) Get(ctx context.Context, key string) (*Entry, error) {
if !client.connected.Load() {
Expand Down
140 changes: 140 additions & 0 deletions cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package cluster

import (
"context"
"encoding/json"
"testing"
"time"

Expand All @@ -38,6 +39,134 @@ import (
)

func TestClient(t *testing.T) {
t.Run("With PutProto GetProto", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
srv := startNatsServer(t)
// create a cluster node1
node1, sd1 := startNode(t, srv.Addr().String())
require.NotNil(t, node1)
// create a cluster node2
node2, sd2 := startNode(t, srv.Addr().String())
require.NotNil(t, node2)
key := "my-key"
value := new(testpb.Hello)
err := node2.Client().PutProto(ctx, key, value, NoExpiration)
require.NoError(t, err)
// wait for the key to be distributed in the cluster
lib.Pause(time.Second)
// let us retrieve the key from the other nodes
exists, err := node1.Client().Exists(ctx, key)
require.NoError(t, err)
require.True(t, exists)
actual := &testpb.Hello{}
err = node1.Client().GetProto(ctx, key, actual)
require.NoError(t, err)
assert.True(t, proto.Equal(value, actual))
lib.Pause(time.Second)

assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd2.Close())
srv.Shutdown()
})
t.Run("With PutString GetString", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
srv := startNatsServer(t)
// create a cluster node1
node1, sd1 := startNode(t, srv.Addr().String())
require.NotNil(t, node1)
// create a cluster node2
node2, sd2 := startNode(t, srv.Addr().String())
require.NotNil(t, node2)
key := "my-key"
value := "my-value"
err := node2.Client().PutString(ctx, key, value, NoExpiration)
require.NoError(t, err)
// wait for the key to be distributed in the cluster
lib.Pause(time.Second)
// let us retrieve the key from the other nodes
exists, err := node1.Client().Exists(ctx, key)
require.NoError(t, err)
require.True(t, exists)
actual, err := node1.Client().GetString(ctx, key)
require.NoError(t, err)
require.NotEmpty(t, actual)
require.Equal(t, value, actual)
lib.Pause(time.Second)

assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd2.Close())
srv.Shutdown()
})
t.Run("With PutProto GetProto with expiration", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
srv := startNatsServer(t)
// create a cluster node1
node1, sd1 := startNode(t, srv.Addr().String())
require.NotNil(t, node1)
// create a cluster node2
node2, sd2 := startNode(t, srv.Addr().String())
require.NotNil(t, node2)
expiration := 100 * time.Millisecond
key := "my-key"
value := &testpb.Hello{Name: key}
err := node2.Client().PutProto(ctx, key, value, expiration)
require.NoError(t, err)
// wait for the key to be distributed in the cluster
lib.Pause(time.Second)
// let us retrieve the key from the other nodes
exists, err := node1.Client().Exists(ctx, key)
require.NoError(t, err)
require.False(t, exists)
actual := &testpb.Hello{}
err = node1.Client().GetProto(ctx, key, actual)
require.Error(t, err)
assert.EqualError(t, err, ErrKeyNotFound.Error())
lib.Pause(time.Second)
assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd2.Close())
srv.Shutdown()
})
t.Run("With PutString GetString with expiration", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
srv := startNatsServer(t)
// create a cluster node1
node1, sd1 := startNode(t, srv.Addr().String())
require.NotNil(t, node1)
// create a cluster node2
node2, sd2 := startNode(t, srv.Addr().String())
require.NotNil(t, node2)
key := "my-key"
value := "my-value"
expiration := 100 * time.Millisecond
err := node2.Client().PutString(ctx, key, value, expiration)
require.NoError(t, err)
// wait for the key to be distributed in the cluster
lib.Pause(time.Second)
// let us retrieve the key from the other nodes
exists, err := node1.Client().Exists(ctx, key)
require.NoError(t, err)
require.False(t, exists)
actual, err := node1.Client().GetString(ctx, key)
require.Error(t, err)
require.Empty(t, actual)
assert.EqualError(t, err, ErrKeyNotFound.Error())
lib.Pause(time.Second)
assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node2.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd2.Close())
srv.Shutdown()
})
t.Run("With Put Get with expiration", func(t *testing.T) {
ctx := context.Background()
// start the NATS server
Expand Down Expand Up @@ -86,3 +215,14 @@ func TestClient(t *testing.T) {
})
})
}

type testCodec struct{}

func (c *testCodec) Encode(t interface{}) ([]byte, error) {
return json.Marshal(t)
}
func (c *testCodec) Decode(bytea []byte) (interface{}, error) {
var t interface{}
err := json.Unmarshal(bytea, &t)
return t, err
}
33 changes: 33 additions & 0 deletions cluster/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* MIT License
*
* Copyright (c) 2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package cluster

// Codec will be implemented to encode and decode message
type Codec interface {
// Encode encodes the receiver into a binary form and returns the result.
Encode(any) ([]byte, error)
// Decode decodes a binary message
Decode([]byte) (any, error)
}

0 comments on commit 3bff476

Please sign in to comment.