Skip to content

De-duplicate OnDiskCorpus #2802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
033f94a
add support to share new objectives in CentralizedEventManager
dhanvithnayak Dec 7, 2024
96ee382
handle received Objectives
dhanvithnayak Dec 7, 2024
a88aea5
remove duplicate event fires in centralized event manager
dhanvithnayak Dec 10, 2024
d369d3d
share input on share_objectives feature (broken)
dhanvithnayak Dec 11, 2024
971d4ee
split impl LlmpEventManager based on share_objectives
dhanvithnayak Dec 12, 2024
91e49a6
reduce code duplication in impl LlmpEventManager (broken)
dhanvithnayak Dec 12, 2024
e17d84e
fix traits error (temp)
dhanvithnayak Dec 13, 2024
4de182a
fix mismatched types
dhanvithnayak Dec 15, 2024
2e30191
resolve merge conflicts
dhanvithnayak Dec 16, 2024
10ba9e4
fix cargo format issue
dhanvithnayak Dec 16, 2024
176d8e9
Merge branch 'main' into main
dhanvithnayak Dec 19, 2024
f12fa78
fix merge conflicts
dhanvithnayak Dec 21, 2024
eb4b884
merge duplicated functions into single impl in llmp/mod.rs
dhanvithnayak Dec 23, 2024
11480dd
merge duplicate impl blocks in stages/sync.rs
dhanvithnayak Dec 29, 2024
1016513
fix clippy warnings
dhanvithnayak Dec 30, 2024
23a8c76
implement generate_name()
dhanvithnayak Dec 31, 2024
b092636
implement file lock for save_testcase()
dhanvithnayak Jan 2, 2025
da3028a
Fix empty multipart (#2789)
riesentoaster Dec 23, 2024
6ed8ad4
Add macros to libafl_bolts tuples for mapping and merging types (#2788)
riesentoaster Dec 23, 2024
fafdb9c
libafl_cc: Automatically find llvm_ar path (#2790)
s1341 Dec 24, 2024
55fd4c6
imemory_ondisk: Don't fail write under any circumstances if locking i…
s1341 Dec 24, 2024
4d4f3c4
frida: Deduplicate with IfElseRuntime (#2792)
s1341 Dec 25, 2024
b53a7a1
Add bloom filter for duplicate execution of the same inputs (#2771)
riesentoaster Dec 28, 2024
1f09cda
bolts limit ashmem concept to Linux/Android only. (#2795)
devnexen Dec 30, 2024
7d91c49
Optimize event serialization with pre-allocated buffer (#2794)
mzfr Dec 30, 2024
675ddc7
Added expect error message to TimeFeedback where there used to be an …
AshrafIbrahim03 Dec 30, 2024
759f6dc
New year new clippy (#2797)
domenukk Jan 1, 2025
ad9b320
implement generate_name()
dhanvithnayak Dec 31, 2024
65076e6
implement file lock for save_testcase()
dhanvithnayak Jan 2, 2025
d4ca687
make locking optional
dhanvithnayak Jan 3, 2025
c735807
autofix
dhanvithnayak Jan 5, 2025
de1f0f8
implement logic for removing testcase
dhanvithnayak Jan 5, 2025
6317780
remove the god-awful bfr
dhanvithnayak Jan 8, 2025
3be79dd
cleanup comment notes
dhanvithnayak Jan 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ uuid = { version = "1.10.0", features = ["serde", "v4"] }
which = "6.0.3"
windows = "0.58.0"
z3 = "0.12.1"
fs2 = "0.4.3"


[workspace.lints.rust]
Expand Down
5 changes: 5 additions & 0 deletions libafl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ std = [
"libafl_bolts/std",
"typed-builder",
"fastbloom",
"fs2"
]

## Tracks the Feedbacks and the Objectives that were interesting for a Testcase
Expand Down Expand Up @@ -143,6 +144,9 @@ unicode = ["libafl_bolts/alloc", "ahash/std", "serde/rc", "bitvec"]
## Enable multi-part input formats and mutators
multipart_inputs = ["arrayvec", "rand_trait"]

## Share objectives across nodes
share_objectives = []

#! ## LibAFL-Bolts Features

## Provide the `#[derive(SerdeAny)]` macro.
Expand Down Expand Up @@ -293,6 +297,7 @@ clap = { workspace = true, optional = true }
num_enum = { workspace = true, optional = true }
libipt = { workspace = true, optional = true }
fastbloom = { version = "0.8.0", optional = true }
fs2 = { workspace = true, optional = true }

[lints]
workspace = true
Expand Down
77 changes: 49 additions & 28 deletions libafl/src/corpus/inmemory_ondisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use std::{
io,
io::Write,
path::{Path, PathBuf},
string::ToString,
};

use fs2::FileExt;
#[cfg(feature = "gzip")]
use libafl_bolts::compress::GzipCompressor;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -87,7 +89,7 @@ where
fn add(&mut self, testcase: Testcase<I>) -> Result<CorpusId, Error> {
let id = self.inner.add(testcase)?;
let testcase = &mut self.get(id).unwrap().borrow_mut();
self.save_testcase(testcase, id)?;
self.save_testcase(testcase)?;
*testcase.input_mut() = None;
Ok(id)
}
Expand All @@ -97,7 +99,7 @@ where
fn add_disabled(&mut self, testcase: Testcase<I>) -> Result<CorpusId, Error> {
let id = self.inner.add_disabled(testcase)?;
let testcase = &mut self.get_from_all(id).unwrap().borrow_mut();
self.save_testcase(testcase, id)?;
self.save_testcase(testcase)?;
*testcase.input_mut() = None;
Ok(id)
}
Expand All @@ -108,7 +110,7 @@ where
let entry = self.inner.replace(id, testcase)?;
self.remove_testcase(&entry)?;
let testcase = &mut self.get(id).unwrap().borrow_mut();
self.save_testcase(testcase, id)?;
self.save_testcase(testcase)?;
*testcase.input_mut() = None;
Ok(entry)
}
Expand Down Expand Up @@ -375,42 +377,50 @@ impl<I> InMemoryOnDiskCorpus<I> {
}
}

fn save_testcase(&self, testcase: &mut Testcase<I>, id: CorpusId) -> Result<(), Error>
fn save_testcase(&self, testcase: &mut Testcase<I>) -> Result<(), Error>
where
I: Input,
{
let file_name_orig = testcase.filename_mut().take().unwrap_or_else(|| {
let file_name = testcase.filename_mut().take().unwrap_or_else(|| {
// TODO walk entry metadata to ask for pieces of filename (e.g. :havoc in AFL)
testcase.input().as_ref().unwrap().generate_name(Some(id))
testcase.input().as_ref().unwrap().generate_name()
});

// New testcase, we need to save it.
let mut file_name = file_name_orig.clone();
let mut ctr = String::new();
if self.locking {
let lockfile_name = format!(".{file_name}");
let lockfile_path = self.dir_path.join(lockfile_name);
let lockfile = try_create_new(&lockfile_path)?.unwrap_or(File::create(&lockfile_path)?);

let mut ctr = 2;
let file_name = if self.locking {
loop {
let lockfile_name = format!(".{file_name}.lafl_lock");
let lockfile_path = self.dir_path.join(lockfile_name);
lockfile.lock_exclusive()?;

if try_create_new(lockfile_path)?.is_some() {
break file_name;
}

file_name = format!("{file_name_orig}-{ctr}");
ctr += 1;
// replace String herre too and try to reduce variables
ctr = fs::read_to_string(&lockfile_path)?;
if ctr.is_empty() {
ctr = String::from("1");
} else {
ctr = (ctr.parse::<u32>()? + 1).to_string();
}
} else {
file_name
};

fs::write(lockfile_path, &ctr)?;
}

if testcase.file_path().is_none() {
*testcase.file_path_mut() = Some(self.dir_path.join(&file_name));
}
*testcase.filename_mut() = Some(file_name);

if self.meta_format.is_some() {
let metafile_name = format!(".{}.metadata", testcase.filename().as_ref().unwrap());
let metafile_name;
if self.locking {
metafile_name = format!(
".{}_{}.metadata",
testcase.filename().as_ref().unwrap(),
ctr
);
} else {
metafile_name = format!(".{}.metadata", testcase.filename().as_ref().unwrap());
}
let metafile_path = self.dir_path.join(&metafile_name);
let mut tmpfile_path = metafile_path.clone();
tmpfile_path.set_file_name(format!(".{metafile_name}.tmp",));
Expand Down Expand Up @@ -453,15 +463,26 @@ impl<I> InMemoryOnDiskCorpus<I> {

fn remove_testcase(&self, testcase: &Testcase<I>) -> Result<(), Error> {
if let Some(filename) = testcase.filename() {
if self.locking {
let lockfile_path = self.dir_path.join(format!(".{filename}"));
let lockfile = File::open(&lockfile_path)?;

lockfile.lock_exclusive()?;
let ctr = fs::read_to_string(&lockfile_path)?;

if ctr == "1" {
lockfile.unlock()?;
drop(fs::remove_file(lockfile_path));
} else {
fs::write(lockfile_path, (ctr.parse::<u32>()? - 1).to_string())?;
return Ok(());
}
}

fs::remove_file(self.dir_path.join(filename))?;
if self.meta_format.is_some() {
fs::remove_file(self.dir_path.join(format!(".{filename}.metadata")))?;
}
// also try to remove the corresponding `.lafl_lock` file if it still exists
// (even though it shouldn't exist anymore, at this point in time)
drop(fs::remove_file(
self.dir_path.join(format!(".{filename}.lafl_lock")),
));
}
Ok(())
}
Expand Down
15 changes: 12 additions & 3 deletions libafl/src/events/centralized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,18 @@ where
if !self.is_main {
// secondary node
let mut is_tc = false;
// Forward to main only if new tc or heartbeat
// Forward to main only if new tc, heartbeat, or optionally, a new objective
let should_be_forwarded = match &mut event {
Event::NewTestcase { forward_id, .. } => {
*forward_id = Some(ClientId(self.inner.mgr_id().0 as u32));
is_tc = true;
true
}
Event::UpdateExecStats { .. } => true, // send it but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it
Event::Stop => true,
Event::UpdateExecStats { .. } | Event::Stop => true, // send UpdateExecStats but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it

#[cfg(feature = "share_objectives")]
Event::Objective { .. } => true,

_ => false,
};

Expand Down Expand Up @@ -677,6 +680,12 @@ where
log::debug!("[{}] {} was discarded...)", process::id(), event_name);
}
}

#[cfg(feature = "share_objectives")]
Event::Objective { .. } => {
log::debug!("Received new Objective");
}

Event::Stop => {
state.request_stop();
}
Expand Down
6 changes: 6 additions & 0 deletions libafl/src/events/llmp/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,12 @@ where
}
}
}

#[cfg(feature = "share_objectives")]
Event::Objective { .. } => {
log::debug!("Received new Objective");
}

Event::CustomBuf { tag, buf } => {
for handler in &mut self.custom_buf_handlers {
if handler(state, &tag, &buf)? == CustomBufEventResult::Handled {
Expand Down
137 changes: 137 additions & 0 deletions libafl/src/events/llmp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ use crate::{
state::{HasCorpus, HasExecutions, NopState, State, Stoppable, UsesState},
Error, HasMetadata,
};
#[cfg(feature = "share_objectives")]
use crate::{
corpus::Testcase,
state::{HasCurrentTestcase, HasSolutions},
};

/// The llmp event manager
pub mod mgr;
Expand Down Expand Up @@ -284,6 +289,7 @@ where
}

// Handle arriving events in the client
#[cfg(not(feature = "share_objectives"))]
fn handle_in_client<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
Expand Down Expand Up @@ -340,7 +346,136 @@ where
}
}

#[cfg(feature = "share_objectives")]
fn handle_in_client<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
manager: &mut EM,
client_id: ClientId,
event: Event<DI>,
) -> Result<(), Error>
where
E: Executor<EM, Z, State = S> + HasObservers,
EM: UsesState<State = S> + EventFirer,
S: HasSolutions + HasCurrentTestcase,
S::Corpus: Corpus<Input = S::Input>,
S::Solutions: Corpus<Input = S::Input>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<EM, <S::Corpus as Corpus>::Input, E::Observers, S>
+ EvaluatorObservers<E, EM, <S::Corpus as Corpus>::Input, S>,
{
match event {
Event::NewTestcase {
input, forward_id, ..
} => {
log::debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})");

let Some(converter) = self.converter_back.as_mut() else {
return Ok(());
};

let res = fuzzer.evaluate_input_with_observers(
state,
executor,
manager,
converter.convert(input)?,
false,
)?;

if let Some(item) = res.1 {
log::info!("Added received Testcase as item #{item}");
}
Ok(())
}
Event::Objective { input, .. } => {
log::debug!("Received new Objective");

let Some(converter) = self.converter_back.as_mut() else {
return Ok(());
};

let converted_input = converter.convert(input)?;
let mut testcase = Testcase::from(converted_input);
testcase.set_parent_id_optional(*state.corpus().current());

if let Ok(mut tc) = state.current_testcase_mut() {
tc.found_objective();
}

state.solutions_mut().add(testcase)?;
log::info!("Added received Objective to Corpus");

Ok(())
}
Event::CustomBuf { tag, buf } => {
for handler in &mut self.custom_buf_handlers {
if handler(state, &tag, &buf)? == CustomBufEventResult::Handled {
break;
}
}
Ok(())
}
Event::Stop => Ok(()),
_ => Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
))),
}
}

/// Handle arriving events in the client
#[cfg(not(feature = "share_objectives"))]
pub fn process<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
state: &mut S,
executor: &mut E,
manager: &mut EM,
) -> Result<usize, Error>
where
E: Executor<EM, Z, State = S> + HasObservers,
EM: UsesState<State = S> + EventFirer,
S::Corpus: Corpus<Input = S::Input>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<EM, <S::Corpus as Corpus>::Input, E::Observers, S>
+ EvaluatorObservers<E, EM, <S::Corpus as Corpus>::Input, S>,
{
// TODO: Get around local event copy by moving handle_in_client
let self_id = self.llmp.sender().id();
let mut count = 0;
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
assert!(
tag != _LLMP_TAG_EVENT_TO_BROKER,
"EVENT_TO_BROKER parcel should not have arrived in the client!"
);

if client_id == self_id {
continue;
}
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
};

let event: Event<DI> = postcard::from_bytes(event_bytes)?;
log::debug!("Processor received message {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?;
count += 1;
}
Ok(count)
}

/// Handle arriving events in the client
#[cfg(feature = "share_objectives")]
pub fn process<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
Expand All @@ -351,7 +486,9 @@ where
where
E: Executor<EM, Z, State = S> + HasObservers,
EM: UsesState<State = S> + EventFirer,
S: HasSolutions + HasCurrentTestcase,
S::Corpus: Corpus<Input = S::Input>,
S::Solutions: Corpus<Input = S::Input>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<EM, <S::Corpus as Corpus>::Input, E::Observers, S>
+ EvaluatorObservers<E, EM, <S::Corpus as Corpus>::Input, S>,
Expand Down
Loading