Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add separate timeout flag for writer #4

Merged
merged 1 commit into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions avssync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type AvsSync struct {
fetchQuorumsDynamically bool
retrySyncNTimes int

rpcTimeoutDuration time.Duration
readerTimeoutDuration time.Duration
writerTimeoutDuration time.Duration
}

// NewAvsSync creates a new AvsSync object
Expand All @@ -36,7 +37,8 @@ func NewAvsSync(
avsReader avsregistry.AvsRegistryReader, avsWriter avsregistry.AvsRegistryWriter,
sleepBeforeFirstSyncDuration time.Duration, syncInterval time.Duration, operators []common.Address,
quorums []byte, fetchQuorumsDynamically bool, retrySyncNTimes int,
rpcTimeoutDuration time.Duration,
readerTimeoutDuration time.Duration,
writerTimeoutDuration time.Duration,
) *AvsSync {
return &AvsSync{
logger: logger,
Expand All @@ -48,13 +50,14 @@ func NewAvsSync(
quorums: quorums,
fetchQuorumsDynamically: fetchQuorumsDynamically,
retrySyncNTimes: retrySyncNTimes,
rpcTimeoutDuration: rpcTimeoutDuration,
readerTimeoutDuration: readerTimeoutDuration,
writerTimeoutDuration: writerTimeoutDuration,
}
}

func (a *AvsSync) Start() {
a.logger.Infof("Starting avs sync with sleepBeforeFirstSyncDuration=%s, syncInterval=%s, operators=%v, quorums=%v, fetchQuorumsDynamically=%v, rpcTimeoutDuration=%s",
a.sleepBeforeFirstSyncDuration, a.syncInterval, a.operators, a.quorums, a.fetchQuorumsDynamically, a.rpcTimeoutDuration)
a.logger.Infof("Starting avs sync with sleepBeforeFirstSyncDuration=%s, syncInterval=%s, operators=%v, quorums=%v, fetchQuorumsDynamically=%v, readerTimeoutDuration=%s, writerTimeoutDuration=%s",
a.sleepBeforeFirstSyncDuration, a.syncInterval, a.operators, a.quorums, a.fetchQuorumsDynamically, a.readerTimeoutDuration, a.writerTimeoutDuration)

// run something every syncInterval
ticker := time.NewTicker(a.syncInterval)
Expand Down Expand Up @@ -94,7 +97,7 @@ func (a *AvsSync) updateStakes() error {
return nil
} else {
a.logger.Infof("Updating stakes of operators: %v", a.operators)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.rpcTimeoutDuration)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.writerTimeoutDuration)
defer cancel()
// this one we update all quorums at once, since we're only updating a subset of operators (which should be a small number)
_, err := a.avsWriter.UpdateStakesOfOperatorSubsetForAllQuorums(timeoutCtx, a.operators)
Expand All @@ -110,7 +113,7 @@ func (a *AvsSync) maybeUpdateQuorumSet() {
return
}
a.logger.Info("Fetching quorum set dynamically")
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.rpcTimeoutDuration)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.readerTimeoutDuration)
defer cancel()
quorumCount, err := a.avsReader.GetQuorumCount(&bind.CallOpts{Context: timeoutCtx})
if err != nil {
Expand All @@ -129,7 +132,7 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
for i := 0; i < retryNTimes; i++ {
a.logger.Debug("tryNTimesUpdateStakesOfEntireOperatorSetForQuorum", "quorum", quorum, "retryNTimes", retryNTimes, "try", i+1)

timeoutCtx, cancel := context.WithTimeout(context.Background(), a.rpcTimeoutDuration)
timeoutCtx, cancel := context.WithTimeout(context.Background(), a.readerTimeoutDuration)
defer cancel()
operatorAddrsPerQuorum, err := a.avsReader.GetOperatorAddrsInQuorumsAtCurrentBlock(&bind.CallOpts{Context: timeoutCtx}, []byte{quorum})
if err != nil {
Expand All @@ -142,7 +145,7 @@ func (a *AvsSync) tryNTimesUpdateStakesOfEntireOperatorSetForQuorum(quorum byte,
return operators[i].Big().Cmp(operators[j].Big()) < 0
})
a.logger.Infof("Updating stakes of operators in quorum %d: %v", quorum, operators)
timeoutCtx, cancel = context.WithTimeout(context.Background(), a.rpcTimeoutDuration)
timeoutCtx, cancel = context.WithTimeout(context.Background(), a.writerTimeoutDuration)
defer cancel()
_, err = a.avsWriter.UpdateStakesOfEntireOperatorSetForQuorums(timeoutCtx, [][]common.Address{operators}, []byte{quorum})
if err != nil {
Expand Down
17 changes: 12 additions & 5 deletions flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ var (
Usage: "If set to true, will fetch the list of quorums registered in the contract and update all of them",
EnvVar: envVarPrefix + "FETCH_QUORUMS_DYNAMICALLY",
}
RpcTimeoutDurationFlag = cli.DurationFlag{
Name: "rpc-timeout-duration",
Usage: "Timeout duration for rpc calls in `SECONDS`",
ReaderTimeoutDurationFlag = cli.DurationFlag{
Name: "reader-timeout-duration",
Usage: "Timeout duration for rpc calls to read from chain in `SECONDS`",
Value: 5 * time.Second,
EnvVar: envVarPrefix + "RPC_TIMEOUT_DURATION",
EnvVar: envVarPrefix + "READER_TIMEOUT_DURATION",
}
WriterTimeoutDurationFlag = cli.DurationFlag{
Name: "writer-timeout-duration",
Usage: "Timeout duration for transactions to be confirmed in `SECONDS`",
Value: 90 * time.Second,
EnvVar: envVarPrefix + "WRITER_TIMEOUT_DURATION",
}
retrySyncNTimes = cli.IntFlag{
Name: "retry-sync-n-times",
Expand All @@ -91,7 +97,8 @@ var OptionalFlags = []cli.Flag{
OperatorListFlag,
QuorumListFlag,
FetchQuorumDynamicallyFlag,
RpcTimeoutDurationFlag,
ReaderTimeoutDurationFlag,
WriterTimeoutDurationFlag,
retrySyncNTimes,
}

Expand Down
1 change: 1 addition & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func NewTestAvsSync(anvilHttpEndpoint string, contractAddresses ContractAddresse
false,
1, // 1 retry
5*time.Second,
5*time.Second,
)
return avsSync
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func avsSyncMain(cliCtx *cli.Context) error {
quorums,
cliCtx.Bool(FetchQuorumDynamicallyFlag.Name),
cliCtx.Int(retrySyncNTimes.Name),
cliCtx.Duration(RpcTimeoutDurationFlag.Name),
cliCtx.Duration(ReaderTimeoutDurationFlag.Name),
cliCtx.Duration(WriterTimeoutDurationFlag.Name),
)
avsSync.Start()

Expand Down
Loading