Skip to content

Commit

Permalink
Integration tests for Coordinator and CRDs (#445)
Browse files Browse the repository at this point in the history
* Integration tests for CRDs

* Integration tests for Coordinator
  • Loading branch information
jessejlt authored Nov 20, 2024
1 parent 8b0e62d commit d48a2f6
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 5 deletions.
102 changes: 102 additions & 0 deletions ingestor/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@ package cluster
import (
"context"
"testing"
"time"

"github.com/Azure/adx-mon/pkg/otlp"
"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/testutils"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/k3s"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
fakek8s "k8s.io/client-go/kubernetes/fake"
v12 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/clientcmd"
)

func TestCoordinator_NewPeer(t *testing.T) {
Expand Down Expand Up @@ -250,3 +259,96 @@ func (l *fakePodLister) List(selector labels.Selector) (ret []*v1.Pod, err error
func (l *fakePodLister) Pods(namespace string) v12.PodNamespaceLister {
return l
}

func TestCoordinatorInK8s(t *testing.T) {
// This is an integration test where a Coordinator is created using a k3s cluster
// configuration and a statefulset is then created that matches the Coordinator's
// peer predicate. What we're testing is that the Coordinator is correctly utilizing
// a k8s informer and its predicate is correctly identifying peer pods and its leader state.
otlpWriter := func(ctx context.Context, database, table string, logs *otlp.Logs) error {
t.Helper()
return nil
}

timeSeriesWriter := func(ctx context.Context, ts []*prompb.TimeSeries) error {
t.Helper()
return nil
}

ctx := context.Background()
k3sContainer, err := k3s.Run(ctx, "rancher/k3s:v1.31.2-k3s1")
testcontainers.CleanupContainer(t, k3sContainer)
require.NoError(t, err)

kubeconfig, err := testutils.WriteKubeConfig(ctx, k3sContainer, t.TempDir())
require.NoError(t, err)

config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
require.NoError(t, err)

client, err := kubernetes.NewForConfig(config)
require.NoError(t, err)

opts := &CoordinatorOpts{
WriteTimeSeriesFn: timeSeriesWriter,
WriteOTLPLogsFn: otlpWriter,
K8sCli: client,
Namespace: "test-namespace",
Hostname: "ingestor-0",
InsecureSkipVerify: true,
}
c, err := NewCoordinator(opts)
require.NoError(t, err)

require.NoError(t, c.Open(ctx))

ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: opts.Namespace,
},
}
_, err = client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
require.NoError(t, err)

replicas := int32(1)
ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "ingestor",
Namespace: opts.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "ingestor",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "ingestor",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "ingestor",
Image: "mcr.microsoft.com/cbl-mariner/base/nginx:1.22-cm2.0",
},
},
},
},
},
}
_, err = client.AppsV1().StatefulSets(opts.Namespace).Create(ctx, ss, metav1.CreateOptions{})
require.NoError(t, err)

require.NoError(t, c.WriteOTLPLogs(ctx, "test-database", "test-table", &otlp.Logs{}))
require.NoError(t, c.Write(ctx, &prompb.WriteRequest{}))

require.Eventually(t, func() bool {
return c.IsLeader()
}, time.Minute, 100*time.Millisecond)

require.NoError(t, c.Close())
}
17 changes: 12 additions & 5 deletions pkg/crd/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/Azure/adx-mon/pkg/logger"
"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -15,9 +16,10 @@ type Store interface {
}

type Options struct {
CtrlCli client.Client
List client.ObjectList
Store Store
CtrlCli client.Client
List client.ObjectList
Store Store
PollFrequency time.Duration
}

type CRD struct {
Expand Down Expand Up @@ -53,12 +55,15 @@ func (c *CRD) Open(ctx context.Context) error {
return fmt.Errorf("failed to store objects: %w", err)
}

if c.opts.PollFrequency == 0 {
c.opts.PollFrequency = time.Minute
}
go func() {
for {
select {
case <-c.ctx.Done():
return
case <-time.After(time.Minute):
case <-time.After(c.opts.PollFrequency):
list, err := c.List(ctx)
if err != nil {
logger.Errorf("Failed to list objects: %s", err.Error())
Expand All @@ -80,10 +85,12 @@ func (c *CRD) Close() error {
c.cancel()
return nil
}

func (c *CRD) List(ctx context.Context) (client.ObjectList, error) {
list := c.opts.List.DeepCopyObject().(client.ObjectList)
if err := c.opts.CtrlCli.List(ctx, list); err != nil {
if errors.Is(err, &meta.NoKindMatchError{}) {
return list, nil
}
return nil, fmt.Errorf("failed to list CRDs: %w", err)
}

Expand Down
102 changes: 102 additions & 0 deletions pkg/crd/crd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package crd_test

import (
"context"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

v1 "github.com/Azure/adx-mon/api/v1"
"github.com/Azure/adx-mon/pkg/crd"
"github.com/Azure/adx-mon/pkg/testutils"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/k3s"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type TestStore struct {
t *testing.T

received int32
}

func (s *TestStore) Receive(ctx context.Context, list client.ObjectList) error {
s.t.Helper()
require.NotNil(s.t, list)

items, ok := list.(*v1.FunctionList)
require.True(s.t, ok)
if items == nil || len(items.Items) == 0 {
return nil
}

atomic.AddInt32(&s.received, int32(len(items.Items)))
return nil
}

func (s *TestStore) Count() int32 {
return atomic.LoadInt32(&s.received)
}

func TestCRD(t *testing.T) {
scheme := clientgoscheme.Scheme
require.NoError(t, clientgoscheme.AddToScheme(scheme))
require.NoError(t, v1.AddToScheme(scheme))

crdPath := filepath.Join(t.TempDir(), "crd.yaml")
require.NoError(t, testutils.CopyFile("../../kustomize/bases/functions_crd.yaml", crdPath))
fnCrdPath := filepath.Join(t.TempDir(), "fn-crd.yaml")
os.WriteFile(fnCrdPath, []byte(fnCrd), 0644)

ctx := context.Background()
k3sContainer, err := k3s.Run(ctx, "rancher/k3s:v1.31.2-k3s1")
testcontainers.CleanupContainer(t, k3sContainer)
require.NoError(t, err)

require.NoError(t, k3sContainer.CopyFileToContainer(ctx, crdPath, filepath.Join(testutils.K3sManifests, "crd.yaml"), 0644))
require.NoError(t, k3sContainer.CopyFileToContainer(ctx, fnCrdPath, filepath.Join(testutils.K3sManifests, "fn-crd.yaml"), 0644))

kubeconfig, err := testutils.WriteKubeConfig(ctx, k3sContainer, t.TempDir())
require.NoError(t, err)

config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
require.NoError(t, err)
ctrlCli, err := ctrlclient.New(config, ctrlclient.Options{
WarningHandler: ctrlclient.WarningHandlerOptions{
SuppressWarnings: true,
},
})
require.NoError(t, err)

ts := &TestStore{t: t}
opts := crd.Options{
CtrlCli: ctrlCli,
List: &v1.FunctionList{},
Store: ts,
PollFrequency: 100 * time.Millisecond,
}
c := crd.New(opts)
require.NoError(t, c.Open(ctx))

require.Eventually(t, func() bool {
return ts.Count() > 0
}, time.Minute, time.Second)

require.NoError(t, c.Close())
}

var fnCrd = `---
apiVersion: adx-mon.azure.com/v1
kind: Function
metadata:
name: some-crd
spec:
body: some-function-body
database: some-database
---`

0 comments on commit d48a2f6

Please sign in to comment.