Skip to content

Commit c4aac4e

Browse files
committed
node: Support subgraph datasource in gnd
1 parent b52a5d1 commit c4aac4e

File tree

3 files changed

+74
-15
lines changed

3 files changed

+74
-15
lines changed

node/src/bin/dev.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, path::Path, sync::Arc};
1+
use std::{path::Path, sync::Arc};
22

33
use anyhow::{Context, Result};
44
use clap::Parser;
@@ -49,6 +49,14 @@ pub struct DevOpt {
4949
)]
5050
pub manifests: Vec<String>,
5151

52+
#[clap(
53+
long,
54+
value_name = "ALIAS:MANIFEST:[BUILD_DIR]",
55+
value_delimiter = ',',
56+
help = "The location of the source subgraph manifest files. This is used to resolve aliases in the manifest files for subgraph data sources. The format is ALIAS:MANIFEST:[BUILD_DIR], where ALIAS is the alias name, BUILD_DIR is the build directory relative to the manifest file, and MANIFEST is the manifest file location."
57+
)]
58+
pub sources: Vec<String>,
59+
5260
#[clap(
5361
long,
5462
help = "The location of the database directory.",
@@ -126,8 +134,9 @@ async fn main() -> Result<()> {
126134
let (tx, rx) = mpsc::channel(1);
127135
let opt = build_args(&dev_opt, &db.connection_uri())?;
128136

129-
let manifests_paths = parse_manifest_args(dev_opt.manifests, &logger)?;
130-
let file_link_resolver = Arc::new(FileLinkResolver::new(None, HashMap::new()));
137+
let (manifests_paths, source_subgraph_aliases) =
138+
parse_manifest_args(dev_opt.manifests, dev_opt.sources, &logger)?;
139+
let file_link_resolver = Arc::new(FileLinkResolver::new(None, source_subgraph_aliases.clone()));
131140

132141
let ctx = DevModeContext {
133142
watch: dev_opt.watch,
@@ -142,8 +151,14 @@ async fn main() -> Result<()> {
142151

143152
if dev_opt.watch {
144153
graph::spawn_blocking(async move {
145-
let result =
146-
watch_subgraphs(&logger, manifests_paths, vec!["pgtemp-*".to_string()], tx).await;
154+
let result = watch_subgraphs(
155+
&logger,
156+
manifests_paths,
157+
source_subgraph_aliases,
158+
vec!["pgtemp-*".to_string()],
159+
tx,
160+
)
161+
.await;
147162
if let Err(e) = result {
148163
error!(logger, "Error watching subgraphs"; "error" => e.to_string());
149164
std::process::exit(1);

node/src/dev/helpers.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ pub async fn watch_subgraph_updates(
126126
std::process::exit(1);
127127
}
128128

129+
/// Parse an alias string into a tuple of (alias_name, manifest, Option<build_dir>)
129130
pub fn parse_alias(alias: &str) -> anyhow::Result<(String, String, Option<String>)> {
130131
let mut split = alias.split(':');
131132
let alias_name = split.next();
@@ -139,12 +140,13 @@ pub fn parse_alias(alias: &str) -> anyhow::Result<(String, String, Option<String
139140
}
140141

141142
let alias_name = alias_name.unwrap().to_owned();
142-
let (build_dir, manifest) = parse_manifest_arg(alias_value.unwrap())
143+
let (manifest, build_dir) = parse_manifest_arg(alias_value.unwrap())
143144
.with_context(|| format!("While parsing alias '{}'", alias))?;
144145

145-
Ok((alias_name, build_dir, manifest))
146+
Ok((alias_name, manifest, build_dir))
146147
}
147148

149+
/// Parse a manifest string into a tuple of (manifest, Option<build_dir>)
148150
pub fn parse_manifest_arg(value: &str) -> anyhow::Result<(String, Option<String>)> {
149151
match value.split_once(':') {
150152
Some((manifest, build_dir)) if !manifest.is_empty() => {

node/src/dev/watcher.rs

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,33 @@ use graph::prelude::{DeploymentHash, SubgraphName};
44
use graph::slog::{self, error, info, Logger};
55
use graph::tokio::sync::mpsc::Sender;
66
use notify::{recommended_watcher, Event, RecursiveMode, Watcher};
7+
use std::collections::HashMap;
78
use std::path::{Path, PathBuf};
89
use std::sync::mpsc;
910
use std::time::Duration;
1011

11-
use super::helpers::parse_manifest_arg;
12+
use super::helpers::{parse_alias, parse_manifest_arg};
1213

1314
const WATCH_DELAY: Duration = Duration::from_secs(5);
1415
const DEFAULT_BUILD_DIR: &str = "build";
1516

1617
// Parses manifest arguments and returns a vector of paths to the manifest files
17-
pub fn parse_manifest_args(manifests: Vec<String>, logger: &Logger) -> Result<Vec<PathBuf>> {
18+
pub fn parse_manifest_args(
19+
manifests: Vec<String>,
20+
subgraph_sources: Vec<String>,
21+
logger: &Logger,
22+
) -> Result<(Vec<PathBuf>, HashMap<String, PathBuf>)> {
1823
let mut manifests_paths = Vec::new();
24+
let mut source_subgraph_aliases = HashMap::new();
25+
26+
for subgraph_source in subgraph_sources {
27+
let (alias_name, manifest_path_str, build_dir_opt) = parse_alias(&subgraph_source)?;
28+
let manifest_path =
29+
process_manifest(build_dir_opt, &manifest_path_str, Some(&alias_name), logger)?;
30+
31+
manifests_paths.push(manifest_path.clone());
32+
source_subgraph_aliases.insert(alias_name, manifest_path);
33+
}
1934

2035
for manifest_str in manifests {
2136
let (manifest_path_str, build_dir_opt) = parse_manifest_arg(&manifest_str)
@@ -27,7 +42,7 @@ pub fn parse_manifest_args(manifests: Vec<String>, logger: &Logger) -> Result<Ve
2742
manifests_paths.push(built_manifest_path);
2843
}
2944

30-
Ok(manifests_paths)
45+
Ok((manifests_paths, source_subgraph_aliases))
3146
}
3247

3348
/// Helper function to process a manifest
@@ -103,12 +118,20 @@ fn process_manifest(
103118
pub async fn watch_subgraphs(
104119
logger: &Logger,
105120
manifests_paths: Vec<PathBuf>,
121+
source_subgraph_aliases: HashMap<String, PathBuf>,
106122
exclusions: Vec<String>,
107123
sender: Sender<(DeploymentHash, SubgraphName)>,
108124
) -> Result<()> {
109125
let logger = logger.new(slog::o!("component" => "Watcher"));
110126

111-
watch_subgraph_dirs(&logger, manifests_paths, exclusions, sender).await?;
127+
watch_subgraph_dirs(
128+
&logger,
129+
manifests_paths,
130+
source_subgraph_aliases,
131+
exclusions,
132+
sender,
133+
)
134+
.await?;
112135
Ok(())
113136
}
114137

@@ -117,6 +140,7 @@ pub async fn watch_subgraphs(
117140
pub async fn watch_subgraph_dirs(
118141
logger: &Logger,
119142
manifests_paths: Vec<PathBuf>,
143+
source_subgraph_aliases: HashMap<String, PathBuf>,
120144
exclusions: Vec<String>,
121145
sender: Sender<(DeploymentHash, SubgraphName)>,
122146
) -> Result<()> {
@@ -159,7 +183,15 @@ pub async fn watch_subgraph_dirs(
159183
}
160184

161185
// Process file change events
162-
process_file_events(logger, rx, &exclusion_set, &manifests_paths, sender).await
186+
process_file_events(
187+
logger,
188+
rx,
189+
&exclusion_set,
190+
&manifests_paths,
191+
&source_subgraph_aliases,
192+
sender,
193+
)
194+
.await
163195
}
164196

165197
/// Processes file change events and triggers redeployments
@@ -168,6 +200,7 @@ async fn process_file_events(
168200
rx: mpsc::Receiver<Result<Event, notify::Error>>,
169201
exclusion_set: &GlobSet,
170202
manifests_paths: &Vec<PathBuf>,
203+
source_subgraph_aliases: &HashMap<String, PathBuf>,
171204
sender: Sender<(DeploymentHash, SubgraphName)>,
172205
) -> Result<()> {
173206
loop {
@@ -203,7 +236,7 @@ async fn process_file_events(
203236
}
204237

205238
// Redeploy all subgraphs
206-
redeploy_all_subgraphs(logger, manifests_paths, &sender).await?;
239+
redeploy_all_subgraphs(logger, manifests_paths, source_subgraph_aliases, &sender).await?;
207240
}
208241
}
209242

@@ -223,15 +256,24 @@ fn is_relevant_event(event: &Event, watched_dirs: Vec<PathBuf>, exclusion_set: &
223256
async fn redeploy_all_subgraphs(
224257
logger: &Logger,
225258
manifests_paths: &Vec<PathBuf>,
259+
source_subgraph_aliases: &HashMap<String, PathBuf>,
226260
sender: &Sender<(DeploymentHash, SubgraphName)>,
227261
) -> Result<()> {
228262
info!(logger, "File change detected, redeploying all subgraphs");
229263
let mut count = 0;
230264
for manifest_path in manifests_paths {
265+
let alias_name = source_subgraph_aliases
266+
.iter()
267+
.find(|(_, path)| path == &manifest_path)
268+
.map(|(name, _)| name);
269+
270+
let id = alias_name
271+
.map(|s| s.to_owned())
272+
.unwrap_or_else(|| manifest_path.display().to_string());
273+
231274
let _ = sender
232275
.send((
233-
DeploymentHash::new(manifest_path.display().to_string())
234-
.map_err(|_| anyhow!("Failed to create deployment hash"))?,
276+
DeploymentHash::new(id).map_err(|_| anyhow!("Failed to create deployment hash"))?,
235277
SubgraphName::new(format!("subgraph-{}", count))
236278
.map_err(|_| anyhow!("Failed to create subgraph name"))?,
237279
))

0 commit comments

Comments
 (0)