Skip to content

Commit

Permalink
fix(ipoperator): fix ip service delete upon provider restart
Browse files Browse the repository at this point in the history
refs ovrclk/engineering#478

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Aug 25, 2022
1 parent 7ff75bd commit 61dece1
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 32 deletions.
6 changes: 4 additions & 2 deletions _run/kube/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ KIND_SETUP_PREREQUISITES ?= \

KUSTOMIZE_INSTALLS ?= \
akash-operator-hostname \
akash-operator-inventory
akash-operator-inventory \
akash-operator-ip

export AKASH_NODE=http://localhost:26657
export AP_NODE=http://localhost:26657
Expand All @@ -28,7 +29,8 @@ provider-run:
--cluster-node-port-quantity 100 \
--cluster-public-hostname "$(GATEWAY_HOSTNAME)" \
--bid-price-strategy "randomRange" \
--deployment-runtime-class "none"
--deployment-runtime-class "none" \
--ip-operator=true

.PHONY: provider-lease-ping
provider-lease-ping:
Expand Down
3 changes: 2 additions & 1 deletion bidengine/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ loop:
s.updateOrderManagerGauge()
}

s.session.Log().Info("Waiting on provider attributes service")
s.session.Log().Debug("waiting on provider attributes service")
<-s.pass.lc.Done()
s.session.Log().Info("shutdown complete")
}

func queryExistingOrders(ctx context.Context, session session.Session) ([]mtypes.OrderID, error) {
Expand Down
6 changes: 4 additions & 2 deletions cluster/kube/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
AkashManagedLabelName = "akash.network"
AkashManifestServiceLabelName = "akash.network/manifest-service"
AkashNetworkStorageClasses = "akash.network/storageclasses"
AkashServiceTarget = "akash.network/service-target"
AkashMetalLB = "metal-lb"

akashNetworkNamespace = "akash.network/namespace"

Expand All @@ -29,8 +31,8 @@ const (
AkashLeaseGSeqLabelName = "akash.network/lease.id.gseq"
AkashLeaseOSeqLabelName = "akash.network/lease.id.oseq"
AkashLeaseProviderLabelName = "akash.network/lease.id.provider"

akashDeploymentPolicyName = "akash-deployment-restrictions"
AkashLeaseManifestVersion = "akash.network/manifest.version"
akashDeploymentPolicyName = "akash-deployment-restrictions"
)

const runtimeClassNoneValue = "none"
Expand Down
10 changes: 8 additions & 2 deletions cluster/kube/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@ func cleanupStaleResources(ctx context.Context, kc kubernetes.Interface, lid mty
if err != nil {
return err
}
req2, err := labels.NewRequirement(builder.AkashManagedLabelName, selection.Equals, []string{"true"})
req2, err := labels.NewRequirement(builder.AkashServiceTarget, selection.Equals, []string{"true"})
if err != nil {
return err
}
selector := labels.NewSelector().Add(*req1).Add(*req2).String()

req3, err := labels.NewRequirement(builder.AkashManagedLabelName, selection.NotIn, []string{builder.AkashMetalLB})
if err != nil {
return err
}

selector := labels.NewSelector().Add(*req1).Add(*req2).Add(*req3).String()

// delete stale deployments
if err := kc.AppsV1().Deployments(ns).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{
Expand Down
8 changes: 3 additions & 5 deletions cluster/kube/metallb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
)

const (
akashServiceTarget = "akash.network/service-target"
akashMetalLB = "metal-lb"
metalLbAllowSharedIP = "metallb.universe.tf/allow-shared-ip"
metalLbPoolAnnotation = "metallb.universe.tf/address-pool"

Expand Down Expand Up @@ -275,7 +273,7 @@ func (c *client) GetIPAddressStatusForLease(ctx context.Context, leaseID mtypes.
if err != nil {
return nil, err
}
_, err = fmt.Fprintf(labelSelector, ",%s=%s", akashServiceTarget, akashMetalLB)
_, err = fmt.Fprintf(labelSelector, ",%s=%s", builder.AkashServiceTarget, builder.AkashMetalLB)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -374,7 +372,7 @@ func (c *client) CreateIPPassthrough(ctx context.Context, directive ctypes.Clust
labels := make(map[string]string)
builder.AppendLeaseLabels(directive.LeaseID, labels)
labels[builder.AkashManagedLabelName] = "true"
labels[akashServiceTarget] = akashMetalLB
labels[builder.AkashServiceTarget] = builder.AkashMetalLB

selector := map[string]string{
builder.AkashManagedLabelName: "true",
Expand Down Expand Up @@ -442,7 +440,7 @@ func (c *client) GetIPPassthroughs(ctx context.Context) ([]v1beta2.IPPassthrough
if err != nil {
return nil, err
}
_, err = fmt.Fprintf(labelSelector, ",%s=%s", akashServiceTarget, akashMetalLB)
_, err = fmt.Fprintf(labelSelector, ",%s=%s", builder.AkashServiceTarget, builder.AkashMetalLB)
if err != nil {
return nil, err
}
Expand Down
19 changes: 8 additions & 11 deletions cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"github.com/avast/retry-go"
)

type deploymentState string

const (
dsDeployActive deploymentState = "deploy-active"
dsDeployPending deploymentState = "deploy-pending"
Expand All @@ -39,6 +37,10 @@ const (
dsTeardownComplete deploymentState = "teardown-complete"
)

const uncleanShutdownGracePeriod = 30 * time.Second

type deploymentState string

var (
deploymentCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "provider_deployment",
Expand Down Expand Up @@ -98,10 +100,10 @@ func newDeploymentManager(s *service, lease mtypes.LeaseID, mgroup *manifest.Gro
currentHostnames: make(map[string]struct{}),
}

ctx, _ := TieContextToLifecycle(context.Background(), s.lc)
startCh := make(chan struct{}, 1)

go dm.run(ctx, startCh)
go dm.lc.WatchChannel(s.lc.ShuttingDown())
go dm.run(context.Background(), startCh)

// ensures lease withdraw monitor is started and subscribed to the bus prior
// sending LeaseAddFundsMonitor event
Expand Down Expand Up @@ -254,16 +256,15 @@ loop:
dm.log.Debug("waiting on dm.wg")
dm.wg.Wait()

if dm.state != dsTeardownComplete {
const uncleanShutdownGracePeriod = 30 * time.Second
if dm.state != dsDeployComplete {
dm.log.Info("shutting down unclean, running teardown now")
ctx, cancel := context.WithTimeout(context.Background(), uncleanShutdownGracePeriod)
defer cancel()
teardownErr = dm.doTeardown(ctx)
}

if teardownErr != nil {
dm.log.Error("lease teardwon failed", "err", teardownErr)
dm.log.Error("lease teardown failed", "err", teardownErr)
}

dm.log.Info("shutdown complete")
Expand Down Expand Up @@ -628,10 +629,6 @@ func (dm *deploymentManager) do(fn func() error) <-chan error {
return ch
}

func TieContextToLifecycle(parentCtx context.Context, lc lifecycle.Lifecycle) (context.Context, context.CancelFunc) {
return TieContextToChannel(parentCtx, lc.ShuttingDown())
}

func TieContextToChannel(parentCtx context.Context, donech <-chan struct{}) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(parentCtx)

Expand Down
2 changes: 1 addition & 1 deletion cluster/operatorclients/ip_operator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (ipoc *ipOperatorClient) GetIPAddressUsage(ctx context.Context) (ipoptypes.
if err != nil {
return ipoptypes.IPAddressUsage{}, err
}
ipoc.log.Info("usage result", "status", response.StatusCode)

if response.StatusCode != http.StatusOK {
return ipoptypes.IPAddressUsage{}, extractRemoteError(response)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ loop:
}

<-s.inventory.done()

s.session.Log().Info("shutdown complete")
}

func (s *service) doCheckDeploymentExists(req checkDeploymentExistsRequest) {
Expand Down
1 change: 1 addition & 0 deletions manifest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ loop:
}
}

s.session.Log().Info("shutdown complete")
}

func (s *service) maybeRemoveWatchdog(deploymentID dtypes.DeploymentID) {
Expand Down
14 changes: 7 additions & 7 deletions operator/ipoperator/ip_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ func (op *ipOperator) prepareUsage(pd operatorcommon.PreparedResult) error {

func (op *ipOperator) prepareState(pd operatorcommon.PreparedResult) error {
results := make(map[string][]interface{})
for _, mnagedIPEntry := range op.state {
leaseID := mnagedIPEntry.presentLease
for _, managedIPEntry := range op.state {
leaseID := managedIPEntry.presentLease

result := struct {
LastChangeTime string `json:"last-event-time,omitempty"`
Expand All @@ -405,11 +405,11 @@ func (op *ipOperator) prepareState(pd operatorcommon.PreparedResult) error {
}{
LeaseID: leaseID,
Namespace: clusterutil.LeaseIDToNamespace(leaseID),
Port: mnagedIPEntry.presentPort,
ExternalPort: mnagedIPEntry.presentExternalPort,
ServiceName: mnagedIPEntry.presentServiceName,
SharingKey: mnagedIPEntry.presentSharingKey,
LastChangeTime: mnagedIPEntry.lastChangedAt.UTC().String(),
Port: managedIPEntry.presentPort,
ExternalPort: managedIPEntry.presentExternalPort,
ServiceName: managedIPEntry.presentServiceName,
SharingKey: managedIPEntry.presentSharingKey,
LastChangeTime: managedIPEntry.lastChangedAt.UTC().String(),
}

entryList := results[leaseID.String()]
Expand Down
2 changes: 2 additions & 0 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,6 @@ func (s *service) run() {
<-s.bidengine.Done()
<-s.manifest.Done()
<-s.bc.lc.Done()

s.session.Log().Info("shutdown complete")
}

0 comments on commit 61dece1

Please sign in to comment.