diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs index 27020ee3eb3..6b7a7011cc8 100644 --- a/node/src/bin/dev.rs +++ b/node/src/bin/dev.rs @@ -13,6 +13,7 @@ use graph::{ }; use graph_core::polling_monitor::ipfs_service; use graph_node::{ + dev::watcher, dev::watcher::{parse_manifest_args, watch_subgraphs}, launcher, opt::Opt, @@ -90,6 +91,35 @@ pub struct DevOpt { default_value = "https://api.thegraph.com/ipfs" )] pub ipfs: Vec, + #[clap( + long, + default_value = "8000", + value_name = "PORT", + help = "Port for the GraphQL HTTP server", + env = "GRAPH_GRAPHQL_HTTP_PORT" + )] + pub http_port: u16, + #[clap( + long, + default_value = "8030", + value_name = "PORT", + help = "Port for the index node server" + )] + pub index_node_port: u16, + #[clap( + long, + default_value = "8020", + value_name = "PORT", + help = "Port for the JSON-RPC admin server" + )] + pub admin_port: u16, + #[clap( + long, + default_value = "8040", + value_name = "PORT", + help = "Port for the Prometheus metrics server" + )] + pub metrics_port: u16, } /// Builds the Graph Node options from DevOpt @@ -109,7 +139,12 @@ fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result { args.push("--postgres-url".to_string()); args.push(db_url.to_string()); - let opt = Opt::parse_from(args); + let mut opt = Opt::parse_from(args); + + opt.http_port = dev_opt.http_port; + opt.admin_port = dev_opt.admin_port; + opt.metrics_port = dev_opt.metrics_port; + opt.index_node_port = dev_opt.index_node_port; Ok(opt) } @@ -118,7 +153,7 @@ async fn run_graph_node( logger: &Logger, opt: Opt, link_resolver: Arc, - subgraph_updates_channel: Option>, + subgraph_updates_channel: mpsc::Receiver<(DeploymentHash, SubgraphName)>, ) -> Result<()> { let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?); @@ -139,7 +174,7 @@ async fn run_graph_node( env_vars, ipfs_service, link_resolver, - subgraph_updates_channel, + Some(subgraph_updates_channel), ) .await; Ok(()) @@ -201,14 +236,22 @@ async fn main() -> Result<()> { parse_manifest_args(dev_opt.manifests, dev_opt.sources, &logger)?; let file_link_resolver = Arc::new(FileLinkResolver::new(None, source_subgraph_aliases.clone())); - let (tx, rx) = dev_opt.watch.then(|| mpsc::channel(1)).unzip(); + let (tx, rx) = mpsc::channel(1); let logger_clone = logger.clone(); graph::spawn(async move { let _ = run_graph_node(&logger_clone, opt, file_link_resolver, rx).await; }); - if let Some(tx) = tx { + if let Err(e) = + watcher::deploy_all_subgraphs(&logger, &manifests_paths, &source_subgraph_aliases, &tx) + .await + { + error!(logger, "Error deploying subgraphs"; "error" => e.to_string()); + std::process::exit(1); + } + + if dev_opt.watch { graph::spawn_blocking(async move { if let Err(e) = watch_subgraphs( &logger, diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs index 9436db9ac2c..a5e5caa6145 100644 --- a/node/src/dev/watcher.rs +++ b/node/src/dev/watcher.rs @@ -255,7 +255,7 @@ fn is_relevant_event(event: &Event, watched_dirs: Vec, exclusion_set: & } /// Redeploys all subgraphs in the order it appears in the manifests_paths -async fn deploy_all_subgraphs( +pub async fn deploy_all_subgraphs( logger: &Logger, manifests_paths: &Vec, source_subgraph_aliases: &HashMap, diff --git a/tests/src/config.rs b/tests/src/config.rs index 6cdd97a216f..46f22b141e7 100644 --- a/tests/src/config.rs +++ b/tests/src/config.rs @@ -1,3 +1,4 @@ +use std::sync::OnceLock; use std::time::{Duration, Instant}; use std::{fs, path::PathBuf}; @@ -13,6 +14,15 @@ use crate::status; lazy_static! { pub static ref CONFIG: Config = Config::default(); + static ref DEV_MODE: OnceLock = OnceLock::new(); +} + +pub fn set_dev_mode(val: bool) { + DEV_MODE.set(val).expect("DEV_MODE already set"); +} + +pub fn dev_mode() -> bool { + *DEV_MODE.get().unwrap_or(&false) } #[derive(Clone, Debug)] @@ -117,6 +127,26 @@ impl GraphNodeConfig { } } } + + pub fn from_env() -> Self { + if dev_mode() { + Self::gnd() + } else { + Self::default() + } + } + + fn gnd() -> Self { + let bin = fs::canonicalize("../target/debug/gnd") + .expect("failed to infer `graph-node` program location. (Was it built already?)"); + + Self { + bin, + ports: GraphNodePorts::default(), + ipfs_uri: "http://localhost:3001".to_string(), + log_file: TestFile::new("integration-tests/graph-node.log"), + } + } } impl Default for GraphNodeConfig { @@ -145,6 +175,13 @@ pub struct Config { impl Config { pub async fn spawn_graph_node(&self) -> anyhow::Result { + self.spawn_graph_node_with_args(&[]).await + } + + pub async fn spawn_graph_node_with_args( + &self, + additional_args: &[&str], + ) -> anyhow::Result { let ports = &self.graph_node.ports; let args = [ @@ -163,6 +200,12 @@ impl Config { "--metrics-port", &ports.metrics.to_string(), ]; + + let args = args + .iter() + .chain(additional_args.iter()) + .cloned() + .collect::>(); let stdout = self.graph_node.log_file.create(); let stderr = stdout.try_clone()?; status!( @@ -174,7 +217,7 @@ impl Config { command .stdout(stdout) .stderr(stderr) - .args(args) + .args(args.clone()) .env("GRAPH_STORE_WRITE_BATCH_DURATION", "5") .env("ETHEREUM_REORG_THRESHOLD", "0"); @@ -254,7 +297,7 @@ impl Default for Config { port: 3021, host: "localhost".to_string(), }, - graph_node: GraphNodeConfig::default(), + graph_node: GraphNodeConfig::from_env(), graph_cli, num_parallel_tests, timeout: Duration::from_secs(600), diff --git a/tests/src/subgraph.rs b/tests/src/subgraph.rs index 92e42836b68..4bd4a17f9ad 100644 --- a/tests/src/subgraph.rs +++ b/tests/src/subgraph.rs @@ -26,7 +26,7 @@ pub struct Subgraph { } impl Subgraph { - fn dir(name: &str) -> TestFile { + pub fn dir(name: &str) -> TestFile { TestFile::new(&format!("integration-tests/{name}")) } @@ -47,8 +47,11 @@ impl Subgraph { Ok(()) } - /// Deploy the subgraph by running the required `graph` commands - pub async fn deploy(name: &str, contracts: &[Contract]) -> anyhow::Result { + /// Prepare the subgraph for deployment by patching contracts and checking for subgraph datasources + pub async fn prepare( + name: &str, + contracts: &[Contract], + ) -> anyhow::Result<(TestFile, String, bool)> { let dir = Self::dir(name); let name = format!("test/{name}"); @@ -62,6 +65,13 @@ impl Subgraph { .and_then(|ds| ds.iter().find(|d| d["kind"].as_str() == Some("subgraph"))) .is_some(); + Ok((dir, name, has_subgraph_datasource)) + } + + /// Deploy the subgraph by running the required `graph` commands + pub async fn deploy(name: &str, contracts: &[Contract]) -> anyhow::Result { + let (dir, name, has_subgraph_datasource) = Self::prepare(name, contracts).await?; + // graph codegen subgraph.yaml let mut prog = Command::new(&CONFIG.graph_cli); let mut cmd = prog.arg("codegen").arg("subgraph.yaml.patched"); diff --git a/tests/tests/gnd_tests.rs b/tests/tests/gnd_tests.rs new file mode 100644 index 00000000000..315521be357 --- /dev/null +++ b/tests/tests/gnd_tests.rs @@ -0,0 +1,148 @@ +use anyhow::anyhow; +use graph::futures03::StreamExt; +use graph_tests::config::set_dev_mode; +use graph_tests::contract::Contract; +use graph_tests::subgraph::Subgraph; +use graph_tests::{error, status, CONFIG}; + +mod integration_tests; + +use integration_tests::{ + stop_graph_node, subgraph_data_sources, test_block_handlers, + test_multiple_subgraph_datasources, yarn_workspace, TestCase, TestResult, +}; + +/// The main test entrypoint. +#[tokio::test] +async fn gnd_tests() -> anyhow::Result<()> { + set_dev_mode(true); + + let test_name_to_run = std::env::var("TEST_CASE").ok(); + + let cases = vec![ + TestCase::new("block-handlers", test_block_handlers), + TestCase::new_with_source_subgraphs( + "subgraph-data-sources", + subgraph_data_sources, + vec!["QmWi3H11QFE2PiWx6WcQkZYZdA5UasaBptUJqGn54MFux5:source-subgraph"], + ), + TestCase::new_with_source_subgraphs( + "multiple-subgraph-datasources", + test_multiple_subgraph_datasources, + vec![ + "QmYHp1bPEf7EoYBpEtJUpZv1uQHYQfWE4AhvR6frjB1Huj:source-subgraph-a", + "QmYBEzastJi7bsa722ac78tnZa6xNnV9vvweerY4kVyJtq:source-subgraph-b", + ], + ), + ]; + + // Filter the test cases if a specific test name is provided + let cases_to_run: Vec<_> = if let Some(test_name) = test_name_to_run { + cases + .into_iter() + .filter(|case| case.name == test_name) + .collect() + } else { + cases + }; + + let contracts = Contract::deploy_all().await?; + + status!("setup", "Resetting database"); + CONFIG.reset_database(); + + status!("setup", "Initializing yarn workspace"); + yarn_workspace().await?; + + for i in cases_to_run.iter() { + i.prepare(&contracts).await?; + } + status!("setup", "Prepared all cases"); + + let manifests = cases_to_run + .iter() + .map(|case| { + Subgraph::dir(&case.name) + .path + .join("subgraph.yaml") + .to_str() + .unwrap() + .to_string() + }) + .collect::>() + .join(","); + + let aliases = cases_to_run + .iter() + .filter_map(|case| case.source_subgraph.as_ref()) + .flatten() + .filter_map(|source_subgraph| { + source_subgraph.alias().map(|alias| { + let manifest_path = Subgraph::dir(source_subgraph.test_name()) + .path + .join("subgraph.yaml") + .to_str() + .unwrap() + .to_string(); + format!("{}:{}", alias, manifest_path) + }) + }) + .collect::>(); + + let aliases_str = aliases.join(","); + let args = if aliases.is_empty() { + vec!["--manifests", &manifests] + } else { + vec!["--manifests", &manifests, "--sources", &aliases_str] + }; + + // Spawn graph-node. + status!("graph-node", "Starting graph-node"); + + let mut graph_node_child_command = CONFIG.spawn_graph_node_with_args(&args).await?; + + let num_sources = aliases.len(); + + let stream = tokio_stream::iter(cases_to_run) + .enumerate() + .map(|(index, case)| { + let subgraph_name = format!("subgraph-{}", num_sources + index); + case.check_health_and_test(&contracts, subgraph_name) + }) + .buffered(CONFIG.num_parallel_tests); + + let mut results: Vec = stream.collect::>().await; + results.sort_by_key(|result| result.name.clone()); + + // Stop graph-node and read its output. + let graph_node_res = stop_graph_node(&mut graph_node_child_command).await; + + status!( + "graph-node", + "graph-node logs are in {}", + CONFIG.graph_node.log_file.path.display() + ); + + match graph_node_res { + Ok(_) => { + status!("graph-node", "Stopped graph-node"); + } + Err(e) => { + error!("graph-node", "Failed to stop graph-node: {}", e); + } + } + + println!("\n\n{:=<60}", ""); + println!("Test results:"); + println!("{:-<60}", ""); + for result in &results { + result.print(); + } + println!("\n"); + + if results.iter().any(|result| !result.success()) { + Err(anyhow!("Some tests failed")) + } else { + Ok(()) + } +} diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 9df36f7145a..be1465c0513 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -33,25 +33,25 @@ type TestFn = Box< + Send, >; -struct TestContext { - subgraph: Subgraph, - contracts: Vec, +pub struct TestContext { + pub subgraph: Subgraph, + pub contracts: Vec, } -enum TestStatus { +pub enum TestStatus { Ok, Err(anyhow::Error), Panic(JoinError), } -struct TestResult { - name: String, - subgraph: Option, - status: TestStatus, +pub struct TestResult { + pub name: String, + pub subgraph: Option, + pub status: TestStatus, } impl TestResult { - fn success(&self) -> bool { + pub fn success(&self) -> bool { match self.status { TestStatus::Ok => true, _ => false, @@ -64,7 +64,7 @@ impl TestResult { } } - fn print(&self) { + pub fn print(&self) { // ANSI escape sequences; see the comment in macros.rs about better colorization const GREEN: &str = "\x1b[1;32m"; const RED: &str = "\x1b[1;31m"; @@ -94,14 +94,44 @@ impl TestResult { } } -struct TestCase { - name: String, - test: TestFn, - source_subgraph: Option, +#[derive(Debug, Clone)] +pub enum SourceSubgraph { + Subgraph(String), + WithAlias((String, String)), // (alias, test_name) +} + +impl SourceSubgraph { + pub fn from_str(s: &str) -> Self { + if let Some((alias, subgraph)) = s.split_once(':') { + Self::WithAlias((alias.to_string(), subgraph.to_string())) + } else { + Self::Subgraph(s.to_string()) + } + } + + pub fn test_name(&self) -> &str { + match self { + Self::Subgraph(name) => name, + Self::WithAlias((_, name)) => name, + } + } + + pub fn alias(&self) -> Option<&str> { + match self { + Self::Subgraph(_) => None, + Self::WithAlias((alias, _)) => Some(alias), + } + } +} + +pub struct TestCase { + pub name: String, + pub test: TestFn, + pub source_subgraph: Option>, } impl TestCase { - fn new(name: &str, test: fn(TestContext) -> T) -> Self + pub fn new(name: &str, test: fn(TestContext) -> T) -> Self where T: Future> + Send + 'static, { @@ -117,24 +147,11 @@ impl TestCase { T: Future> + Send + 'static, { let mut test_case = Self::new(name, test); - test_case.source_subgraph = Some(base_subgraph.to_string()); - test_case - } - - fn new_with_source_subgraph( - name: &str, - test: fn(TestContext) -> T, - source_subgraph: &str, - ) -> Self - where - T: Future> + Send + 'static, - { - let mut test_case = Self::new(name, test); - test_case.source_subgraph = Some(source_subgraph.to_string()); + test_case.source_subgraph = Some(vec![SourceSubgraph::from_str(base_subgraph)]); test_case } - fn new_with_multiple_source_subgraphs( + pub fn new_with_source_subgraphs( name: &str, test: fn(TestContext) -> T, source_subgraphs: Vec<&str>, @@ -143,7 +160,12 @@ impl TestCase { T: Future> + Send + 'static, { let mut test_case = Self::new(name, test); - test_case.source_subgraph = Some(source_subgraphs.join(",")); + test_case.source_subgraph = Some( + source_subgraphs + .into_iter() + .map(SourceSubgraph::from_str) + .collect(), + ); test_case } @@ -179,32 +201,37 @@ impl TestCase { Ok(subgraph) } - async fn run(self, contracts: &[Contract]) -> TestResult { - // If a subgraph has subgraph datasources, deploy them first + pub async fn prepare(&self, contracts: &[Contract]) -> anyhow::Result { + // If a subgraph has subgraph datasources, prepare them first if let Some(_subgraphs) = &self.source_subgraph { - if let Err(e) = self.deploy_multiple_sources(contracts).await { - error!(&self.name, "source subgraph deployment failed"); - return TestResult { - name: self.name.clone(), - subgraph: None, - status: TestStatus::Err(e), - }; + if let Err(e) = self.prepare_multiple_sources(contracts).await { + error!(&self.name, "source subgraph deployment failed: {:?}", e); + return Err(e); } } - status!(&self.name, "Deploying subgraph"); - let subgraph_name = match Subgraph::deploy(&self.name, contracts).await { + status!(&self.name, "Preparing subgraph"); + let (_, subgraph_name, _) = match Subgraph::prepare(&self.name, contracts).await { Ok(name) => name, Err(e) => { - error!(&self.name, "Deploy failed"); - return TestResult { - name: self.name.clone(), - subgraph: None, - status: TestStatus::Err(e.context("Deploy failed")), - }; + error!(&self.name, "Prepare failed: {:?}", e); + return Err(e); } }; - status!(&self.name, "Waiting for subgraph to become ready"); + + Ok(subgraph_name) + } + + pub async fn check_health_and_test( + self, + contracts: &[Contract], + subgraph_name: String, + ) -> TestResult { + status!( + &self.name, + "Waiting for subgraph ({}) to become ready", + subgraph_name + ); let subgraph = match Subgraph::wait_ready(&subgraph_name).await { Ok(subgraph) => subgraph, Err(e) => { @@ -216,6 +243,7 @@ impl TestCase { }; } }; + if subgraph.healthy { status!(&self.name, "Subgraph ({}) is synced", subgraph.deployment); } else { @@ -251,12 +279,50 @@ impl TestCase { } } + async fn run(self, contracts: &[Contract]) -> TestResult { + // If a subgraph has subgraph datasources, deploy them first + if let Some(_subgraphs) = &self.source_subgraph { + if let Err(e) = self.deploy_multiple_sources(contracts).await { + error!(&self.name, "source subgraph deployment failed"); + return TestResult { + name: self.name.clone(), + subgraph: None, + status: TestStatus::Err(e), + }; + } + } + + status!(&self.name, "Deploying subgraph"); + let subgraph_name = match Subgraph::deploy(&self.name, contracts).await { + Ok(name) => name, + Err(e) => { + error!(&self.name, "Deploy failed"); + return TestResult { + name: self.name.clone(), + subgraph: None, + status: TestStatus::Err(e.context("Deploy failed")), + }; + } + }; + + self.check_health_and_test(contracts, subgraph_name).await + } + + async fn prepare_multiple_sources(&self, contracts: &[Contract]) -> Result<()> { + if let Some(sources) = &self.source_subgraph { + for source in sources { + let _ = Subgraph::prepare(source.test_name(), contracts).await?; + } + } + Ok(()) + } + async fn deploy_multiple_sources(&self, contracts: &[Contract]) -> Result<()> { if let Some(sources) = &self.source_subgraph { - for source in sources.split(",") { - let subgraph = self.deploy_and_wait(source, contracts).await?; + for source in sources { + let subgraph = self.deploy_and_wait(source.test_name(), contracts).await?; status!( - source, + source.test_name(), "Source subgraph deployed with hash {}", subgraph.deployment ); @@ -339,7 +405,7 @@ async fn test_int8(ctx: TestContext) -> anyhow::Result<()> { * the `cases` variable in `integration_tests`. */ -async fn test_timestamp(ctx: TestContext) -> anyhow::Result<()> { +pub async fn test_timestamp(ctx: TestContext) -> anyhow::Result<()> { let subgraph = ctx.subgraph; assert!(subgraph.healthy); @@ -367,7 +433,7 @@ async fn test_timestamp(ctx: TestContext) -> anyhow::Result<()> { Ok(()) } -async fn test_block_handlers(ctx: TestContext) -> anyhow::Result<()> { +pub async fn test_block_handlers(ctx: TestContext) -> anyhow::Result<()> { let subgraph = ctx.subgraph; assert!(subgraph.healthy); @@ -528,7 +594,7 @@ async fn test_eth_api(ctx: TestContext) -> anyhow::Result<()> { Ok(()) } -async fn subgraph_data_sources(ctx: TestContext) -> anyhow::Result<()> { +pub async fn subgraph_data_sources(ctx: TestContext) -> anyhow::Result<()> { let subgraph = ctx.subgraph; assert!(subgraph.healthy); let expected_response = json!({ @@ -973,10 +1039,12 @@ async fn test_missing(_sg: Subgraph) -> anyhow::Result<()> { Err(anyhow!("This test is missing")) } -async fn test_multiple_subgraph_datasources(ctx: TestContext) -> anyhow::Result<()> { +pub async fn test_multiple_subgraph_datasources(ctx: TestContext) -> anyhow::Result<()> { let subgraph = ctx.subgraph; assert!(subgraph.healthy); + println!("subgraph: {:?}", subgraph); + // Test querying data aggregated from multiple sources let exp = json!({ "aggregatedDatas": [ @@ -1041,12 +1109,12 @@ async fn integration_tests() -> anyhow::Result<()> { TestCase::new("ethereum-api-tests", test_eth_api), TestCase::new("topic-filter", test_topic_filters), TestCase::new_with_grafting("grafted", test_subgraph_grafting, "base"), - TestCase::new_with_source_subgraph( + TestCase::new_with_source_subgraphs( "subgraph-data-sources", subgraph_data_sources, - "source-subgraph", + vec!["source-subgraph"], ), - TestCase::new_with_multiple_source_subgraphs( + TestCase::new_with_source_subgraphs( "multiple-subgraph-datasources", test_multiple_subgraph_datasources, vec!["source-subgraph-a", "source-subgraph-b"], @@ -1120,13 +1188,13 @@ async fn integration_tests() -> anyhow::Result<()> { } } -async fn stop_graph_node(child: &mut Child) -> anyhow::Result<()> { +pub async fn stop_graph_node(child: &mut Child) -> anyhow::Result<()> { child.kill().await.context("Failed to kill graph-node")?; Ok(()) } -async fn yarn_workspace() -> anyhow::Result<()> { +pub async fn yarn_workspace() -> anyhow::Result<()> { // We shouldn't really have to do this since we use the bundled version // of graph-cli, but that gets very unhappy if the workspace isn't // initialized