Skip to content

Commit

Permalink
Emit sorted dir-diff result as soon as preceding results get ready
Browse files Browse the repository at this point in the history
If the directory diff contains lots of changes across files, it's better to
emit earlier results incrementally.

The idea is borrowed from the following thread. The diff producer assigns
incremental index, and the consumer puts the results in BinaryHeap to reorder
them by index.
https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3
  • Loading branch information
yuja committed Jan 1, 2024
1 parent 7c8cf98 commit f3bcd71
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 12 deletions.
84 changes: 72 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ use crate::parse::syntax;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::fs::Permissions;
use std::path::Path;
use std::{env, thread};
use std::{env, iter, thread};

use humansize::{format_size, BINARY};
use owo_colors::OwoColorize;
Expand Down Expand Up @@ -261,15 +263,6 @@ fn main() {
.iter()
.any(|diff_result| diff_result.has_reportable_change());
display::json::print_directory(results);
} else if display_options.sort_paths {
let result: Vec<DiffResult> = diff_iter.collect();
for diff_result in result {
print_diff_result(&display_options, &diff_result);

if diff_result.has_reportable_change() {
encountered_changes = true;
}
}
} else {
// We want to diff files in the directory in
// parallel, but print the results serially
Expand All @@ -280,11 +273,18 @@ fn main() {

s.spawn(move || {
diff_iter
.enumerate()
.try_for_each_with(send, |s, diff_result| s.send(diff_result))
.expect("Receiver should be connected")
});

for diff_result in recv.into_iter() {
let serial_iter: Box<dyn Iterator<Item = _>> =
if display_options.sort_paths {
Box::new(reorder_by_index(recv.into_iter(), 0))
} else {
Box::new(recv.into_iter())
};
for (_, diff_result) in serial_iter {
print_diff_result(&display_options, &diff_result);

if diff_result.has_reportable_change() {
Expand Down Expand Up @@ -769,7 +769,7 @@ fn diff_directories<'a>(
display_options: &DisplayOptions,
diff_options: &DiffOptions,
overrides: &[(LanguageOverride, Vec<glob::Pattern>)],
) -> impl ParallelIterator<Item = DiffResult> + 'a {
) -> impl IndexedParallelIterator<Item = DiffResult> + 'a {
let diff_options = diff_options.clone();
let display_options = display_options.clone();
let overrides: Vec<_> = overrides.into();
Expand Down Expand Up @@ -927,6 +927,57 @@ fn print_diff_result(display_options: &DisplayOptions, summary: &DiffResult) {
}
}

/// Sort items in the `source` iterator by the 0th `usize` field, yield when all
/// preceding items are received.
///
/// The idea is borrowed from
/// <https://users.rust-lang.org/t/parallel-work-collected-sequentially/13504/3>.
fn reorder_by_index<T>(
source: impl Iterator<Item = (usize, T)>,
mut next_index: usize,
) -> impl Iterator<Item = (usize, T)> {
struct WorkItem<T> {
index: usize,
value: T,
}

impl<T> Eq for WorkItem<T> {}

impl<T> Ord for WorkItem<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.index.cmp(&other.index).reverse() // min heap
}
}

impl<T> PartialEq for WorkItem<T> {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
}
}

impl<T> PartialOrd for WorkItem<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

let mut source = source.fuse();
let mut queue: BinaryHeap<WorkItem<T>> = BinaryHeap::new();
iter::from_fn(move || {
// Consume the source iterator until the next_index item is found.
while queue.peek().map_or(true, |item| item.index > next_index) {
if let Some((index, value)) = source.next() {
queue.push(WorkItem { index, value });
} else {
break;
}
}
let item = queue.pop()?;
next_index = item.index + 1;
Some((item.index, item.value))
})
}

#[cfg(test)]
mod tests {
use std::ffi::OsStr;
Expand All @@ -951,4 +1002,13 @@ mod tests {
assert_eq!(res.lhs_positions, vec![]);
assert_eq!(res.rhs_positions, vec![]);
}

#[test]
fn test_reorder_by_index() {
let source = vec![(0, "a"), (4, "b"), (2, "c"), (1, "d"), (3, "e")];
let reordered: Vec<_> = reorder_by_index(source.iter().copied(), 0).collect();
let mut sorted = source.clone();
sorted.sort();
assert_eq!(reordered, sorted);
}
}
31 changes: 31 additions & 0 deletions tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,37 @@ fn directory_arguments() {
cmd.assert().stdout(predicate_fn);
}

#[test]
fn directory_arguments_sorted() {
let mut cmd = get_base_command();

cmd.arg("--sort-paths")
.arg("--display=inline")
.arg("sample_files/dir_before")
.arg("sample_files/dir_after");

let expected_files = [
"clojure.clj",
"foo.js",
"has_many_hunk.py",
"only_in_after_dir.rs",
"only_in_before.c",
];
let predicate_fn = predicate::function(|output: &str| {
let mut file_linenos = Vec::new();
for name in expected_files {
if let Some(lineno) = output.lines().position(|line| line.starts_with(name)) {
file_linenos.push(lineno);
} else {
return false; // All files should be emitted
}
}
// Output should be sorted
file_linenos.windows(2).all(|w| w[0] < w[1])
});
cmd.assert().stdout(predicate_fn);
}

#[test]
fn git_style_arguments_rename() {
let mut cmd = get_base_command();
Expand Down

0 comments on commit f3bcd71

Please sign in to comment.