-
Notifications
You must be signed in to change notification settings - Fork 1
EOA execution #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis change introduces a robust multilane queue system in the Changes
Sequence Diagram(s)Multilane Job Batch Pop and ProcessingsequenceDiagram
participant Client
participant MultilaneQueue
participant Redis
participant Worker
participant Handler
Client->>MultilaneQueue: push_to_lane(job, lane_id)
MultilaneQueue->>Redis: Store job data and update lane sets
Redis-->>MultilaneQueue: OK
Worker->>MultilaneQueue: pop_batch_jobs(batch_size)
MultilaneQueue->>Redis: Lua script to atomically pop jobs from lanes
Redis-->>MultilaneQueue: List of jobs (by lane)
MultilaneQueue-->>Worker: BorrowedJob(s)
loop For each BorrowedJob
Worker->>Handler: process(job)
Handler-->>Worker: JobResult (Success/Nack/Fail)
Worker->>MultilaneQueue: complete_job(BorrowedJob, JobResult)
MultilaneQueue->>Redis: Atomic update (WATCH/MULTI/EXEC)
Redis-->>MultilaneQueue: OK
end
EOA Executor Worker Transaction LifecyclesequenceDiagram
participant EoaExecutorWorker
participant EoaExecutorStore
participant ChainService
participant EoaSigner
participant Blockchain
EoaExecutorWorker->>EoaExecutorStore: Acquire lock for EOA
EoaExecutorWorker->>ChainService: Get chain for chain_id
EoaExecutorWorker->>EoaExecutorStore: Crash recovery (rebroadcast borrowed txs)
EoaExecutorWorker->>ChainService: Send transaction (if needed)
ChainService->>Blockchain: Broadcast transaction
Blockchain-->>ChainService: Tx hash / error
ChainService-->>EoaExecutorWorker: Result
EoaExecutorWorker->>EoaExecutorStore: Confirm on-chain txs
EoaExecutorWorker->>ChainService: Get on-chain nonce
ChainService-->>EoaExecutorWorker: Nonce
EoaExecutorWorker->>EoaExecutorStore: Update state, mark confirmed/failed
EoaExecutorWorker->>EoaExecutorStore: Send new or recycled txs
EoaExecutorWorker->>EoaSigner: Sign transaction
EoaSigner-->>EoaExecutorWorker: Signature
EoaExecutorWorker->>ChainService: Send signed transaction
ChainService-->>EoaExecutorWorker: Result
EoaExecutorWorker->>EoaExecutorStore: Update transaction state
Possibly related PRs
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Clippy (1.86.0)
error: failed to get Caused by: Caused by: Caused by: Caused by: Warning Review ran into problems🔥 ProblemsCheck-run timed out after 90 seconds. Some checks/pipelines were still in progress when the timeout was reached. Consider increasing the reviews.tools.github-checks.timeout_ms value in your CodeRabbit configuration to allow more time for checks to complete. ✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
🔭 Outside diff range comments (1)
aa-core/src/userop/builder.rs (1)
316-318
: Potential panic viaunwrap_or_default()
unwrap_or_default()
silently masks missingpaymaster_post_op_gas_limit
by inserting0
. If that value is required by the protocol, defaulting to zero could cause on-chain reverts. Consider propagating an error instead.- bundler_response - .paymaster_post_op_gas_limit - .unwrap_or_default(), + bundler_response + .paymaster_post_op_gas_limit + .ok_or(EngineError::MissingBundlerGasLimit)?,Replace with an appropriate domain error.
♻️ Duplicate comments (1)
core/Cargo.toml (1)
13-14
: Same branch-pinning concern asaa-core
See previous comment—use an immutable
rev
for bothvault-*
dependencies to avoid surprise breakages.
🧹 Nitpick comments (6)
aa-core/src/userop/builder.rs (2)
238-246
: Missing error context on gas-price fetch failureIf
paymaster_client().get_user_op_gas_fees()
fails, the propagated error loses context about which entrypoint version (v0.7) failed. Consider adding contextual info:- .map_err(|e| e.to_engine_error(self.chain))?; + .map_err(|e| { + tracing::error!("v0.7 gas-fee fetch failed: {e}"); + e.to_engine_error(self.chain) + })?;This small addition greatly eases troubleshooting in prod.
260-266
: UnnecessarySome()
wrapping
paymaster
andpaymaster_data
are alreadyOption
s—pm_response
returns concrete values. Wrapping again withSome()
is redundant when fields are mutable:- self.userop.paymaster = Some(pm_response.paymaster); - self.userop.paymaster_data = Some(pm_response.paymaster_data); + self.userop.paymaster = pm_response.paymaster; + self.userop.paymaster_data = pm_response.paymaster_data;Assuming the struct definitions use
Option<Bytes>
this simplifies the code path.executors/src/eoa/error_classifier.rs (1)
265-314
: Add more comprehensive test coverage.The current tests cover basic scenarios but miss edge cases and recovery strategy variations.
Consider adding tests for:
- Empty error messages
- Error messages with special characters
- Case sensitivity edge cases
- All error variants
- Recovery strategy validation for each error type
Example additional test:
#[test] fn test_edge_cases() { // Empty message let error = EoaErrorMapper::map_ethereum_error(-32000, ""); match error { EoaExecutionError::RpcError { .. } => {} _ => panic!("Expected RpcError for empty message"), } // Mixed case let error = EoaErrorMapper::map_ethereum_error(-32000, "NoNcE tOo LoW"); match error { EoaExecutionError::NonceTooLow { .. } => {} _ => panic!("Expected NonceTooLow for mixed case"), } } #[test] fn test_all_recovery_strategies() { // Test each error variant has appropriate strategy let test_cases = vec![ (EoaExecutionError::NonceTooLow { message: "test".into() }, true, false), (EoaExecutionError::InsufficientFunds { message: "test".into() }, false, true), // ... other variants ]; for (error, expected_queue_confirmation, expected_recycle) in test_cases { let strategy = EoaErrorMapper::get_recovery_strategy(&error); assert_eq!(strategy.queue_confirmation, expected_queue_confirmation); assert_eq!(strategy.recycle_nonce, expected_recycle); } }executors/src/eoa/nonce_manager.rs (1)
412-436
: Simplify return type for get_nonce_assignments.This method returns a
Vec
but only ever contains zero or one element, which is misleading.Change the return type to
Option<NonceAssignment>
:-pub async fn get_nonce_assignments( +pub async fn get_nonce_assignment( &self, eoa: Address, chain_id: u64, nonce: U256, -) -> Result<Vec<NonceAssignment>, NonceManagerError> { +) -> Result<Option<NonceAssignment>, NonceManagerError> { let mut conn = self.redis.clone(); let assignments_key = self.nonce_assignments_key(eoa, chain_id); let assignment_json: Option<String> = conn.hget(&assignments_key, nonce.to_string()).await?; match assignment_json { Some(json) => { let assignment: NonceAssignment = serde_json::from_str(&json).map_err(|e| { NonceManagerError::NonceAssignmentFailed { reason: format!("Failed to deserialize assignment: {}", e), } })?; - Ok(vec![assignment]) + Ok(Some(assignment)) } - None => Ok(vec![]), + None => Ok(None), } }executors/src/eoa/send.rs (1)
341-341
: Remove unused pre_computed_hash variable.The pre-computed hash is calculated but never used.
-let pre_computed_hash = signed_tx.hash(); - // 9. Send transaction match chain.provider().send_tx_envelope(signed_tx.into()).await {If you need to validate the hash matches, compare it with the returned hash:
let pre_computed_hash = signed_tx.hash(); // ... send transaction ... assert_eq!(pre_computed_hash, tx_hash, "Hash mismatch");twmq/src/multilane.rs (1)
1198-1235
: Add retry limits and backoff to WATCH loop.The infinite retry loop could cause issues under high contention.
Add retry limits and exponential backoff:
// Execute with lease protection (same pattern as single-lane queue) let lease_key = self.lease_key_name(&job.job.id, &job.lease_token); +let mut retry_count = 0; +const MAX_RETRIES: u32 = 10; loop { + if retry_count >= MAX_RETRIES { + return Err(TwmqError::Runtime { + message: format!("Failed to complete job after {} retries", MAX_RETRIES), + }); + } + let mut conn = self.redis.clone(); redis::cmd("WATCH") .arg(&lease_key) .query_async::<()>(&mut conn) .await?; // ... existing logic ... Err(_) => { tracing::debug!(job_id = %job.job.id, "WATCH failed during completion, retrying"); + retry_count += 1; + if retry_count > 1 { + // Exponential backoff: 10ms, 20ms, 40ms, etc. + tokio::time::sleep(Duration::from_millis(10 * (1 << (retry_count - 1)))).await; + } continue; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (19)
.github/workflows/ci-twmq.yaml
(1 hunks).github/workflows/coverage-twmq.yaml
(1 hunks)Cargo.toml
(1 hunks)aa-core/Cargo.toml
(1 hunks)aa-core/src/userop/builder.rs
(1 hunks)core/Cargo.toml
(1 hunks)core/src/signer.rs
(6 hunks)executors/Cargo.toml
(1 hunks)executors/src/eoa/eoa_confirmation_worker.rs
(1 hunks)executors/src/eoa/error_classifier.rs
(1 hunks)executors/src/eoa/mod.rs
(1 hunks)executors/src/eoa/nonce_manager.rs
(1 hunks)executors/src/eoa/send.rs
(1 hunks)executors/src/eoa/transaction_store.rs
(1 hunks)executors/src/lib.rs
(1 hunks)twmq/src/lib.rs
(3 hunks)twmq/src/multilane.rs
(1 hunks)twmq/src/shutdown.rs
(4 hunks)twmq/tests/multilane_batch_pop.rs
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
twmq/src/lib.rs (1)
twmq/src/multilane.rs (1)
work
(443-540)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: test
- GitHub Check: coverage
🔇 Additional comments (19)
.github/workflows/coverage-twmq.yaml (1)
41-43
: Confirmed: install-action@9185c192a96ba09167ad8663015b3fbbf007ec79 matches v2.56.2
Rangit ls-remote https://github.com/taiki-e/install-action.git refs/tags/v2.56.2
and the SHA aligns with the pinned hash..github/workflows/ci-twmq.yaml (1)
41-43
: Consistency win—same action hash as coverage workflowThe CI workflow now uses the identical commit hash as the coverage job, eliminating drift.
No further issues spotted.Cargo.toml (1)
2-10
: Array re-formatting looks goodPurely stylistic change; no functional impact. TOML 1.0 allows the trailing comma, so this remains valid.
executors/Cargo.toml (1)
22-22
: Chrono version 0.4.41 is valid on crates.io — no action neededThe crates.io API shows
chrono
’smax_version
as0.4.41
, so the dependency declaration is correct and can be merged as-is.executors/src/lib.rs (1)
1-4
: LGTM! Clean module organization.The addition of the
eoa
module and reordering of modules improves code organization by grouping related functionality.twmq/src/lib.rs (3)
4-4
: Module addition looks good.The new
multilane
module integrates well with the existing structure.
18-18
: Clean re-export of multilane types.Re-exporting
MultilaneQueue
andMultilanePushableJob
from the crate root follows good API design practices.
452-452
: Return type update aligns with the generalized shutdown handling.The change from
WorkerHandle<H>
toWorkerHandle<Queue<H>>
correctly reflects the new genericWorkerHandle<Q>
design that supports multiple queue types.core/src/signer.rs (3)
2-2
: Import addition is appropriate.The
TypedTransaction
import is required for the newsign_transaction
method signature.
181-187
: Well-designed trait method addition.The new
sign_transaction
method maintains consistency with existing trait methods and properly handles async operations.
309-344
: Solid implementation of transaction signing.The implementation correctly handles both Vault and IAW credential types with appropriate error handling and logging, maintaining consistency with other signing methods.
executors/src/eoa/mod.rs (1)
1-16
: Well-structured module organization for EOA functionality.The module cleanly organizes EOA-related functionality into logical submodules and provides a convenient public API through re-exports. The separation of concerns (confirmation, error classification, nonce management, sending, and storage) follows good architectural principles.
twmq/src/shutdown.rs (3)
5-9
: Good generalization of WorkerHandle.Making
WorkerHandle
generic overQ
instead of being tied toDurableExecution
improves flexibility and enables support for different queue types.
11-26
: Clean abstraction with the QueueIdentifier trait.The trait provides a minimal interface for queue identification, and the implementations correctly map to the appropriate methods (
name()
for Queue,queue_id()
for MultilaneQueue).
88-91
: Flexible generic signatures for worker management methods.The updated method signatures properly support any queue type
Q
, providing good flexibility without unnecessary trait bounds at the method level.Also applies to: 152-156, 159-167, 169-172
executors/src/eoa/transaction_store.rs (1)
46-51
: LGTM!The
queue_job_id
method provides a clean way to generate unique job IDs for transaction attempts.executors/src/eoa/eoa_confirmation_worker.rs (3)
94-127
: LGTM!Clean implementation with proper dependency injection and trait implementations.
137-292
: Well-structured confirmation monitoring logic.The process method implements a comprehensive flow for monitoring transaction confirmations with proper error handling and retry logic. The nonce progression detection and active transaction monitoring approach is sound.
403-403
: Verifyreceipt.status()
behavior in the alloy libraryIt looks like
receipt.status()
is coming from the external alloy crate (no local implementation in our codebase). Please confirm whether this method returns a plainbool
(guaranteed) or anOption<bool>
(which could beNone
for pending receipts). If it can beNone
, consider guarding against it:• File:
executors/src/eoa/eoa_confirmation_worker.rs
Line: ~403- if receipt.status() { + if receipt.status().unwrap_or(false) {This change ensures that any
None
(e.g., pending) status is treated asfalse
, avoiding a potential panic or unintended behavior.
aa-core/Cargo.toml
Outdated
vault-types = { version = "0.1.0", git = "ssh://[email protected]/thirdweb-dev/vault.git", branch = "pb/update-alloy" } | ||
vault-sdk = { version = "0.1.0", git = "ssh://[email protected]/thirdweb-dev/vault.git", branch = "pb/update-alloy" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Pin commit hash instead of using a moving branch
Depending on the pb/update-alloy
branch for vault-*
crates introduces non-determinism—new commits to that branch may break reproducible builds. Prefer a specific commit or tag:
-vault-types = { version = "0.1.0", git = "...vault.git", branch = "pb/update-alloy" }
+vault-types = { git = "...vault.git", rev = "<commit-sha>" }
Same for vault-sdk
.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
vault-types = { version = "0.1.0", git = "ssh://[email protected]/thirdweb-dev/vault.git", branch = "pb/update-alloy" } | |
vault-sdk = { version = "0.1.0", git = "ssh://[email protected]/thirdweb-dev/vault.git", branch = "pb/update-alloy" } | |
vault-types = { git = "ssh://[email protected]/thirdweb-dev/vault.git", rev = "<commit-sha>" } | |
vault-sdk = { git = "ssh://[email protected]/thirdweb-dev/vault.git", rev = "<commit-sha>" } |
🤖 Prompt for AI Agents
In aa-core/Cargo.toml at lines 11 to 12, the dependencies for vault-types and
vault-sdk use a moving branch reference "pb/update-alloy," which causes
non-deterministic builds. Replace the branch specification with a fixed commit
hash or a specific tag for both dependencies to ensure reproducible builds.
Update the git dependency entries to include the exact commit hash instead of
the branch name.
EoaExecutionError::AlreadyKnown { .. } => RecoveryStrategy { | ||
queue_confirmation: true, | ||
recycle_nonce: false, | ||
needs_resync: false, | ||
retryable: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling for JSON operations.
The current implementation silently ignores JSON deserialization errors and could panic on serialization errors.
Apply this diff to properly handle JSON errors:
-match health_json {
- Some(json) => Ok(serde_json::from_str(&json).unwrap_or_default()),
- None => Ok(EoaHealth::default()),
-}
+match health_json {
+ Some(json) => serde_json::from_str(&json)
+ .map_err(|e| NonceManagerError::NonceAssignmentFailed {
+ reason: format!("Failed to deserialize health data: {}", e),
+ }),
+ None => Ok(EoaHealth::default()),
+}
And for serialization:
-let health_json = serde_json::to_string(health).unwrap();
+let health_json = serde_json::to_string(health)
+ .map_err(|e| NonceManagerError::NonceAssignmentFailed {
+ reason: format!("Failed to serialize health data: {}", e),
+ })?;
Also applies to: 177-177
🤖 Prompt for AI Agents
In executors/src/eoa/error_classifier.rs at lines 159 to 163 and line 177, the
current code silently ignores JSON deserialization errors and may panic on
serialization errors. Modify the error handling logic to explicitly catch and
handle JSON deserialization errors by returning appropriate error results or
recovery strategies instead of ignoring them. Similarly, wrap serialization
operations in error handling to prevent panics, returning errors or fallback
values as needed. Ensure all JSON operations have proper error handling to avoid
silent failures or panics.
match code { | ||
-32000 => { | ||
// Only handle the specific ethereum errors we care about | ||
if msg_lower.contains("nonce too low") { | ||
EoaExecutionError::NonceTooLow { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("nonce too high") { | ||
EoaExecutionError::NonceTooHigh { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") { | ||
EoaExecutionError::AlreadyKnown { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") { | ||
EoaExecutionError::ReplacementUnderpriced { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("insufficient funds") { | ||
EoaExecutionError::InsufficientFunds { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("gas") { | ||
EoaExecutionError::GasError { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") { | ||
EoaExecutionError::PoolLimitExceeded { | ||
message: message.to_string(), | ||
} | ||
} else if msg_lower.contains("account") { | ||
EoaExecutionError::AccountError { | ||
message: message.to_string(), | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider more robust error message matching patterns.
The current string matching approach has potential issues:
- Pattern "gas" (line 106) is too generic and could match unrelated errors
- Pattern "account" (line 114) could also be too broad
- No documentation about which RPC implementations these patterns target
Consider using more specific patterns and documenting the expected error formats:
match code {
-32000 => {
// Only handle the specific ethereum errors we care about
+ // Error patterns based on go-ethereum, erigon, and common RPC implementations
if msg_lower.contains("nonce too low") {
EoaExecutionError::NonceTooLow {
message: message.to_string(),
}
// ... other conditions ...
- } else if msg_lower.contains("gas") {
+ } else if msg_lower.contains("gas limit") || msg_lower.contains("gas price")
+ || msg_lower.contains("intrinsic gas") || msg_lower.contains("out of gas") {
EoaExecutionError::GasError {
message: message.to_string(),
}
// ... other conditions ...
- } else if msg_lower.contains("account") {
+ } else if msg_lower.contains("account not found") || msg_lower.contains("unknown account")
+ || msg_lower.contains("invalid account") {
EoaExecutionError::AccountError {
message: message.to_string(),
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
match code { | |
-32000 => { | |
// Only handle the specific ethereum errors we care about | |
if msg_lower.contains("nonce too low") { | |
EoaExecutionError::NonceTooLow { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("nonce too high") { | |
EoaExecutionError::NonceTooHigh { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") { | |
EoaExecutionError::AlreadyKnown { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") { | |
EoaExecutionError::ReplacementUnderpriced { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("insufficient funds") { | |
EoaExecutionError::InsufficientFunds { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("gas") { | |
EoaExecutionError::GasError { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") { | |
EoaExecutionError::PoolLimitExceeded { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("account") { | |
EoaExecutionError::AccountError { | |
message: message.to_string(), | |
} | |
} else { | |
match code { | |
-32000 => { | |
// Only handle the specific ethereum errors we care about | |
// Error patterns based on go-ethereum, erigon, and common RPC implementations | |
if msg_lower.contains("nonce too low") { | |
EoaExecutionError::NonceTooLow { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("nonce too high") { | |
EoaExecutionError::NonceTooHigh { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("already known") || msg_lower.contains("duplicate") { | |
EoaExecutionError::AlreadyKnown { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("replacement") && msg_lower.contains("underpriced") { | |
EoaExecutionError::ReplacementUnderpriced { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("insufficient funds") { | |
EoaExecutionError::InsufficientFunds { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("gas limit") || msg_lower.contains("gas price") | |
|| msg_lower.contains("intrinsic gas") || msg_lower.contains("out of gas") { | |
EoaExecutionError::GasError { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("txpool") || msg_lower.contains("pool limit") { | |
EoaExecutionError::PoolLimitExceeded { | |
message: message.to_string(), | |
} | |
} else if msg_lower.contains("account not found") || msg_lower.contains("unknown account") | |
|| msg_lower.contains("invalid account") { | |
EoaExecutionError::AccountError { | |
message: message.to_string(), | |
} | |
} else { | |
// ... | |
} | |
} |
🤖 Prompt for AI Agents
In executors/src/eoa/error_classifier.rs between lines 83 and 118, the current
error message matching uses overly broad substrings like "gas" and "account"
which can cause false positives. Refine these patterns to be more specific and
less likely to match unrelated errors, for example by including additional
context words or exact phrases. Also, add comments documenting which RPC error
message formats these patterns are intended to match to improve maintainability
and clarity.
impl Drop for MultilaneTestHarness { | ||
fn drop(&mut self) { | ||
// Cleanup in background since we can't await in drop | ||
let queue_id = self.queue_id.clone(); | ||
let redis = self.queue.clone().redis.clone(); | ||
|
||
tokio::spawn(async move { | ||
let mut conn = redis; | ||
let keys_pattern = format!("twmq_multilane:{}:*", queue_id); | ||
let keys: Vec<String> = redis::cmd("KEYS") | ||
.arg(&keys_pattern) | ||
.query_async(&mut conn) | ||
.await | ||
.unwrap_or_default(); | ||
|
||
if !keys.is_empty() { | ||
redis::cmd("DEL") | ||
.arg(keys) | ||
.query_async::<()>(&mut conn) | ||
.await | ||
.unwrap_or_default(); | ||
} | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve cleanup reliability in Drop implementation.
Spawning async cleanup in Drop has issues:
- Tasks might outlive the test runtime
- No guarantee cleanup completes before next test
- Could cause test interference
Consider explicit cleanup in tests or using a test fixture that ensures cleanup:
impl Drop for MultilaneTestHarness {
fn drop(&mut self) {
- // Cleanup in background since we can't await in drop
- let queue_id = self.queue_id.clone();
- let redis = self.queue.clone().redis.clone();
-
- tokio::spawn(async move {
- let mut conn = redis;
- let keys_pattern = format!("twmq_multilane:{}:*", queue_id);
- let keys: Vec<String> = redis::cmd("KEYS")
- .arg(&keys_pattern)
- .query_async(&mut conn)
- .await
- .unwrap_or_default();
-
- if !keys.is_empty() {
- redis::cmd("DEL")
- .arg(keys)
- .query_async::<()>(&mut conn)
- .await
- .unwrap_or_default();
- }
- });
+ // Log warning if cleanup wasn't called
+ if !self.cleaned_up {
+ eprintln!("Warning: MultilaneTestHarness dropped without cleanup for queue: {}", self.queue_id);
+ }
}
}
Then ensure each test calls cleanup:
#[tokio::test]
async fn test_example() {
let harness = MultilaneTestHarness::new().await;
// ... test logic ...
harness.cleanup().await;
}
🤖 Prompt for AI Agents
In twmq/tests/multilane_batch_pop.rs around lines 160 to 184, the Drop
implementation spawns an async cleanup task that may outlive the test runtime
and cause interference. Remove the async cleanup from Drop and instead implement
an explicit async cleanup method on MultilaneTestHarness. Update tests to call
this cleanup method at the end to ensure reliable and complete resource cleanup
before test exit.
while jobs_popped < batch_size and empty_lanes_count < total_lanes do | ||
local lane_id = lanes_with_scores[lane_index * 2 - 1] | ||
|
||
-- Skip if we've already marked this lane as empty | ||
if lane_id == nil then | ||
lane_index = lane_index + 1 | ||
if lane_index > total_lanes then | ||
lane_index = 1 | ||
end | ||
else | ||
local last_score = tonumber(lanes_with_scores[lane_index * 2]) | ||
|
||
-- Only cleanup if not visited this batch (score != now) | ||
if last_score ~= now then | ||
cleanup_lane_leases(lane_id) | ||
process_delayed_jobs(lane_id) | ||
redis.call('ZADD', lanes_zset_name, now, lane_id) | ||
lanes_with_scores[lane_index * 2] = tostring(now) | ||
end | ||
|
||
-- Try to pop a job from this lane | ||
local job_result = pop_job_from_lane(lane_id) | ||
|
||
if job_result then | ||
table.insert(result_jobs, {lane_id, job_result[1], job_result[2], job_result[3], job_result[4], job_result[5], job_result[6]}) | ||
jobs_popped = jobs_popped + 1 | ||
else | ||
-- Lane is empty, mark it and count it | ||
lanes_with_scores[lane_index * 2 - 1] = nil | ||
lanes_with_scores[lane_index * 2] = nil | ||
empty_lanes_count = empty_lanes_count + 1 | ||
|
||
-- Check if lane should be removed from Redis | ||
local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending' | ||
local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed' | ||
local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active' | ||
|
||
local pending_count = redis.call('LLEN', lane_pending_list) | ||
local delayed_count = redis.call('ZCARD', lane_delayed_zset) | ||
local active_count = redis.call('HLEN', lane_active_hash) | ||
|
||
if pending_count == 0 and delayed_count == 0 and active_count == 0 then | ||
redis.call('ZREM', lanes_zset_name, lane_id) | ||
end | ||
end | ||
|
||
-- Move to next lane | ||
lane_index = lane_index + 1 | ||
if lane_index > total_lanes then | ||
lane_index = 1 | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve lane tracking to avoid sparse arrays in Lua script.
Setting array elements to nil
creates sparse arrays which can cause iteration issues in Lua.
Track empty lanes separately instead of modifying the array:
local empty_lanes = {} -- Track empty lanes
-- In the loop, instead of:
-- lanes_with_scores[lane_index * 2 - 1] = nil
-- lanes_with_scores[lane_index * 2] = nil
-- Do:
empty_lanes[lane_id] = true
-- And check:
if empty_lanes[lane_id] then
-- Skip this lane
else
-- Process lane
end
This avoids sparse array issues and makes the logic clearer.
🤖 Prompt for AI Agents
In twmq/src/multilane.rs around lines 690 to 742, the current code sets elements
of the lanes_with_scores array to nil to mark empty lanes, which creates sparse
arrays and can cause iteration problems in Lua. Instead, introduce a separate
table (e.g., empty_lanes) to track empty lanes by their lane_id. Replace the nil
assignments with adding lane_id to empty_lanes, and modify the lane skipping
logic to check if a lane_id is in empty_lanes before processing. This change
avoids sparse arrays and clarifies lane tracking.
pub async fn store_transaction_data( | ||
&self, | ||
transaction_data: &TransactionData, | ||
) -> Result<(), TransactionStoreError> { | ||
let mut conn = self.redis.clone(); | ||
let data_key = self.transaction_data_key(&transaction_data.transaction_id); | ||
let active_key = | ||
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id); | ||
|
||
let data_json = serde_json::to_string(transaction_data)?; | ||
|
||
// Store transaction data | ||
let _: () = conn.set(&data_key, data_json).await?; | ||
|
||
// Add to active transactions set | ||
let _: () = conn | ||
.sadd(&active_key, &transaction_data.transaction_id) | ||
.await?; | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider using Redis transactions for atomic operations.
The current implementation performs multiple Redis commands sequentially, which could lead to race conditions if multiple workers operate on the same transaction simultaneously.
Consider using Redis transactions (MULTI/EXEC) or Lua scripts for atomicity:
pub async fn store_transaction_data(
&self,
transaction_data: &TransactionData,
) -> Result<(), TransactionStoreError> {
- let mut conn = self.redis.clone();
+ let mut conn = self.redis.clone();
let data_key = self.transaction_data_key(&transaction_data.transaction_id);
let active_key =
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id);
let data_json = serde_json::to_string(transaction_data)?;
- // Store transaction data
- let _: () = conn.set(&data_key, data_json).await?;
-
- // Add to active transactions set
- let _: () = conn
- .sadd(&active_key, &transaction_data.transaction_id)
- .await?;
+ // Use pipeline for atomic operations
+ let mut pipe = twmq::redis::pipe();
+ pipe.atomic()
+ .set(&data_key, data_json)
+ .sadd(&active_key, &transaction_data.transaction_id);
+ let _: () = pipe.query_async(&mut conn).await?;
Ok(())
}
Alternatively, expose only the pipeline methods and make the async methods private to enforce atomic usage.
Also applies to: 256-293
🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs around lines 116 to 136, the current
method performs multiple Redis commands sequentially, risking race conditions.
Refactor the code to use Redis transactions by wrapping the set and sadd
commands within a MULTI/EXEC block or use a Lua script to ensure atomicity. This
change will guarantee that both operations succeed or fail together, preventing
inconsistent state. Also, consider making async methods private and exposing
only pipeline methods to enforce atomic usage. Apply similar changes to lines
256-293 as well.
let active_key = | ||
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id); | ||
|
||
let data_json = serde_json::to_string(transaction_data).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace unwrap()
with proper error handling in pipeline methods.
Using unwrap()
on serialization can cause panics if serialization fails. While unlikely with these simple structs, it's better to handle errors gracefully in production code.
Consider propagating the error by making these methods return Result
:
- pub fn add_store_transaction_command(
+ pub fn add_store_transaction_command(
&self,
pipeline: &mut Pipeline,
transaction_data: &TransactionData,
- ) {
+ ) -> Result<(), TransactionStoreError> {
let data_key = self.transaction_data_key(&transaction_data.transaction_id);
let active_key =
self.eoa_active_transactions_key(transaction_data.eoa, transaction_data.chain_id);
- let data_json = serde_json::to_string(transaction_data).unwrap();
+ let data_json = serde_json::to_string(transaction_data)?;
pipeline.set(&data_key, data_json);
pipeline.sadd(&active_key, &transaction_data.transaction_id);
+ Ok(())
}
Apply similar changes to the other pipeline methods.
Also applies to: 385-385, 421-421
🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs at line 368, replace the use of
unwrap() on serde_json::to_string(transaction_data) with proper error handling
by changing the method to return a Result type and propagating the serialization
error using the ? operator. Apply the same pattern to lines 385 and 421,
modifying those methods to return Result and handle errors gracefully instead of
panicking.
/// Mark transaction as failed and clean up | ||
pub async fn mark_transaction_failed( | ||
&self, | ||
transaction_id: &str, | ||
error_message: &str, | ||
) -> Result<(), TransactionStoreError> { | ||
let mut conn = self.redis.clone(); | ||
|
||
// Get transaction data to determine EOA and chain_id | ||
let tx_data = self | ||
.get_transaction_data(transaction_id) | ||
.await? | ||
.ok_or_else(|| TransactionStoreError::TransactionNotFound { | ||
transaction_id: transaction_id.to_string(), | ||
})?; | ||
|
||
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id); | ||
let attempt_key = self.active_attempt_key(transaction_id); | ||
|
||
// Get current attempt to clean up nonce index | ||
if let Some(attempt) = self.get_active_attempt(transaction_id).await? { | ||
let nonce_key = | ||
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce); | ||
let _: () = conn.srem(&nonce_key, transaction_id).await?; | ||
} | ||
|
||
// Remove from active set | ||
let _: () = conn.srem(&active_key, transaction_id).await?; | ||
|
||
// Remove active attempt | ||
let _: () = conn.del(&attempt_key).await?; | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Store the error message when marking transaction as failed.
The error_message
parameter is currently unused. Consider storing it for debugging and audit purposes.
You could store the error in a dedicated key or as part of a failure record:
pub async fn mark_transaction_failed(
&self,
transaction_id: &str,
error_message: &str,
) -> Result<(), TransactionStoreError> {
let mut conn = self.redis.clone();
+ let failure_key = match &self.namespace {
+ Some(ns) => format!("{}:eoa_failure:{}", ns, transaction_id),
+ None => format!("eoa_failure:{}", transaction_id),
+ };
// Get transaction data to determine EOA and chain_id
let tx_data = self
.get_transaction_data(transaction_id)
.await?
.ok_or_else(|| TransactionStoreError::TransactionNotFound {
transaction_id: transaction_id.to_string(),
})?;
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id);
let attempt_key = self.active_attempt_key(transaction_id);
// Get current attempt to clean up nonce index
if let Some(attempt) = self.get_active_attempt(transaction_id).await? {
let nonce_key =
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce);
let _: () = conn.srem(&nonce_key, transaction_id).await?;
}
+ // Store failure reason with timestamp
+ let failure_data = serde_json::json!({
+ "error": error_message,
+ "failed_at": std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_secs(),
+ });
+ let _: () = conn.set(&failure_key, failure_data.to_string()).await?;
// Remove from active set
let _: () = conn.srem(&active_key, transaction_id).await?;
// Remove active attempt
let _: () = conn.del(&attempt_key).await?;
Ok(())
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
/// Mark transaction as failed and clean up | |
pub async fn mark_transaction_failed( | |
&self, | |
transaction_id: &str, | |
error_message: &str, | |
) -> Result<(), TransactionStoreError> { | |
let mut conn = self.redis.clone(); | |
// Get transaction data to determine EOA and chain_id | |
let tx_data = self | |
.get_transaction_data(transaction_id) | |
.await? | |
.ok_or_else(|| TransactionStoreError::TransactionNotFound { | |
transaction_id: transaction_id.to_string(), | |
})?; | |
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id); | |
let attempt_key = self.active_attempt_key(transaction_id); | |
// Get current attempt to clean up nonce index | |
if let Some(attempt) = self.get_active_attempt(transaction_id).await? { | |
let nonce_key = | |
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce); | |
let _: () = conn.srem(&nonce_key, transaction_id).await?; | |
} | |
// Remove from active set | |
let _: () = conn.srem(&active_key, transaction_id).await?; | |
// Remove active attempt | |
let _: () = conn.del(&attempt_key).await?; | |
Ok(()) | |
} | |
/// Mark transaction as failed and clean up | |
pub async fn mark_transaction_failed( | |
&self, | |
transaction_id: &str, | |
error_message: &str, | |
) -> Result<(), TransactionStoreError> { | |
let mut conn = self.redis.clone(); | |
let failure_key = match &self.namespace { | |
Some(ns) => format!("{}:eoa_failure:{}", ns, transaction_id), | |
None => format!("eoa_failure:{}", transaction_id), | |
}; | |
// Get transaction data to determine EOA and chain_id | |
let tx_data = self | |
.get_transaction_data(transaction_id) | |
.await? | |
.ok_or_else(|| TransactionStoreError::TransactionNotFound { | |
transaction_id: transaction_id.to_string(), | |
})?; | |
let active_key = self.eoa_active_transactions_key(tx_data.eoa, tx_data.chain_id); | |
let attempt_key = self.active_attempt_key(transaction_id); | |
// Get current attempt to clean up nonce index | |
if let Some(attempt) = self.get_active_attempt(transaction_id).await? { | |
let nonce_key = | |
self.nonce_to_transactions_key(tx_data.eoa, tx_data.chain_id, attempt.nonce); | |
let _: () = conn.srem(&nonce_key, transaction_id).await?; | |
} | |
// Store failure reason with timestamp | |
let failure_data = serde_json::json!({ | |
"error": error_message, | |
"failed_at": std::time::SystemTime::now() | |
.duration_since(std::time::UNIX_EPOCH) | |
.unwrap() | |
.as_secs(), | |
}); | |
let _: () = conn.set(&failure_key, failure_data.to_string()).await?; | |
// Remove from active set | |
let _: () = conn.srem(&active_key, transaction_id).await?; | |
// Remove active attempt | |
let _: () = conn.del(&attempt_key).await?; | |
Ok(()) | |
} |
🤖 Prompt for AI Agents
In executors/src/eoa/transaction_store.rs around lines 295 to 328, the
error_message parameter in mark_transaction_failed is unused. To fix this, store
the error_message in Redis or your storage system as part of the transaction
failure record, for example by setting a dedicated key like
"transaction:{transaction_id}:error" with the error_message value. This will
help with debugging and auditing failed transactions.
let requeue_job = self | ||
.send_queue | ||
.clone() | ||
.job(EoaSendJobData { | ||
transaction_id: tx_data.transaction_id.clone(), | ||
chain_id: tx_data.chain_id, | ||
from: tx_data.eoa, | ||
to: tx_data.to, | ||
value: tx_data.value, | ||
data: tx_data.data.into(), | ||
webhook_options: None, // TODO: Get from original job if needed | ||
assigned_nonce: None, // Will get new nonce | ||
gas_limit: tx_data.gas_limit, | ||
signing_credential: Default::default(), // TODO: Get from original job | ||
rpc_credentials: job_data.rpc_credentials.clone(), | ||
}) | ||
.with_id(&format!("{}_{}", transaction_id, attempt_number)); | ||
|
||
// Queue the job (this would normally be done in a pipeline in the actual hook) | ||
tracing::info!( | ||
transaction_id = %transaction_id, | ||
queue_job_id = %format!("{}_{}", transaction_id, attempt_number), | ||
"Requeuing transaction after race loss" | ||
); | ||
|
||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Complete the requeue implementation with proper credentials and queueing.
The current implementation has several critical issues that will prevent requeued transactions from working:
- Missing signing credentials (line 560)
- Missing webhook options (line 557)
- Job is created but never actually queued (line 565 comment)
- Direct Redis access breaks encapsulation (lines 542-544)
The signing credentials are essential for transaction execution. You need to either:
- Store credentials with the original transaction data
- Pass them through the confirmation worker job data
- Retrieve them from a secure credential store
Here's a suggested approach:
+ // Store signing credentials reference with transaction data
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub struct TransactionData {
+ pub transaction_id: String,
+ pub eoa: Address,
+ pub chain_id: u64,
+ pub to: Option<Address>,
+ pub value: U256,
+ pub data: Vec<u8>,
+ pub gas_limit: Option<u64>,
+ pub created_at: u64,
+ pub signing_credential_id: String, // Reference to credential
+ pub webhook_options: Option<WebhookOptions>,
+ }
async fn requeue_transaction(
&self,
job_data: &EoaConfirmationWorkerJobData,
transaction_id: &str,
) -> Result<(), EoaConfirmationWorkerError> {
// ... existing code ...
- // Get current attempt number for new queue job ID
- let mut conn = self.transaction_store.redis.clone();
- let counter_key = self.transaction_store.attempt_counter_key(transaction_id);
- let attempt_number: u32 = conn.get(&counter_key).await.unwrap_or(0);
+ // Use transaction store method to get attempt counter
+ let attempt_number = self.transaction_store
+ .get_attempt_counter(transaction_id)
+ .await
+ .unwrap_or(0);
// Create new send job with incremented attempt
let requeue_job = self
.send_queue
.clone()
.job(EoaSendJobData {
transaction_id: tx_data.transaction_id.clone(),
chain_id: tx_data.chain_id,
from: tx_data.eoa,
to: tx_data.to,
value: tx_data.value,
data: tx_data.data.into(),
- webhook_options: None, // TODO: Get from original job if needed
+ webhook_options: tx_data.webhook_options,
assigned_nonce: None, // Will get new nonce
gas_limit: tx_data.gas_limit,
- signing_credential: Default::default(), // TODO: Get from original job
+ signing_credential: CredentialReference::from_id(tx_data.signing_credential_id),
rpc_credentials: job_data.rpc_credentials.clone(),
})
.with_id(&format!("{}_{}", transaction_id, attempt_number));
- // Queue the job (this would normally be done in a pipeline in the actual hook)
+ // Actually queue the job
+ requeue_job.push().await?;
tracing::info!(
transaction_id = %transaction_id,
queue_job_id = %format!("{}_{}", transaction_id, attempt_number),
"Requeuing transaction after race loss"
);
Ok(())
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In executors/src/eoa/eoa_confirmation_worker.rs around lines 547 to 572, the
requeue job creation is incomplete: signing credentials and webhook options are
missing, and the job is never actually queued. Fix this by retrieving and
including the original job's signing credentials and webhook options when
constructing EoaSendJobData, then enqueue the job properly using the
send_queue's enqueue or similar method instead of direct Redis access to
maintain encapsulation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (3)
executors/src/eoa/worker.rs (2)
806-819
: Missing implementation for no-op transaction sending.The
send_noop_transaction
method is not implemented, which could block the transaction queue when recycled nonces need to be consumed but no pending transactions exist.Would you like me to implement the no-op transaction logic or create an issue to track this TODO? A no-op transaction typically sends 0 ETH to the sender's own address with minimal gas.
511-514
: Hard-coded delays should be configurable.The 50ms delays between consecutive transactions are hard-coded. Different RPC providers have different rate limits, so this should be configurable.
Consider making the delay configurable:
pub struct EoaExecutorWorker<CS> where CS: ChainService + Send + Sync + 'static, { pub chain_service: Arc<CS>, pub store: Arc<EoaExecutorStore>, pub eoa_signer: Arc<EoaSigner>, pub max_inflight: u64, pub max_recycled_nonces: u64, + pub nonce_send_delay_ms: u64, // Default: 50 }
Then use it in the delay:
-sleep(Duration::from_millis(50)).await; +sleep(Duration::from_millis(self.nonce_send_delay_ms)).await;Also applies to: 628-630
executors/src/eoa/store.rs (1)
449-453
: Unnecessary cloning of keys for closures.The code clones multiple keys for use in the closures, which is inefficient and reduces readability.
Since the helper method already handles the closure execution, you can capture the keys by reference:
-let recyled_key_for_validation = recycled_key.clone(); -let pending_key_for_validation = pending_key.clone(); -let transaction_id_for_validation = transaction_id.clone(); -let borrowed_key_for_validation = borrowed_key.clone(); self.execute_with_watch_and_retry( eoa, chain_id, worker_id, &[recycled_key.clone(), pending_key.clone()], "pending->borrowed with recycled nonce", - async move |conn: &mut ConnectionManager| { + async |conn: &mut ConnectionManager| { // Validation phase - check preconditions - let nonce_score: Option<f64> = conn.zscore(recycled_key.clone(), nonce).await?; + let nonce_score: Option<f64> = conn.zscore(&recycled_key, nonce).await?; // ... rest of validation }, - |pipeline: &mut Pipeline| { - pipeline.zrem(recyled_key_for_validation, nonce); + move |pipeline: &mut Pipeline| { + pipeline.zrem(&recycled_key, nonce); // ... rest of pipeline }, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (5)
executors/Cargo.toml
(1 hunks)executors/src/eoa/mod.rs
(1 hunks)executors/src/eoa/store.rs
(1 hunks)executors/src/eoa/worker.rs
(1 hunks)twmq/src/job.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- executors/Cargo.toml
- executors/src/eoa/mod.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
executors/src/eoa/worker.rs (3)
executors/src/eoa/store.rs (5)
eoa
(1766-1768)chain_id
(1771-1773)build
(1470-1495)nonce
(231-236)new
(95-97)twmq/src/job.rs (3)
data
(168-170)new
(159-161)new
(200-206)executors/src/eoa/error_classifier.rs (1)
message
(224-236)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: test
- GitHub Check: coverage
🔇 Additional comments (4)
twmq/src/job.rs (1)
59-74
: Good trait generalization for error type flexibility!The addition of the
ErrorType: Into<E>
bound allows the trait to work with any error type that can be converted into the target error typeE
. This is a backward-compatible change that improves the trait's flexibility without breaking existing usage.executors/src/eoa/worker.rs (1)
922-950
: Well-structured gas estimation with proper error handling.The gas estimation logic correctly:
- Adds a 10% buffer to prevent out-of-gas errors
- Distinguishes between transaction reverts (which should fail the transaction) and RPC errors (which should retry the worker)
- Provides detailed error messages including revert data
executors/src/eoa/store.rs (2)
363-364
: Well-implemented exponential backoff for retry logic.The retry mechanism with exponential backoff (capped at 640ms) and 10 max retries provides good balance between responsiveness and avoiding overwhelming Redis during high contention.
Also applies to: 1441-1442
1458-1495
: Excellent scoped store pattern for API ergonomics and safety.The
ScopedEoaExecutorStore
wrapper:
- Validates lock ownership on construction, preventing unauthorized operations
- Significantly reduces boilerplate by binding common parameters
- Provides a cleaner, more ergonomic API
This is a great pattern for ensuring safety while improving developer experience.
executors/src/eoa/worker.rs
Outdated
fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult { | ||
let error_str = error.to_string().to_lowercase(); | ||
|
||
// Transaction possibly made it to mempool | ||
if error_str.contains("nonce too low") | ||
|| error_str.contains("already known") | ||
|| error_str.contains("replacement transaction underpriced") | ||
{ | ||
return SendResult::PossiblySent; | ||
} | ||
|
||
// Clear failures that didn't consume nonce | ||
if error_str.contains("invalid signature") | ||
|| error_str.contains("malformed") | ||
|| error_str.contains("insufficient funds") | ||
|| error_str.contains("gas limit") | ||
|| error_str.contains("intrinsic gas too low") | ||
{ | ||
return SendResult::DeterministicFailure; | ||
} | ||
|
||
// Default: assume possibly sent for safety | ||
SendResult::PossiblySent | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error classification relies on fragile string matching.
The classify_send_error
function uses string matching on error messages, which is unreliable because:
- Error messages can vary between RPC providers
- Message formats may change over time
- Localized error messages won't match
This could lead to misclassification and incorrect transaction handling (e.g., treating a sent transaction as failed or vice versa).
Consider using structured error types or error codes from the RPC response instead:
-fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult {
- let error_str = error.to_string().to_lowercase();
-
- // Transaction possibly made it to mempool
- if error_str.contains("nonce too low")
- || error_str.contains("already known")
- || error_str.contains("replacement transaction underpriced")
- {
- return SendResult::PossiblySent;
- }
- // ... rest of the function
+fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult {
+ // Use the error classifier from error_classifier.rs which handles structured errors
+ use crate::eoa::error_classifier::EoaErrorMapper;
+
+ match EoaErrorMapper::classify_error(error) {
+ Some(EoaExecutionError::NonceTooLow { .. })
+ | Some(EoaExecutionError::AlreadyKnown { .. })
+ | Some(EoaExecutionError::ReplacementUnderpriced { .. }) => SendResult::PossiblySent,
+
+ Some(EoaExecutionError::InsufficientFunds { .. })
+ | Some(EoaExecutionError::GasError { .. }) => SendResult::DeterministicFailure,
+
+ _ => SendResult::PossiblySent // Default to safe assumption
+ }
+}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In executors/src/eoa/worker.rs between lines 102 and 125, the
classify_send_error function currently relies on fragile string matching of
error messages, which is unreliable due to variations in RPC provider messages
and localization. To fix this, refactor the function to parse and use structured
error types or standardized error codes from the RPC response instead of string
matching. This may involve updating the RpcError handling to extract error codes
or enums and using those for classification to ensure more robust and accurate
error handling.
executors/src/eoa/worker.rs
Outdated
self.store | ||
.acquire_eoa_lock_aggressively(data.eoa_address, data.chain_id, &data.worker_id) | ||
.await | ||
.map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Potential race condition with aggressive lock acquisition.
The acquire_eoa_lock_aggressively
method forcefully takes over locks without coordination. If multiple workers simultaneously attempt to take over the same EOA lock, it could lead to race conditions where workers overwrite each other's lock ownership.
Consider implementing a more coordinated lock acquisition strategy:
- Add a grace period before forceful takeover
- Use atomic compare-and-swap operations
- Implement lock heartbeats to detect truly stalled workers
🏁 Script executed:
#!/bin/bash
# Check if there are any lock coordination mechanisms in the store implementation
ast-grep --pattern 'fn acquire_eoa_lock_aggressively($$$) {
$$$
}'
Length of output: 70
🏁 Script executed:
#!/bin/bash
# Locate both uses and the definition of acquire_eoa_lock_aggressively
rg --color=never -n -C 5 'acquire_eoa_lock_aggressively' || echo "No matches found"
Length of output: 1544
🏁 Script executed:
#!/bin/bash
# Display the implementation of acquire_eoa_lock_aggressively to check for coordination logic
sed -n '260,360p' executors/src/eoa/store.rs
Length of output: 3713
Safer EOA Lock Acquisition Needed
We’ve verified that in executors/src/eoa/store.rs
(lines 269–287) the acquire_eoa_lock_aggressively
method does:
SETNX
to grab the lock- Fallback to plain
SET
(no expiry) to force takeover
This leaves no TTL or coordination, so concurrent callers can override each other and stale locks never expire.
Consider strengthening the takeover logic by:
- Adding an expiry on initial lock (e.g. Redis
SET key value NX PX <ttl>
) - Renewing the TTL via heartbeats instead of unconditional force-set
- Employing an atomic compare-and-swap or Redis
WATCH
/MULTI
pipeline (or Redlock) - Introducing a grace/backoff period before force takeover
Pinpoint locations:
- executors/src/eoa/store.rs:269–287 (
acquire_eoa_lock_aggressively
) - executors/src/eoa/worker.rs:172–175 (usage site)
🤖 Prompt for AI Agents
In executors/src/eoa/worker.rs around lines 172 to 175, the current usage of
acquire_eoa_lock_aggressively calls a method that uses SETNX and a fallback
plain SET without expiry, risking stale locks and unsafe concurrent overrides.
To fix this, update the acquire_eoa_lock_aggressively method in
executors/src/eoa/store.rs (lines 269–287) to set the lock with an expiry (e.g.,
using Redis SET with NX and PX options), implement a heartbeat mechanism to
renew the TTL instead of force-setting the lock, and use atomic operations like
compare-and-swap or Redis WATCH/MULTI to safely coordinate lock takeover,
possibly adding a grace or backoff period before forcing a takeover. Adjust the
worker.rs usage if needed to handle the updated locking behavior properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you considered that every redis state mutation runs with a watch on this key to ensure that we have the lock, and we exit if we lose the lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
You're absolutely right! Let me examine the store implementation to see this coordination mechanism.
@d4mr Thank you for that clarification! You're absolutely correct. Looking at the store implementation, I can see that there's a robust lock coordination mechanism in place:
- WATCH-based transactions: Every Redis state mutation uses
execute_with_watch_and_retry
which employs Redis WATCH/MULTI/EXEC operations - Lock validation: The
with_lock_check
method validates lock ownership before every operation - Automatic retry: If the lock is lost during a transaction (detected via WATCH failure), the operation is retried with exponential backoff
- Graceful exit: If lock ownership can't be validated, the worker exits cleanly
This design ensures that even if multiple workers attempt aggressive lock acquisition, only one can successfully perform state mutations at a time. The WATCH mechanism provides the coordination I was concerned about.
🏁 Script executed:
#!/bin/bash
# Search for WATCH operations in the store implementation
rg -A 10 -B 2 "WATCH" executors/src/eoa/store.rs
# Also look for lock validation patterns
rg -A 5 -B 2 "with_lock_check\|execute_with_watch_and_retry" executors/src/eoa/store.rs
Length of output: 11719
@d4mr You're absolutely correct! The store implementation shows extensive WATCH-based coordination:
Lock Protection Mechanisms Found:
-
Universal WATCH operations: Every state mutation watches the lock key:
let _: () = twmq::redis::cmd("WATCH") .arg(&lock_key) .query_async(&mut conn).await?;
-
Lock ownership validation: Before every operation:
let current_owner: Option<String> = conn.get(&lock_key).await?; if current_owner.as_deref() != Some(worker_id) { return Err(TransactionStoreError::LockLost { ... }); }
-
WATCH failure handling: When transactions fail due to WATCH, the code distinguishes between lock loss vs. state changes:
let still_own_lock: Option<String> = conn.get(&lock_key).await?; if still_own_lock.as_deref() != Some(worker_id) { return Err(TransactionStoreError::LockLost { ... }); } // Otherwise retry
-
Multiple coordination helpers:
execute_with_watch_and_retry
- centralized WATCH/retry logicwith_atomic_operation
- generic atomic operationswith_lock_check
- lock validation wrapper
This design ensures that even with aggressive lock acquisition, only the lock holder can successfully mutate state. The WATCH mechanism provides perfect coordination - my initial concern about race conditions was unfounded given this robust protection layer.
✏️ Learnings added
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: In the EOA executor system, aggressive lock acquisition is safe because every Redis state mutation uses WATCH operations on the lock key. If the lock is lost during a transaction, the WATCH causes the transaction to fail and the worker exits gracefully. This provides coordination between workers even when using forceful lock takeover.
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: The EOA executor store uses comprehensive WATCH-based coordination where every Redis state mutation watches the lock key and validates ownership before proceeding. If the lock is lost during any operation, the transaction fails with LockLost error. This makes aggressive lock acquisition safe because only the actual lock holder can successfully perform state mutations, regardless of who claims the lock.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
executors/src/eoa/worker.rs (2)
103-126
: Replace fragile string matching with structured error classification.The current implementation still uses string matching on error messages, which remains fragile as noted in previous reviews. Error messages can vary between RPC providers, change over time, and may be localized.
Use the error classifier from the eoa module instead:
+use crate::eoa::error_classifier::EoaErrorMapper; fn classify_send_error(error: &RpcError<TransportErrorKind>) -> SendResult { - let error_str = error.to_string().to_lowercase(); - - // Transaction possibly made it to mempool - if error_str.contains("nonce too low") - || error_str.contains("already known") - || error_str.contains("replacement transaction underpriced") - { - return SendResult::PossiblySent; - } - - // Clear failures that didn't consume nonce - if error_str.contains("invalid signature") - || error_str.contains("malformed") - || error_str.contains("insufficient funds") - || error_str.contains("gas limit") - || error_str.contains("intrinsic gas too low") - { - return SendResult::DeterministicFailure; - } - - // Default: assume possibly sent for safety - SendResult::PossiblySent + match EoaErrorMapper::classify_error(error) { + Some(err) if err.is_possibly_sent() => SendResult::PossiblySent, + Some(err) if err.is_deterministic_failure() => SendResult::DeterministicFailure, + _ => SendResult::PossiblySent // Default to safe assumption + } }
172-176
: Implement safer lock acquisition with coordination.The aggressive lock acquisition remains problematic as noted in previous reviews. Multiple workers can simultaneously force takeover, leading to race conditions and lost work.
Consider implementing these improvements in the store's
acquire_eoa_lock_aggressively
method:
- Add TTL to locks (e.g.,
SET key value NX PX <ttl>
)- Implement heartbeat mechanism for lock renewal
- Use atomic compare-and-swap or WATCH/MULTI for safe takeover
- Add grace period before forcing takeover
// 1. ACQUIRE LOCK AGGRESSIVELY tracing::info!("Acquiring EOA lock aggressively"); -self.store - .acquire_eoa_lock_aggressively(data.eoa_address, data.chain_id, &data.worker_id) - .await - .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?; +match self.store + .try_acquire_eoa_lock_with_timeout( + data.eoa_address, + data.chain_id, + &data.worker_id, + Duration::from_secs(300) // 5 minute TTL + ) + .await { + Ok(()) => {}, + Err(TransactionStoreError::LockContended { .. }) => { + // Wait and retry with backoff + return Err(EoaExecutorWorkerError::WorkRemaining { + message: "Lock contended, will retry".to_string() + }).map_err_nack(Some(Duration::from_secs(30)), RequeuePosition::Last); + } + Err(e) => return Err(e).map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last), +}
🧹 Nitpick comments (4)
executors/src/eoa/store.rs (2)
539-632
: Consider extracting lock validation logic to improve readability.The helper method is well-designed and effectively centralizes atomic operation logic. However, it's quite complex. Consider extracting the lock validation logic into a separate method to improve readability.
+ async fn validate_lock_ownership( + &self, + conn: &mut ConnectionManager, + lock_key: &str, + worker_id: &str, + eoa: Address, + chain_id: u64, + ) -> Result<(), TransactionStoreError> { + let current_owner: Option<String> = conn.get(lock_key).await?; + if current_owner.as_deref() != Some(worker_id) { + return Err(TransactionStoreError::LockLost { + eoa, + chain_id, + worker_id: worker_id.to_string(), + }); + } + Ok(()) + } async fn execute_with_watch_and_retry( &self, eoa: Address, chain_id: u64, worker_id: &str, safe_tx: &impl SafeRedisTransaction, ) -> Result<(), TransactionStoreError> { // ... existing code ... - // Check lock ownership - let current_owner: Option<String> = conn.get(&lock_key).await?; - if current_owner.as_deref() != Some(worker_id) { - let _: () = twmq::redis::cmd("UNWATCH").query_async(&mut conn).await?; - return Err(TransactionStoreError::LockLost { - eoa, - chain_id, - worker_id: worker_id.to_string(), - }); - } + // Check lock ownership + if let Err(e) = self.validate_lock_ownership(&mut conn, &lock_key, worker_id, eoa, chain_id).await { + let _: () = twmq::redis::cmd("UNWATCH").query_async(&mut conn).await?; + return Err(e); + }
1002-1102
: Consider extracting hash validation logic for better maintainability.This method is necessarily complex due to the need to pass hash data between validation and pipeline phases. While the current implementation is correct, consider extracting the hash validation logic to improve readability.
+ async fn find_transaction_hashes( + &self, + conn: &mut ConnectionManager, + submitted_key: &str, + transaction_id: &str, + ) -> Result<Vec<String>, TransactionStoreError> { + let all_hashes: Vec<String> = conn.zrange(submitted_key, 0, -1).await?; + let mut transaction_hashes = Vec::new(); + + for hash in all_hashes { + if let Some(tx_id) = self.get_transaction_id_for_hash(&hash).await? { + if tx_id == transaction_id { + transaction_hashes.push(hash); + } + } + } + + if transaction_hashes.is_empty() { + return Err(TransactionStoreError::TransactionNotInSubmittedState { + transaction_id: transaction_id.to_string(), + }); + } + + Ok(transaction_hashes) + }executors/src/eoa/worker.rs (2)
591-645
: Good parallel-to-sequential pattern with comprehensive error handling.The implementation correctly handles the complex flow of building transactions in parallel and then moving them to borrowed state sequentially to maintain nonce ordering. The error handling for nonce conflicts and missing transactions is thorough.
Consider extracting the error handling logic into a helper method to reduce nesting:
+ fn handle_move_to_borrowed_error( + &self, + error: TransactionStoreError, + transaction_id: &str, + ) -> Result<bool, ()> { + match error { + TransactionStoreError::OptimisticNonceChanged { .. } => { + tracing::debug!("Nonce changed for transaction {}, skipping", transaction_id); + Ok(false) // Stop processing + } + TransactionStoreError::TransactionNotInPendingQueue { .. } => { + tracing::debug!("Transaction {} already processed, skipping", transaction_id); + Err(()) // Continue with next + } + e => { + tracing::error!("Failed to move transaction {} to borrowed: {}", transaction_id, e); + Err(()) // Continue with next + } + } + }
828-842
: Complete no-op transaction implementation for recycled nonces.The missing no-op transaction implementation could cause recycled nonces to remain unconsumed, potentially leading to nonce gaps or inefficient recycling.
This missing functionality affects the recycling system. The implementation needs to:
- Create a minimal transaction (e.g., self-transfer of 0 value)
- Handle signing credentials appropriately
- Implement atomic nonce consumption from recycled set
Would you like me to generate a complete implementation for this method?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executors/src/eoa/store.rs
(1 hunks)executors/src/eoa/worker.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: coverage
- GitHub Check: test
🔇 Additional comments (3)
executors/src/eoa/store.rs (2)
14-22
: Well-designed trait for atomic Redis operations.The
SafeRedisTransaction
trait provides an excellent abstraction for atomic Redis operations, properly separating validation, operation building, and watch key management. The async trait syntax is correct for stable Rust.
1565-1602
: Excellent ergonomic wrapper with proper validation.The
ScopedEoaExecutorStore
is a well-designed wrapper that eliminates repetitive parameter passing while ensuring lock ownership validation. The builder pattern with validation is a good approach.executors/src/eoa/worker.rs (1)
286-373
: Well-implemented crash recovery with proper stale transaction handling.The crash recovery logic is comprehensive and handles edge cases well:
- Proper sorting by nonce for ordering
- Detection and logging of stale transactions
- Appropriate error classification for recovery decisions
- Good error handling and logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
thirdweb-core/Cargo.toml (1)
7-13
: Add missing alloy features to root workspace declarationThe root
Cargo.toml
currently has:[workspace.dependencies] alloy = { version = "1.0.8" }
but does not enable the required features from
thirdweb-core/Cargo.toml
. Without them,json-abi
,consensus
,dyn-abi
,eips
, andeip712
won’t be activated, leading to potential compile-time errors.• File:
Cargo.toml
(workspace root)
Under[workspace.dependencies]
, update thealloy
entry to include the same feature set:[workspace.dependencies] - alloy = { version = "1.0.8" } + alloy = { + version = "1.0.8", + features = [ + "json-abi", + "consensus", + "dyn-abi", + "eips", + "eip712", + ], + }
♻️ Duplicate comments (1)
executors/src/eoa/worker.rs (1)
148-192
: Error classification relies on fragile string matching.The
classify_send_error
function uses string matching on error messages, which is unreliable because error messages can vary between RPC providers, may change over time, and localized messages won't match. This could lead to misclassification and incorrect transaction handling.Consider using the structured error classifier from
error_classifier.rs
module mentioned in the AI summary instead of string matching.
🧹 Nitpick comments (3)
thirdweb-core/Cargo.toml (1)
7-13
: Optional: move feature flags to the workspace rootSince all crates depend on the same
alloy
version, consider consolidating the feature list into the workspace-level declaration. This avoids repetitive feature arrays in every crate and guarantees a single, unambiguous feature set.executors/src/eoa/worker.rs (2)
1785-1821
: Consider using checked arithmetic for gas bumping calculations.The gas bumping calculations use unchecked arithmetic operations which could theoretically overflow for very large gas values.
Consider using checked arithmetic operations to ensure safety:
-tx.max_fee_per_gas = tx.max_fee_per_gas * bump_multiplier as u128 / 100; +tx.max_fee_per_gas = tx.max_fee_per_gas + .checked_mul(bump_multiplier as u128) + .and_then(|v| v.checked_div(100)) + .unwrap_or(tx.max_fee_per_gas);
789-1016
: Consider breaking down long functions for better maintainability.The
process_recycled_nonces
function is 227 lines long and handles multiple responsibilities:
- Collecting nonce-transaction pairs
- Building and signing transactions
- Moving to borrowed state
- Sending transactions
- Processing results
Consider extracting these steps into separate helper methods for better readability and testability.
For example:
collect_nonce_tx_pairs
build_and_sign_transactions
send_prepared_transactions
process_send_results
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.toml
(1 hunks)aa-core/Cargo.toml
(1 hunks)core/Cargo.toml
(1 hunks)executors/src/eoa/store.rs
(1 hunks)executors/src/eoa/worker.rs
(1 hunks)server/Cargo.toml
(1 hunks)thirdweb-core/Cargo.toml
(1 hunks)twmq/src/lib.rs
(20 hunks)
✅ Files skipped from review due to trivial changes (1)
- server/Cargo.toml
🚧 Files skipped from review as they are similar to previous changes (5)
- core/Cargo.toml
- Cargo.toml
- aa-core/Cargo.toml
- twmq/src/lib.rs
- executors/src/eoa/store.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: The EOA executor store uses comprehensive WATCH-based coordination where every Redis state mutation watches the lock key and validates ownership before proceeding. If the lock is lost during any operation, the transaction fails with LockLost error. This makes aggressive lock acquisition safe because only the actual lock holder can successfully perform state mutations, regardless of who claims the lock.
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: In the EOA executor system, aggressive lock acquisition is safe because every Redis state mutation uses WATCH operations on the lock key. If the lock is lost during a transaction, the WATCH causes the transaction to fail and the worker exits gracefully. This provides coordination between workers even when using forceful lock takeover.
executors/src/eoa/worker.rs (2)
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: In the EOA executor system, aggressive lock acquisition is safe because every Redis state mutation uses WATCH operations on the lock key. If the lock is lost during a transaction, the WATCH causes the transaction to fail and the worker exits gracefully. This provides coordination between workers even when using forceful lock takeover.
Learnt from: d4mr
PR: thirdweb-dev/engine-core#5
File: executors/src/eoa/worker.rs:173-176
Timestamp: 2025-07-06T15:44:13.681Z
Learning: The EOA executor store uses comprehensive WATCH-based coordination where every Redis state mutation watches the lock key and validates ownership before proceeding. If the lock is lost during any operation, the transaction fails with LockLost error. This makes aggressive lock acquisition safe because only the actual lock holder can successfully perform state mutations, regardless of who claims the lock.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: test
- GitHub Check: coverage
🔇 Additional comments (3)
executors/src/eoa/worker.rs (3)
597-615
: Good implementation of stalled nonce detection and gas bumping.The logic properly detects when a nonce has been stalled for more than 5 minutes and attempts a gas bump. The error handling ensures that gas bump failures don't crash the worker, which is appropriate.
314-321
: Excellent lock management implementation.The worker properly acquires locks aggressively via
ScopedEoaExecutorStore::build
and ensures they are released in all exit paths (success, nack, fail). As confirmed by the learnings, this is safe due to the comprehensive WATCH-based coordination in the store implementation.The
release_eoa_lock
method also includes proper error logging without failing the workflow, which is a good practice.Also applies to: 332-375, 443-454
261-286
: Well-architected EOA executor implementation.The worker demonstrates excellent design with:
- Clear separation of concerns between crash recovery, confirmation, and send flows
- Robust error handling with proper classification and recovery strategies
- Efficient use of parallel operations for RPC calls while maintaining sequential Redis operations
- Comprehensive transaction lifecycle management with atomic state transitions
- Proper integration with the distributed queue system
The aggressive lock acquisition pattern combined with WATCH-based coordination provides a solid foundation for distributed execution.
Also applies to: 378-441
async fn send_noop_transaction( | ||
&self, | ||
scoped: &ScopedEoaExecutorStore<'_>, | ||
chain: &impl Chain, | ||
nonce: u64, | ||
) -> Result<bool, EoaExecutorWorkerError> { | ||
// Create a minimal transaction to consume the recycled nonce | ||
// Send 0 ETH to self with minimal gas | ||
let eoa = scoped.eoa(); | ||
|
||
// Build no-op transaction (send 0 to self) | ||
let mut tx_request = AlloyTransactionRequest::default() | ||
.with_from(eoa) | ||
.with_to(eoa) // Send to self | ||
.with_value(U256::ZERO) // Send 0 value | ||
.with_input(Bytes::new()) // No data | ||
.with_chain_id(scoped.chain_id()) | ||
.with_nonce(nonce) | ||
.with_gas_limit(21000); // Minimal gas for basic transfer | ||
|
||
// Estimate gas to ensure the transaction is valid | ||
match chain.provider().estimate_gas(tx_request.clone()).await { | ||
Ok(gas_limit) => { | ||
tx_request = tx_request.with_gas_limit(gas_limit); | ||
} | ||
Err(e) => { | ||
tracing::warn!( | ||
nonce = nonce, | ||
error = %e, | ||
"Failed to estimate gas for no-op transaction" | ||
); | ||
return Ok(false); | ||
} | ||
} | ||
|
||
// Build typed transaction | ||
let typed_tx = match tx_request.build_typed_tx() { | ||
Ok(tx) => tx, | ||
Err(e) => { | ||
tracing::warn!( | ||
nonce = nonce, | ||
error = ?e, | ||
"Failed to build typed transaction for no-op" | ||
); | ||
return Ok(false); | ||
} | ||
}; | ||
|
||
// Get signing credential from health or use default approach | ||
// For no-op transactions, we need to find a valid signing credential | ||
// This is a limitation of the current design - no-op transactions | ||
// need access to signing credentials which are transaction-specific | ||
tracing::warn!( | ||
nonce = nonce, | ||
"No-op transaction requires signing credential access - recycled nonce will remain unconsumed" | ||
); | ||
Ok(false) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete no-op transaction implementation.
The send_noop_transaction
method is incomplete and always returns false
, logging that it needs signing credential access. This means recycled nonces cannot be consumed when there are no pending transactions, potentially causing nonce management issues.
The comment at lines 1311-1318 acknowledges this limitation but doesn't provide a solution.
Would you like me to suggest an implementation that handles signing credentials for no-op transactions, perhaps by storing a default signing credential in the worker configuration?
🤖 Prompt for AI Agents
In executors/src/eoa/worker.rs between lines 1263 and 1320, the
send_noop_transaction function is incomplete and always returns false because it
lacks signing credential handling, preventing consumption of recycled nonces. To
fix this, implement logic to retrieve or store a default signing credential in
the worker configuration and use it to sign the no-op transaction before sending
it. This will allow the transaction to be properly signed and submitted,
enabling nonce consumption even when no other transactions are pending.
// no nonce progress | ||
if current_chain_nonce == cached_nonce { | ||
let current_health = self.get_eoa_health(scoped, chain).await?; | ||
let now = chrono::Utc::now().timestamp_millis().max(0) as u64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Potential timestamp calculation issues.
The pattern chrono::Utc::now().timestamp_millis().max(0) as u64
appears multiple times. Using .max(0)
could mask negative timestamp values which might indicate system clock issues.
Consider using a more robust timestamp handling approach that explicitly handles edge cases rather than silently converting negative values to 0.
-let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
+let now = chrono::Utc::now().timestamp_millis();
+if now < 0 {
+ tracing::error!("System clock returned negative timestamp: {}", now);
+ return Err(EoaExecutorWorkerError::InternalError {
+ message: "Invalid system timestamp".to_string(),
+ });
+}
+let now = now as u64;
Also applies to: 736-736, 849-849, 1057-1057, 1421-1421, 1446-1447
🏁 Script executed:
#!/bin/bash
# Search for all occurrences of `.timestamp_millis().max(0)` in the worker file
rg -n "timestamp_millis().max(0)" executors/src/eoa/worker.rs
Length of output: 63
🏁 Script executed:
#!/bin/bash
# Locate worker.rs within the repository
fd -t f worker.rs
# Search for all occurrences of `timestamp_millis` in the executors directory
rg -n "timestamp_millis" executors
Length of output: 1167
Address timestamp calculation errors in worker.rs and store.rs
The pattern
chrono::Utc::now().timestamp_millis().max(0) as u64
appears at these locations:
- executors/src/eoa/worker.rs:593, 736, 849, 1057, 1421
- executors/src/eoa/store.rs:1448, 1481, 1563, 1635, 1675
Using .max(0)
silently converts any negative timestamp (e.g. if the system clock is before the UNIX epoch) to zero, masking underlying clock issues. Instead, explicitly handle errors by switching to std::time::SystemTime
:
Suggested replacement at each site:
-use chrono::Utc::now().timestamp_millis().max(0) as u64;
+use std::time::{SystemTime, UNIX_EPOCH};
+let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
+ Ok(dur) => dur.as_millis() as u64,
+ Err(err) => {
+ tracing::error!("System clock before UNIX_EPOCH: {:?}", err);
+ return Err(EoaExecutorWorkerError::InternalError {
+ message: "Invalid system timestamp".into(),
+ });
+ }
+};
– Apply this change to all the listed occurrences.
🤖 Prompt for AI Agents
In executors/src/eoa/worker.rs at lines 593, 736, 849, 1057, and 1421, and in
executors/src/eoa/store.rs at lines 1448, 1481, 1563, 1635, and 1675, replace
the usage of chrono::Utc::now().timestamp_millis().max(0) as u64 with an
explicit error-handling approach using std::time::SystemTime. Remove the .max(0)
call and instead obtain the current time by calling SystemTime::now(),
converting it to a duration since UNIX_EPOCH, and handling any potential errors
explicitly rather than masking negative timestamps. Apply this change
consistently at all listed locations to ensure proper timestamp calculation and
error handling.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
Chores