Skip to content

Commit

Permalink
feat: add additional utility methods to client code (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 16, 2024
1 parent 0a7ab69 commit d74d770
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 19 deletions.
38 changes: 22 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,45 +1,51 @@
# Go-KV
## Go-KV

[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/Tochemey/gokv/build.yml)]((https://github.com/Tochemey/gokv/actions/workflows/build.yml))

Simple Distributed in-memory key/value store. GoKV provides high availability and fault tolerance which makes it suitable large-scale applications system without sacrificing performance and reliability.
With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers.

## Installation
### Installation

```bash
go get github.com/tochemey/gokv
```

## Features
### Features

- Go-KV uses the push-pull anti-entropy method to replicate nodes' state across the cluster. This approach makes Go-KV eventually consistent.
One can set the [`stateSyncInterval`](./cluster/config.go) value to low for frequent state synchronisation at a network cost.
- Discovery API to implement custom nodes discovery provider. See: [Discovery API](./discovery/provider.go)
- Comes bundled with some discovery providers that can help you hit the ground running:
- [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is fully functional
- [NATS](https://nats.io/) [integration](./discovery/nats) is fully functional
- [Static](./discovery/static) is fully functional and for demo purpose
- [DNS](./discovery/dnssd) is fully functional
- Built-in [Client](./cluster/client.go) to interact with the distributed store via the following apis:
- [nats](https://nats.io/) [integration](./discovery/nats) is fully functional
- [static](./discovery/static) is fully functional and for demo purpose
- [dns](./discovery/dnssd) is fully functional
- Built-in [client](./cluster/client.go) to interact with the distributed store via the following apis:
- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `PutProto`: to create a key/value pair where the value is a protocol buffer message
- `PutString`: to create a key/value pair where the value is a string
- `PutAny`: to create a key/value pair with a given [`Codec`](./cluster/codec.go) to encode the value type.
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `GetProto`: retrieves a protocol buffer message for a given `key`. This requires `PutProto` or `Put` to be used to set the value.
- `GetString`: retrieves a string value for a given `key`. This requires `PutString` or `Put` to be used to set the value.
- `GetAny`: retrieves any value type for a given `key`. This requires `PutAny` to be used to set the value.
- `Exists`: check the existence of a given `key` in the cluster.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `archived`.

## Use Cases
### Use Cases

- Distributed cache

## Example
### Example

There is an example on how to use it with NATs [here](./example/example.go)

## Builtin Discovery
### Builtin Discovery

### NATS
#### nats

To use the NATS discovery provider one needs to provide the following:
To use the [nats](https://nats.io/) discovery provider one needs to provide the following:

- `Server`: the NATS Server address
- `Subject`: the NATS subject to use
Expand All @@ -49,7 +55,7 @@ To use the NATS discovery provider one needs to provide the following:
- `DiscoveryPort`: the discovery port of the running node
- `Host`: the host address of the running node

### DNS
#### dns

This provider performs nodes discovery based upon the domain name provided. This is very useful when doing local development
using docker.
Expand All @@ -59,14 +65,14 @@ To use the DNS discovery provider one needs to provide the following:
- `DomainName`: the domain name
- `IPv6`: it states whether to lookup for IPv6 addresses.

### Static
#### static

This provider performs nodes discovery based upon the list of static hosts addresses.
The address of each host is the form of `host:port` where `port` is the discovery port.

### Kubernetes
#### kubernetes

To get the kubernetes discovery working as expected, the following need to be set in the manifest files:
To get the [kubernetes](https://kubernetes.io/docs/home/) discovery working as expected, the following need to be set in the manifest files:

- `Namespace`: the kubernetes namespace
- `DiscoveryPortName`: the discovery port name
Expand Down
60 changes: 58 additions & 2 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"connectrpc.com/connect"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

"github.com/tochemey/gokv/internal/http"
"github.com/tochemey/gokv/internal/internalpb"
Expand Down Expand Up @@ -61,6 +62,30 @@ func (client *Client) Put(ctx context.Context, key string, value []byte, expirat
return err
}

// PutProto creates a key/value pair where the value is a proto message and distributes in the cluster
func (client *Client) PutProto(ctx context.Context, key string, value proto.Message, expiration time.Duration) error {
bytea, err := proto.Marshal(value)
if err != nil {
return err
}
return client.Put(ctx, key, bytea, expiration)
}

// PutString creates a key/value pair where the value is a string and distributes in the cluster
func (client *Client) PutString(ctx context.Context, key string, value string, expiration time.Duration) error {
return client.Put(ctx, key, []byte(value), expiration)
}

// PutAny distributes the key/value pair in the cluster.
// A binary encoder is required to properly encode the value.
func (client *Client) PutAny(ctx context.Context, key string, value any, expiration time.Duration, codec Codec) error {
bytea, err := codec.Encode(value)
if err != nil {
return err
}
return client.Put(ctx, key, bytea, expiration)
}

// Get retrieves the value of the given key from the cluster
func (client *Client) Get(ctx context.Context, key string) ([]byte, error) {
if !client.connected.Load() {
Expand All @@ -82,6 +107,37 @@ func (client *Client) Get(ctx context.Context, key string) ([]byte, error) {
return response.Msg.GetValue(), nil
}

// GetProto retrieves the value of the given from the cluster as protocol buffer message
// Prior to calling this method one must set a proto message as the value of the key
func (client *Client) GetProto(ctx context.Context, key string, dst proto.Message) error {
bytea, err := client.Get(ctx, key)
if err != nil {
return err
}
return proto.Unmarshal(bytea, dst)
}

// GetString retrieves the value of the given from the cluster as a string
// Prior to calling this method one must set a string as the value of the key
func (client *Client) GetString(ctx context.Context, key string, dst string) error {
bytea, err := client.Get(ctx, key)
if err != nil {
return err
}
dst = string(bytea)
return nil
}

// GetAny retrieves the value of the given from the cluster
// Prior to calling this method one must set a string as the value of the key
func (client *Client) GetAny(ctx context.Context, key string, codec Codec) (any, error) {
bytea, err := client.Get(ctx, key)
if err != nil {
return nil, err
}
return codec.Decode(bytea)
}

// Delete deletes a given key from the cluster
// nolint
func (client *Client) Delete(ctx context.Context, key string) error {
Expand Down Expand Up @@ -114,8 +170,8 @@ func (client *Client) Exists(ctx context.Context, key string) (bool, error) {
return response.Msg.GetExists(), nil
}

// Close closes the client connection to the cluster
func (client *Client) Close() error {
// close closes the client connection to the cluster
func (client *Client) close() error {
// no-op when the client is not connected
if !client.connected.Load() {
return nil
Expand Down
33 changes: 33 additions & 0 deletions cluster/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* MIT License
*
* Copyright (c) 2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package cluster

// Codec will be implemented to encode and decode message
type Codec interface {
// Encode encodes the receiver into a binary form and returns the result.
Encode(any) ([]byte, error)
// Decode decodes a binary message
Decode([]byte) (any, error)
}
2 changes: 1 addition & 1 deletion cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (node *Node) Stop(ctx context.Context) error {

if err := errorschain.
New(errorschain.ReturnFirst()).
AddError(node.clusterClient.Close()).
AddError(node.clusterClient.close()).
AddError(node.memberlist.Leave(node.config.shutdownTimeout)).
AddError(node.config.provider.Deregister()).
AddError(node.config.provider.Close()).
Expand Down

0 comments on commit d74d770

Please sign in to comment.