Skip to content

Commit

Permalink
feat: add kubernetes discovery provider (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 14, 2024
1 parent 5f2702e commit 5234ba0
Show file tree
Hide file tree
Showing 7 changed files with 696 additions and 30 deletions.
34 changes: 20 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Simple Distributed in-memory key/value store. GoKV provides high availability and fault tolerance which makes it suitable large-scale applications system without sacrificing performance and reliability.
With GoKV, you can instantly create a fast, scalable, distributed system across a cluster of computers.


## Installation

```bash
Expand All @@ -17,21 +16,19 @@ go get github.com/tochemey/gokv
- Robust APIs to manipulate key/value pairs. See: [APIs](#apis)
- Discovery API to implement custom nodes discovery provider. See: [Discovery API](./discovery/provider.go)
- Comes bundled with some discovery providers that can help you hit the ground running:
- Kubernetes provider [Soon]
- [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is fully functional
- [NATS](https://nats.io/) [integration](./discovery/nats) is fully functional
- [Static](./discovery/static) is fully functional and for demo purpose
- [DNS](./discovery/dnssd) is fully functional
- Built-in [client](./cluster/client.go) to interact with the distributed store via the following apis:
- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `deleted`.
- `Exists`: check the existence of a given `key` in the cluster.

## APIs

- `Put`: create key/value pair that is eventually distributed in the cluster of nodes. The `key` is a string and the `value` is a byte array.
- `Get`: retrieves the value of a given `key` from the cluster of nodes.
- `Delete`: delete a given `key` from the cluster. At the moment the `key` is marked to be `deleted`.
- `Exists`: check the existence of a given `key` in the cluster.

## Discovery Providers
## Builtin Discovery Providers

### NATS Discovery Provider Setup
### NATS

To use the NATS discovery provider one needs to provide the following:

Expand All @@ -43,7 +40,7 @@ To use the NATS discovery provider one needs to provide the following:
- `DiscoveryPort`: the discovery port of the running node
- `Host`: the host address of the running node

#### DNS Provider Setup
### DNS

This provider performs nodes discovery based upon the domain name provided. This is very useful when doing local development
using docker.
Expand All @@ -53,7 +50,16 @@ To use the DNS discovery provider one needs to provide the following:
- `DomainName`: the domain name
- `IPv6`: it states whether to lookup for IPv6 addresses.

#### Static Provider Setup
### Static

This provider performs nodes discovery based upon the list of static hosts addresses.
The address of each host is the form of `host:port` where `port` is the discovery port.
The address of each host is the form of `host:port` where `port` is the discovery port.

### Kubernetes

To get the kubernetes discovery working as expected, the following need to be set in the manifest files:

- `Namespace`: the kubernetes namespace
- `DiscoveryPortName`: the discovery port name
- `PortName`: the client port name. This port is used by the built-in cluster client for the various operations on the key/value pair distributed store
- `PodLabels`: the POD labels
49 changes: 49 additions & 0 deletions discovery/kubernetes/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package kubernetes

import "github.com/tochemey/gokv/internal/validation"

// Config defines the kubernetes discovery configuration
type Config struct {
// Namespace specifies the kubernetes namespace
Namespace string
// DiscoveryPortName specifies the discovery port name
DiscoveryPortName string
// PortName specifies the client port name
PortName string
// PodLabels specifies the pod labels
PodLabels map[string]string
}

// Validate checks whether the given discovery configuration is valid
func (x Config) Validate() error {
return validation.New(validation.FailFast()).
AddValidator(validation.NewEmptyStringValidator("Namespace", x.Namespace)).
AddValidator(validation.NewEmptyStringValidator("DiscoveryPortName", x.DiscoveryPortName)).
AddValidator(validation.NewEmptyStringValidator("PortName", x.PortName)).
AddAssertion(len(x.PodLabels) > 0, "PodLabels are required").
Validate()
}
52 changes: 52 additions & 0 deletions discovery/kubernetes/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package kubernetes

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConfig(t *testing.T) {
t.Run("With valid configuration", func(t *testing.T) {
config := &Config{
Namespace: "namespace",
DiscoveryPortName: "discoveryPortName",
PortName: "portName",
PodLabels: map[string]string{
"label1": "value1",
},
}
assert.NoError(t, config.Validate())
})
t.Run("With invalid configuration", func(t *testing.T) {
config := &Config{
Namespace: "namespace",
DiscoveryPortName: "",
}
assert.Error(t, config.Validate())
})
}
182 changes: 182 additions & 0 deletions discovery/kubernetes/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package kubernetes

import (
"context"
"fmt"
"net"
"strconv"
"sync"

goset "github.com/deckarep/golang-set/v2"
"go.uber.org/atomic"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/strings/slices"

"github.com/tochemey/gokv/discovery"
)

// Discovery represents the kubernetes discovery
type Discovery struct {
config *Config
client kubernetes.Interface
mu sync.Mutex

stopChan chan struct{}
// states whether the actor system has started or not
initialized *atomic.Bool
}

// enforce compilation error
var _ discovery.Provider = &Discovery{}

// NewDiscovery returns an instance of the kubernetes discovery provider
func NewDiscovery(config *Config) *Discovery {
// create an instance of
discovery := &Discovery{
mu: sync.Mutex{},
stopChan: make(chan struct{}, 1),
initialized: atomic.NewBool(false),
config: config,
}

return discovery
}

// ID returns the discovery provider id
func (d *Discovery) ID() string {
return "kubernetes"
}

// Initialize initializes the plugin: registers some internal data structures, clients etc.
func (d *Discovery) Initialize() error {
d.mu.Lock()
defer d.mu.Unlock()

if d.initialized.Load() {
return discovery.ErrAlreadyInitialized
}

return d.config.Validate()
}

// Register registers this node to a service discovery directory.
func (d *Discovery) Register() error {
d.mu.Lock()
defer d.mu.Unlock()

if d.initialized.Load() {
return discovery.ErrAlreadyRegistered
}

config, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to get the in-cluster config of the kubernetes provider: %w", err)
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create the kubernetes client api: %w", err)
}

d.client = client
d.initialized = atomic.NewBool(true)
return nil
}

// Deregister removes this node from a service discovery directory.
func (d *Discovery) Deregister() error {
d.mu.Lock()
defer d.mu.Unlock()

if !d.initialized.Load() {
return discovery.ErrNotInitialized
}
d.initialized = atomic.NewBool(false)
close(d.stopChan)
return nil
}

// DiscoverPeers returns a list of known nodes.
func (d *Discovery) DiscoverPeers() ([]string, error) {
if !d.initialized.Load() {
return nil, discovery.ErrNotInitialized
}

ctx := context.Background()

pods, err := d.client.CoreV1().Pods(d.config.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(d.config.PodLabels).String(),
})

if err != nil {
return nil, err
}

validPortNames := []string{d.config.DiscoveryPortName, d.config.PortName}

// define the addresses list
addresses := goset.NewSet[string]()

MainLoop:
for _, pod := range pods.Items {
pod := pod

if pod.Status.Phase != corev1.PodRunning {
continue MainLoop
}
// If there is a Ready condition available, we need that to be true.
// If no ready condition is set, then we accept this pod regardless.
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
continue MainLoop
}
}

// iterate the pod containers and find the named port
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if !slices.Contains(validPortNames, port.Name) {
continue
}

if port.Name == d.config.DiscoveryPortName {
addresses.Add(net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(int(port.ContainerPort))))
}
}
}
}
return addresses.ToSlice(), nil
}

// Close closes the provider
func (d *Discovery) Close() error {
return nil
}
Loading

0 comments on commit 5234ba0

Please sign in to comment.