diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index 4755e1de6cef5..49db8a52e791b 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [features] default = [] verify_serialization = [] +verify_aggregation_graph = [] trace_aggregation_update = [] trace_find_and_schedule = [] trace_task_completion = [] diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 33a2e75d4ac8d..fe70f468b5b5d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -189,10 +189,15 @@ struct TurboTasksBackendInner { stopping_event: Event, idle_start_event: Event, idle_end_event: Event, + #[cfg(feature = "verify_aggregation_graph")] + is_idle: AtomicBool, task_statistics: TaskStatisticsApi, backing_storage: B, + + #[cfg(feature = "verify_aggregation_graph")] + root_tasks: Mutex>, } impl TurboTasksBackend { @@ -237,8 +242,12 @@ impl TurboTasksBackendInner { stopping_event: Event::new(|| "TurboTasksBackend::stopping_event".to_string()), idle_start_event: Event::new(|| "TurboTasksBackend::idle_start_event".to_string()), idle_end_event: Event::new(|| "TurboTasksBackend::idle_end_event".to_string()), + #[cfg(feature = "verify_aggregation_graph")] + is_idle: AtomicBool::new(false), task_statistics: TaskStatisticsApi::default(), backing_storage, + #[cfg(feature = "verify_aggregation_graph")] + root_tasks: Default::default(), } } @@ -980,17 +989,49 @@ impl TurboTasksBackendInner { self.stopping_event.notify(usize::MAX); } - fn stop(&self) { + #[allow(unused_variables)] + fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi>) { + #[cfg(feature = "verify_aggregation_graph")] + { + self.is_idle.store(false, Ordering::Release); + self.verify_aggregation_graph(turbo_tasks, false); + } if let Err(err) = self.backing_storage.shutdown() { println!("Shutting down failed: {err}"); } } - fn idle_start(&self) { + #[allow(unused_variables)] + fn idle_start(self: &Arc, turbo_tasks: &dyn TurboTasksBackendApi>) { self.idle_start_event.notify(usize::MAX); + + #[cfg(feature = "verify_aggregation_graph")] + { + use tokio::select; + + self.is_idle.store(true, Ordering::Release); + let this = self.clone(); + let turbo_tasks = turbo_tasks.pin(); + tokio::task::spawn(async move { + select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + // do nothing + } + _ = this.idle_end_event.listen() => { + return; + } + } + if !this.is_idle.load(Ordering::Relaxed) { + return; + } + this.verify_aggregation_graph(&*turbo_tasks, true); + }); + } } fn idle_end(&self) { + #[cfg(feature = "verify_aggregation_graph")] + self.is_idle.store(false, Ordering::Release); self.idle_end_event.notify(usize::MAX); } @@ -2148,6 +2189,8 @@ impl TurboTasksBackendInner { RootType::OnceTask => "Once Task".to_string(), })); } + #[cfg(feature = "verify_aggregation_graph")] + self.root_tasks.lock().insert(task_id); task_id } @@ -2156,6 +2199,9 @@ impl TurboTasksBackendInner { task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi>, ) { + #[cfg(feature = "verify_aggregation_graph")] + self.root_tasks.lock().remove(&task_id); + let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id)); @@ -2176,6 +2222,179 @@ impl TurboTasksBackendInner { } } + #[cfg(feature = "verify_aggregation_graph")] + fn verify_aggregation_graph( + &self, + turbo_tasks: &dyn TurboTasksBackendApi>, + idle: bool, + ) { + if env::var("TURBO_ENGINE_VERIFY_GRAPH").ok().as_deref() == Some("0") { + return; + } + use std::{collections::VecDeque, env, io::stdout}; + + use crate::backend::operation::{get_uppers, is_aggregating_node}; + + let mut ctx = self.execute_context(turbo_tasks); + let root_tasks = self.root_tasks.lock().clone(); + let len = root_tasks.len(); + + for (i, task_id) in root_tasks.into_iter().enumerate() { + println!("Verifying graph from root {task_id} {i}/{len}..."); + let mut queue = VecDeque::new(); + let mut visited = FxHashSet::default(); + let mut aggregated_nodes = FxHashSet::default(); + let mut collectibles = FxHashMap::default(); + let root_task_id = task_id; + visited.insert(task_id); + aggregated_nodes.insert(task_id); + queue.push_back(task_id); + let mut counter = 0; + while let Some(task_id) = queue.pop_front() { + counter += 1; + if counter % 100000 == 0 { + println!( + "queue={}, visited={}, aggregated_nodes={}", + queue.len(), + visited.len(), + aggregated_nodes.len() + ); + } + let task = ctx.task(task_id, TaskDataCategory::All); + if idle && !self.is_idle.load(Ordering::Relaxed) { + return; + } + + let uppers = get_uppers(&task); + if task_id != root_task_id + && !uppers.iter().any(|upper| aggregated_nodes.contains(upper)) + { + println!( + "Task {} {} doesn't report to any root but is reachable from one (uppers: \ + {:?})", + task_id, + ctx.get_task_description(task_id), + uppers + ); + } + + let aggregated_collectibles: Vec<_> = get_many!(task, AggregatedCollectible { collectible } value if *value > 0 => {collectible}); + for collectible in aggregated_collectibles { + collectibles + .entry(collectible) + .or_insert_with(|| (false, Vec::new())) + .1 + .push(task_id); + } + + let own_collectibles: Vec<_> = get_many!(task, Collectible { collectible } value if *value > 0 => {collectible}); + for collectible in own_collectibles { + if let Some((flag, _)) = collectibles.get_mut(&collectible) { + *flag = true + } else { + println!( + "Task {} has a collectible {:?} that is not in any upper task", + task_id, collectible + ); + } + } + + let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id)); + let has_dirty_container = get!(task, AggregatedDirtyContainerCount) + .is_some_and(|count| count.get(self.session_id) > 0); + let should_be_in_upper = is_dirty || has_dirty_container; + + let aggregation_number = get_aggregation_number(&task); + if is_aggregating_node(aggregation_number) { + aggregated_nodes.insert(task_id); + } + // println!( + // "{task_id}: {} agg_num = {aggregation_number}, uppers = {:#?}", + // ctx.get_task_description(task_id), + // uppers + // ); + + for child_id in iter_many!(task, Child { task } => task) { + // println!("{task_id}: child -> {child_id}"); + if visited.insert(child_id) { + queue.push_back(child_id); + } + } + drop(task); + + if should_be_in_upper { + for upper_id in uppers { + let task = ctx.task(task_id, TaskDataCategory::All); + let in_upper = get!(task, AggregatedDirtyContainer { task: task_id }) + .is_some_and(|dirty| dirty.get(self.session_id) > 0); + if !in_upper { + println!( + "Task {} is dirty, but is not listed in the upper task {}", + task_id, upper_id + ); + } + } + } + } + + for (collectible, (flag, task_ids)) in collectibles { + if !flag { + use std::io::Write; + let mut stdout = stdout().lock(); + writeln!( + stdout, + "{:?} that is not emitted in any child task but in these aggregated \ + tasks: {:#?}", + collectible, + task_ids + .iter() + .map(|t| format!("{t} {}", ctx.get_task_description(*t))) + .collect::>() + ); + + let task_id = collectible.cell.task; + let mut queue = { + let task = ctx.task(task_id, TaskDataCategory::All); + get_uppers(&task) + }; + let mut visited = FxHashSet::default(); + for &upper_id in queue.iter() { + visited.insert(upper_id); + writeln!(stdout, "{task_id:?} -> {upper_id:?}"); + } + while let Some(task_id) = queue.pop() { + let desc = ctx.get_task_description(task_id); + let task = ctx.task(task_id, TaskDataCategory::All); + let aggregated_collectible = + get!(task, AggregatedCollectible { collectible }) + .copied() + .unwrap_or_default(); + let uppers = get_uppers(&task); + drop(task); + writeln!( + stdout, + "upper {task_id} {desc} collectible={aggregated_collectible}" + ); + if task_ids.contains(&task_id) { + writeln!( + stdout, + "Task has an upper connection to an aggregated task that doesn't \ + reference it. Upper connection is invalid!" + ); + } + for upper_id in uppers { + writeln!(stdout, "{task_id:?} -> {upper_id:?}"); + if !visited.contains(&upper_id) { + queue.push(upper_id); + } + } + } + } + } + println!("visited {task_id} {} tasks", visited.len()); + } + } + fn assert_not_persistent_calling_transient( &self, parent_id: TaskId, @@ -2254,12 +2473,12 @@ impl Backend for TurboTasksBackend { self.0.stopping(); } - fn stop(&self, _turbo_tasks: &dyn TurboTasksBackendApi) { - self.0.stop(); + fn stop(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + self.0.stop(turbo_tasks); } - fn idle_start(&self, _turbo_tasks: &dyn TurboTasksBackendApi) { - self.0.idle_start(); + fn idle_start(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + self.0.idle_start(turbo_tasks); } fn idle_end(&self, _turbo_tasks: &dyn TurboTasksBackendApi) {