From e5a1efba51d11d5d7c08387c93d1ddf387576356 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Fri, 11 Apr 2025 21:27:53 +0200 Subject: [PATCH 1/4] Use thread local dep graph encoding --- compiler/rustc_data_structures/src/sync.rs | 2 +- .../src/sync/parallel.rs | 10 + .../rustc_incremental/src/persist/save.rs | 13 +- .../rustc_query_system/src/dep_graph/graph.rs | 76 ++-- .../rustc_query_system/src/dep_graph/mod.rs | 3 +- .../src/dep_graph/serialized.rs | 367 +++++++++++------- 6 files changed, 288 insertions(+), 183 deletions(-) diff --git a/compiler/rustc_data_structures/src/sync.rs b/compiler/rustc_data_structures/src/sync.rs index 616a18a72ab7e..cba74a24949c4 100644 --- a/compiler/rustc_data_structures/src/sync.rs +++ b/compiler/rustc_data_structures/src/sync.rs @@ -43,7 +43,7 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard}; pub use self::lock::{Lock, LockGuard, Mode}; pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode}; pub use self::parallel::{ - join, par_for_each_in, par_map, parallel_guard, scope, try_par_for_each_in, + broadcast, join, par_for_each_in, par_map, parallel_guard, scope, try_par_for_each_in, }; pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec}; pub use self::worker_local::{Registry, WorkerLocal}; diff --git a/compiler/rustc_data_structures/src/sync/parallel.rs b/compiler/rustc_data_structures/src/sync/parallel.rs index ba3c85ef5b155..9b04137abf3af 100644 --- a/compiler/rustc_data_structures/src/sync/parallel.rs +++ b/compiler/rustc_data_structures/src/sync/parallel.rs @@ -226,3 +226,13 @@ pub fn par_map, R: DynSend, C: FromIterato } }) } + +pub fn broadcast(op: impl Fn(usize) -> R + DynSync) -> Vec { + if mode::is_dyn_thread_safe() { + let op = FromDyn::from(op); + let results = rayon_core::broadcast(|context| op.derive(op(context.index()))); + results.into_iter().map(|r| r.into_inner()).collect() + } else { + vec![op(0)] + } +} diff --git a/compiler/rustc_incremental/src/persist/save.rs b/compiler/rustc_incremental/src/persist/save.rs index 94ce6d9fa81f1..58fea3278a839 100644 --- a/compiler/rustc_incremental/src/persist/save.rs +++ b/compiler/rustc_incremental/src/persist/save.rs @@ -44,10 +44,6 @@ pub(crate) fn save_dep_graph(tcx: TyCtxt<'_>) { sess.time("assert_dep_graph", || assert_dep_graph(tcx)); sess.time("check_dirty_clean", || dirty_clean::check_dirty_clean_annotations(tcx)); - if sess.opts.unstable_opts.incremental_info { - tcx.dep_graph.print_incremental_info() - } - join( move || { sess.time("incr_comp_persist_dep_graph", || { @@ -172,12 +168,5 @@ pub(crate) fn build_dep_graph( // First encode the commandline arguments hash sess.opts.dep_tracking_hash(false).encode(&mut encoder); - Some(DepGraph::new( - sess, - prev_graph, - prev_work_products, - encoder, - sess.opts.unstable_opts.query_dep_graph, - sess.opts.unstable_opts.incremental_info, - )) + Some(DepGraph::new(sess, prev_graph, prev_work_products, encoder)) } diff --git a/compiler/rustc_query_system/src/dep_graph/graph.rs b/compiler/rustc_query_system/src/dep_graph/graph.rs index 0d56db160996d..180a2c9edf67c 100644 --- a/compiler/rustc_query_system/src/dep_graph/graph.rs +++ b/compiler/rustc_query_system/src/dep_graph/graph.rs @@ -11,7 +11,7 @@ use rustc_data_structures::outline; use rustc_data_structures::profiling::QueryInvocationId; use rustc_data_structures::sharded::{self, ShardedHashMap}; use rustc_data_structures::stable_hasher::{HashStable, StableHasher}; -use rustc_data_structures::sync::{AtomicU64, Lock}; +use rustc_data_structures::sync::{AtomicU64, Lock, is_dyn_thread_safe}; use rustc_data_structures::unord::UnordMap; use rustc_errors::DiagInner; use rustc_index::IndexVec; @@ -124,19 +124,11 @@ impl DepGraph { prev_graph: Arc, prev_work_products: WorkProductMap, encoder: FileEncoder, - record_graph: bool, - record_stats: bool, ) -> DepGraph { let prev_graph_node_count = prev_graph.node_count(); - let current = CurrentDepGraph::new( - session, - prev_graph_node_count, - encoder, - record_graph, - record_stats, - Arc::clone(&prev_graph), - ); + let current = + CurrentDepGraph::new(session, prev_graph_node_count, encoder, Arc::clone(&prev_graph)); let colors = DepNodeColorMap::new(prev_graph_node_count); @@ -1052,17 +1044,8 @@ impl DepGraph { } } - pub fn print_incremental_info(&self) { - if let Some(data) = &self.data { - data.current.encoder.print_incremental_info( - data.current.total_read_count.load(Ordering::Relaxed), - data.current.total_duplicate_read_count.load(Ordering::Relaxed), - ) - } - } - pub fn finish_encoding(&self) -> FileEncodeResult { - if let Some(data) = &self.data { data.current.encoder.finish() } else { Ok(0) } + if let Some(data) = &self.data { data.current.encoder.finish(&data.current) } else { Ok(0) } } pub(crate) fn next_virtual_depnode_index(&self) -> DepNodeIndex { @@ -1179,8 +1162,8 @@ pub(super) struct CurrentDepGraph { /// These are simple counters that are for profiling and /// debugging and only active with `debug_assertions`. - total_read_count: AtomicU64, - total_duplicate_read_count: AtomicU64, + pub(super) total_read_count: AtomicU64, + pub(super) total_duplicate_read_count: AtomicU64, } impl CurrentDepGraph { @@ -1188,8 +1171,6 @@ impl CurrentDepGraph { session: &Session, prev_graph_node_count: usize, encoder: FileEncoder, - record_graph: bool, - record_stats: bool, previous: Arc, ) -> Self { let mut stable_hasher = StableHasher::new(); @@ -1211,14 +1192,7 @@ impl CurrentDepGraph { session.opts.unstable_opts.incremental_verify_ich || cfg!(debug_assertions); CurrentDepGraph { - encoder: GraphEncoder::new( - encoder, - prev_graph_node_count, - record_graph, - record_stats, - &session.prof, - previous, - ), + encoder: GraphEncoder::new(session, encoder, prev_graph_node_count, previous), anon_node_to_index: ShardedHashMap::with_capacity( // FIXME: The count estimate is off as anon nodes are only a portion of the nodes. new_node_count_estimate / sharded::shards(), @@ -1345,6 +1319,7 @@ impl Default for TaskDeps { // array, using one u32 per entry. pub(super) struct DepNodeColorMap { values: IndexVec, + sync: bool, } const COMPRESSED_NONE: u32 = u32::MAX; @@ -1353,7 +1328,10 @@ const COMPRESSED_RED: u32 = u32::MAX - 1; impl DepNodeColorMap { fn new(size: usize) -> DepNodeColorMap { debug_assert!(COMPRESSED_RED > DepNodeIndex::MAX_AS_U32); - DepNodeColorMap { values: (0..size).map(|_| AtomicU32::new(COMPRESSED_NONE)).collect() } + DepNodeColorMap { + values: (0..size).map(|_| AtomicU32::new(COMPRESSED_NONE)).collect(), + sync: is_dyn_thread_safe(), + } } #[inline] @@ -1362,6 +1340,36 @@ impl DepNodeColorMap { if value <= DepNodeIndex::MAX_AS_U32 { Some(DepNodeIndex::from_u32(value)) } else { None } } + /// This tries to atomically mark a node green and assign `index` as the new + /// index. + #[inline] + pub(super) fn try_mark_green( + &self, + prev_index: SerializedDepNodeIndex, + index: DepNodeIndex, + ) -> Result<(), DepNodeIndex> { + let value = &self.values[prev_index]; + if self.sync { + match value.compare_exchange( + COMPRESSED_NONE, + index.as_u32(), + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => Ok(()), + Err(v) => Err(DepNodeIndex::from_u32(v)), + } + } else { + let v = value.load(Ordering::Relaxed); + if v == COMPRESSED_NONE { + value.store(index.as_u32(), Ordering::Relaxed); + Ok(()) + } else { + Err(DepNodeIndex::from_u32(v)) + } + } + } + #[inline] pub(super) fn get(&self, index: SerializedDepNodeIndex) -> Option { match self.values[index].load(Ordering::Acquire) { diff --git a/compiler/rustc_query_system/src/dep_graph/mod.rs b/compiler/rustc_query_system/src/dep_graph/mod.rs index 3a80835afad57..89d1db878095f 100644 --- a/compiler/rustc_query_system/src/dep_graph/mod.rs +++ b/compiler/rustc_query_system/src/dep_graph/mod.rs @@ -12,6 +12,7 @@ pub(crate) use graph::DepGraphData; pub use graph::{DepGraph, DepNodeIndex, TaskDepsRef, WorkProduct, WorkProductMap, hash_result}; pub use query::DepGraphQuery; use rustc_data_structures::profiling::SelfProfilerRef; +use rustc_data_structures::sync::DynSync; use rustc_session::Session; pub use serialized::{SerializedDepGraph, SerializedDepNodeIndex}; use tracing::instrument; @@ -89,7 +90,7 @@ pub trait DepContext: Copy { } } -pub trait Deps { +pub trait Deps: DynSync { /// Execute the operation with provided dependencies. fn with_deps(deps: TaskDepsRef<'_>, op: OP) -> R where diff --git a/compiler/rustc_query_system/src/dep_graph/serialized.rs b/compiler/rustc_query_system/src/dep_graph/serialized.rs index d2bcde143835e..e88d95b6b074a 100644 --- a/compiler/rustc_query_system/src/dep_graph/serialized.rs +++ b/compiler/rustc_query_system/src/dep_graph/serialized.rs @@ -35,23 +35,27 @@ //! If the number of edges in this node does not fit in the bits available in the header, we //! store it directly after the header with leb128. -use std::iter; +use std::cell::RefCell; +use std::cmp::max; use std::marker::PhantomData; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::{iter, mem, u64}; use rustc_data_structures::fingerprint::{Fingerprint, PackedFingerprint}; use rustc_data_structures::fx::FxHashMap; use rustc_data_structures::outline; use rustc_data_structures::profiling::SelfProfilerRef; -use rustc_data_structures::sync::Lock; +use rustc_data_structures::sync::{Lock, WorkerLocal, broadcast}; use rustc_data_structures::unhash::UnhashMap; -use rustc_index::{Idx, IndexVec}; +use rustc_index::IndexVec; use rustc_serialize::opaque::mem_encoder::MemEncoder; use rustc_serialize::opaque::{FileEncodeResult, FileEncoder, IntEncodedWithFixedSize, MemDecoder}; use rustc_serialize::{Decodable, Decoder, Encodable, Encoder}; +use rustc_session::Session; use tracing::{debug, instrument}; -use super::graph::{DepNodeColor, DepNodeColorMap}; +use super::graph::{CurrentDepGraph, DepNodeColor, DepNodeColorMap}; use super::query::DepGraphQuery; use super::{DepKind, DepNode, DepNodeIndex, Deps}; use crate::dep_graph::edges::EdgesVec; @@ -76,6 +80,9 @@ const DEP_NODE_PAD: usize = DEP_NODE_SIZE - 1; const DEP_NODE_WIDTH_BITS: usize = DEP_NODE_SIZE / 2; /// Data for use when recompiling the **current crate**. +/// +/// There may be unused indices with DEP_KIND_NULL in this graph due to batch allocation of +/// indices to threads. #[derive(Debug, Default)] pub struct SerializedDepGraph { /// The set of all DepNodes in the graph @@ -184,26 +191,30 @@ impl SerializedDepGraph { pub fn decode(d: &mut MemDecoder<'_>, deps: &D) -> Arc { // The last 16 bytes are the node count and edge count. debug!("position: {:?}", d.position()); - let (node_count, edge_count) = - d.with_position(d.len() - 2 * IntEncodedWithFixedSize::ENCODED_SIZE, |d| { + + // `node_max` is the number of indices including empty nodes while `node_count` + // is the number of actually encoded nodes. + let (node_max, node_count, edge_count) = + d.with_position(d.len() - 3 * IntEncodedWithFixedSize::ENCODED_SIZE, |d| { debug!("position: {:?}", d.position()); + let node_max = IntEncodedWithFixedSize::decode(d).0 as usize; let node_count = IntEncodedWithFixedSize::decode(d).0 as usize; let edge_count = IntEncodedWithFixedSize::decode(d).0 as usize; - (node_count, edge_count) + (node_max, node_count, edge_count) }); debug!("position: {:?}", d.position()); debug!(?node_count, ?edge_count); - let graph_bytes = d.len() - (2 * IntEncodedWithFixedSize::ENCODED_SIZE) - d.position(); + let graph_bytes = d.len() - (3 * IntEncodedWithFixedSize::ENCODED_SIZE) - d.position(); let mut nodes = IndexVec::from_elem_n( DepNode { kind: D::DEP_KIND_NULL, hash: PackedFingerprint::from(Fingerprint::ZERO) }, - node_count, + node_max, ); - let mut fingerprints = IndexVec::from_elem_n(Fingerprint::ZERO, node_count); + let mut fingerprints = IndexVec::from_elem_n(Fingerprint::ZERO, node_max); let mut edge_list_indices = - IndexVec::from_elem_n(EdgeHeader { repr: 0, num_edges: 0 }, node_count); + IndexVec::from_elem_n(EdgeHeader { repr: 0, num_edges: 0 }, node_max); // This estimation assumes that all of the encoded bytes are for the edge lists or for the // fixed-size node headers. But that's not necessarily true; if any edge list has a length @@ -217,7 +228,7 @@ impl SerializedDepGraph { let mut edge_list_data = Vec::with_capacity(graph_bytes - node_count * size_of::>()); - for _index in 0..node_count { + for _ in 0..node_count { // Decode the header for this edge; the header packs together as many of the fixed-size // fields as possible to limit the number of times we update decoder state. let node_header = @@ -263,8 +274,8 @@ impl SerializedDepGraph { for (idx, node) in nodes.iter_enumerated() { if index[node.kind.as_usize()].insert(node.hash, idx).is_some() { - // Side effect nodes can have duplicates - if node.kind != D::DEP_KIND_SIDE_EFFECT { + // Empty nodes and side effect nodes can have duplicates + if node.kind != D::DEP_KIND_NULL && node.kind != D::DEP_KIND_SIDE_EFFECT { let name = deps.name(node.kind); panic!( "Error: A dep graph node ({name}) does not have an unique index. \ @@ -508,17 +519,32 @@ struct Stat { edge_counter: u64, } -struct EncoderState { - previous: Arc, - encoder: FileEncoder, - total_node_count: usize, - total_edge_count: usize, - stats: Option>, +struct LocalEncoderState { + next_node_index: u32, + remaining_node_index: u32, + encoder: MemEncoder, + node_count: usize, + edge_count: usize, - mem_encoder: MemEncoder, + /// Stores the number of times we've encoded each dep kind. + kind_stats: Vec, +} + +struct LocalEncoderResult { + node_max: u32, + node_count: usize, + edge_count: usize, /// Stores the number of times we've encoded each dep kind. kind_stats: Vec, +} + +struct EncoderState { + next_node_index: AtomicU64, + previous: Arc, + file: Lock>, + local: WorkerLocal>, + stats: Option>>, marker: PhantomData, } @@ -526,34 +552,58 @@ impl EncoderState { fn new(encoder: FileEncoder, record_stats: bool, previous: Arc) -> Self { Self { previous, - encoder, - total_edge_count: 0, - total_node_count: 0, - stats: record_stats.then(FxHashMap::default), - mem_encoder: MemEncoder::new(), - kind_stats: iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect(), + next_node_index: AtomicU64::new(0), + stats: record_stats.then(|| Lock::new(FxHashMap::default())), + file: Lock::new(Some(encoder)), + local: WorkerLocal::new(|_| { + RefCell::new(LocalEncoderState { + next_node_index: 0, + remaining_node_index: 0, + edge_count: 0, + node_count: 0, + encoder: MemEncoder::new(), + kind_stats: iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect(), + }) + }), marker: PhantomData, } } #[inline] - fn alloc_index(&mut self) -> DepNodeIndex { - let index = DepNodeIndex::new(self.total_node_count); - self.total_node_count += 1; - index + fn next_index(&self, local: &mut LocalEncoderState) -> DepNodeIndex { + if local.remaining_node_index == 0 { + let count = 256; + + // We assume that there won't be enough active threads to overflow u64 from u32::MAX here. + assert!(self.next_node_index.load(Ordering::Relaxed) <= u32::MAX as u64); + local.next_node_index = + self.next_node_index.fetch_add(count, Ordering::Relaxed).try_into().unwrap(); + + local.remaining_node_index = count as u32; + } + + DepNodeIndex::from_u32(local.next_node_index) + } + + #[inline] + fn bump_index(&self, local: &mut LocalEncoderState) { + local.remaining_node_index -= 1; + local.next_node_index += 1; + local.node_count += 1; } #[inline] fn record( - &mut self, + &self, node: DepNode, index: DepNodeIndex, edge_count: usize, - edges: impl FnOnce(&mut Self) -> Vec, + edges: impl FnOnce(&Self) -> Vec, record_graph: &Option>, - ) -> DepNodeIndex { - self.kind_stats[node.kind.as_usize()] += 1; - self.total_edge_count += edge_count; + local: &mut LocalEncoderState, + ) { + local.kind_stats[node.kind.as_usize()] += 1; + local.edge_count += edge_count; if let Some(record_graph) = &record_graph { // Call `edges` before the outlined code to allow the closure to be optimized out. @@ -568,40 +618,47 @@ impl EncoderState { }); } - if let Some(stats) = &mut self.stats { + if let Some(stats) = &self.stats { let kind = node.kind; // Outline the stats code as it's typically disabled and cold. outline(move || { + let mut stats = stats.lock(); let stat = stats.entry(kind).or_insert(Stat { kind, node_counter: 0, edge_counter: 0 }); stat.node_counter += 1; stat.edge_counter += edge_count as u64; }); } - - index } #[inline] - fn flush_mem_encoder(&mut self) { - let data = &mut self.mem_encoder.data; + fn flush_mem_encoder(&self, local: &mut LocalEncoderState) { + let data = &mut local.encoder.data; if data.len() > 64 * 1024 { - self.encoder.emit_raw_bytes(&data[..]); + self.file.lock().as_mut().unwrap().emit_raw_bytes(&data[..]); data.clear(); } } /// Encodes a node to the current graph. fn encode_node( - &mut self, + &self, + index: DepNodeIndex, node: &NodeInfo, record_graph: &Option>, - ) -> DepNodeIndex { - let index = self.alloc_index(); - node.encode::(&mut self.mem_encoder, index); - self.flush_mem_encoder(); - self.record(node.node, index, node.edges.len(), |_| node.edges[..].to_vec(), record_graph) + local: &mut LocalEncoderState, + ) { + node.encode::(&mut local.encoder, index); + self.flush_mem_encoder(&mut *local); + self.record( + node.node, + index, + node.edges.len(), + |_| node.edges[..].to_vec(), + record_graph, + &mut *local, + ); } /// Encodes a node that was promoted from the previous graph. It reads the information directly from @@ -612,16 +669,17 @@ impl EncoderState { /// It expects all edges to already have a new dep node index assigned. #[inline] fn encode_promoted_node( - &mut self, + &self, + index: DepNodeIndex, prev_index: SerializedDepNodeIndex, record_graph: &Option>, colors: &DepNodeColorMap, - ) -> DepNodeIndex { - let index = self.alloc_index(); + local: &mut LocalEncoderState, + ) { let node = self.previous.index_to_node(prev_index); let fingerprint = self.previous.fingerprint_by_index(prev_index); let edge_count = NodeInfo::encode_promoted::( - &mut self.mem_encoder, + &mut local.encoder, node, index, fingerprint, @@ -629,7 +687,7 @@ impl EncoderState { colors, &self.previous, ); - self.flush_mem_encoder(); + self.flush_mem_encoder(&mut *local); self.record( node, index, @@ -641,38 +699,60 @@ impl EncoderState { .collect() }, record_graph, + &mut *local, ); - index } - fn finish(self, profiler: &SelfProfilerRef) -> FileEncodeResult { - let Self { - mut encoder, - mem_encoder, - total_node_count, - total_edge_count, - stats: _, - kind_stats, - marker: _, - previous, - } = self; + fn finish(&self, profiler: &SelfProfilerRef, current: &CurrentDepGraph) -> FileEncodeResult { + // Prevent more indices from being allocated. + self.next_node_index.store(u32::MAX as u64 + 1, Ordering::SeqCst); + + let results = broadcast(|_| { + let mut local = self.local.borrow_mut(); + + // Prevent more indices from being allocated on this thread. + local.remaining_node_index = 0; + + let data = mem::replace(&mut local.encoder.data, Vec::new()); + self.file.lock().as_mut().unwrap().emit_raw_bytes(&data); + + LocalEncoderResult { + kind_stats: local.kind_stats.clone(), + node_max: local.next_node_index, + node_count: local.node_count, + edge_count: local.edge_count, + } + }); + + let mut encoder = self.file.lock().take().unwrap(); + + let mut kind_stats: Vec = iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect(); - encoder.emit_raw_bytes(&mem_encoder.data); + let mut node_max = 0; + let mut node_count = 0; + let mut edge_count = 0; - let node_count = total_node_count.try_into().unwrap(); - let edge_count = total_edge_count.try_into().unwrap(); + for result in results { + node_max = max(node_max, result.node_max); + node_count += result.node_count; + edge_count += result.edge_count; + for (i, stat) in result.kind_stats.iter().enumerate() { + kind_stats[i] += stat; + } + } // Encode the number of each dep kind encountered for count in kind_stats.iter() { count.encode(&mut encoder); } - previous.session_count.checked_add(1).unwrap().encode(&mut encoder); + self.previous.session_count.checked_add(1).unwrap().encode(&mut encoder); - debug!(?node_count, ?edge_count); + debug!(?node_max, ?node_count, ?edge_count); debug!("position: {:?}", encoder.position()); - IntEncodedWithFixedSize(node_count).encode(&mut encoder); - IntEncodedWithFixedSize(edge_count).encode(&mut encoder); + IntEncodedWithFixedSize(node_max.try_into().unwrap()).encode(&mut encoder); + IntEncodedWithFixedSize(node_count.try_into().unwrap()).encode(&mut encoder); + IntEncodedWithFixedSize(edge_count.try_into().unwrap()).encode(&mut encoder); debug!("position: {:?}", encoder.position()); // Drop the encoder so that nothing is written after the counts. let result = encoder.finish(); @@ -681,44 +761,20 @@ impl EncoderState { // don't need a dependency on rustc_incremental just for that. profiler.artifact_size("dep_graph", "dep-graph.bin", position as u64); } - result - } -} - -pub(crate) struct GraphEncoder { - profiler: SelfProfilerRef, - status: Lock>>, - record_graph: Option>, -} -impl GraphEncoder { - pub(crate) fn new( - encoder: FileEncoder, - prev_node_count: usize, - record_graph: bool, - record_stats: bool, - profiler: &SelfProfilerRef, - previous: Arc, - ) -> Self { - let record_graph = record_graph.then(|| Lock::new(DepGraphQuery::new(prev_node_count))); - let status = Lock::new(Some(EncoderState::new(encoder, record_stats, previous))); - GraphEncoder { status, record_graph, profiler: profiler.clone() } - } + self.print_incremental_info(current, node_count, edge_count); - pub(crate) fn with_query(&self, f: impl Fn(&DepGraphQuery)) { - if let Some(record_graph) = &self.record_graph { - f(&record_graph.lock()) - } + result } - pub(crate) fn print_incremental_info( + fn print_incremental_info( &self, - total_read_count: u64, - total_duplicate_read_count: u64, + current: &CurrentDepGraph, + total_node_count: usize, + total_edge_count: usize, ) { - let mut status = self.status.lock(); - let status = status.as_mut().unwrap(); - if let Some(record_stats) = &status.stats { + if let Some(record_stats) = &self.stats { + let record_stats = record_stats.lock(); let mut stats: Vec<_> = record_stats.values().collect(); stats.sort_by_key(|s| -(s.node_counter as i64)); @@ -730,10 +786,13 @@ impl GraphEncoder { eprintln!("[incremental] DepGraph Statistics"); eprintln!("{SEPARATOR}"); eprintln!("[incremental]"); - eprintln!("[incremental] Total Node Count: {}", status.total_node_count); - eprintln!("[incremental] Total Edge Count: {}", status.total_edge_count); + eprintln!("[incremental] Total Node Count: {}", total_node_count); + eprintln!("[incremental] Total Edge Count: {}", total_edge_count); if cfg!(debug_assertions) { + let total_read_count = current.total_read_count.load(Ordering::Relaxed); + let total_duplicate_read_count = + current.total_duplicate_read_count.load(Ordering::Relaxed); eprintln!("[incremental] Total Edge Reads: {total_read_count}"); eprintln!("[incremental] Total Duplicate Edge Reads: {total_duplicate_read_count}"); } @@ -747,7 +806,7 @@ impl GraphEncoder { for stat in stats { let node_kind_ratio = - (100.0 * (stat.node_counter as f64)) / (status.total_node_count as f64); + (100.0 * (stat.node_counter as f64)) / (total_node_count as f64); let node_kind_avg_edges = (stat.edge_counter as f64) / (stat.node_counter as f64); eprintln!( @@ -763,6 +822,35 @@ impl GraphEncoder { eprintln!("[incremental]"); } } +} + +pub(crate) struct GraphEncoder { + profiler: SelfProfilerRef, + status: EncoderState, + record_graph: Option>, +} + +impl GraphEncoder { + pub(crate) fn new( + sess: &Session, + encoder: FileEncoder, + prev_node_count: usize, + previous: Arc, + ) -> Self { + let record_graph = sess + .opts + .unstable_opts + .query_dep_graph + .then(|| Lock::new(DepGraphQuery::new(prev_node_count))); + let status = EncoderState::new(encoder, sess.opts.unstable_opts.incremental_info, previous); + GraphEncoder { status, record_graph, profiler: sess.prof.clone() } + } + + pub(crate) fn with_query(&self, f: impl Fn(&DepGraphQuery)) { + if let Some(record_graph) = &self.record_graph { + f(&record_graph.lock()) + } + } /// Encodes a node that does not exists in the previous graph. pub(crate) fn send_new( @@ -773,7 +861,11 @@ impl GraphEncoder { ) -> DepNodeIndex { let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph"); let node = NodeInfo { node, fingerprint, edges }; - self.status.lock().as_mut().unwrap().encode_node(&node, &self.record_graph) + let mut local = self.status.local.borrow_mut(); + let index = self.status.next_index(&mut *local); + self.status.bump_index(&mut *local); + self.status.encode_node(index, &node, &self.record_graph, &mut *local); + index } /// Encodes a node that exists in the previous graph, but was re-executed. @@ -791,23 +883,24 @@ impl GraphEncoder { let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph"); let node = NodeInfo { node, fingerprint, edges }; - let mut status = self.status.lock(); - let status = status.as_mut().unwrap(); + let mut local = self.status.local.borrow_mut(); - // Check colors inside the lock to avoid racing when `send_promoted` is called concurrently - // on the same index. - match colors.get(prev_index) { - None => { - let dep_node_index = status.encode_node(&node, &self.record_graph); - colors.insert( - prev_index, - if is_green { DepNodeColor::Green(dep_node_index) } else { DepNodeColor::Red }, - ); - dep_node_index + let index = self.status.next_index(&mut *local); + + if is_green { + // Use `try_mark_green` to avoid racing when `send_promoted` is called concurrently + // on the same index. + match colors.try_mark_green(prev_index, index) { + Ok(()) => (), + Err(dep_node_index) => return dep_node_index, } - Some(DepNodeColor::Green(dep_node_index)) => dep_node_index, - Some(DepNodeColor::Red) => panic!(), + } else { + colors.insert(prev_index, DepNodeColor::Red); } + + self.status.bump_index(&mut *local); + self.status.encode_node(index, &node, &self.record_graph, &mut *local); + index } /// Encodes a node that was promoted from the previous graph. It reads the information directly from @@ -822,26 +915,30 @@ impl GraphEncoder { ) -> DepNodeIndex { let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph"); - let mut status = self.status.lock(); - let status = status.as_mut().unwrap(); + let mut local = self.status.local.borrow_mut(); + let index = self.status.next_index(&mut *local); - // Check colors inside the lock to avoid racing when `send_promoted` or `send_and_color` + // Use `try_mark_green` to avoid racing when `send_promoted` or `send_and_color` // is called concurrently on the same index. - match colors.get(prev_index) { - None => { - let dep_node_index = - status.encode_promoted_node(prev_index, &self.record_graph, colors); - colors.insert(prev_index, DepNodeColor::Green(dep_node_index)); - dep_node_index + match colors.try_mark_green(prev_index, index) { + Ok(()) => { + self.status.bump_index(&mut *local); + self.status.encode_promoted_node( + index, + prev_index, + &self.record_graph, + colors, + &mut *local, + ); + index } - Some(DepNodeColor::Green(dep_node_index)) => dep_node_index, - Some(DepNodeColor::Red) => panic!(), + Err(dep_node_index) => dep_node_index, } } - pub(crate) fn finish(&self) -> FileEncodeResult { + pub(crate) fn finish(&self, current: &CurrentDepGraph) -> FileEncodeResult { let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph_finish"); - self.status.lock().take().unwrap().finish(&self.profiler) + self.status.finish(&self.profiler, current) } } From 9bb8430cb31e90441cebebc7cb45078015db1a75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Tue, 22 Apr 2025 04:36:58 +0200 Subject: [PATCH 2/4] Add some comments --- compiler/rustc_query_system/src/dep_graph/graph.rs | 3 ++- compiler/rustc_query_system/src/dep_graph/serialized.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/compiler/rustc_query_system/src/dep_graph/graph.rs b/compiler/rustc_query_system/src/dep_graph/graph.rs index 180a2c9edf67c..3ae56cef2c421 100644 --- a/compiler/rustc_query_system/src/dep_graph/graph.rs +++ b/compiler/rustc_query_system/src/dep_graph/graph.rs @@ -1341,7 +1341,8 @@ impl DepNodeColorMap { } /// This tries to atomically mark a node green and assign `index` as the new - /// index. + /// index. This returns `Ok` if `index` gets assigned, otherwise it returns + /// the alreadly allocated index in `Err`. #[inline] pub(super) fn try_mark_green( &self, diff --git a/compiler/rustc_query_system/src/dep_graph/serialized.rs b/compiler/rustc_query_system/src/dep_graph/serialized.rs index e88d95b6b074a..648823edb1890 100644 --- a/compiler/rustc_query_system/src/dep_graph/serialized.rs +++ b/compiler/rustc_query_system/src/dep_graph/serialized.rs @@ -585,6 +585,7 @@ impl EncoderState { DepNodeIndex::from_u32(local.next_node_index) } + /// Marks the index previously returned by `next_index` as used. #[inline] fn bump_index(&self, local: &mut LocalEncoderState) { local.remaining_node_index -= 1; From d17b845d7c1b751a43afd1f89baa69bd0027c400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Sat, 26 Apr 2025 02:31:51 +0200 Subject: [PATCH 3/4] Group dep nodes by chunks with sequential indices and only store the first index at the start of the chunk --- .../src/dep_graph/serialized.rs | 140 +++++++++++------- 1 file changed, 84 insertions(+), 56 deletions(-) diff --git a/compiler/rustc_query_system/src/dep_graph/serialized.rs b/compiler/rustc_query_system/src/dep_graph/serialized.rs index 648823edb1890..1c108d09d2c11 100644 --- a/compiler/rustc_query_system/src/dep_graph/serialized.rs +++ b/compiler/rustc_query_system/src/dep_graph/serialized.rs @@ -228,36 +228,48 @@ impl SerializedDepGraph { let mut edge_list_data = Vec::with_capacity(graph_bytes - node_count * size_of::>()); - for _ in 0..node_count { - // Decode the header for this edge; the header packs together as many of the fixed-size - // fields as possible to limit the number of times we update decoder state. - let node_header = - SerializedNodeHeader:: { bytes: d.read_array(), _marker: PhantomData }; + let mut decoded_nodes = 0; + while decoded_nodes < node_count { + let mut current_index = d.read_u32(); + + loop { + // Decode the header for this edge; the header packs together as many of the fixed-size + // fields as possible to limit the number of times we update decoder state. + let node_header = + SerializedNodeHeader:: { bytes: d.read_array(), _marker: PhantomData }; + + if node_header.node().kind == D::DEP_KIND_NULL { + break; + } + + decoded_nodes += 1; - let index = node_header.index(); + let index = SerializedDepNodeIndex::from_u32(current_index); + current_index += 1; - let node = &mut nodes[index]; - // Make sure there's no duplicate indices in the dep graph. - assert!(node_header.node().kind != D::DEP_KIND_NULL && node.kind == D::DEP_KIND_NULL); - *node = node_header.node(); + let node = &mut nodes[index]; + // Make sure there's no duplicate indices in the dep graph. + assert!(node.kind == D::DEP_KIND_NULL); + *node = node_header.node(); - fingerprints[index] = node_header.fingerprint(); + fingerprints[index] = node_header.fingerprint(); - // If the length of this node's edge list is small, the length is stored in the header. - // If it is not, we fall back to another decoder call. - let num_edges = node_header.len().unwrap_or_else(|| d.read_u32()); + // If the length of this node's edge list is small, the length is stored in the header. + // If it is not, we fall back to another decoder call. + let num_edges = node_header.len().unwrap_or_else(|| d.read_u32()); - // The edges index list uses the same varint strategy as rmeta tables; we select the - // number of byte elements per-array not per-element. This lets us read the whole edge - // list for a node with one decoder call and also use the on-disk format in memory. - let edges_len_bytes = node_header.bytes_per_index() * (num_edges as usize); - // The in-memory structure for the edges list stores the byte width of the edges on - // this node with the offset into the global edge data array. - let edges_header = node_header.edges_header(&edge_list_data, num_edges); + // The edges index list uses the same varint strategy as rmeta tables; we select the + // number of byte elements per-array not per-element. This lets us read the whole edge + // list for a node with one decoder call and also use the on-disk format in memory. + let edges_len_bytes = node_header.bytes_per_index() * (num_edges as usize); + // The in-memory structure for the edges list stores the byte width of the edges on + // this node with the offset into the global edge data array. + let edges_header = node_header.edges_header(&edge_list_data, num_edges); - edge_list_data.extend(d.read_raw_bytes(edges_len_bytes)); + edge_list_data.extend(d.read_raw_bytes(edges_len_bytes)); - edge_list_indices[index] = edges_header; + edge_list_indices[index] = edges_header; + } } // When we access the edge list data, we do a fixed-size read from the edge list data then @@ -308,10 +320,9 @@ impl SerializedDepGraph { /// * In whatever bits remain, the length of the edge list for this node, if it fits struct SerializedNodeHeader { // 2 bytes for the DepNode - // 4 bytes for the index // 16 for Fingerprint in DepNode // 16 for Fingerprint in NodeInfo - bytes: [u8; 38], + bytes: [u8; 34], _marker: PhantomData, } @@ -321,7 +332,6 @@ struct Unpacked { len: Option, bytes_per_index: usize, kind: DepKind, - index: SerializedDepNodeIndex, hash: PackedFingerprint, fingerprint: Fingerprint, } @@ -343,7 +353,6 @@ impl SerializedNodeHeader { #[inline] fn new( node: DepNode, - index: DepNodeIndex, fingerprint: Fingerprint, edge_max_index: u32, edge_count: usize, @@ -365,11 +374,10 @@ impl SerializedNodeHeader { let hash: Fingerprint = node.hash.into(); // Using half-open ranges ensures an unconditional panic if we get the magic numbers wrong. - let mut bytes = [0u8; 38]; + let mut bytes = [0u8; 34]; bytes[..2].copy_from_slice(&head.to_le_bytes()); - bytes[2..6].copy_from_slice(&index.as_u32().to_le_bytes()); - bytes[6..22].copy_from_slice(&hash.to_le_bytes()); - bytes[22..].copy_from_slice(&fingerprint.to_le_bytes()); + bytes[2..18].copy_from_slice(&hash.to_le_bytes()); + bytes[18..].copy_from_slice(&fingerprint.to_le_bytes()); #[cfg(debug_assertions)] { @@ -386,9 +394,8 @@ impl SerializedNodeHeader { #[inline] fn unpack(&self) -> Unpacked { let head = u16::from_le_bytes(self.bytes[..2].try_into().unwrap()); - let index = u32::from_le_bytes(self.bytes[2..6].try_into().unwrap()); - let hash = self.bytes[6..22].try_into().unwrap(); - let fingerprint = self.bytes[22..].try_into().unwrap(); + let hash = self.bytes[2..18].try_into().unwrap(); + let fingerprint = self.bytes[18..].try_into().unwrap(); let kind = head & mask(Self::KIND_BITS) as u16; let bytes_per_index = (head >> Self::KIND_BITS) & mask(Self::WIDTH_BITS) as u16; @@ -398,7 +405,6 @@ impl SerializedNodeHeader { len: len.checked_sub(1), bytes_per_index: bytes_per_index as usize + 1, kind: DepKind::new(kind), - index: SerializedDepNodeIndex::from_u32(index), hash: Fingerprint::from_le_bytes(hash).into(), fingerprint: Fingerprint::from_le_bytes(fingerprint), } @@ -414,11 +420,6 @@ impl SerializedNodeHeader { self.unpack().bytes_per_index } - #[inline] - fn index(&self) -> SerializedDepNodeIndex { - self.unpack().index - } - #[inline] fn fingerprint(&self) -> Fingerprint { self.unpack().fingerprint @@ -447,15 +448,10 @@ struct NodeInfo { } impl NodeInfo { - fn encode(&self, e: &mut MemEncoder, index: DepNodeIndex) { + fn encode(&self, e: &mut MemEncoder) { let NodeInfo { node, fingerprint, ref edges } = *self; - let header = SerializedNodeHeader::::new( - node, - index, - fingerprint, - edges.max_index(), - edges.len(), - ); + let header = + SerializedNodeHeader::::new(node, fingerprint, edges.max_index(), edges.len()); e.write_array(header.bytes); if header.len().is_none() { @@ -479,7 +475,6 @@ impl NodeInfo { fn encode_promoted( e: &mut MemEncoder, node: DepNode, - index: DepNodeIndex, fingerprint: Fingerprint, prev_index: SerializedDepNodeIndex, colors: &DepNodeColorMap, @@ -492,7 +487,7 @@ impl NodeInfo { let edge_max = edges.clone().map(|i| colors.current(i).unwrap().as_u32()).max().unwrap_or(0); - let header = SerializedNodeHeader::::new(node, index, fingerprint, edge_max, edge_count); + let header = SerializedNodeHeader::::new(node, fingerprint, edge_max, edge_count); e.write_array(header.bytes); if header.len().is_none() { @@ -526,6 +521,8 @@ struct LocalEncoderState { node_count: usize, edge_count: usize, + in_chunk: bool, + /// Stores the number of times we've encoded each dep kind. kind_stats: Vec, } @@ -561,6 +558,7 @@ impl EncoderState { remaining_node_index: 0, edge_count: 0, node_count: 0, + in_chunk: false, encoder: MemEncoder::new(), kind_stats: iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect(), }) @@ -569,6 +567,30 @@ impl EncoderState { } } + #[inline] + fn end_chunk(&self, local: &mut LocalEncoderState) { + if !local.in_chunk { + return; + } + local.in_chunk = false; + + NodeInfo { + node: DepNode { kind: D::DEP_KIND_NULL, hash: Fingerprint::ZERO.into() }, + fingerprint: Fingerprint::ZERO, + edges: EdgesVec::new(), + } + .encode::(&mut local.encoder); + } + + #[inline] + fn start_chunk(&self, local: &mut LocalEncoderState, first_index: u32) { + if local.in_chunk { + self.end_chunk(local); + } + local.in_chunk = true; + local.encoder.emit_u32(first_index); + } + #[inline] fn next_index(&self, local: &mut LocalEncoderState) -> DepNodeIndex { if local.remaining_node_index == 0 { @@ -579,6 +601,8 @@ impl EncoderState { local.next_node_index = self.next_node_index.fetch_add(count, Ordering::Relaxed).try_into().unwrap(); + self.start_chunk(local, local.next_node_index); + local.remaining_node_index = count as u32; } @@ -635,10 +659,13 @@ impl EncoderState { #[inline] fn flush_mem_encoder(&self, local: &mut LocalEncoderState) { - let data = &mut local.encoder.data; - if data.len() > 64 * 1024 { - self.file.lock().as_mut().unwrap().emit_raw_bytes(&data[..]); - data.clear(); + if local.encoder.data.len() > 64 * 1024 { + self.end_chunk(local); + self.file.lock().as_mut().unwrap().emit_raw_bytes(&local.encoder.data[..]); + local.encoder.data.clear(); + if local.remaining_node_index > 0 { + self.start_chunk(local, local.next_node_index); + } } } @@ -650,7 +677,7 @@ impl EncoderState { record_graph: &Option>, local: &mut LocalEncoderState, ) { - node.encode::(&mut local.encoder, index); + node.encode::(&mut local.encoder); self.flush_mem_encoder(&mut *local); self.record( node.node, @@ -682,7 +709,6 @@ impl EncoderState { let edge_count = NodeInfo::encode_promoted::( &mut local.encoder, node, - index, fingerprint, prev_index, colors, @@ -711,6 +737,8 @@ impl EncoderState { let results = broadcast(|_| { let mut local = self.local.borrow_mut(); + self.end_chunk(&mut *local); + // Prevent more indices from being allocated on this thread. local.remaining_node_index = 0; From 4207ff78db53651951b0636905f380ad6730a531 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Sat, 26 Apr 2025 13:59:21 +0200 Subject: [PATCH 4/4] Test --- .../rustc_query_system/src/dep_graph/serialized.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/compiler/rustc_query_system/src/dep_graph/serialized.rs b/compiler/rustc_query_system/src/dep_graph/serialized.rs index 1c108d09d2c11..1980c08dbeb83 100644 --- a/compiler/rustc_query_system/src/dep_graph/serialized.rs +++ b/compiler/rustc_query_system/src/dep_graph/serialized.rs @@ -272,6 +272,8 @@ impl SerializedDepGraph { } } + assert_eq!(decoded_nodes, node_count); + // When we access the edge list data, we do a fixed-size read from the edge list data then // mask off the bytes that aren't for that edge index, so the last read may dangle off the // end of the array. This padding ensure it doesn't. @@ -284,6 +286,8 @@ impl SerializedDepGraph { let session_count = d.read_u64(); + assert_eq!(d.read_u64(), 0x8375672356237834); + for (idx, node) in nodes.iter_enumerated() { if index[node.kind.as_usize()].insert(node.hash, idx).is_some() { // Empty nodes and side effect nodes can have duplicates @@ -677,6 +681,8 @@ impl EncoderState { record_graph: &Option>, local: &mut LocalEncoderState, ) { + assert!(local.in_chunk); + assert_ne!(node.node.kind, D::DEP_KIND_NULL); node.encode::(&mut local.encoder); self.flush_mem_encoder(&mut *local); self.record( @@ -705,6 +711,8 @@ impl EncoderState { local: &mut LocalEncoderState, ) { let node = self.previous.index_to_node(prev_index); + assert!(local.in_chunk); + assert_ne!(node.kind, D::DEP_KIND_NULL); let fingerprint = self.previous.fingerprint_by_index(prev_index); let edge_count = NodeInfo::encode_promoted::( &mut local.encoder, @@ -777,6 +785,8 @@ impl EncoderState { self.previous.session_count.checked_add(1).unwrap().encode(&mut encoder); + encoder.emit_u64(0x8375672356237834); + debug!(?node_max, ?node_count, ?edge_count); debug!("position: {:?}", encoder.position()); IntEncodedWithFixedSize(node_max.try_into().unwrap()).encode(&mut encoder);