From e38ca66c71294d36df6401a1796779a21823f133 Mon Sep 17 00:00:00 2001 From: Yuya Nishihara Date: Thu, 30 Nov 2023 09:31:49 +0900 Subject: [PATCH] Emit sorted dir-diff result as soon as preceding results get ready If the directory diff contains lots of changes across files, it's better to emit earlier results incrementally. The idea is borrowed from the following thread. The diff producer assigns incremental index, and the consumer puts the results in BinaryHeap to reorder them by index. https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3 --- src/main.rs | 84 ++++++++++++++++++++++++++++++++++++++++++++-------- tests/cli.rs | 31 +++++++++++++++++++ 2 files changed, 103 insertions(+), 12 deletions(-) diff --git a/src/main.rs b/src/main.rs index 096f762807..1bd32ba0d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,8 +71,10 @@ use crate::parse::syntax; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; +use std::cmp::Ordering; +use std::collections::BinaryHeap; use std::path::Path; -use std::{env, thread}; +use std::{env, iter, thread}; use humansize::{format_size, BINARY}; use owo_colors::OwoColorize; @@ -260,15 +262,6 @@ fn main() { .iter() .any(|diff_result| diff_result.has_reportable_change()); display::json::print_directory(results, display_options.print_unchanged); - } else if display_options.sort_paths { - let result: Vec = diff_iter.collect(); - for diff_result in result { - print_diff_result(&display_options, &diff_result); - - if diff_result.has_reportable_change() { - encountered_changes = true; - } - } } else { // We want to diff files in the directory in // parallel, but print the results serially @@ -279,11 +272,18 @@ fn main() { s.spawn(move || { diff_iter + .enumerate() .try_for_each_with(send, |s, diff_result| s.send(diff_result)) .expect("Receiver should be connected") }); - for diff_result in recv.into_iter() { + let serial_iter: Box> = + if display_options.sort_paths { + Box::new(reorder_by_index(recv.into_iter(), 0)) + } else { + Box::new(recv.into_iter()) + }; + for (_, diff_result) in serial_iter { print_diff_result(&display_options, &diff_result); if diff_result.has_reportable_change() { @@ -723,7 +723,7 @@ fn diff_directories<'a>( display_options: &DisplayOptions, diff_options: &DiffOptions, overrides: &[(LanguageOverride, Vec)], -) -> impl ParallelIterator + 'a { +) -> impl IndexedParallelIterator + 'a { let diff_options = diff_options.clone(); let display_options = display_options.clone(); let overrides: Vec<_> = overrides.into(); @@ -883,6 +883,57 @@ fn print_diff_result(display_options: &DisplayOptions, summary: &DiffResult) { } } +/// Sort items in the `source` iterator by the 0th `usize` field, yield when all +/// preceding items are received. +/// +/// The idea is borrowed from +/// . +fn reorder_by_index( + source: impl Iterator, + mut next_index: usize, +) -> impl Iterator { + struct WorkItem { + index: usize, + value: T, + } + + impl Eq for WorkItem {} + + impl Ord for WorkItem { + fn cmp(&self, other: &Self) -> Ordering { + self.index.cmp(&other.index).reverse() // min heap + } + } + + impl PartialEq for WorkItem { + fn eq(&self, other: &Self) -> bool { + self.index == other.index + } + } + + impl PartialOrd for WorkItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + let mut source = source.fuse(); + let mut queue: BinaryHeap> = BinaryHeap::new(); + iter::from_fn(move || { + // Consume the source iterator until the next_index item is found. + while queue.peek().map_or(true, |item| item.index > next_index) { + if let Some((index, value)) = source.next() { + queue.push(WorkItem { index, value }); + } else { + break; + } + } + let item = queue.pop()?; + next_index = item.index + 1; + Some((item.index, item.value)) + }) +} + #[cfg(test)] mod tests { use std::ffi::OsStr; @@ -907,4 +958,13 @@ mod tests { assert_eq!(res.lhs_positions, vec![]); assert_eq!(res.rhs_positions, vec![]); } + + #[test] + fn test_reorder_by_index() { + let source = vec![(0, "a"), (4, "b"), (2, "c"), (1, "d"), (3, "e")]; + let reordered: Vec<_> = reorder_by_index(source.iter().copied(), 0).collect(); + let mut sorted = source.clone(); + sorted.sort(); + assert_eq!(reordered, sorted); + } } diff --git a/tests/cli.rs b/tests/cli.rs index 467b5b64ab..5add849d25 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -185,6 +185,37 @@ fn directory_arguments() { cmd.assert().stdout(predicate_fn); } +#[test] +fn directory_arguments_sorted() { + let mut cmd = get_base_command(); + + cmd.arg("--sort-paths") + .arg("--display=inline") + .arg("sample_files/dir_before") + .arg("sample_files/dir_after"); + + let expected_files = [ + "clojure.clj", + "foo.js", + "has_many_hunk.py", + "only_in_after_dir.rs", + "only_in_before.c", + ]; + let predicate_fn = predicate::function(|output: &str| { + let mut file_linenos = Vec::new(); + for name in expected_files { + if let Some(lineno) = output.lines().position(|line| line.starts_with(name)) { + file_linenos.push(lineno); + } else { + return false; // All files should be emitted + } + } + // Output should be sorted + file_linenos.windows(2).all(|w| w[0] < w[1]) + }); + cmd.assert().stdout(predicate_fn); +} + #[test] fn git_style_arguments_rename() { let mut cmd = get_base_command();