Skip to content

Commit 50b561e

Browse files
committed
node: Wire file watching in dev mode to redeploy subgraphs
1 parent fb84ce3 commit 50b561e

File tree

3 files changed

+67
-16
lines changed

3 files changed

+67
-16
lines changed

node/src/bin/dev.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,17 @@ use std::{path::Path, sync::Arc};
33
use anyhow::{Context, Result};
44
use clap::Parser;
55
use git_testament::{git_testament, render_testament};
6-
use graph::{components::link_resolver::FileLinkResolver, env::EnvVars, tokio};
7-
use graph_node::{launcher, opt::Opt};
6+
use graph::{
7+
components::link_resolver::FileLinkResolver,
8+
env::EnvVars,
9+
log::logger,
10+
tokio::{self, sync::mpsc},
11+
};
12+
use graph_node::{
13+
dev::{helpers::DevModeContext, watcher::watch_subgraph_dir},
14+
launcher,
15+
opt::Opt,
16+
};
817
use lazy_static::lazy_static;
918
use pgtemp::PgTempDBBuilder;
1019

@@ -101,15 +110,16 @@ fn get_build_dir(manifest_path_str: &str) -> Result<std::path::PathBuf> {
101110
.context("Failed to canonicalize build directory path")
102111
}
103112

104-
async fn run_graph_node(opt: Opt, file_link_resolver: Arc<FileLinkResolver>) -> Result<()> {
113+
async fn run_graph_node(opt: Opt, ctx: Option<DevModeContext>) -> Result<()> {
105114
let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?);
106115

107-
launcher::run(opt, env_vars, Some(file_link_resolver)).await;
116+
launcher::run(opt, env_vars, ctx).await;
108117
Ok(())
109118
}
110119

111120
#[tokio::main]
112121
async fn main() -> Result<()> {
122+
env_logger::init();
113123
let dev_opt = DevOpt::parse();
114124

115125
let build_dir = get_build_dir(&dev_opt.manifest)?;
@@ -121,11 +131,39 @@ async fn main() -> Result<()> {
121131
.start_async()
122132
.await;
123133

134+
let (tx, rx) = mpsc::channel(1);
124135
let opt = build_args(&dev_opt, &db.connection_uri(), &dev_opt.manifest)?;
125136
let file_link_resolver = Arc::new(FileLinkResolver::with_base_dir(&build_dir));
126137

138+
let ctx = DevModeContext {
139+
watch: dev_opt.watch,
140+
file_link_resolver,
141+
updates_rx: rx,
142+
};
143+
144+
let subgraph = opt.subgraph.clone().unwrap();
145+
146+
// Set up logger
147+
let logger = logger(opt.debug);
148+
127149
// Run graph node
128-
run_graph_node(opt, file_link_resolver).await?;
150+
graph::spawn(async move {
151+
let _ = run_graph_node(opt, Some(ctx)).await;
152+
});
153+
154+
if dev_opt.watch {
155+
graph::spawn_blocking(async move {
156+
watch_subgraph_dir(
157+
&logger,
158+
build_dir,
159+
subgraph,
160+
vec!["pgtemp-*".to_string()],
161+
tx,
162+
)
163+
.await;
164+
});
165+
}
129166

167+
graph::futures03::future::pending::<()>().await;
130168
Ok(())
131169
}

node/src/launcher.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ use graph::futures03::compat::Future01CompatExt;
66
use graph::futures03::future::TryFutureExt;
77

88
use crate::config::Config;
9+
use crate::dev::helpers::{watch_subgraph_updates, DevModeContext};
910
use crate::network_setup::Networks;
1011
use crate::opt::Opt;
1112
use crate::store_builder::StoreBuilder;
1213
use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap};
13-
use graph::components::link_resolver::{ArweaveClient, FileLinkResolver, FileSizeLimit};
14+
use graph::components::link_resolver::{ArweaveClient, FileSizeLimit};
1415
use graph::components::subgraph::Settings;
1516
use graph::data::graphql::load_manager::LoadManager;
1617
use graph::endpoint::EndpointMetrics;
@@ -347,12 +348,7 @@ fn build_graphql_server(
347348
graphql_server
348349
}
349350

350-
pub async fn run(
351-
opt: Opt,
352-
env_vars: Arc<EnvVars>,
353-
file_link_resolver: Option<Arc<FileLinkResolver>>,
354-
) {
355-
env_logger::init();
351+
pub async fn run(opt: Opt, env_vars: Arc<EnvVars>, dev_ctx: Option<DevModeContext>) {
356352
// Set up logger
357353
let logger = logger(opt.debug);
358354

@@ -440,9 +436,8 @@ pub async fn run(
440436

441437
// Convert the clients into a link resolver. Since we want to get past
442438
// possible temporary DNS failures, make the resolver retry
443-
let link_resolver: Arc<dyn LinkResolver> = if let Some(file_link_resolver) = file_link_resolver
444-
{
445-
file_link_resolver
439+
let link_resolver: Arc<dyn LinkResolver> = if let Some(dev_ctx) = &dev_ctx {
440+
dev_ctx.file_link_resolver.clone()
446441
} else {
447442
Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone()))
448443
};
@@ -563,7 +558,7 @@ pub async fn run(
563558

564559
// Add the CLI subgraph with a REST request to the admin server.
565560
if let Some(subgraph) = subgraph {
566-
deploy_subgraph_from_flag(subgraph, &opt, subgraph_registrar.clone(), node_id);
561+
deploy_subgraph_from_flag(subgraph, &opt, subgraph_registrar.clone(), node_id.clone());
567562
}
568563

569564
// Serve GraphQL queries over HTTP
@@ -578,6 +573,23 @@ pub async fn run(
578573
.await
579574
.expect("Failed to start metrics server")
580575
});
576+
577+
// If we are in dev mode, watch for subgraph updates
578+
// And drop and recreate the subgraph when it changes
579+
if let Some(dev_ctx) = dev_ctx {
580+
if dev_ctx.watch {
581+
graph::spawn(async move {
582+
watch_subgraph_updates(
583+
&logger,
584+
network_store.subgraph_store(),
585+
subgraph_registrar.clone(),
586+
node_id.clone(),
587+
dev_ctx.updates_rx,
588+
)
589+
.await;
590+
});
591+
}
592+
}
581593
};
582594

583595
graph::spawn(launch_services(logger.clone(), env_vars.cheap_clone()));

node/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ fn main() {
2323
}
2424

2525
async fn main_inner() {
26+
env_logger::init();
2627
let env_vars = Arc::new(EnvVars::from_env().unwrap());
2728
let opt = opt::Opt::parse();
2829

0 commit comments

Comments
 (0)