Skip to content

Commit

Permalink
refactor(balance-checker): move lease withdraw into balance checker
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Sep 22, 2022
1 parent 4c40e30 commit a52371b
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 436 deletions.
64 changes: 45 additions & 19 deletions balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,32 @@ func (bc *balanceChecker) doEscrowCheck(ctx context.Context, lid mtypes.LeaseID,
}

func (bc *balanceChecker) startWithdraw(lid mtypes.LeaseID) error {
return bc.bus.Publish(event.LeaseWithdraw{LeaseID: lid})
msg := &mtypes.MsgWithdrawLease{
LeaseID: lid,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errch := make(chan error, 1)

go func(ch chan<- error) {
ch <- bc.session.Client().Tx().Broadcast(ctx, msg)
}(errch)

select {
case <-bc.lc.Done():
// give request extra 30s to finish before force canceling
select {
case <-time.After(30 * time.Second):
cancel()
return bc.lc.Error()
case err := <-errch:
return err
}
case err := <-errch:
return err
}
}

func (bc *balanceChecker) run(startCh chan<- error) {
Expand All @@ -203,15 +228,16 @@ func (bc *balanceChecker) run(startCh chan<- error) {
}()

leaseCheckCh := make(chan leaseCheckResponse, 1)
resultCh := make(chan runner.Result, 1)
var resultch chan runner.Result

subscriber, err := bc.bus.Subscribe()

startCh <- err
if err != nil {
return
}

resultch = make(chan runner.Result, 1)

loop:
for {
select {
Expand All @@ -235,7 +261,7 @@ loop:

bc.leases[ev.LeaseID] = lState

// if there was provider restart with bunch of active leases
// if there was provider restart with a bunch of active leases
// spread their requests across 1min interval
// to reduce pressure on the RPC
if !ev.IsNewLease {
Expand Down Expand Up @@ -266,31 +292,31 @@ loop:
withdraw := false

switch res.state {
case respStateOutOfFunds:
bc.log.Debug("lease is out of funds", "lease", res.lid)
// reschedule funds check. if lease not being topped up then network will close it
fallthrough
case respStateScheduledWithdraw:
withdraw = true
bc.log.Debug("sending withdraw", "lease", res.lid)
// reschedule periodic withdraw if configured
if bc.cfg.WithdrawalPeriod > 0 {
lState.scheduledWithdrawAt = time.Now().Add(bc.cfg.WithdrawalPeriod)
}
fallthrough
case respStateOutOfFunds:
bc.log.Debug("lease is out of fund. sending withdraw", "lease", res.lid)
withdraw = true

fallthrough
case respStateNextCheck:
timerPeriod := res.checkAfter
scheduledWithdraw := false

if res.err != nil {
bc.log.Info("couldn't check lease balance. retrying in 1m", "leaseId", res.lid, "error", res.err.Error())
timerPeriod = time.Minute
} else {
if !lState.scheduledWithdrawAt.IsZero() {
withdrawIn := time.Until(lState.scheduledWithdrawAt)
if timerPeriod >= withdrawIn {
timerPeriod = withdrawIn
scheduledWithdraw = true
}
} else {
bc.log.Debug("lease is out of fund. sending withdraw event", "lease", res.lid)
} else if !withdraw && !lState.scheduledWithdrawAt.IsZero() {
withdrawIn := time.Until(lState.scheduledWithdrawAt)
if timerPeriod >= withdrawIn {
timerPeriod = withdrawIn
scheduledWithdraw = true
}
}

Expand All @@ -301,11 +327,11 @@ loop:
go func() {
select {
case <-bc.ctx.Done():
case resultCh <- runner.NewResult(res.lid, bc.startWithdraw(res.lid)):
case resultch <- runner.NewResult(res.lid, bc.startWithdraw(res.lid)):
}
}()
}
case res := <-resultCh:
case res := <-resultch:
if err := res.Error(); err != nil {
bc.log.Error("failed to do lease withdrawal", "err", err, "LeaseID", res.Value().(mtypes.LeaseID))
}
Expand Down
144 changes: 0 additions & 144 deletions balance_checker_test.go

This file was deleted.

Loading

0 comments on commit a52371b

Please sign in to comment.