Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rollup: filtering in subdirectories + work-stealing stack for parallel directory traversal #2609

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ Unreleased changes. Release notes have not yet been written.
`rg -B1 -A2`. That is, `-A` and `-B` no longer completely override `-C`.
Instead, they only partially override `-C`.

Performance improvements:

* [PERF #2591](https://github.com/BurntSushi/ripgrep/pull/2591):
Parallel directory traversal now uses work stealing for faster searches.

Feature enhancements:

* Added or improved file type filtering for Ada, DITA, Elixir, Fuchsia, Gentoo,
Expand All @@ -21,6 +26,8 @@ Feature enhancements:

Bug fixes:

* [BUG #1757](https://github.com/BurntSushi/ripgrep/issues/1757):
Fix bug when searching a sub-directory didn't have ignores applied correctly.
* [BUG #1891](https://github.com/BurntSushi/ripgrep/issues/1891):
Fix bug when using `-w` with a regex that can match the empty string.
* [BUG #1911](https://github.com/BurntSushi/ripgrep/issues/1911):
Expand Down
68 changes: 54 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/ignore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ name = "ignore"
bench = false

[dependencies]
crossbeam-deque = "0.8.3"
globset = { version = "0.4.10", path = "../globset" }
lazy_static = "1.1"
log = "0.4.5"
Expand Down
24 changes: 23 additions & 1 deletion crates/ignore/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,29 @@ impl Ignore {
}
if self.0.opts.parents {
if let Some(abs_parent_path) = self.absolute_base() {
let path = abs_parent_path.join(path);
// What we want to do here is take the absolute base path of
// this directory and join it with the path we're searching.
// The main issue we want to avoid is accidentally duplicating
// directory components, so we try to strip any common prefix
// off of `path`. Overall, this seems a little ham-fisted, but
// it does fix a nasty bug. It should do fine until we overhaul
// this crate.
let dirpath = self.0.dir.as_path();
let path_prefix = match strip_prefix("./", dirpath) {
None => dirpath,
Some(stripped_dot_slash) => stripped_dot_slash,
};
let path = match strip_prefix(path_prefix, path) {
None => abs_parent_path.join(path),
Some(p) => {
let p = match strip_prefix("/", p) {
None => p,
Some(p) => p,
};
abs_parent_path.join(p)
}
};

for ig in
self.parents().skip_while(|ig| !ig.0.is_absolute_parent)
{
Expand Down
108 changes: 86 additions & 22 deletions crates/ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, FileType, Metadata};
use std::io;
use std::iter::FusedIterator;
use std::iter::{self, FusedIterator};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::vec;

use crossbeam_deque::{Stealer, Worker as Deque};
use same_file::Handle;
use walkdir::{self, WalkDir};

Expand Down Expand Up @@ -1231,9 +1232,8 @@ impl WalkParallel {
/// can be merged together into a single data structure.
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) {
let threads = self.threads();
let stack = Arc::new(Mutex::new(vec![]));
let mut stack = vec![];
{
let mut stack = stack.lock().unwrap();
let mut visitor = builder.build();
let mut paths = Vec::new().into_iter();
std::mem::swap(&mut paths, &mut self.paths);
Expand Down Expand Up @@ -1283,24 +1283,24 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let num_pending =
Arc::new(AtomicUsize::new(stack.lock().unwrap().len()));
let num_pending = Arc::new(AtomicUsize::new(stack.len()));
let stacks = Stack::new_for_each_thread(threads, stack);
std::thread::scope(|s| {
let mut handles = vec![];
for _ in 0..threads {
let worker = Worker {
let handles: Vec<_> = stacks
.into_iter()
.map(|stack| Worker {
visitor: builder.build(),
stack: stack.clone(),
stack,
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
skip: self.skip.clone(),
filter: self.filter.clone(),
};
handles.push(s.spawn(|| worker.run()));
}
})
.map(|worker| s.spawn(|| worker.run()))
.collect();
for handle in handles {
handle.join().unwrap();
}
Expand Down Expand Up @@ -1390,20 +1390,87 @@ impl Work {
}
}

/// A work-stealing stack.
#[derive(Debug)]
struct Stack {
/// This thread's index.
index: usize,
/// The thread-local stack.
deque: Deque<Message>,
/// The work stealers.
stealers: Arc<[Stealer<Message>]>,
}

impl Stack {
/// Create a work-stealing stack for each thread. The given messages
/// correspond to the initial paths to start the search at. They will
/// be distributed automatically to each stack in a round-robin fashion.
fn new_for_each_thread(threads: usize, init: Vec<Message>) -> Vec<Stack> {
// Using new_lifo() ensures each worker operates depth-first, not
// breadth-first. We do depth-first because a breadth first traversal
// on wide directories with a lot of gitignores is disastrous (for
// example, searching a directory tree containing all of crates.io).
let deques: Vec<Deque<Message>> =
iter::repeat_with(Deque::new_lifo).take(threads).collect();
let stealers = Arc::<[Stealer<Message>]>::from(
deques.iter().map(Deque::stealer).collect::<Vec<_>>(),
);
let stacks: Vec<Stack> = deques
.into_iter()
.enumerate()
.map(|(index, deque)| Stack {
index,
deque,
stealers: stealers.clone(),
})
.collect();
// Distribute the initial messages.
init.into_iter()
.zip(stacks.iter().cycle())
.for_each(|(m, s)| s.push(m));
stacks
}

/// Push a message.
fn push(&self, msg: Message) {
self.deque.push(msg);
}

/// Pop a message.
fn pop(&self) -> Option<Message> {
self.deque.pop().or_else(|| self.steal())
}

/// Steal a message from another queue.
fn steal(&self) -> Option<Message> {
// For fairness, try to steal from index - 1, then index - 2, ... 0,
// then wrap around to len - 1, len - 2, ... index + 1.
let (left, right) = self.stealers.split_at(self.index);
// Don't steal from ourselves
let right = &right[1..];

left.iter()
.rev()
.chain(right.iter().rev())
.map(|s| s.steal_batch_and_pop(&self.deque))
.find_map(|s| s.success())
}
}

/// A worker is responsible for descending into directories, updating the
/// ignore matchers, producing new work and invoking the caller's callback.
///
/// Note that a worker is *both* a producer and a consumer.
struct Worker<'s> {
/// The caller's callback.
visitor: Box<dyn ParallelVisitor + 's>,
/// A stack of work to do.
/// A work-stealing stack of work to do.
///
/// We use a stack instead of a channel because a stack lets us visit
/// directories in depth first order. This can substantially reduce peak
/// memory usage by keeping both the number of files path and gitignore
/// memory usage by keeping both the number of file paths and gitignore
/// matchers in memory lower.
stack: Arc<Mutex<Vec<Message>>>,
stack: Stack,
/// Whether all workers should terminate at the next opportunity. Note
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
Expand Down Expand Up @@ -1668,20 +1735,17 @@ impl<'s> Worker<'s> {
/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, Ordering::SeqCst);
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Work(work));
self.stack.push(Message::Work(work));
}

/// Send a quit message.
fn send_quit(&self) {
let mut stack = self.stack.lock().unwrap();
stack.push(Message::Quit);
self.stack.push(Message::Quit);
}

/// Receive work.
fn recv(&self) -> Option<Message> {
let mut stack = self.stack.lock().unwrap();
stack.pop()
self.stack.pop()
}

/// Signal that work has been finished.
Expand Down
Loading