Skip to content

Commit

Permalink
feat: enable GCM authentication encryption (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Oct 27, 2024
1 parent 3bff476 commit b4d955c
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 25 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ go get github.com/tochemey/gokv
Go-KV is designed to distribute key/value pair in a cluster of computers using push-pull anti-entropy method to replicate nodes' state across the cluster.
When a data entry is changed on a node the full state of that entry is replicated to other nodes.
This approach makes Go-KV eventually consistent. However, at some point in time the cluster will be in complete synchronised state. For frequent state synchronisation
one can set the [`stateSyncInterval`](./cluster/config.go) value to a low value. The downside of a low value is that it will increase network traffic.
one can set the [`syncInterval`](./cluster/config.go) value to a low value. The downside of a low value is that it will increase network traffic.

## Features
- Built-in [client](./cluster/client.go) to interact with the cluster via the following apis:
Expand All @@ -32,7 +32,7 @@ one can set the [`stateSyncInterval`](./cluster/config.go) value to a low value.
- `List`: retrieves the list of key/value pairs in the cluster at a point in time
- `Exists`: check the existence of a given `key` in the cluster. This can return a false negative meaning that the key may exist but at the time of checking it is having yet to be replicated in the cluster.
- `Delete`: delete a given `key` from the cluster. Node only deletes the key they own
- Built-in janitor to remove expired entries. One can set the janitor execution interval. Bearing in mind of the eventual consistency of the Go-KV, one need to set that interval taking into consideration the [`stateSyncInterval`](./cluster/config.go)
- Built-in janitor to remove expired entries. One can set the janitor execution interval. Bearing in mind of the eventual consistency of the Go-KV, one need to set that interval taking into consideration the [`syncInterval`](./cluster/config.go)
- Discovery API to implement custom nodes discovery provider. See: [discovery](./discovery/provider.go)
- Comes bundled with some discovery providers that can help you hit the ground running:
- [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is fully functional
Expand Down
27 changes: 19 additions & 8 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ type Config struct {
// quite expensive relative to standard gossiped messages.
// Setting this interval lower (more frequent) will increase convergence
// speeds across larger clusters at the expense of increased bandwidth usage.
stateSyncInterval time.Duration

syncInterval time.Duration
// specifies the interval at which deleted or dead keys will be completely removed from the system
cleanerJobInterval time.Duration
readTimeout time.Duration
// specifies the read timeout. This is how long to wait before timing out when reading
// a given key
readTimeout time.Duration
// security specifies whether to encrypt the data
security *Security
}

// enforce compilation error
Expand All @@ -77,7 +81,7 @@ func NewConfig() *Config {
maxJoinAttempts: 5,
joinRetryInterval: time.Second,
shutdownTimeout: 3 * time.Second,
stateSyncInterval: time.Minute,
syncInterval: time.Minute,
logger: log.New(log.ErrorLevel, os.Stderr),
readTimeout: time.Second,
}
Expand Down Expand Up @@ -131,9 +135,9 @@ func (config *Config) WithHost(host string) *Config {
return config
}

// WithStateSyncInterval sets the delegate sync interval
func (config *Config) WithStateSyncInterval(interval time.Duration) *Config {
config.stateSyncInterval = interval
// WithSyncInterval sets the delegate sync interval
func (config *Config) WithSyncInterval(interval time.Duration) *Config {
config.syncInterval = interval
return config
}

Expand All @@ -151,6 +155,12 @@ func (config *Config) WithCleanerJobInterval(interval time.Duration) *Config {
return config
}

// WithSecurity sets the security of the config
func (config *Config) WithSecurity(security *Security) *Config {
config.security = security
return config
}

// Validate implements validation.Validator.
func (config *Config) Validate() error {
return validation.
Expand All @@ -159,7 +169,8 @@ func (config *Config) Validate() error {
AddAssertion(config.joinRetryInterval > 0, "join retry interval is invalid").
AddAssertion(config.shutdownTimeout > 0, "shutdown timeout is invalid").
AddAssertion(config.maxJoinAttempts > 0, "max join attempts is invalid").
AddAssertion(config.stateSyncInterval > 0, "stateSync interval is invalid").
AddAssertion(config.syncInterval > 0, "stateSync interval is invalid").
AddValidator(validation.NewEmptyStringValidator("host", config.host)).
AddValidator(validation.NewConditionalValidator(config.security != nil, config.security)).
Validate()
}
22 changes: 11 additions & 11 deletions cluster/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryPort(1235).
WithDiscoveryProvider(discovery).
WithHost("127.0.0.1").
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithLogger(log.DiscardLogger).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
Expand All @@ -55,7 +55,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryPort(1235).
WithDiscoveryProvider(discovery).
WithHost(""). // host not provided will return an error
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithLogger(log.DiscardLogger).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
Expand All @@ -69,7 +69,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryPort(1235).
WithDiscoveryProvider(discovery).
WithHost(""). // host not provided will return an error
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithLogger(log.DiscardLogger).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
Expand All @@ -83,7 +83,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryPort(1235).
WithDiscoveryProvider(discovery).
WithHost(""). // host not provided will return an error
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithLogger(log.DiscardLogger).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
Expand All @@ -97,7 +97,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryPort(1235).
WithDiscoveryProvider(discovery).
WithHost(""). // host not provided will return an error
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithLogger(log.DiscardLogger).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
Expand All @@ -111,7 +111,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryPort(1235).
WithDiscoveryProvider(discovery).
WithHost("").
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithLogger(log.DiscardLogger).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
Expand All @@ -127,7 +127,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryPort(1235).
WithDiscoveryProvider(nil).
WithHost("127.0.0.1").
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithLogger(log.DiscardLogger).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
Expand All @@ -144,7 +144,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryProvider(discovery).
WithHost("127.0.0.1").
WithLogger(log.DiscardLogger).
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithJoinRetryInterval(-1).
WithShutdownTimeout(time.Second).
WithReadTimeout(time.Second)
Expand All @@ -160,7 +160,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryProvider(discovery).
WithHost("127.0.0.1").
WithLogger(log.DiscardLogger).
WithStateSyncInterval(-1).
WithSyncInterval(-1).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
WithReadTimeout(time.Second)
Expand All @@ -176,7 +176,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryProvider(discovery).
WithHost("127.0.0.1").
WithLogger(log.DiscardLogger).
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(-1).
WithReadTimeout(time.Second)
Expand All @@ -192,7 +192,7 @@ func TestConfig(t *testing.T) {
WithDiscoveryProvider(discovery).
WithHost("127.0.0.1").
WithLogger(log.DiscardLogger).
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithJoinRetryInterval(time.Second).
WithShutdownTimeout(time.Second).
WithMaxJoinAttempts(-1).
Expand Down
9 changes: 8 additions & 1 deletion cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,14 @@ func NewNode(config *Config) *Node {
mconfig.AdvertisePort = mconfig.BindPort
mconfig.LogOutput = newLogWriter(config.logger)
mconfig.Name = net.JoinHostPort(mconfig.BindAddr, strconv.Itoa(mconfig.BindPort))
mconfig.PushPullInterval = config.stateSyncInterval
mconfig.PushPullInterval = config.syncInterval

if config.security != nil {
mconfig.Label = config.security.cookie
mconfig.SecretKey = config.security.encryptionKey
mconfig.GossipVerifyIncoming = true
mconfig.GossipVerifyOutgoing = true
}

meta := &internalpb.NodeMeta{
Name: mconfig.Name,
Expand Down
7 changes: 5 additions & 2 deletions cluster/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func startNatsServer(t *testing.T) *natsserver.Server {

func startNode(t *testing.T, serverAddr string) (*Node, discovery.Provider) {
ctx := context.TODO()
logger := log.DiscardLogger
logger := log.DefaultLogger

// generate the ports for the single startNode
nodePorts := dynaport.Get(2)
Expand All @@ -227,6 +227,8 @@ func startNode(t *testing.T, serverAddr string) (*Node, discovery.Provider) {
host := "127.0.0.1"
// create the various config option
natsSubject := "some-subject"
cookie := "cookie"
secretKey := []byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
// create the config
config := nats.Config{
Server: fmt.Sprintf("nats://%s", serverAddr),
Expand All @@ -245,9 +247,10 @@ func startNode(t *testing.T, serverAddr string) (*Node, discovery.Provider) {
shutdownTimeout: time.Second,
logger: logger,
host: host,
stateSyncInterval: 500 * time.Millisecond,
syncInterval: 500 * time.Millisecond,
joinRetryInterval: 500 * time.Millisecond,
maxJoinAttempts: 5,
security: NewSecurity(cookie, secretKey),
})

// start the node
Expand Down
72 changes: 72 additions & 0 deletions cluster/security.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* MIT License
*
* Copyright (c) 2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package cluster

import (
"errors"

"github.com/tochemey/gokv/internal/validation"
)

// Security defines measures needed to protect and guarantee
// the authenticity of data shared in the cluster
type Security struct {
// cookie is a set of bytes to use as authentication label
// This has to be the same within the cluster to ensure smooth GCM authenticated data
// reference: https://en.wikipedia.org/wiki/Galois/Counter_Mode
cookie string
// encryptionKey is used to encrypt messages and decrypt messages. Providing a
// value for this will enable message-level encryption and
// verification.
//
// The value should be either 16, 24, or 32 bytes to select AES-128,
// AES-192, or AES-256.
encryptionKey []byte
}

// NewSecurity creates an instance of Security by providing the cookie and the encryption key
//
// cookie is a set of bytes to use as authentication label
// encryptionKey is used to encrypt messages and decrypt messages. Providing a
// value for this will enable message-level encryption and
// verification. The cookie has to be the same within the cluster to ensure smooth GCM authenticated data
// reference: https://en.wikipedia.org/wiki/Galois/Counter_Mode
//
// The value should be either 16, 24, or 32 bytes to select AES-128,
// AES-192, or AES-256.
func NewSecurity(cookie string, privateKey []byte) *Security {
return &Security{cookie: cookie, encryptionKey: privateKey}
}

// enforce compilation error
var _ validation.Validator = (*Security)(nil)

// Validate validates the security object
func (sec *Security) Validate() error {
if l := len(sec.encryptionKey); l != 16 && l != 24 && l != 32 {
return errors.New("key size must be 16, 24 or 32 bytes")
}
return nil
}
2 changes: 1 addition & 1 deletion example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func main() {
WithDiscoveryPort(discoveryPort).
WithDiscoveryProvider(discovery).
WithHost(host).
WithStateSyncInterval(time.Second).
WithSyncInterval(time.Second).
WithLogger(logger)

// create an instance of a node
Expand Down

0 comments on commit b4d955c

Please sign in to comment.