From 28124cc2bdd5fd14d319789caf82ddaccefea154 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 8 May 2025 16:19:29 +0200 Subject: [PATCH 1/3] add feature flag to verify aggregation graph --- .../crates/turbo-tasks-backend/Cargo.toml | 1 + .../turbo-tasks-backend/src/backend/mod.rs | 162 +++++++++++++++++- 2 files changed, 160 insertions(+), 3 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index 4755e1de6cef5..49db8a52e791b 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [features] default = [] verify_serialization = [] +verify_aggregation_graph = [] trace_aggregation_update = [] trace_find_and_schedule = [] trace_task_completion = [] diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 33a2e75d4ac8d..1184c291882dd 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -189,10 +189,15 @@ struct TurboTasksBackendInner { stopping_event: Event, idle_start_event: Event, idle_end_event: Event, + #[cfg(feature = "verify_aggregation_graph")] + is_idle: AtomicBool, task_statistics: TaskStatisticsApi, backing_storage: B, + + #[cfg(feature = "verify_aggregation_graph")] + root_tasks: Mutex>, } impl TurboTasksBackend { @@ -237,8 +242,12 @@ impl TurboTasksBackendInner { stopping_event: Event::new(|| "TurboTasksBackend::stopping_event".to_string()), idle_start_event: Event::new(|| "TurboTasksBackend::idle_start_event".to_string()), idle_end_event: Event::new(|| "TurboTasksBackend::idle_end_event".to_string()), + #[cfg(feature = "verify_aggregation_graph")] + is_idle: AtomicBool::new(false), task_statistics: TaskStatisticsApi::default(), backing_storage, + #[cfg(feature = "verify_aggregation_graph")] + root_tasks: Default::default(), } } @@ -986,11 +995,37 @@ impl TurboTasksBackendInner { } } - fn idle_start(&self) { + #[allow(unused_variables)] + fn idle_start(self: &Arc, turbo_tasks: &dyn TurboTasksBackendApi>) { self.idle_start_event.notify(usize::MAX); + + #[cfg(feature = "verify_aggregation_graph")] + { + use tokio::select; + + self.is_idle.store(true, Ordering::Release); + let this = self.clone(); + let turbo_tasks = turbo_tasks.pin(); + tokio::task::spawn(async move { + select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + // do nothing + } + _ = this.idle_end_event.listen() => { + return; + } + } + if !this.is_idle.load(Ordering::Relaxed) { + return; + } + this.verify_aggregation_graph(&*turbo_tasks); + }); + } } fn idle_end(&self) { + #[cfg(feature = "verify_aggregation_graph")] + self.is_idle.store(false, Ordering::Release); self.idle_end_event.notify(usize::MAX); } @@ -2148,6 +2183,8 @@ impl TurboTasksBackendInner { RootType::OnceTask => "Once Task".to_string(), })); } + #[cfg(feature = "verify_aggregation_graph")] + self.root_tasks.lock().insert(task_id); task_id } @@ -2156,6 +2193,9 @@ impl TurboTasksBackendInner { task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi>, ) { + #[cfg(feature = "verify_aggregation_graph")] + self.root_tasks.lock().remove(&task_id); + let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id)); @@ -2176,6 +2216,122 @@ impl TurboTasksBackendInner { } } + #[cfg(feature = "verify_aggregation_graph")] + fn verify_aggregation_graph( + &self, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { + use std::collections::VecDeque; + + use crate::backend::operation::{get_uppers, is_aggregating_node}; + + let mut ctx = self.execute_context(turbo_tasks); + let root_tasks = self.root_tasks.lock().clone(); + + for task_id in root_tasks { + let mut queue = VecDeque::new(); + let mut visited = FxHashSet::default(); + let mut aggregated_nodes = FxHashSet::default(); + let mut collectibles = FxHashMap::default(); + let root_task_id = task_id; + visited.insert(task_id); + aggregated_nodes.insert(task_id); + queue.push_back(task_id); + while let Some(task_id) = queue.pop_front() { + let task = ctx.task(task_id, TaskDataCategory::All); + if !self.is_idle.load(Ordering::Relaxed) { + return; + } + + let uppers = get_uppers(&task); + if task_id != root_task_id + && !uppers.iter().any(|upper| aggregated_nodes.contains(upper)) + { + println!( + "Task {} {} doesn't report to any root but is reachable from one (uppers: \ + {:?})", + task_id, + ctx.get_task_description(task_id), + uppers + ); + } + + let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible}); + for collectible in aggregated_collectibles { + collectibles + .entry(collectible) + .or_insert_with(|| (false, Vec::new())) + .1 + .push(task_id); + } + + let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible}); + for collectible in own_collectibles { + if let Some((flag, _)) = collectibles.get_mut(&collectible) { + *flag = true + } else { + println!( + "Task {} has a collectible {:?} that is not in any upper task", + task_id, collectible + ); + } + } + + let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id)); + let has_dirty_container = get!(task, AggregatedDirtyContainerCount) + .is_some_and(|count| count.get(self.session_id) > 0); + let should_be_in_upper = is_dirty || has_dirty_container; + + let aggregation_number = get_aggregation_number(&task); + if is_aggregating_node(aggregation_number) { + aggregated_nodes.insert(task_id); + } + // println!( + // "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}", + // ctx.get_task_description(task_id), + // uppers + // ); + + for child_id in iter_many!(task, Child { task } => task) { + // println!("{task_id}: child -> {child_id}"); + if visited.insert(child_id) { + queue.push_back(child_id); + } + } + drop(task); + + if should_be_in_upper { + for upper_id in uppers { + let task = ctx.task(task_id, TaskDataCategory::All); + let in_upper = get!(task, AggregatedDirtyContainer { task: task_id }) + .is_some_and(|dirty| dirty.get(self.session_id) > 0); + if !in_upper { + println!( + "Task {} is dirty, but is not listed in the upper task {}", + task_id, upper_id + ); + } + } + } + } + + for (collectible, (flag, task_ids)) in collectibles { + if !flag { + println!( + "{:?} that is not emitted in any child task but in these aggregated \ + tasks: {:#?}", + collectible, + task_ids + .iter() + .map(|t| format!("{t} {}", ctx.get_task_description(*t))) + .collect::>() + ); + } + } + println!("visited {task_id} {} tasks", visited.len()); + } + } + fn assert_not_persistent_calling_transient( &self, parent_id: TaskId, @@ -2258,8 +2414,8 @@ impl Backend for TurboTasksBackend { self.0.stop(); } - fn idle_start(&self, _turbo_tasks: &dyn TurboTasksBackendApi) { - self.0.idle_start(); + fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + self.0.idle_start(turbo_tasks); } fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi) { From 6ac4f0014ac1b7f7addc42ffe0b7cddd36025c3a Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 14 May 2025 15:02:28 +0200 Subject: [PATCH 2/3] verify graph on stop --- turbopack/crates/turbo-tasks-backend/src/backend/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 1184c291882dd..8b058f8613c0f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -989,7 +989,9 @@ impl TurboTasksBackendInner { self.stopping_event.notify(usize::MAX); } - fn stop(&self) { + fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi>) { + #[cfg(feature = "verify_aggregation_graph")] + self.verify_aggregation_graph(turbo_tasks); if let Err(err) = self.backing_storage.shutdown() { println!("Shutting down failed: {err}"); } @@ -2410,8 +2412,8 @@ impl Backend for TurboTasksBackend { self.0.stopping(); } - fn stop(&self, _turbo_tasks: &dyn TurboTasksBackendApi) { - self.0.stop(); + fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + self.0.stop(turbo_tasks); } fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi) { From 22996be03611df351b6c8b5d7b9cadf08294f35f Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 15 May 2025 15:53:00 +0200 Subject: [PATCH 3/3] verify progress --- .../turbo-tasks-backend/src/backend/mod.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 8b058f8613c0f..2a7536330d769 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -2223,14 +2223,19 @@ impl TurboTasksBackendInner { &self, turbo_tasks: &dyn TurboTasksBackendApi>, ) { - use std::collections::VecDeque; + if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") { + return; + } + use std::{collections::VecDeque, env}; use crate::backend::operation::{get_uppers, is_aggregating_node}; let mut ctx = self.execute_context(turbo_tasks); let root_tasks = self.root_tasks.lock().clone(); + let len = root_tasks.len(); - for task_id in root_tasks { + for (i, task_id) in root_tasks.into_iter().enumerate() { + println!("Verifying graph from root {task_id} {i}/{len}..."); let mut queue = VecDeque::new(); let mut visited = FxHashSet::default(); let mut aggregated_nodes = FxHashSet::default(); @@ -2239,7 +2244,17 @@ impl TurboTasksBackendInner { visited.insert(task_id); aggregated_nodes.insert(task_id); queue.push_back(task_id); + let mut counter = 0; while let Some(task_id) = queue.pop_front() { + counter += 1; + if counter % 100000 == 0 { + println!( + "queue={}, visited={}, aggregated_nodes={}", + queue.len(), + visited.len(), + aggregated_nodes.len() + ); + } let task = ctx.task(task_id, TaskDataCategory::All); if !self.is_idle.load(Ordering::Relaxed) { return;