Skip to content

Commit

Permalink
fix(ipoperator): incorporate fixes from akash repo
Browse files Browse the repository at this point in the history
refs ovrclk/engineering#379

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
tidrolpolelsef authored and troian committed Jul 20, 2022
1 parent 442eb54 commit 8c5bb3e
Show file tree
Hide file tree
Showing 18 changed files with 207 additions and 39 deletions.
9 changes: 9 additions & 0 deletions _docs/kustomize/akash-ip-operator/cluster_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ rules:
- apiGroups: ["akash.network"]
resources: ["providerleasedips"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: akash-ip-op-watch-configmaps
rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list", "watch"]
1 change: 1 addition & 0 deletions _docs/kustomize/akash-ip-operator/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: akash-ip-operator
namespace: akash-services
labels:
akash.network/component: akash-ip-operator
spec:
Expand Down
1 change: 1 addition & 0 deletions _docs/kustomize/akash-ip-operator/ingress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: akash-ip-operator
namespace: akash-services
annotations:
nginx.ingress.kubernetes.io/proxy-send-timeout: "60"
nginx.ingress.kubernetes.io/proxy-read-timeout: "60"
Expand Down
3 changes: 3 additions & 0 deletions _docs/kustomize/akash-ip-operator/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

resources:
- deployment.yaml
- service.yaml
Expand Down
15 changes: 15 additions & 0 deletions _docs/kustomize/akash-ip-operator/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ metadata:
subjects:
- kind: ServiceAccount
name: akash-ip-operator
namespace: akash-services
roleRef:
kind: ClusterRole
name: akash-ip-op-manage-service
Expand All @@ -17,7 +18,21 @@ metadata:
subjects:
- kind: ServiceAccount
name: akash-ip-operator
namespace: akash-services
roleRef:
kind: ClusterRole
name: akash-ip-op-watch-providerleasedip
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: akash-ip-operator-watch-configmaps
subjects:
- kind: ServiceAccount
name: akash-ip-operator
namespace: akash-services
roleRef:
kind: ClusterRole
name: akash-ip-op-watch-configmaps
apiGroup: rbac.authorization.k8s.io
1 change: 1 addition & 0 deletions _docs/kustomize/akash-ip-operator/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apiVersion: v1
kind: Service
metadata:
name: akash-ip-operator
namespace: akash-services
spec:
selector:
akash.network/component: akash-ip-operator
Expand Down
1 change: 1 addition & 0 deletions _docs/kustomize/akash-ip-operator/service_account.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ apiVersion: v1
kind: ServiceAccount
metadata:
name: akash-ip-operator
namespace: akash-services
3 changes: 1 addition & 2 deletions _docs/provider/kube/metallb-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ spec:
- from:
- namespaceSelector:
matchLabels:
name: akash-services
kubernetes.io/metadata.name: ingress-nginx
kubernetes.io/metadata.name: akash-services
podSelector:
matchLabels: {}
2 changes: 1 addition & 1 deletion cluster/kube/cluster_ip_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *client) PurgeDeclaredIP(ctx context.Context, leaseID mtypes.LeaseID, se

func (c *client) DeclareIP(ctx context.Context, lID mtypes.LeaseID, serviceName string, port uint32, externalPort uint32, proto manifest.ServiceProtocol, sharingKey string, overwrite bool) error {
// Note: This interface expects sharing key to contain a value that is unique per deployment owner, in this
// case it is the bech32 address
// case it is the bech32 address, or a derivative thereof
resourceName := strings.ToLower(fmt.Sprintf("%s-%s-%d", sharingKey, proto.ToString(), externalPort))

c.log.Debug("checking for resource", "resource-name", resourceName)
Expand Down
69 changes: 60 additions & 9 deletions cluster/kube/metallb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"net/http"
"strings"
"sync"

manifest "github.com/ovrclk/akash/manifest/v2beta1"
mtypes "github.com/ovrclk/akash/x/market/types/v1beta2"
Expand Down Expand Up @@ -59,6 +60,7 @@ type Client interface {
CreateIPPassthrough(ctx context.Context, directive ctypes.ClusterIPPassthroughDirective) error
PurgeIPPassthrough(ctx context.Context, directive ctypes.ClusterIPPassthroughDirective) error
GetIPPassthroughs(ctx context.Context) ([]v1beta2.IPPassthrough, error)
DetectPoolChanges(ctx context.Context) (<-chan struct{}, error)

Stop()
}
Expand All @@ -70,6 +72,7 @@ type client struct {

sda clusterutil.ServiceDiscoveryAgent
client clusterutil.ServiceClient
l sync.Locker

poolName string
}
Expand Down Expand Up @@ -104,8 +107,8 @@ func NewClient(configPath string, logger log.Logger, poolName string, endpoint *
sda: sda,
kube: kc,
poolName: poolName,

log: logger.With("client", "metallb"),
l: &sync.Mutex{},
log: logger.With("client", "metallb"),
}, nil

}
Expand All @@ -114,6 +117,17 @@ func (c *client) Stop() {
c.sda.Stop()
}

func (c *client) setupClient(ctx context.Context) error {
c.l.Lock()
defer c.l.Unlock()
if c.client != nil {
return nil
}
var err error
c.client, err = c.sda.GetClient(ctx, false, false)
return err
}

/*
can get stuff like this to access metal lb metrics
75 nslookup -type=SRV _monitoring._tcp.
Expand All @@ -123,12 +137,9 @@ can get stuff like this to access metal lb metrics
*/

func (c *client) GetIPAddressUsage(ctx context.Context) (uint, uint, error) {
if c.client == nil {
var err error
c.client, err = c.sda.GetClient(ctx, false, false)
if err != nil {
return math.MaxUint32, math.MaxUint32, err
}
err := c.setupClient(ctx)
if err != nil {
return math.MaxUint32, math.MaxUint32, err
}

request, err := c.client.CreateRequest(ctx, http.MethodGet, metricsPath, nil)
Expand Down Expand Up @@ -164,6 +175,7 @@ func (c *client) GetIPAddressUsage(ctx context.Context) (uint, uint, error) {
setAvailable := false
inUse := uint(0)
setInUse := false
poolsFound := make(map[string]struct{})
for _, entry := range mf {
if setInUse && setAvailable {
break
Expand Down Expand Up @@ -194,6 +206,9 @@ func (c *client) GetIPAddressUsage(ctx context.Context) (uint, uint, error) {
continue
}

// Record all pool names found, for debugging purposes
poolsFound[labelEntry.GetValue()] = struct{}{}

if labelEntry.GetValue() != c.poolName {
continue
}
Expand All @@ -206,7 +221,11 @@ func (c *client) GetIPAddressUsage(ctx context.Context) (uint, uint, error) {
}

if !setInUse || !setAvailable {
return math.MaxUint32, math.MaxUint32, fmt.Errorf("%w: data not found in metrics response", errMetalLB)
if len(poolsFound) == 0 {
c.log.Debug("no pools configured on Metal LB")
} else {
c.log.Debug("pools configured on Metal LB, but none matching", "configured-pool-name", c.poolName, "quantity-configured", len(poolsFound))
}
}

return inUse, available, nil
Expand Down Expand Up @@ -489,6 +508,38 @@ func (c *client) GetIPPassthroughs(ctx context.Context) ([]v1beta2.IPPassthrough
return result, err
}

func (c *client) DetectPoolChanges(ctx context.Context) (<-chan struct{}, error) {
const metalLBNamespace = "metallb-system"
watcher, err := c.kube.CoreV1().ConfigMaps(metalLBNamespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

output := make(chan struct{}, 1)
go func() {
defer close(output)
for {
select {
case <-ctx.Done():
err = ctx.Err()
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
c.log.Error("failed watching metal LB config map changes", "err", err)
}
return
case ev, ok := <-watcher.ResultChan():
if !ok { // Channel closed when an error happens
return
}
// Do not log the whole event, it is too verbose
c.log.Debug("metal LB config change event", "event-type", ev.Type)
output <- struct{}{}
}
}
}()

return output, nil
}

type ipPassthrough struct {
lID mtypes.LeaseID
serviceName string
Expand Down
23 changes: 23 additions & 0 deletions cluster/mocks/metallb_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions cluster/operatorclients/hostname_operator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net"
"net/http"
"sync"

"github.com/tendermint/tendermint/libs/log"
"k8s.io/client-go/rest"
Expand All @@ -28,6 +29,7 @@ type hostnameOperatorClient struct {
sda clusterutil.ServiceDiscoveryAgent
client clusterutil.ServiceClient
log log.Logger
l sync.Locker
}

func NewHostnameOperatorClient(logger log.Logger, kubeConfig *rest.Config, endpoint *net.SRV) (HostnameOperatorClient, error) {
Expand All @@ -39,11 +41,15 @@ func NewHostnameOperatorClient(logger log.Logger, kubeConfig *rest.Config, endpo
return &hostnameOperatorClient{
log: logger.With("operator", "hostname"),
sda: sda,
l: &sync.Mutex{},
}, nil

}

func (hopc *hostnameOperatorClient) newRequest(ctx context.Context, method string, path string, body io.Reader) (*http.Request, error) {
hopc.l.Lock()
defer hopc.l.Unlock()

if nil == hopc.client {
var err error
hopc.client, err = hopc.sda.GetClient(ctx, false, false)
Expand Down
8 changes: 5 additions & 3 deletions cluster/util/ip_sharing_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"crypto/sha256"
"encoding/base32"
"fmt"
mtypes "github.com/ovrclk/akash/x/market/types/v1beta2"
"io"
"regexp"
"strings"

mtypes "github.com/ovrclk/akash/x/market/types/v1beta2"
)

var allowedIPEndpointNameRegex = regexp.MustCompile(`^[a-z0-9\-]+$`)

func MakeIPSharingKey(lID mtypes.LeaseID, endpointName string) string {
allowedRegex := regexp.MustCompile(`[a-z0-9\-]+`)
effectiveName := endpointName
if !allowedRegex.MatchString(endpointName) {
if !allowedIPEndpointNameRegex.MatchString(endpointName) {
h := sha256.New()
_, err := io.WriteString(h, endpointName)
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions cluster/util/ip_sharing_key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package util_test

import (
"testing"

"github.com/ovrclk/akash/testutil"
"github.com/stretchr/testify/require"

"github.com/ovrclk/provider-services/cluster/util"
)

func TestPassesThroughNames(t *testing.T) {
leaseID := testutil.LeaseID(t)

sharingKey := util.MakeIPSharingKey(leaseID, "foobar")
require.Contains(t, sharingKey, "foobar")
}

func TestFiltersUnderscore(t *testing.T) {
leaseID := testutil.LeaseID(t)

sharingKey := util.MakeIPSharingKey(leaseID, "me_you")
require.NotContains(t, sharingKey, "me_you")
}

func TestFiltersUppercase(t *testing.T) {
leaseID := testutil.LeaseID(t)

sharingKey := util.MakeIPSharingKey(leaseID, "meYOU")
require.NotContains(t, sharingKey, "meYOU")

require.Equal(t, sharingKey, leaseID.GetOwner()+"-ip-ps9pn7rkocct7m9ivtovuktb")
}
Loading

0 comments on commit 8c5bb3e

Please sign in to comment.