Skip to content

gnd: Integration tests #6035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: krishna/gnd-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions node/src/bin/dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use graph::{
};
use graph_core::polling_monitor::ipfs_service;
use graph_node::{
dev::watcher,
dev::watcher::{parse_manifest_args, watch_subgraphs},
launcher,
opt::Opt,
Expand Down Expand Up @@ -90,6 +91,35 @@ pub struct DevOpt {
default_value = "https://api.thegraph.com/ipfs"
)]
pub ipfs: Vec<String>,
#[clap(
long,
default_value = "8000",
value_name = "PORT",
help = "Port for the GraphQL HTTP server",
env = "GRAPH_GRAPHQL_HTTP_PORT"
)]
pub http_port: u16,
#[clap(
long,
default_value = "8030",
value_name = "PORT",
help = "Port for the index node server"
)]
pub index_node_port: u16,
#[clap(
long,
default_value = "8020",
value_name = "PORT",
help = "Port for the JSON-RPC admin server"
)]
pub admin_port: u16,
#[clap(
long,
default_value = "8040",
value_name = "PORT",
help = "Port for the Prometheus metrics server"
)]
pub metrics_port: u16,
}

/// Builds the Graph Node options from DevOpt
Expand All @@ -109,7 +139,12 @@ fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result<Opt> {
args.push("--postgres-url".to_string());
args.push(db_url.to_string());

let opt = Opt::parse_from(args);
let mut opt = Opt::parse_from(args);

opt.http_port = dev_opt.http_port;
opt.admin_port = dev_opt.admin_port;
opt.metrics_port = dev_opt.metrics_port;
opt.index_node_port = dev_opt.index_node_port;

Ok(opt)
}
Expand All @@ -118,7 +153,7 @@ async fn run_graph_node(
logger: &Logger,
opt: Opt,
link_resolver: Arc<dyn LinkResolver>,
subgraph_updates_channel: Option<mpsc::Receiver<(DeploymentHash, SubgraphName)>>,
subgraph_updates_channel: mpsc::Receiver<(DeploymentHash, SubgraphName)>,
) -> Result<()> {
let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?);

Expand All @@ -139,7 +174,7 @@ async fn run_graph_node(
env_vars,
ipfs_service,
link_resolver,
subgraph_updates_channel,
Some(subgraph_updates_channel),
)
.await;
Ok(())
Expand Down Expand Up @@ -201,14 +236,22 @@ async fn main() -> Result<()> {
parse_manifest_args(dev_opt.manifests, dev_opt.sources, &logger)?;
let file_link_resolver = Arc::new(FileLinkResolver::new(None, source_subgraph_aliases.clone()));

let (tx, rx) = dev_opt.watch.then(|| mpsc::channel(1)).unzip();
let (tx, rx) = mpsc::channel(1);

let logger_clone = logger.clone();
graph::spawn(async move {
let _ = run_graph_node(&logger_clone, opt, file_link_resolver, rx).await;
});

if let Some(tx) = tx {
if let Err(e) =
watcher::deploy_all_subgraphs(&logger, &manifests_paths, &source_subgraph_aliases, &tx)
.await
{
error!(logger, "Error deploying subgraphs"; "error" => e.to_string());
std::process::exit(1);
}

if dev_opt.watch {
graph::spawn_blocking(async move {
if let Err(e) = watch_subgraphs(
&logger,
Expand Down
2 changes: 1 addition & 1 deletion node/src/dev/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ fn is_relevant_event(event: &Event, watched_dirs: Vec<PathBuf>, exclusion_set: &
}

/// Redeploys all subgraphs in the order it appears in the manifests_paths
async fn deploy_all_subgraphs(
pub async fn deploy_all_subgraphs(
logger: &Logger,
manifests_paths: &Vec<PathBuf>,
source_subgraph_aliases: &HashMap<String, PathBuf>,
Expand Down
47 changes: 45 additions & 2 deletions tests/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use std::{fs, path::PathBuf};

Expand All @@ -13,6 +14,15 @@ use crate::status;

lazy_static! {
pub static ref CONFIG: Config = Config::default();
static ref DEV_MODE: OnceLock<bool> = OnceLock::new();
}

pub fn set_dev_mode(val: bool) {
DEV_MODE.set(val).expect("DEV_MODE already set");
}

pub fn dev_mode() -> bool {
*DEV_MODE.get().unwrap_or(&false)
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -117,6 +127,26 @@ impl GraphNodeConfig {
}
}
}

pub fn from_env() -> Self {
if dev_mode() {
Self::gnd()
} else {
Self::default()
}
}

fn gnd() -> Self {
let bin = fs::canonicalize("../target/debug/gnd")
.expect("failed to infer `graph-node` program location. (Was it built already?)");

Self {
bin,
ports: GraphNodePorts::default(),
ipfs_uri: "http://localhost:3001".to_string(),
log_file: TestFile::new("integration-tests/graph-node.log"),
}
}
}

impl Default for GraphNodeConfig {
Expand Down Expand Up @@ -145,6 +175,13 @@ pub struct Config {

impl Config {
pub async fn spawn_graph_node(&self) -> anyhow::Result<Child> {
self.spawn_graph_node_with_args(&[]).await
}

pub async fn spawn_graph_node_with_args(
&self,
additional_args: &[&str],
) -> anyhow::Result<Child> {
let ports = &self.graph_node.ports;

let args = [
Expand All @@ -163,6 +200,12 @@ impl Config {
"--metrics-port",
&ports.metrics.to_string(),
];

let args = args
.iter()
.chain(additional_args.iter())
.cloned()
.collect::<Vec<_>>();
let stdout = self.graph_node.log_file.create();
let stderr = stdout.try_clone()?;
status!(
Expand All @@ -174,7 +217,7 @@ impl Config {
command
.stdout(stdout)
.stderr(stderr)
.args(args)
.args(args.clone())
.env("GRAPH_STORE_WRITE_BATCH_DURATION", "5")
.env("ETHEREUM_REORG_THRESHOLD", "0");

Expand Down Expand Up @@ -254,7 +297,7 @@ impl Default for Config {
port: 3021,
host: "localhost".to_string(),
},
graph_node: GraphNodeConfig::default(),
graph_node: GraphNodeConfig::from_env(),
graph_cli,
num_parallel_tests,
timeout: Duration::from_secs(600),
Expand Down
16 changes: 13 additions & 3 deletions tests/src/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Subgraph {
}

impl Subgraph {
fn dir(name: &str) -> TestFile {
pub fn dir(name: &str) -> TestFile {
TestFile::new(&format!("integration-tests/{name}"))
}

Expand All @@ -47,8 +47,11 @@ impl Subgraph {
Ok(())
}

/// Deploy the subgraph by running the required `graph` commands
pub async fn deploy(name: &str, contracts: &[Contract]) -> anyhow::Result<String> {
/// Prepare the subgraph for deployment by patching contracts and checking for subgraph datasources
pub async fn prepare(
name: &str,
contracts: &[Contract],
) -> anyhow::Result<(TestFile, String, bool)> {
let dir = Self::dir(name);
let name = format!("test/{name}");

Expand All @@ -62,6 +65,13 @@ impl Subgraph {
.and_then(|ds| ds.iter().find(|d| d["kind"].as_str() == Some("subgraph")))
.is_some();

Ok((dir, name, has_subgraph_datasource))
}

/// Deploy the subgraph by running the required `graph` commands
pub async fn deploy(name: &str, contracts: &[Contract]) -> anyhow::Result<String> {
let (dir, name, has_subgraph_datasource) = Self::prepare(name, contracts).await?;

// graph codegen subgraph.yaml
let mut prog = Command::new(&CONFIG.graph_cli);
let mut cmd = prog.arg("codegen").arg("subgraph.yaml.patched");
Expand Down
148 changes: 148 additions & 0 deletions tests/tests/gnd_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use anyhow::anyhow;
use graph::futures03::StreamExt;
use graph_tests::config::set_dev_mode;
use graph_tests::contract::Contract;
use graph_tests::subgraph::Subgraph;
use graph_tests::{error, status, CONFIG};

mod integration_tests;

use integration_tests::{
stop_graph_node, subgraph_data_sources, test_block_handlers,
test_multiple_subgraph_datasources, yarn_workspace, TestCase, TestResult,
};

/// The main test entrypoint.
#[tokio::test]
async fn gnd_tests() -> anyhow::Result<()> {
set_dev_mode(true);

let test_name_to_run = std::env::var("TEST_CASE").ok();

let cases = vec![
TestCase::new("block-handlers", test_block_handlers),
TestCase::new_with_source_subgraphs(
"subgraph-data-sources",
subgraph_data_sources,
vec!["QmWi3H11QFE2PiWx6WcQkZYZdA5UasaBptUJqGn54MFux5:source-subgraph"],
),
TestCase::new_with_source_subgraphs(
"multiple-subgraph-datasources",
test_multiple_subgraph_datasources,
vec![
"QmYHp1bPEf7EoYBpEtJUpZv1uQHYQfWE4AhvR6frjB1Huj:source-subgraph-a",
"QmYBEzastJi7bsa722ac78tnZa6xNnV9vvweerY4kVyJtq:source-subgraph-b",
],
),
];

// Filter the test cases if a specific test name is provided
let cases_to_run: Vec<_> = if let Some(test_name) = test_name_to_run {
cases
.into_iter()
.filter(|case| case.name == test_name)
.collect()
} else {
cases
};

let contracts = Contract::deploy_all().await?;

status!("setup", "Resetting database");
CONFIG.reset_database();

status!("setup", "Initializing yarn workspace");
yarn_workspace().await?;

for i in cases_to_run.iter() {
i.prepare(&contracts).await?;
}
status!("setup", "Prepared all cases");

let manifests = cases_to_run
.iter()
.map(|case| {
Subgraph::dir(&case.name)
.path
.join("subgraph.yaml")
.to_str()
.unwrap()
.to_string()
})
.collect::<Vec<_>>()
.join(",");

let aliases = cases_to_run
.iter()
.filter_map(|case| case.source_subgraph.as_ref())
.flatten()
.filter_map(|source_subgraph| {
source_subgraph.alias().map(|alias| {
let manifest_path = Subgraph::dir(source_subgraph.test_name())
.path
.join("subgraph.yaml")
.to_str()
.unwrap()
.to_string();
format!("{}:{}", alias, manifest_path)
})
})
.collect::<Vec<_>>();

let aliases_str = aliases.join(",");
let args = if aliases.is_empty() {
vec!["--manifests", &manifests]
} else {
vec!["--manifests", &manifests, "--sources", &aliases_str]
};

// Spawn graph-node.
status!("graph-node", "Starting graph-node");

let mut graph_node_child_command = CONFIG.spawn_graph_node_with_args(&args).await?;

let num_sources = aliases.len();

let stream = tokio_stream::iter(cases_to_run)
.enumerate()
.map(|(index, case)| {
let subgraph_name = format!("subgraph-{}", num_sources + index);
case.check_health_and_test(&contracts, subgraph_name)
})
.buffered(CONFIG.num_parallel_tests);

let mut results: Vec<TestResult> = stream.collect::<Vec<_>>().await;
results.sort_by_key(|result| result.name.clone());

// Stop graph-node and read its output.
let graph_node_res = stop_graph_node(&mut graph_node_child_command).await;

status!(
"graph-node",
"graph-node logs are in {}",
CONFIG.graph_node.log_file.path.display()
);

match graph_node_res {
Ok(_) => {
status!("graph-node", "Stopped graph-node");
}
Err(e) => {
error!("graph-node", "Failed to stop graph-node: {}", e);
}
}

println!("\n\n{:=<60}", "");
println!("Test results:");
println!("{:-<60}", "");
for result in &results {
result.print();
}
println!("\n");

if results.iter().any(|result| !result.success()) {
Err(anyhow!("Some tests failed"))
} else {
Ok(())
}
}
Loading