Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriptions): sub_blocks, watch_contract_event #5

Merged
merged 14 commits into from
Mar 25, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = "MIT OR Apache-2.0"
homepage = "https://github.com/alloy-rs/examples"
repository = "https://github.com/alloy-rs/examples"
publish = false
exclude = ["examples/"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be removed as publishing is disabled


[workspace.dependencies]
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e7dfb4f", default-features = false }
Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ cargo run --example mnemonic_signer
- [ ] Paginated logs
- [ ] UniswapV2 pair
- [ ] Transactions
- [ ] Subscriptions
- [ ] Watch blocks
- [ ] Subscribe events by type
- [ ] Subscribe logs
- [x] Subscriptions
- [x] [Subscribe and watch blocks](./examples/subscriptions/examples/subscribe_blocks.rs)
- [x] [Subscribe contract events and watch logs](./examples/subscriptions/examples/watch_contract_event.rs)
- [ ] Transactions
- [ ] Call override
- [ ] Create raw transaction
Expand Down
26 changes: 26 additions & 0 deletions examples/subscriptions/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "examples-subscriptions"

publish.workspace = true
version.workspace = true
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[dev-dependencies]
alloy-contract.workspace = true
alloy-network.workspace = true
alloy-node-bindings.workspace = true
alloy-provider = { workspace = true, features = ["pubsub", "ws"] }
alloy-pubsub.workspace = true
alloy-rpc-client.workspace = true
alloy-rpc-types.workspace = true
alloy-sol-types = { workspace = true }

eyre.workspace = true
futures-util = "0.3"
reqwest.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
39 changes: 39 additions & 0 deletions examples/subscriptions/examples/subscribe_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Example of subscribing to blocks and watching blocks.

use alloy_network::Ethereum;
use alloy_node_bindings::{Anvil, AnvilInstance};
use alloy_provider::{Provider, RootProvider};
use alloy_pubsub::PubSubFrontend;
use alloy_rpc_client::RpcClient;
use eyre::Result;
use futures_util::{stream, StreamExt};

#[tokio::main]
async fn main() -> Result<()> {
let (provider, _anvil) = init().await;

let sub = provider.subscribe_blocks().await?;
let mut stream = sub.into_stream().take(2);
Comment on lines +15 to +16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always call into_stream()? Should we return that in the subscribe_blocks return type instead @DaniPopes by impl Stream on the type?


while let Some(block) = stream.next().await {
println!("Subscribed Block: {:?}", block.header.number);
}

let poller = provider.watch_blocks().await?;
let mut stream = poller.into_stream().flat_map(stream::iter).take(2);

while let Some(block_hash) = stream.next().await {
println!("Watched Block: {:?}", block_hash);
}

Ok(())
}

async fn init() -> (RootProvider<Ethereum, PubSubFrontend>, AnvilInstance) {
let anvil = Anvil::new().block_time(1).spawn();
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let client = RpcClient::connect_pubsub(ws).await.unwrap();
let provider = RootProvider::<Ethereum, _>::new(client);

(provider, anvil)
}
94 changes: 94 additions & 0 deletions examples/subscriptions/examples/watch_contract_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! Example of subscribing to contract events and watching logs.

use alloy_network::Ethereum;
use alloy_node_bindings::{Anvil, AnvilInstance};
use alloy_provider::{Provider, RootProvider};
use alloy_pubsub::PubSubFrontend;
use alloy_rpc_client::RpcClient;
use alloy_rpc_types::Filter;
use alloy_sol_types::{sol, SolEvent};
use eyre::Result;
use futures_util::{stream, StreamExt};

sol!(
#[sol(rpc, bytecode = "0x60806040526000805534801561001457600080fd5b50610260806100246000396000f3fe608060405234801561001057600080fd5b50600436106100415760003560e01c80632baeceb71461004657806361bc221a14610050578063d09de08a1461006e575b600080fd5b61004e610078565b005b6100586100d9565b6040516100659190610159565b60405180910390f35b6100766100df565b005b600160008082825461008a91906101a3565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167fdc69c403b972fc566a14058b3b18e1513da476de6ac475716e489fae0cbe4a2660405160405180910390a3565b60005481565b60016000808282546100f191906101e6565b925050819055506000543373ffffffffffffffffffffffffffffffffffffffff167ff6d1d8d205b41f9fb9549900a8dba5d669d68117a3a2b88c1ebc61163e8117ba60405160405180910390a3565b6000819050919050565b61015381610140565b82525050565b600060208201905061016e600083018461014a565b92915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006101ae82610140565b91506101b983610140565b92508282039050818112600084121682821360008512151617156101e0576101df610174565b5b92915050565b60006101f182610140565b91506101fc83610140565b92508282019050828112156000831216838212600084121516171561022457610223610174565b5b9291505056fea26469706673582212208d0d34c26bfd2938ff07dd54c3fcc2bc4509e4ae654edff58101e5e7ab8cf18164736f6c63430008180033")]
contract EventExample {
Comment on lines +10 to +11
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we ship compiler it'll be interesting to see if we can bundle it with the sol macro

int256 public counter = 0;

event Increment(address indexed by, int256 indexed value);
event Decrement(address indexed by, int256 indexed value);

function increment() public {
counter += 1;
emit Increment(msg.sender, counter);
}

function decrement() public {
counter -= 1;
emit Decrement(msg.sender, counter);
}
}
);

#[tokio::main]
async fn main() -> Result<()> {
let (provider, _anvil) = init().await;

let deployed_contract = EventExample::deploy(provider.clone()).await?;

println!("Deployed contract at: {:?}", deployed_contract.address());

let increment_filter = Filter::new()
.address(deployed_contract.address().to_owned())
.event_signature(EventExample::Increment::SIGNATURE_HASH);

let increment_poller = provider.watch_logs(&increment_filter).await?;

let decrement_filter = Filter::new()
.address(deployed_contract.address().to_owned())
.event_signature(EventExample::Decrement::SIGNATURE_HASH);

let decrement_poller = provider.watch_logs(&decrement_filter).await?;
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved

println!("Watching for events...");
println!("every {:?}", increment_poller.poll_interval()); // Default 250ms for local connection else 7s

let mut increment_stream = increment_poller.into_stream().flat_map(stream::iter).take(2);

let mut decrement_stream = decrement_poller.into_stream().flat_map(stream::iter).take(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still don't love the flat_map(stream::iter) syntax, but i understand it's because we also return the stream data by block, so i wonder if we should just have .flatten() as a helper method that just does flat_map(stream::iter) @DaniPopes ?


// Build a call to increment the counter
let increment_call = deployed_contract.increment();

// Send the increment call 2 times
for _ in 0..2 {
let _ = increment_call.send().await?;
}

// Build a call to decrement the counter
let decrement_call = deployed_contract.decrement();

// Send the decrement call 2 times
for _ in 0..2 {
let _ = decrement_call.send().await?;
}

while let Some(log) = increment_stream.next().await {
println!("Received Increment: {:?}", log);
}

while let Some(log) = decrement_stream.next().await {
println!("Received Decrement: {:?}", log);
}
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

async fn init() -> (RootProvider<Ethereum, PubSubFrontend>, AnvilInstance) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DaniPopes what's a good way to type erase here? impl Provider? wonder if we can make functions that return providers easy to use basically so that they don't need to manually write the network and the transport type

let anvil = Anvil::new().block_time(1).spawn();
let ws = alloy_rpc_client::WsConnect::new(anvil.ws_endpoint());
let client = RpcClient::connect_pubsub(ws).await.unwrap();
let provider = RootProvider::<Ethereum, _>::new(client);

(provider, anvil)
}
Loading