Skip to content

Simplify logs fetching #1540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 87 additions & 76 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use std::time::Instant;
use ethabi::ParamType;
use graph::components::ethereum::{EthereumAdapter as EthereumAdapterTrait, *};
use graph::prelude::{
debug, err_msg, error, ethabi, format_err, hex, retry, stream, tiny_keccak, trace, warn, web3,
ChainStore, Error, EthereumCallCache, Logger, TimeoutError,
debug, err_msg, error, ethabi, format_err,
futures03::{self, compat::Future01CompatExt, FutureExt, StreamExt, TryStreamExt},
hex, retry, stream, tiny_keccak, trace, warn, web3, ChainStore, CheapClone, DynTryFuture,
Error, EthereumCallCache, Logger, TimeoutError,
};
use web3::api::Web3;
use web3::transports::batch::Batch;
Expand All @@ -29,13 +31,6 @@ lazy_static! {
.parse::<u64>()
.expect("invalid trace stream step size");

/// Maximum number of chunks to request in parallel when streaming logs. The default is low
/// because this can have a quadratic effect on the number of parallel requests.
static ref LOG_STREAM_PARALLEL_CHUNKS: u64 = std::env::var("ETHEREUM_PARALLEL_BLOCK_RANGES")
.unwrap_or("10".into())
.parse::<u64>()
.expect("invalid number of parallel Ethereum block ranges to scan");

/// Maximum range size for `eth.getLogs` requests that dont filter on
/// contract address, only event signature, and are therefore expensive.
///
Expand Down Expand Up @@ -67,6 +62,15 @@ lazy_static! {
.expect("invalid GRAPH_ETHEREUM_REQUEST_RETRIES env var");
}

impl<T: web3::Transport> CheapClone for EthereumAdapter<T> {
fn cheap_clone(&self) -> Self {
Self {
web3: self.web3.cheap_clone(),
metrics: self.metrics.cheap_clone(),
}
}
}

impl<T> EthereumAdapter<T>
where
T: web3::BatchTransport + Send + Sync + 'static,
Expand Down Expand Up @@ -173,7 +177,7 @@ where
subgraph_metrics: Arc<SubgraphEthRpcMetrics>,
from: u64,
to: u64,
filter: EthGetLogsFilter,
filter: Arc<EthGetLogsFilter>,
too_many_logs_fingerprints: &'static [&'static str],
) -> impl Future<Item = Vec<Log>, Error = TimeoutError<web3::error::Error>> {
let eth_adapter = self.clone();
Expand Down Expand Up @@ -264,7 +268,7 @@ where
from: u64,
to: u64,
filter: EthGetLogsFilter,
) -> impl Future<Item = Vec<Log>, Error = Error> {
) -> DynTryFuture<'static, Vec<Log>, Error> {
// Codes returned by Ethereum node providers if an eth_getLogs request is too heavy.
// The first one is for Infura when it hits the log limit, the rest for Alchemy timeouts.
const TOO_MANY_LOGS_FINGERPRINTS: &[&str] = &[
Expand All @@ -281,75 +285,74 @@ where
}

// Collect all event sigs
let eth = self.clone();
let eth = self.cheap_clone();
let filter = Arc::new(filter);

let step = match filter.contracts.is_empty() {
// `to - from + 1` blocks will be scanned.
false => to - from,
true => (to - from).min(*MAX_EVENT_ONLY_RANGE - 1),
};

stream::unfold((from, step), move |(start, step)| {
if start > to {
return None;
}

// Make as many parallel requests of size `step` as necessary,
// respecting `LOG_STREAM_PARALLEL_CHUNKS`.
let mut chunk_futures = vec![];
let mut low = start;
for _ in 0..*LOG_STREAM_PARALLEL_CHUNKS {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind dropping parallel chunks?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rationale is on the second paragraph of the PR description.

if low == to + 1 {
break;
// Typically this will loop only once and fetch the entire range in one request. But if the
// node returns an error that signifies the request is to heavy to process, the range will
// be broken down to smaller steps.
futures03::stream::try_unfold((from, step), move |(start, step)| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked everything in here carefully for bugs and everything checks out. Nice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It's easy to make an off-by-one error here.

let logger = logger.cheap_clone();
let filter = filter.cheap_clone();
let eth = eth.cheap_clone();
let subgraph_metrics = subgraph_metrics.cheap_clone();

async move {
if start > to {
return Ok(None);
}
let high = (low + step).min(to);

let end = (start + step).min(to);
debug!(
logger,
"Requesting logs for blocks [{}, {}], {}", low, high, filter
"Requesting logs for blocks [{}, {}], {}", start, end, filter
);
chunk_futures.push(eth.logs_with_sigs(
&logger,
subgraph_metrics.clone(),
low,
high,
filter.clone(),
TOO_MANY_LOGS_FINGERPRINTS,
));
low = high + 1;
}
let logger = logger.clone();
Some(
stream::futures_ordered(chunk_futures)
.collect()
.map(|chunks| chunks.into_iter().flatten().collect::<Vec<Log>>())
.then(move |res| match res {
Err(e) => {
let string_err = e.to_string();

// If the step is already 0, we're hitting the log
// limit even for a single block. We hope this never
// happens, but if it does, make sure to error.
if TOO_MANY_LOGS_FINGERPRINTS
.iter()
.any(|f| string_err.contains(f))
&& step > 0
{
// The range size for a request is `step + 1`.
// So it's ok if the step goes down to 0, in
// that case we'll request one block at a time.
let new_step = step / 10;
debug!(logger, "Reducing block range size to scan for events";
let res = eth
.logs_with_sigs(
&logger,
subgraph_metrics.cheap_clone(),
start,
end,
filter.cheap_clone(),
TOO_MANY_LOGS_FINGERPRINTS,
)
.compat()
.await;

match res {
Err(e) => {
let string_err = e.to_string();

// If the step is already 0, the request is too heavy even for a single
// block. We hope this never happens, but if it does, make sure to error.
if TOO_MANY_LOGS_FINGERPRINTS
.iter()
.any(|f| string_err.contains(f))
&& step > 0
{
// The range size for a request is `step + 1`. So it's ok if the step
// goes down to 0, in that case we'll request one block at a time.
let new_step = step / 10;
debug!(logger, "Reducing block range size to scan for events";
"new_size" => new_step + 1);
Ok((vec![], (start, new_step)))
} else {
warn!(logger, "Unexpected RPC error"; "error" => &string_err);
Err(err_msg(string_err))
}
Ok(Some((vec![], (start, new_step))))
} else {
warn!(logger, "Unexpected RPC error"; "error" => &string_err);
Err(err_msg(string_err))
}
Ok(logs) => Ok((logs, (low, step))),
}),
)
}
Ok(logs) => Ok(Some((logs, (end + 1, step)))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the nature of asking for too many logs that there are simply too many in that range, but that a future range may contain fewer logs and successfully complete with a larger step?

If so, I would consider increasing the step size on successful requests. Eg: Ok(Some((logs, (end + 1, max_step.min(step * 2))))). As the code is, the step size can (and will) only decrease the step size, slowing down and never recovering the speed. The max_step variable would simply be the value set earlier at let step = match filter.contracts.is_empty() {... This value would be both max_step and step.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that it's simply the number of logs in all cases, the Alchemy rinkeby nodes where returning -32000 at any range over 500 blocks, even for the first blocks of the network. Hopefully this is something they can improve on their side.

For Infura it really is about the number of logs, but limit is now 10,000 logs which is well above our target triggers per range (currently 100 by default), so this should be a rare event anyways.

So I don't think it's worth having logic that tries to find the optimum range here, that's kinda what our adaptive block range already does, this is just a fail safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

}
}
})
.concat2()
.try_concat()
.boxed()
}

fn call(
Expand Down Expand Up @@ -1042,16 +1045,24 @@ where
from: u64,
to: u64,
log_filter: EthereumLogFilter,
) -> Box<dyn Future<Item = Vec<Log>, Error = Error> + Send> {
let eth = self.clone();
) -> DynTryFuture<'static, Vec<Log>, Error> {
let eth: Self = self.cheap_clone();
let logger = logger.clone();
Box::new(
stream::iter_ok(log_filter.eth_get_logs_filters().map(move |filter| {
eth.log_stream(logger.clone(), subgraph_metrics.clone(), from, to, filter)
}))
.buffered(*LOG_STREAM_PARALLEL_CHUNKS as usize)
.concat2(),
)

futures03::stream::iter(log_filter.eth_get_logs_filters().map(move |filter| {
eth.cheap_clone()
.log_stream(
logger.cheap_clone(),
subgraph_metrics.cheap_clone(),
from,
to,
filter,
)
.into_stream()
}))
.flatten()
.try_concat()
.boxed()
}

fn calls_in_block_range(
Expand Down
16 changes: 7 additions & 9 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ those.

- `ETHEREUM_POLLING_INTERVAL`: how often to poll Ethereum for new blocks (in ms,
defaults to 500ms)
- `ETHEREUM_RPC_MAX_PARALLEL_REQUESTS`: how many RPC connections to start in
parallel for block retrieval (defaults to 64)
- `ETHEREUM_RPC_MAX_PARALLEL_REQUESTS`: Maximum number of concurrent HTTP
requests to an Ethereum RPC endpoint (defaults to 64).
- `GRAPH_ETHEREUM_TARGET_TRIGGERS_PER_BLOCK_RANGE`: The ideal amount of triggers
to be processed in a batch. If this is too small it may cause too many requests
to the ethereum node, if it is too large it may cause unreasonably expensive
calls to the ethereum node and excessive memory usage (defaults to 100).
to be processed in a batch. If this is too small it may cause too many requests
to the ethereum node, if it is too large it may cause unreasonably expensive
calls to the ethereum node and excessive memory usage (defaults to 100).
- `ETHEREUM_TRACE_STREAM_STEP_SIZE`: `graph-node` queries traces for a given
block range when a subgraph defines call handlers or block handlers with a
call filter. The value of this variable controls the number of blocks to scan
Expand All @@ -27,9 +27,7 @@ calls to the ethereum node and excessive memory usage (defaults to 100).
- `ETHEREUM_BLOCK_BATCH_SIZE`: number of Ethereum blocks to request in parallel
(defaults to 50)
- `GRAPH_ETHEREUM_MAX_BLOCK_RANGE_SIZE`: Maximum number of blocks to scan for
triggers in each request (defaults to 100000).
- `ETHEREUM_PARALLEL_BLOCK_RANGES`: Maximum number of parallel `eth_getLogs`
calls to make when scanning logs for a subgraph. Defaults to 100.
triggers in each request (defaults to 100000).
- `GRAPH_ETHEREUM_MAX_EVENT_ONLY_RANGE`: Maximum range size for `eth.getLogs`
requests that dont filter on contract address, only event signature.
- `GRAPH_ETHEREUM_JSON_RPC_TIMEOUT`: Timeout for Ethereum JSON-RPC requests.
Expand Down Expand Up @@ -95,4 +93,4 @@ triggers in each request (defaults to 100000).
[here](https://docs.rs/env_logger/0.6.0/env_logger/)
- `THEGRAPH_STORE_POSTGRES_DIESEL_URL`: postgres instance used when running
tests. Set to `postgresql://<DBUSER>:<DBPASSWORD>@<DBHOST>:<DBPORT>/<DBNAME>`
- `GRAPH_KILL_IF_UNRESPONSIVE`: If set, the process will be killed if unresponsive.
- `GRAPH_KILL_IF_UNRESPONSIVE`: If set, the process will be killed if unresponsive.
14 changes: 14 additions & 0 deletions graph/src/cheap_clone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use slog::Logger;
use std::rc::Rc;
use std::sync::Arc;

/// Things that, in the context of an application such as Graph Node, are fast to clone.
pub trait CheapClone: Clone {
fn cheap_clone(&self) -> Self {
self.clone()
}
}

impl<T: ?Sized> CheapClone for Rc<T> {}
impl<T: ?Sized> CheapClone for Arc<T> {}
impl CheapClone for Logger {}
6 changes: 4 additions & 2 deletions graph/src/components/ethereum/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ethabi::{Bytes, Error as ABIError, Function, ParamType, Token};
use failure::SyncFailure;
use futures::Future;
use futures03::future::TryFutureExt;
use mockall::predicate::*;
use mockall::*;
use petgraph::graphmap::GraphMap;
Expand Down Expand Up @@ -738,7 +739,7 @@ pub trait EthereumAdapter: Send + Sync + 'static {
from: u64,
to: u64,
log_filter: EthereumLogFilter,
) -> Box<dyn Future<Item = Vec<Log>, Error = Error> + Send>;
) -> DynTryFuture<'static, Vec<Log>, Error>;

fn calls_in_block_range(
&self,
Expand Down Expand Up @@ -894,7 +895,8 @@ pub fn blocks_with_triggers(
if !log_filter.is_empty() {
trigger_futs.push(Box::new(
eth.logs_in_block_range(&logger, subgraph_metrics.clone(), from, to, log_filter)
.map(|logs: Vec<Log>| logs.into_iter().map(EthereumTrigger::Log).collect()),
.map_ok(|logs: Vec<Log>| logs.into_iter().map(EthereumTrigger::Log).collect())
.compat(),
))
}

Expand Down
4 changes: 4 additions & 0 deletions graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub mod ext;
/// Logging utilities
pub mod log;

/// `CheapClone` trait.
pub mod cheap_clone;

/// Module with mocks for different parts of the system.
pub mod mock {
pub use crate::components::ethereum::MockEthereumAdapter;
Expand Down Expand Up @@ -99,6 +102,7 @@ pub mod prelude {
};
pub use crate::components::{EventConsumer, EventProducer};

pub use crate::cheap_clone::CheapClone;
pub use crate::data::graphql::{SerializableValue, TryFromValue, ValueMap};
pub use crate::data::query::{
Query, QueryError, QueryExecutionError, QueryResult, QueryVariables,
Expand Down