Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: respect context in new signature call #502

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions services/bls_aggregation/blsagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,28 @@ func (a *BlsAggregatorService) ProcessNewSignature(
blsSignature *bls.Signature,
operatorId types.OperatorId,
) error {
respC := make(chan error)
go func() {
respC <- a.processNewSignature(taskIndex, taskResponse, blsSignature, operatorId)
}()

// NOTE: we do this to let the goroutine consume the result of the operation
// but allow the operation to end early if the context is cancelled
select {
case err := <-respC:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (a *BlsAggregatorService) processNewSignature(
taskIndex types.TaskIndex,
taskResponse types.TaskResponse,
blsSignature *bls.Signature,
operatorId types.OperatorId,
) error {
// TODO: move this to a gorouting to avoid sharing state
a.taskChansMutex.Lock()
taskC, taskInitialized := a.signedTaskRespsCs[taskIndex]
a.taskChansMutex.Unlock()
Expand All @@ -297,21 +319,13 @@ func (a *BlsAggregatorService) ProcessNewSignature(
// send the task to the goroutine processing this task
// and return the error (if any) returned by the signature verification routine

select {
// we need to send this as part of select because if the goroutine is processing another SignedTaskResponseDigest
// and cannot receive this one, we want the context to be able to cancel the request
case taskC <- types.SignedTaskResponseDigest{
taskC <- types.SignedTaskResponseDigest{
TaskResponse: taskResponse,
BlsSignature: blsSignature,
OperatorId: operatorId,
SignatureVerificationErrorC: signatureVerificationErrorC,
}:
// note that we need to wait synchronously here for this response because we want to
// send back an informative error message to the operator who sent his signature to the aggregator
return <-signatureVerificationErrorC
case <-ctx.Done():
return ctx.Err()
}
return <-signatureVerificationErrorC
}

func (a *BlsAggregatorService) singleTaskAggregatorGoroutineFunc(
Expand Down