diff --git a/.goreleaser-docker.yaml b/.goreleaser-docker.yaml index 793d986b2..ab1f8d17e 100644 --- a/.goreleaser-docker.yaml +++ b/.goreleaser-docker.yaml @@ -1,17 +1,40 @@ +--- version: 2 project_name: provider env: - GO111MODULE=on - DOCKER_CLI_EXPERIMENTAL="enabled" + - CGO_ENABLED=1 builds: - - id: provider-services-linux + - id: provider-services-linux-arm64 binary: provider-services main: ./cmd/provider-services goarch: - - amd64 - arm64 goos: - linux + env: + - CC=aarch64-linux-gnu-gcc + - CXX=aarch64-linux-gnu-g++ + flags: + - "-mod={{ .Env.MOD }}" + - "-tags={{ .Env.BUILD_TAGS }}" + - -trimpath + ldflags: + - "{{ .Env.BUILD_VARS }}" + - "{{ .Env.STRIP_FLAGS }}" + - "-linkmode={{ .Env.LINKMODE }}" + - -extldflags "-lc -lrt -lpthread --static" + - id: provider-services-linux-amd64 + binary: provider-services + main: ./cmd/provider-services + goarch: + - amd64 + goos: + - linux + env: + - CC=x86_64-linux-gnu-gcc + - CXX=x86_64-linux-gnu-g++ flags: - "-mod={{ .Env.MOD }}" - "-tags={{ .Env.BUILD_TAGS }}" @@ -19,6 +42,8 @@ builds: ldflags: - "{{ .Env.BUILD_VARS }}" - "{{ .Env.STRIP_FLAGS }}" + - "-linkmode={{ .Env.LINKMODE }}" + - -extldflags "-lc -lrt -lpthread --static" archives: - format: binary dockers: diff --git a/Dockerfile b/Dockerfile index 2a9cc4ed0..7c3d15800 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,16 +1,24 @@ -FROM debian:bullseye +FROM ubuntu:noble LABEL "org.opencontainers.image.source"="https://github.com/akash-network/provider" COPY provider-services /usr/bin/ +ENV DEBIAN_FRONTEND=noninteractive + RUN \ apt-get update \ && apt-get install -y --no-install-recommends \ tini \ + jq \ + bc \ + mawk \ + curl \ ca-certificates \ pci.ids \ && rm -rf /var/lib/apt/lists/* +ENV DEBIAN_FRONTEND="" + # default port for provider API EXPOSE 8443 diff --git a/Makefile b/Makefile index c544d790f..185e7d442 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,4 @@ GOBIN := $(shell go env GOPATH)/bin -KIND_APP_IP ?= $(shell make -sC _run/kube kind-k8s-ip) -KIND_APP_PORT ?= $(shell make -sC _run/kube app-http-port) -KIND_VARS ?= KUBE_INGRESS_IP="$(KIND_APP_IP)" KUBE_INGRESS_PORT="$(KIND_APP_PORT)" - LEDGER_ENABLED ?= true include make/init.mk diff --git a/_run/.envrc b/_run/.envrc index 8bcd4cf49..d3e7ae8ce 100644 --- a/_run/.envrc +++ b/_run/.envrc @@ -8,3 +8,5 @@ fi if ! has tqdm ; then echo -e "\033[31mtqdm is not installed. https://github.com/tqdm/tqdm"; exit 1 fi + +dotenv .env diff --git a/_run/.envrc_run b/_run/.envrc_run index bc2b8b91d..0fd0d9454 100644 --- a/_run/.envrc_run +++ b/_run/.envrc_run @@ -1,8 +1,8 @@ +source_up .envrc + AP_RUN_NAME=$(basename "$(pwd)") AP_RUN_DIR="${DEVCACHE_RUN}/${AP_RUN_NAME}" export AKASH_HOME="${AP_RUN_DIR}/.akash" export AP_RUN_NAME export AP_RUN_DIR - -dotenv .env diff --git a/_run/kube/.envrc b/_run/kube/.envrc deleted file mode 100644 index 0b6e79f73..000000000 --- a/_run/kube/.envrc +++ /dev/null @@ -1,4 +0,0 @@ -source_up .envrc -source_up .envrc_run - -export AKASH_HOME=$DEVCACHE_RUN/kube/.akash diff --git a/_run/kube/.envrc b/_run/kube/.envrc new file mode 120000 index 000000000..a4526206d --- /dev/null +++ b/_run/kube/.envrc @@ -0,0 +1 @@ +../.envrc_run \ No newline at end of file diff --git a/_run/kube/key.key b/_run/kube/key.key new file mode 100644 index 000000000..c2edec8e5 --- /dev/null +++ b/_run/kube/key.key @@ -0,0 +1,9 @@ +-----BEGIN TENDERMINT PRIVATE KEY----- +kdf: bcrypt +salt: EAE151E3A990D0509979F9F8D2387C20 +type: secp256k1 + +/LtGnQIsFnwa4QZzMlzQANGT43ayv/0P50Cfcoz0muXP3mvUsHu/ifdN9WV1vryO +/EdmbmBIkgq7G3L0YpU85Lu5/+O5nNN3erLtQk0= +=0Zx3 +-----END TENDERMINT PRIVATE KEY----- diff --git a/_run/kube/key2.key b/_run/kube/key2.key new file mode 100644 index 000000000..c0fa12e3f --- /dev/null +++ b/_run/kube/key2.key @@ -0,0 +1,9 @@ +-----BEGIN TENDERMINT PRIVATE KEY----- +kdf: bcrypt +salt: CF5040F935D19E50F32924074482B533 +type: secp256k1 + +M/HGSTz9+SQtYnftu9txVfX/qibznv4sEcVgt2PXdxZpWQNSqHiHIhQeQ1ZmCsWd +GhQ8l3AhaLgBk5qsHO8R9eA1aQ3TGep0WtDgKA4= +=XYTy +-----END TENDERMINT PRIVATE KEY----- diff --git a/balance_checker.go b/balance_checker.go index a539fbfc3..a589bd0f6 100644 --- a/balance_checker.go +++ b/balance_checker.go @@ -225,8 +225,8 @@ func (bc *balanceChecker) run(startCh chan<- error) { loop: for { select { - case <-bc.lc.ShutdownRequest(): - bc.log.Debug("shutting down") + case shutdownErr := <-bc.lc.ShutdownRequest(): + bc.log.Debug("received shutdown request", "err", shutdownErr) bc.lc.ShutdownInitiated(nil) cancel() break loop diff --git a/bidengine/service.go b/bidengine/service.go index 0281075d4..21d2234da 100644 --- a/bidengine/service.go +++ b/bidengine/service.go @@ -96,7 +96,11 @@ func NewService(pctx context.Context, aqc sclient.QueryClient, session session.S go s.lc.WatchContext(ctx) go s.run(pctx) group.Go(func() error { - return s.ordersFetcher(ctx, aqc) + err := s.ordersFetcher(ctx, aqc) + + <-ctx.Done() + + return err }) return s, nil @@ -276,7 +280,8 @@ func (s *service) run(ctx context.Context) { loop: for { select { - case <-s.lc.ShutdownRequest(): + case shutdownErr := <-s.lc.ShutdownRequest(): + s.session.Log().Debug("received shutdown request", "err", shutdownErr) s.lc.ShutdownInitiated(nil) s.cancel() break loop diff --git a/cluster/inventory.go b/cluster/inventory.go index ce191c454..f44143656 100644 --- a/cluster/inventory.go +++ b/cluster/inventory.go @@ -519,6 +519,7 @@ loop: for { select { case err := <-is.lc.ShutdownRequest(): + is.log.Debug("received shutdown request", "err", err) is.lc.ShutdownInitiated(err) break loop case ev := <-is.sub.Events(): diff --git a/cluster/kube/apply.go b/cluster/kube/apply.go index 84b67bbca..ef1dcb4e4 100644 --- a/cluster/kube/apply.go +++ b/cluster/kube/apply.go @@ -4,6 +4,7 @@ package kube import ( "context" + "encoding/json" "reflect" appsv1 "k8s.io/api/apps/v1" @@ -11,6 +12,8 @@ import ( netv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" metricsutils "github.com/akash-network/node/util/metrics" @@ -20,6 +23,12 @@ import ( crdapi "github.com/akash-network/provider/pkg/client/clientset/versioned" ) +type k8sPatch struct { + Op string `json:"op"` + Path string `json:"path"` + Value interface{} `json:"value"` +} + func applyNS(ctx context.Context, kc kubernetes.Interface, b builder.NS) (*corev1.Namespace, *corev1.Namespace, *corev1.Namespace, error) { oobj, err := kc.CoreV1().Namespaces().Get(ctx, b.Name(), metav1.GetOptions{}) metricsutils.IncCounterVecWithLabelValuesFiltered(kubeCallsCounter, "namespaces-get", err, errors.IsNotFound) @@ -144,12 +153,57 @@ func applyDeployment(ctx context.Context, kc kubernetes.Interface, b builder.Dep case err == nil: curr := oobj.DeepCopy() oobj, err = b.Update(oobj) - if err == nil && (b.IsObjectRevisionLatest(curr.Labels) || + if err != nil { + break + } + + if b.IsObjectRevisionLatest(curr.Labels) || !reflect.DeepEqual(&curr.Spec, &oobj.Spec) || - !reflect.DeepEqual(curr.Labels, oobj.Labels)) { + !reflect.DeepEqual(curr.Labels, oobj.Labels) { uobj, err = kc.AppsV1().Deployments(b.NS()).Update(ctx, oobj, metav1.UpdateOptions{}) metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "deployments-update", err) } + + var patches []k8sPatch + + if rev := curr.Spec.RevisionHistoryLimit; rev == nil || *rev != 10 { + patches = append(patches, k8sPatch{ + Op: "add", + Path: "/spec/revisionHistoryLimit", + Value: int32(10), + }) + } + + ustrategy := &oobj.Spec.Strategy + if uobj != nil { + ustrategy = &uobj.Spec.Strategy + } + + maxSurge := intstr.FromInt32(0) + maxUnavailable := intstr.FromInt32(1) + + strategy := appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + RollingUpdate: &appsv1.RollingUpdateDeployment{ + MaxUnavailable: &maxUnavailable, + MaxSurge: &maxSurge, + }, + } + + if !reflect.DeepEqual(&strategy, &ustrategy) { + patches = append(patches, k8sPatch{ + Op: "replace", + Path: "/spec/strategy", + Value: strategy, + }) + } + + if len(patches) > 0 { + data, _ := json.Marshal(patches) + + oobj, err = kc.AppsV1().Deployments(b.NS()).Patch(ctx, oobj.Name, k8stypes.JSONPatchType, data, metav1.PatchOptions{}) + metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "deployments-patch", err) + } case errors.IsNotFound(err): oobj, err = b.Create() if err == nil { @@ -172,13 +226,57 @@ func applyStatefulSet(ctx context.Context, kc kubernetes.Interface, b builder.St case err == nil: curr := oobj.DeepCopy() oobj, err = b.Update(oobj) - if err == nil && (b.IsObjectRevisionLatest(curr.Labels) || + if err != nil { + break + } + + if b.IsObjectRevisionLatest(curr.Labels) || !reflect.DeepEqual(&curr.Spec, &oobj.Spec) || - !reflect.DeepEqual(curr.Labels, oobj.Labels)) { + !reflect.DeepEqual(curr.Labels, oobj.Labels) { uobj, err = kc.AppsV1().StatefulSets(b.NS()).Update(ctx, oobj, metav1.UpdateOptions{}) metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "statefulset-update", err) - } + + // var patches []k8sPatch + // + // if rev := curr.Spec.RevisionHistoryLimit; rev == nil || *rev != 10 { + // patches = append(patches, k8sPatch{ + // Op: "add", + // Path: "/spec/revisionHistoryLimit", + // Value: int32(10), + // }) + // } + // + // ustrategy := &oobj.Spec.UpdateStrategy + // if uobj != nil { + // ustrategy = &uobj.Spec.UpdateStrategy + // } + // + // partition := int32(0) + // maxUnavailable := intstr.FromInt32(1) + // + // strategy := appsv1.StatefulSetUpdateStrategy{ + // Type: appsv1.RollingUpdateStatefulSetStrategyType, + // RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + // Partition: &partition, + // MaxUnavailable: &maxUnavailable, + // }, + // } + // + // if !reflect.DeepEqual(&strategy, ustrategy) { + // patches = append(patches, k8sPatch{ + // Op: "replace", + // Path: "/spec/updateStrategy", + // Value: strategy, + // }) + // } + // + // if len(patches) > 0 { + // data, _ := json.Marshal(patches) + // + // oobj, err = kc.AppsV1().StatefulSets(b.NS()).Patch(ctx, oobj.Name, k8stypes.JSONPatchType, data, metav1.PatchOptions{}) + // metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "statefulset-patch", err) + // } case errors.IsNotFound(err): oobj, err = b.Create() if err == nil { @@ -242,21 +340,3 @@ func applyManifest(ctx context.Context, kc crdapi.Interface, b builder.Manifest) return nobj, uobj, oobj, err } - -// TODO: re-enable. see #946 -// func applyRestrictivePodSecPoliciesToNS(ctx context.Context, kc kubernetes.Interface, p builder.PspRestricted) error { -// obj, err := kc.PolicyV1beta1().PodSecurityPolicies().Get(ctx, p.Name(), metav1.GetOptions{}) -// switch { -// case err == nil: -// obj, err = p.Update(obj) -// if err == nil { -// _, err = kc.PolicyV1beta1().PodSecurityPolicies().Update(ctx, obj, metav1.UpdateOptions{}) -// } -// case errors.IsNotFound(err): -// obj, err = p.Create() -// if err == nil { -// _, err = kc.PolicyV1beta1().PodSecurityPolicies().Create(ctx, obj, metav1.CreateOptions{}) -// } -// } -// return err -// } diff --git a/cluster/kube/builder/deployment.go b/cluster/kube/builder/deployment.go index aeaf886fc..cb96472d4 100644 --- a/cluster/kube/builder/deployment.go +++ b/cluster/kube/builder/deployment.go @@ -4,6 +4,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) type Deployment interface { @@ -31,6 +32,11 @@ func NewDeployment(workload Workload) Deployment { func (b *deployment) Create() (*appsv1.Deployment, error) { // nolint:golint,unparam falseValue := false + revisionHistoryLimit := int32(10) + + maxSurge := intstr.FromInt32(0) + maxUnavailable := intstr.FromInt32(1) + kdeployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: b.Name(), @@ -40,7 +46,15 @@ func (b *deployment) Create() (*appsv1.Deployment, error) { // nolint:golint,unp Selector: &metav1.LabelSelector{ MatchLabels: b.selectorLabels(), }, - Replicas: b.replicas(), + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + RollingUpdate: &appsv1.RollingUpdateDeployment{ + MaxUnavailable: &maxUnavailable, + MaxSurge: &maxSurge, + }, + }, + RevisionHistoryLimit: &revisionHistoryLimit, + Replicas: b.replicas(), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: b.labels(), diff --git a/cluster/kube/builder/statefulset.go b/cluster/kube/builder/statefulset.go index f58c10e0c..909b0b533 100644 --- a/cluster/kube/builder/statefulset.go +++ b/cluster/kube/builder/statefulset.go @@ -4,6 +4,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" ) type StatefulSet interface { @@ -31,16 +32,29 @@ func BuildStatefulSet(workload Workload) StatefulSet { func (b *statefulSet) Create() (*appsv1.StatefulSet, error) { // nolint:golint,unparam falseValue := false + revisionHistoryLimit := int32(1) + + partition := int32(0) + maxUnavailable := intstr.FromInt32(1) + kdeployment := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: b.Name(), Labels: b.labels(), }, Spec: appsv1.StatefulSetSpec{ + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: &partition, + MaxUnavailable: &maxUnavailable, + }, + }, Selector: &metav1.LabelSelector{ MatchLabels: b.selectorLabels(), }, - Replicas: b.replicas(), + RevisionHistoryLimit: &revisionHistoryLimit, + Replicas: b.replicas(), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: b.labels(), @@ -66,8 +80,8 @@ func (b *statefulSet) Create() (*appsv1.StatefulSet, error) { // nolint:golint,u func (b *statefulSet) Update(obj *appsv1.StatefulSet) (*appsv1.StatefulSet, error) { // nolint:golint,unparam obj.Labels = updateAkashLabels(obj.Labels, b.labels()) - obj.Spec.Selector.MatchLabels = b.selectorLabels() obj.Spec.Replicas = b.replicas() + obj.Spec.Selector.MatchLabels = b.selectorLabels() obj.Spec.Template.Labels = b.labels() obj.Spec.Template.Spec.Affinity = b.affinity() obj.Spec.Template.Spec.RuntimeClassName = b.runtimeClass() diff --git a/cluster/kube/operators/clients/inventory/client_test.go b/cluster/kube/operators/clients/inventory/client_test.go index aca54bf27..3d7914a56 100644 --- a/cluster/kube/operators/clients/inventory/client_test.go +++ b/cluster/kube/operators/clients/inventory/client_test.go @@ -330,17 +330,17 @@ func TestInventorySingleNodeNoPods(t *testing.T) { Name: "test", Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePair(expectedCPU, 0, "m"), + Quantity: inventoryV1.NewResourcePair(expectedCPU, expectedCPU, 0, "m"), }, Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(expectedMemory, 0, "M"), + Quantity: inventoryV1.NewResourcePair(expectedMemory, expectedMemory, 0, "M"), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, - EphemeralStorage: inventoryV1.NewResourcePair(expectedStorage, 0, "M"), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: inventoryV1.NewResourcePair(expectedStorage, expectedStorage, 0, "M"), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, Capabilities: inventoryV1.NodeCapabilities{}, }, @@ -380,17 +380,17 @@ func TestInventorySingleNodeWithPods(t *testing.T) { Name: "test", Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePair(expectedCPU, cpuPerContainer*totalContainers, "m"), + Quantity: inventoryV1.NewResourcePair(expectedCPU, expectedCPU, cpuPerContainer*totalContainers, "m"), }, Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(expectedMemory, memoryPerContainer*totalContainers, "M"), + Quantity: inventoryV1.NewResourcePair(expectedMemory, expectedMemory, memoryPerContainer*totalContainers, "M"), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, - EphemeralStorage: inventoryV1.NewResourcePair(expectedStorage, storagePerContainer*totalContainers, "M"), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: inventoryV1.NewResourcePair(expectedStorage, expectedStorage, storagePerContainer*totalContainers, "M"), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, Capabilities: inventoryV1.NodeCapabilities{}, }, @@ -617,30 +617,30 @@ func multipleReplicasGenNodes() inventoryV1.Nodes { Name: "node1", Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(119800, 51020, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePairMilli(119800, 119800, 51020, resource.DecimalSI), }, Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(457317732352, 17495527424, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(457317732352, 457317732352, 17495527424, resource.DecimalSI), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, - EphemeralStorage: inventoryV1.NewResourcePair(7760751097705, 8589934592, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: inventoryV1.NewResourcePair(7760751097705, 7760751097705, 8589934592, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, }, { Name: "node2", Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(119800, 51000, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePairMilli(119800, 119800, 51000, resource.DecimalSI), }, Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(457317732352, 17495527424, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(457317732352, 457317732352, 17495527424, resource.DecimalSI), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(2, 0, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(2, 2, 0, resource.DecimalSI), Info: inventoryV1.GPUInfoS{ { Vendor: "nvidia", @@ -660,43 +660,43 @@ func multipleReplicasGenNodes() inventoryV1.Nodes { }, }, }, - EphemeralStorage: inventoryV1.NewResourcePair(7760751097705, 8589934592, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: inventoryV1.NewResourcePair(7760751097705, 7760751097705, 8589934592, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, }, { Name: "node3", Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(119800, 275, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePairMilli(119800, 119800, 275, resource.DecimalSI), }, Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(457317732352, 17495527424, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(457317732352, 457317732352, 17495527424, resource.DecimalSI), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, - EphemeralStorage: inventoryV1.NewResourcePair(7760751097705, 0, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: inventoryV1.NewResourcePair(7760751097705, 7760751097705, 0, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, }, { Name: "node4", Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(119800, 305, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePairMilli(119800, 119800, 305, resource.DecimalSI), }, Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(457317732352, 17495527424, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(457317732352, 457317732352, 17495527424, resource.DecimalSI), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, - EphemeralStorage: inventoryV1.NewResourcePair(7760751097705, 0, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: inventoryV1.NewResourcePair(7760751097705, 7760751097705, 0, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, }, } diff --git a/cluster/manager.go b/cluster/manager.go index 30f7e6f79..5d835fa75 100644 --- a/cluster/manager.go +++ b/cluster/manager.go @@ -164,8 +164,8 @@ loop: for { select { case shutdownErr = <-dm.lc.ShutdownRequest(): + dm.log.Debug("received shutdown request", "err", shutdownErr) break loop - case deployment := <-dm.updatech: dm.deployment = deployment newch := dm.handleUpdate(ctx) diff --git a/cluster/service.go b/cluster/service.go index 543e99309..7358bcf57 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -345,6 +345,7 @@ loop: for { select { case err := <-s.lc.ShutdownRequest(): + s.log.Debug("received shutdown request", "err", err) s.lc.ShutdownInitiated(err) break loop case ev := <-s.sub.Events(): diff --git a/cluster/types/v1beta3/clients/inventory/inventory.go b/cluster/types/v1beta3/clients/inventory/inventory.go index ca0fe7840..622d22d03 100644 --- a/cluster/types/v1beta3/clients/inventory/inventory.go +++ b/cluster/types/v1beta3/clients/inventory/inventory.go @@ -66,7 +66,7 @@ func NewNull(ctx context.Context, nodes ...string) NullClient { cluster := inventoryV1.Cluster{} cluster.Storage = append(cluster.Storage, inventoryV1.Storage{ - Quantity: inventoryV1.NewResourcePair(nullClientStorage, nullClientStorage-(10*unit.Gi), resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(nullClientStorage, nullClientStorage, nullClientStorage-(10*unit.Gi), resource.DecimalSI), Info: inventoryV1.StorageInfo{ Class: "beta2", }, @@ -77,17 +77,17 @@ func NewNull(ctx context.Context, nodes ...string) NullClient { Name: ndName, Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(nullClientCPU, 100, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePairMilli(nullClientCPU, nullClientCPU, 100, resource.DecimalSI), }, Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(nullClientMemory, 1*unit.Gi, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(nullClientMemory, nullClientMemory, 1*unit.Gi, resource.DecimalSI), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, - EphemeralStorage: inventoryV1.NewResourcePair(nullClientStorage, 10*unit.Gi, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: inventoryV1.NewResourcePair(nullClientStorage, nullClientStorage, 10*unit.Gi, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, Capabilities: inventoryV1.NodeCapabilities{}, } @@ -100,17 +100,17 @@ func NewNull(ctx context.Context, nodes ...string) NullClient { Name: "solo", Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(nullClientCPU, 100, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePairMilli(nullClientCPU, nullClientCPU, 100, resource.DecimalSI), }, Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(nullClientMemory, 1*unit.Gi, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(nullClientMemory, nullClientMemory, 1*unit.Gi, resource.DecimalSI), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(nullClientGPU, 1, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(nullClientGPU, nullClientGPU, 1, resource.DecimalSI), }, - EphemeralStorage: inventoryV1.NewResourcePair(nullClientStorage, 10*unit.Gi, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: inventoryV1.NewResourcePair(nullClientStorage, nullClientStorage, 10*unit.Gi, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, Capabilities: inventoryV1.NodeCapabilities{}, }) diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index 8223caf4c..9a1d31ca8 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -753,7 +753,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { return err } - ctx = context.WithValue(ctx, clfromctx.CtxKeyClientInventory, hostnameOperatorClient) + ctx = context.WithValue(ctx, clfromctx.CtxKeyClientHostname, hostnameOperatorClient) inventory, err := kubeinventory.NewClient(ctx) if err != nil { diff --git a/go.mod b/go.mod index 92e48e8b0..a7f06ac69 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/akash-network/provider go 1.23.5 require ( - github.com/akash-network/akash-api v0.0.73 - github.com/akash-network/node v0.34.1 + github.com/akash-network/akash-api v0.0.75 + github.com/akash-network/node v0.36.0 github.com/avast/retry-go/v4 v4.5.0 github.com/blang/semver/v4 v4.0.0 github.com/boz/go-lifecycle v0.1.1 @@ -52,8 +52,8 @@ require ( ) retract ( - v0.6.0 v0.6.5 + v0.6.0 ) replace ( diff --git a/go.sum b/go.sum index cfc522905..148736169 100644 --- a/go.sum +++ b/go.sum @@ -158,16 +158,16 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= -github.com/akash-network/akash-api v0.0.73 h1:Wm0NgVgHbuLTAAa9aIRxan1zGwSb+i9/1uWp65vJmXc= -github.com/akash-network/akash-api v0.0.73/go.mod h1:5JMjLPHvWOyyamLz8bJoy6QHqz4I46ANAQpEIIUY1bM= +github.com/akash-network/akash-api v0.0.75 h1:h9RZemWa7JqMGYb3nVRhRgP4xZnACIy0yN7de60JLyg= +github.com/akash-network/akash-api v0.0.75/go.mod h1:pvoHHEQbt63+U+HUSTjssZ1nUJ8sJuWtHCu6ztaXcqo= github.com/akash-network/cometbft v0.34.27-akash h1:V1dApDOr8Ee7BJzYyQ7Z9VBtrAul4+baMeA6C49dje0= github.com/akash-network/cometbft v0.34.27-akash/go.mod h1:BcCbhKv7ieM0KEddnYXvQZR+pZykTKReJJYf7YC7qhw= github.com/akash-network/ledger-go v0.14.3 h1:LCEFkTfgGA2xFMN2CtiKvXKE7dh0QSM77PJHCpSkaAo= github.com/akash-network/ledger-go v0.14.3/go.mod h1:NfsjfFvno9Kaq6mfpsKz4sqjnAVVEsVsnBJfKB4ueAs= github.com/akash-network/ledger-go/cosmos v0.14.4 h1:h3WiXmoKKs9wkj1LHcJ12cLjXXg6nG1fp+UQ5+wu/+o= github.com/akash-network/ledger-go/cosmos v0.14.4/go.mod h1:SjAfheQTE4rWk0ir+wjbOWxwj8nc8E4AZ08NdsvYG24= -github.com/akash-network/node v0.34.1 h1:5ky3Q1dgXgGkcZA0y0AjEshi3fL7bk76OqCeh5ecMTs= -github.com/akash-network/node v0.34.1/go.mod h1:lPxn9dDCAXXflq9o1bqRH4DsLOiQaXIwCnU0l/nOLUs= +github.com/akash-network/node v0.36.0 h1:eDBotSwxbtmMlDBzqW6CXnbaazgBdEaLDPfKLRO6FPg= +github.com/akash-network/node v0.36.0/go.mod h1:lPxn9dDCAXXflq9o1bqRH4DsLOiQaXIwCnU0l/nOLUs= github.com/alecthomas/participle/v2 v2.0.0-alpha7 h1:cK4vjj0VSgb3lN1nuKA5F7dw+1s1pWBe5bx7nNCnN+c= github.com/alecthomas/participle/v2 v2.0.0-alpha7/go.mod h1:NumScqsC42o9x+dGj8/YqsIfhrIQjFEOFovxotbBirA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/make/test-integration.mk b/make/test-integration.mk index 9feba2cef..643ec4a34 100644 --- a/make/test-integration.mk +++ b/make/test-integration.mk @@ -4,6 +4,12 @@ BUILD_TAGS_E2E := e2e integration BUILD_TAGS_ALL := "$(BUILD_TAGS_K8S_INTEGRATION) $(BUILD_TAGS_E2E)" TEST_MODULES ?= $(shell $(GO) list ./... | grep -v '/mocks\|/kubernetes_mock\|/pkg/client') +KIND_NAME := kube + +include _run/common-kind.mk + +KIND_VARS ?= KUBE_INGRESS_IP="$(KIND_K8S_IP)" KUBE_INGRESS_PORT="$(KIND_HTTP_PORT)" + # This is statically specified in the vagrant configuration # todo @troian check it still necessary KUBE_NODE_IP ?= 172.18.8.101 @@ -19,7 +25,7 @@ test-e2e-integration: # ``` # KUSTOMIZE_INSTALLS=akash-operator-inventory make kube-cluster-setup-e2e # ``` - $(KIND_VARS) $(INTEGRATION_VARS) $(GO_TEST) -count=1 -p 4 -tags "e2e" -v ./integration/... -run TestIntegrationTestSuite -timeout 1500s + $(KIND_VARS) $(INTEGRATION_VARS) $(GO_TEST) -count=1 -p 4 -tags "e2e" -v ./integration/... -run TestIntegrationTestSuite -timeout 3000s .PHONY: test-e2e-integration-k8s test-e2e-integration-k8s: diff --git a/operator/inventory/ceph.go b/operator/inventory/ceph.go index 81d3f7fe5..167ce9702 100644 --- a/operator/inventory/ceph.go +++ b/operator/inventory/ceph.go @@ -362,6 +362,7 @@ func (c *ceph) run(startch chan<- struct{}) error { Quantity: inventory.ResourcePair{ Allocated: &allocated, Allocatable: resource.NewQuantity(int64(pool.Stats.MaxAvail), resource.DecimalSI), // nolint: gosec + Capacity: resource.NewQuantity(int64(pool.Stats.MaxAvail), resource.DecimalSI), // nolint: gosec }, Info: inventory.StorageInfo{ Class: class, diff --git a/operator/inventory/cmd.go b/operator/inventory/cmd.go index c99528562..1cfa78019 100644 --- a/operator/inventory/cmd.go +++ b/operator/inventory/cmd.go @@ -224,6 +224,11 @@ func Cmd() *cobra.Command { factory.Core().V1().Nodes().Informer(), topicKubeNodes) + InformKubeObjects(ctx, + bus, + factory.Core().V1().Pods().Informer(), + topicKubePods) + fromctx.MustStartupChFromCtx(ctx) <- struct{}{} err = group.Wait() diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index e7813cc86..9db33ec9b 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -191,7 +191,7 @@ func (dp *nodeDiscovery) apiConnector() error { { Name: "psutil", Image: dp.image, - Command: []string{ + Args: []string{ "provider-services", "tools", "psutil", @@ -345,6 +345,18 @@ initloop: } } +func isPodAllocated(status corev1.PodStatus) bool { + isScheduled := false + for _, condition := range status.Conditions { + isScheduled = (condition.Type == corev1.PodScheduled) && (condition.Status == corev1.ConditionTrue) + if isScheduled { + break + } + } + + return isScheduled && (status.Phase == corev1.PodRunning) +} + func (dp *nodeDiscovery) monitor() error { ctx := dp.ctx log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") @@ -444,8 +456,13 @@ func (dp *nodeDiscovery) monitor() error { currPods = make(map[string]corev1.Pod) - for name := range pods.Items { - pod := pods.Items[name].DeepCopy() + for idx := range pods.Items { + pod := pods.Items[idx].DeepCopy() + + if !isPodAllocated(pod.Status) { + continue + } + addPodAllocatedResources(&node, pod) currPods[pod.Name] = *pod @@ -514,8 +531,9 @@ func (dp *nodeDiscovery) monitor() error { if obj.Name == dp.name { switch evt.Type { case watch.Modified: - if nodeAllocatableChanged(knode, obj, &node) { - podsWatch.Stop() + if nodeAllocatableChanged(knode, obj) { + // podsWatch.Stop() + updateNodeInfo(obj, &node) if err = restartPodsWatcher(); err != nil { return err } @@ -540,7 +558,9 @@ func (dp *nodeDiscovery) monitor() error { obj := res.Object.(*corev1.Pod) switch res.Type { case watch.Added: - if _, exists := currPods[obj.Name]; !exists { + fallthrough + case watch.Modified: + if _, exists := currPods[obj.Name]; !exists && isPodAllocated(obj.Status) { currPods[obj.Name] = *obj.DeepCopy() addPodAllocatedResources(&node, obj) } @@ -612,7 +632,7 @@ func (dp *nodeDiscovery) monitor() error { } } -func nodeAllocatableChanged(prev *corev1.Node, curr *corev1.Node, node *v1.Node) bool { +func nodeAllocatableChanged(prev *corev1.Node, curr *corev1.Node) bool { changed := len(prev.Status.Allocatable) != len(curr.Status.Allocatable) if !changed { @@ -625,10 +645,6 @@ func nodeAllocatableChanged(prev *corev1.Node, curr *corev1.Node, node *v1.Node) } } - if changed { - updateNodeInfo(curr, node) - } - return changed } @@ -640,20 +656,20 @@ func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors, knode *corev1. Name: knode.Name, Resources: v1.NodeResources{ CPU: v1.CPU{ - Quantity: v1.NewResourcePairMilli(0, 0, resource.DecimalSI), + Quantity: v1.NewResourcePairMilli(0, 0, 0, resource.DecimalSI), Info: cpuInfo, }, GPU: v1.GPU{ - Quantity: v1.NewResourcePair(0, 0, resource.DecimalSI), + Quantity: v1.NewResourcePair(0, 0, 0, resource.DecimalSI), Info: gpuInfo, }, Memory: v1.Memory{ - Quantity: v1.NewResourcePair(0, 0, resource.DecimalSI), + Quantity: v1.NewResourcePair(0, 0, 0, resource.DecimalSI), Info: nil, }, - EphemeralStorage: v1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesAttached: v1.NewResourcePair(0, 0, resource.DecimalSI), - VolumesMounted: v1.NewResourcePair(0, 0, resource.DecimalSI), + EphemeralStorage: v1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesAttached: v1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: v1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, } @@ -677,22 +693,22 @@ func updateNodeInfo(knode *corev1.Node, node *v1.Node) { node.Resources.GPU.Quantity.Allocatable.Set(r.Value()) } } -} -// // trimAllocated ensure allocated does not overrun allocatable -// // Deprecated to be replaced with function from akash-api after sdk-47 upgrade -// func trimAllocated(rp *v1.ResourcePair) { -// allocated := rp.Allocated.Value() -// allocatable := rp.Allocatable.Value() -// -// if allocated <= allocatable { -// return -// } -// -// allocated = allocatable -// -// rp.Allocated.Set(allocated) -// } + for name, r := range knode.Status.Capacity { + switch name { + case corev1.ResourceCPU: + node.Resources.CPU.Quantity.Capacity.SetMilli(r.MilliValue()) + case corev1.ResourceMemory: + node.Resources.Memory.Quantity.Capacity.Set(r.Value()) + case corev1.ResourceEphemeralStorage: + node.Resources.EphemeralStorage.Capacity.Set(r.Value()) + case builder.ResourceGPUNvidia: + fallthrough + case builder.ResourceGPUAMD: + node.Resources.GPU.Quantity.Capacity.Set(r.Value()) + } + } +} func nodeResetAllocated(node *v1.Node) { node.Resources.CPU.Quantity.Allocated = resource.NewMilliQuantity(0, resource.DecimalSI) diff --git a/operator/inventory/rancher.go b/operator/inventory/rancher.go index 7c5aeae7f..3d6f8f2a0 100644 --- a/operator/inventory/rancher.go +++ b/operator/inventory/rancher.go @@ -254,7 +254,7 @@ func (c *rancher) run(startch chan<- struct{}) error { for class, params := range scs { if params.isRancher && params.isAkashManaged { res = append(res, inventory.Storage{ - Quantity: inventory.NewResourcePair(allocatable, int64(params.allocated), resource.DecimalSI), // nolint: gosec + Quantity: inventory.NewResourcePair(allocatable, allocatable, int64(params.allocated), resource.DecimalSI), // nolint: gosec Info: inventory.StorageInfo{ Class: class, }, diff --git a/operator/inventory/types.go b/operator/inventory/types.go index 862b05602..b27572095 100644 --- a/operator/inventory/types.go +++ b/operator/inventory/types.go @@ -47,6 +47,7 @@ const ( topicKubeNodes = "kube-nodes" topicKubeCephClusters = "kube-ceph-clusters" topicKubePV = "kube-pv" + topicKubePods = "kube-pods" ) type dpReqType int diff --git a/operator/waiter/waiter_test.go b/operator/waiter/waiter_test.go index be2960b6f..2183fcdcf 100644 --- a/operator/waiter/waiter_test.go +++ b/operator/waiter/waiter_test.go @@ -2,6 +2,7 @@ package waiter import ( "context" + "fmt" "io" "testing" "time" @@ -10,6 +11,18 @@ import ( "github.com/stretchr/testify/require" ) +type fakeWaiter struct { + failure error +} + +func (fw fakeWaiter) Check(_ context.Context) error { + return fw.failure +} + +func (fw fakeWaiter) String() string { + return "fakeWaiter" +} + func TestWaiterNoInput(t *testing.T) { logger := testutil.Logger(t) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -24,24 +37,13 @@ func TestWaiterContextCancelled(t *testing.T) { logger := testutil.Logger(t) ctx, cancel := context.WithCancel(context.Background()) cancel() - // no objects passed - waiter := NewOperatorWaiter(ctx, logger) + waitable := fakeWaiter{failure: fmt.Errorf("test error")} + + waiter := NewOperatorWaiter(ctx, logger, &waitable) require.NotNil(t, waiter) require.ErrorIs(t, waiter.WaitForAll(ctx), context.Canceled) } -type fakeWaiter struct { - failure error -} - -func (fw fakeWaiter) Check(_ context.Context) error { - return fw.failure -} - -func (fw fakeWaiter) String() string { - return "fakeWaiter" -} - func TestWaiterInputReady(t *testing.T) { waitable := fakeWaiter{failure: nil} logger := testutil.Logger(t) diff --git a/service.go b/service.go index 0adc7b55f..a7a84e77b 100644 --- a/service.go +++ b/service.go @@ -9,14 +9,10 @@ import ( "github.com/pkg/errors" tpubsub "github.com/troian/pubsub" - "github.com/cosmos/cosmos-sdk/client" - sdktypes "github.com/cosmos/cosmos-sdk/types" - sdkquery "github.com/cosmos/cosmos-sdk/types/query" - - sclient "github.com/akash-network/akash-api/go/node/client/v1beta2" dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3" - mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4" provider "github.com/akash-network/akash-api/go/provider/v1" + "github.com/cosmos/cosmos-sdk/client" + sdktypes "github.com/cosmos/cosmos-sdk/types" "github.com/akash-network/node/pubsub" @@ -151,112 +147,6 @@ func NewService(ctx context.Context, return svc, nil } -func queryExistingLeases( - ctx context.Context, - aqc sclient.QueryClient, - accAddr sdktypes.AccAddress, -) ([]mtypes.Bid, error) { - var presp *sdkquery.PageResponse - var leases []mtypes.QueryLeaseResponse - - bidsMap := make(map[string]mtypes.Bid) - - limit := uint64(10) - errorCnt := 0 - - for { - preq := &sdkquery.PageRequest{ - Key: nil, - Limit: limit, - } - - if presp != nil { - preq.Key = presp.NextKey - } - - resp, err := aqc.Bids(ctx, &mtypes.QueryBidsRequest{ - Filters: mtypes.BidFilters{ - Provider: accAddr.String(), - State: mtypes.BidActive.String(), - }, - Pagination: preq, - }) - if err != nil { - if errorCnt > 1 { - return nil, err - } - - errorCnt++ - - continue - } - errorCnt = 0 - - for _, resp := range resp.Bids { - bidsMap[resp.Bid.BidID.DeploymentID().String()] = resp.Bid - } - - if uint64(len(resp.Bids)) < limit { - break - } - - presp = resp.Pagination - } - - presp = nil - for { - preq := &sdkquery.PageRequest{ - Key: nil, - Limit: limit, - } - - if presp != nil { - preq.Key = presp.NextKey - } - - resp, err := aqc.Leases(ctx, &mtypes.QueryLeasesRequest{ - Filters: mtypes.LeaseFilters{ - Provider: accAddr.String(), - State: mtypes.LeaseActive.String(), - }, - Pagination: preq, - }) - if err != nil { - if errorCnt > 1 { - return nil, err - } - - errorCnt++ - - continue - } - - errorCnt = 0 - - leases = append(leases, resp.Leases...) - if uint64(len(resp.Leases)) < limit { - break - } - - presp = resp.Pagination - } - - for _, resp := range leases { - did := resp.Lease.LeaseID.DeploymentID().String() - if _, exists := bidsMap[did]; !exists { - delete(bidsMap, did) - } - } - - res := make([]mtypes.Bid, 0, len(bidsMap)) - - for _, bid := range bidsMap { - res = append(res, bid) - } - - return res, nil -} - type service struct { config Config session session.Session @@ -379,12 +269,14 @@ func (s *service) run() { // Wait for any service to finish select { - case <-s.lc.ShutdownRequest(): + case shutdownErr := <-s.lc.ShutdownRequest(): + s.session.Log().Info("received shutdown request", "err", shutdownErr) case <-s.cluster.Done(): case <-s.bidengine.Done(): case <-s.manifest.Done(): } + s.session.Log().Info("shutting down services") // Shut down all services s.lc.ShutdownInitiated(nil) s.cancel()