Skip to content

Commit a28e7a9

Browse files
authored
Handle conn pool initialization gracefully (#4778)
- Duplicated labels no longer allowed on config - Endpoints get tested/connected to per provider label instead of per endpoint
1 parent 0f9fadc commit a28e7a9

File tree

3 files changed

+81
-13
lines changed

3 files changed

+81
-13
lines changed

graph/src/firehose/endpoints.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{
44
blockchain::BlockPtr,
55
cheap_clone::CheapClone,
66
components::store::BlockNumber,
7+
data::value::Word,
78
endpoint::{ConnectionType, EndpointMetrics, Provider, RequestLabels},
89
firehose::decode_firehose_block,
910
prelude::{anyhow, debug, info},
@@ -16,7 +17,13 @@ use futures03::StreamExt;
1617
use http::uri::{Scheme, Uri};
1718
use itertools::Itertools;
1819
use slog::Logger;
19-
use std::{collections::BTreeMap, fmt::Display, ops::ControlFlow, sync::Arc, time::Duration};
20+
use std::{
21+
collections::{BTreeMap, HashMap},
22+
fmt::Display,
23+
ops::ControlFlow,
24+
sync::Arc,
25+
time::Duration,
26+
};
2027
use tonic::codegen::InterceptedService;
2128
use tonic::{
2229
codegen::CompressionEncoding,
@@ -494,18 +501,21 @@ impl FirehoseNetworks {
494501
}
495502
}
496503

497-
/// Returns a `Vec` of tuples where the first element of the tuple is
498-
/// the chain's id and the second one is an endpoint for this chain.
499-
/// There can be mulitple tuple with the same chain id but with different
500-
/// endpoint where multiple providers exist for a single chain id.
501-
pub fn flatten(&self) -> Vec<(String, Arc<FirehoseEndpoint>)> {
504+
/// Returns a `HashMap` where the key is the chain's id and the key is an endpoint for this chain.
505+
/// There can be mulitple keys with the same chain id but with different
506+
/// endpoint where multiple providers exist for a single chain id. Providers with the same
507+
/// label do not need to be tested individually, if one is working, every other endpoint in the
508+
/// pool should also work.
509+
pub fn flatten(&self) -> HashMap<(String, Word), Arc<FirehoseEndpoint>> {
502510
self.networks
503511
.iter()
504512
.flat_map(|(chain_id, firehose_endpoints)| {
505-
firehose_endpoints
506-
.0
507-
.iter()
508-
.map(move |endpoint| (chain_id.clone(), endpoint.clone()))
513+
firehose_endpoints.0.iter().map(move |endpoint| {
514+
(
515+
(chain_id.clone(), endpoint.provider.clone()),
516+
endpoint.clone(),
517+
)
518+
})
509519
})
510520
.collect()
511521
}

node/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ where
324324
.flatten()
325325
.into_iter()
326326
.map(|(chain_id, endpoint)| (chain_id, endpoint, logger.clone()))
327-
.map(|(chain_id, endpoint, logger)| async move {
327+
.map(|((chain_id, _), endpoint, logger)| async move {
328328
let logger = logger.new(o!("provider" => endpoint.provider.to_string()));
329329
info!(
330330
logger, "Connecting to Firehose to get chain identifier";

node/src/config.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use graph::{
22
anyhow::Error,
33
blockchain::BlockchainKind,
44
firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN},
5+
itertools::Itertools,
56
prelude::{
67
anyhow::{anyhow, bail, Context, Result},
78
info,
@@ -518,8 +519,14 @@ fn default_blockchain_kind() -> BlockchainKind {
518519

519520
impl Chain {
520521
fn validate(&mut self) -> Result<()> {
521-
// `Config` validates that `self.shard` references a configured shard
522+
let mut labels = self.providers.iter().map(|p| &p.label).collect_vec();
523+
labels.sort();
524+
labels.dedup();
525+
if labels.len() != self.providers.len() {
526+
return Err(anyhow!("Provider labels must be unique"));
527+
}
522528

529+
// `Config` validates that `self.shard` references a configured shard
523530
for provider in self.providers.iter_mut() {
524531
provider.validate()?
525532
}
@@ -1147,7 +1154,7 @@ where
11471154
#[cfg(test)]
11481155
mod tests {
11491156

1150-
use crate::config::Web3Rule;
1157+
use crate::config::{ChainSection, Web3Rule};
11511158

11521159
use super::{
11531160
Chain, Config, FirehoseProvider, Provider, ProviderDetails, Transport, Web3Provider,
@@ -1702,4 +1709,55 @@ mod tests {
17021709
assert!(SubgraphLimit::Unlimited > SubgraphLimit::Limit(10));
17031710
assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled);
17041711
}
1712+
1713+
#[test]
1714+
fn duplicated_labels_are_not_allowed_within_chain() {
1715+
let mut actual = toml::from_str::<ChainSection>(
1716+
r#"
1717+
ingestor = "block_ingestor_node"
1718+
[mainnet]
1719+
shard = "vip"
1720+
provider = [
1721+
{ label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } },
1722+
{ label = "mainnet1", url = "http://127.0.0.1", features = [ "archive", "traces" ] }
1723+
]
1724+
"#,
1725+
)
1726+
.unwrap();
1727+
1728+
let err = actual.validate();
1729+
assert_eq!(true, err.is_err());
1730+
let err = err.unwrap_err();
1731+
assert_eq!(
1732+
true,
1733+
err.to_string().contains("unique"),
1734+
"result: {:?}",
1735+
err
1736+
);
1737+
}
1738+
1739+
#[test]
1740+
fn duplicated_labels_are_allowed_on_different_chain() {
1741+
let mut actual = toml::from_str::<ChainSection>(
1742+
r#"
1743+
ingestor = "block_ingestor_node"
1744+
[mainnet]
1745+
shard = "vip"
1746+
provider = [
1747+
{ label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } },
1748+
{ label = "mainnet2", url = "http://127.0.0.1", features = [ "archive", "traces" ] }
1749+
]
1750+
[mainnet2]
1751+
shard = "vip"
1752+
provider = [
1753+
{ label = "mainnet1", url = "http://127.0.0.1", features = [], headers = { Authorization = "Bearer foo" } },
1754+
{ label = "mainnet2", url = "http://127.0.0.1", features = [ "archive", "traces" ] }
1755+
]
1756+
"#,
1757+
)
1758+
.unwrap();
1759+
1760+
let result = actual.validate();
1761+
assert_eq!(true, result.is_ok(), "error: {:?}", result.unwrap_err());
1762+
}
17051763
}

0 commit comments

Comments
 (0)