Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add massa event cache crate #4769

Merged
merged 40 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8699ea0
Add massa event cache crate
sydhds Oct 29, 2024
dbc06e3
Add event cache controller into massa execution
sydhds Oct 30, 2024
07fdb9c
Cargo fmt
sydhds Oct 30, 2024
2285a3a
Add event cache config in masa-node
sydhds Oct 30, 2024
d294854
Minor fixes
sydhds Oct 31, 2024
c948f41
Cargo clippy fixes
sydhds Oct 31, 2024
9648d25
Cargo clippy fixes
sydhds Oct 31, 2024
acbc1b4
Add limits & security checks
sydhds Nov 4, 2024
23725f0
Add controller + manager
sydhds Nov 6, 2024
3086495
Cargo fmt pass
sydhds Nov 6, 2024
2bd4785
Fix check/clippy with --all-targets
sydhds Nov 6, 2024
e72423d
Better event cache clear at startup && better filtering
sydhds Nov 7, 2024
ba34210
Rename to config to max_call_stack_length
sydhds Nov 7, 2024
b2021d5
Improve event cache filtering
sydhds Nov 19, 2024
47819b1
Avoid lock contention in controller::get_filtered_sc_output_events
sydhds Nov 25, 2024
5dfe45c
Improve comment
sydhds Nov 25, 2024
46b1cf7
Add query limit
sydhds Nov 25, 2024
350fc7b
Add tick delay in event cache writer thread
sydhds Nov 26, 2024
9531a0c
Use per address / operation id / is_error counters
sydhds Nov 28, 2024
180743f
Cargo fmt
sydhds Nov 28, 2024
1989ee5
typos fixes
sydhds Nov 28, 2024
31111e3
Cargo clippy fixes for tests
sydhds Nov 28, 2024
082e86e
Cargo fmt
sydhds Nov 28, 2024
f4b4f81
Add mock expectations + impl
sydhds Nov 28, 2024
16bc18d
Cargo clippy for TU fixes
sydhds Nov 28, 2024
23bfa06
Use MAX_EVENT_PER_OPERATION constant
sydhds Nov 29, 2024
fe319c5
Unit test the filter optimisations
sydhds Nov 29, 2024
d192d2a
Add more doc
sydhds Nov 29, 2024
db4aa41
Cargo clippy fixes
sydhds Nov 29, 2024
21e3e62
Use ..Default::default in TU
sydhds Nov 29, 2024
dc817b7
Cargo clippy fix
sydhds Dec 10, 2024
a972ba9
Use scope
sydhds Dec 10, 2024
c812679
Use scope 2
sydhds Dec 10, 2024
20af46e
Remove tick_delay + directly mem::take struct
sydhds Dec 10, 2024
59f9300
Add tu for counter removal
sydhds Dec 10, 2024
f9d4cf9
Add KeyKind in KeyBuilder
sydhds Dec 11, 2024
e993258
Wait for condvar in wait_loop_event
sydhds Dec 12, 2024
815bbe3
Removed unused lib
sydhds Dec 12, 2024
955bb8b
Condvar wait fix
sydhds Dec 16, 2024
0aeb191
Truncate event message in case of error
sydhds Dec 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,038 changes: 652 additions & 386 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ members = [
"massa-versioning",
"massa-grpc",
"massa-xtask",
"massa-event-cache",
]
resolver = "2"

Expand Down Expand Up @@ -105,6 +106,7 @@ massa_test_framework = { path = "./massa-test-framework" }
massa_time = { path = "./massa-time" }
massa_versioning = { path = "./massa-versioning" }
massa_wallet = { path = "./massa-wallet" }
massa_event_cache = { path = "./massa-event-cache" }

# Massa projects dependencies
# massa-proto-rs = { git = "https://github.com/massalabs/massa-proto-rs", branch = "deferred_calls" }
Expand Down
3 changes: 2 additions & 1 deletion _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ extend-ignore-re = [
# Secret key (S): 18 - 62 characters
# Public key (P): 18 - 62 characters
# NodeId (N)
"(AU|AS|N|S|P)\\d\\w{18,62}",
# OperationId (O)
"(AU|AS|N|S|P|O)\\d\\w{18,62}",
]

[default.extend-words]
Expand Down
6 changes: 3 additions & 3 deletions massa-db-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//! * if we want to delete item a: 1000 ^ 0011 == 1011 (== item b)
//! * if we want to delete item b: 1000 ^ 1011 == 0011 (== item a)
//!
//! Note that this does not provides "Proof of present" nor "Proof of Absence"
//! Note that this does not provide "Proof of present" nor "Proof of Absence"
//! (operations avail with Merkle trees)
//!
//! For more details here: https://github.com/massalabs/massa/discussions/3852#discussioncomment-6188158
Expand All @@ -45,10 +45,10 @@
//! # Caches
//!
//! A cache of db changes is kept in memory allowing to easily stream it
//! (by streaming, we means: sending it to another massa node (aka bootstrap))
//! (by streaming, we mean: sending it to another massa node (aka bootstrap))
//! There is 2 separate caches: one for 'state' and one for 'versioning'
//!
//! These caches is stored as a key, value: slot -> insertion_data|deletion_data.
//! These caches are stored as a key, value: slot -> insertion_data|deletion_data.
//!
//! # Streaming steps
//!
Expand Down
33 changes: 33 additions & 0 deletions massa-event-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "massa_event_cache"
version = "0.1.0"
edition = "2021"

[features]
test-exports = [
"massa_models/test-exports",
"mockall",
"mockall_wrap"
]


[dependencies]
nom = {workspace = true}
rocksdb = {workspace = true}
tracing = {workspace = true}
parking_lot = { workspace = true }
num_enum = { workspace = true }
massa_models = {workspace = true}
massa_serialization = {workspace = true}
massa_time = {workspace = true}
mockall = {workspace = true, optional = true}
mockall_wrap = {workspace = true, optional = true}

[dev-dependencies]
tempfile = {workspace = true}
serial_test = {workspace = true}
more-asserts = {workspace = true}
rand = {workspace = true}
mockall = {workspace = true}
mockall_wrap = {workspace = true}
massa_models = { workspace = true, features = ["test-exports"] }
22 changes: 22 additions & 0 deletions massa-event-cache/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::path::PathBuf;

pub struct EventCacheConfig {
/// Path to the hard drive cache storage
pub event_cache_path: PathBuf,
/// Maximum number of entries we want to keep in the event cache
pub max_event_cache_length: usize,
/// Amount of entries removed when `event_cache_size` is reached
pub snip_amount: usize,
/// Maximum length of an event data (aka event message)
pub max_event_data_length: u64,
/// Thread count
pub thread_count: u8,
/// Call stack max length
pub max_call_stack_length: u16,
/// Maximum number of events per operation
pub max_events_per_operation: u64,
damip marked this conversation as resolved.
Show resolved Hide resolved
/// Maximum number of operations per block
pub max_operations_per_block: u64,
damip marked this conversation as resolved.
Show resolved Hide resolved
/// Maximum events returned in a query
pub max_events_per_query: usize,
}
136 changes: 136 additions & 0 deletions massa-event-cache/src/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// std
use std::collections::{BTreeSet, VecDeque};
use std::sync::Arc;
// third-party
use parking_lot::{Condvar, Mutex, RwLock};
// internal
use crate::event_cache::EventCache;
use massa_models::execution::EventFilter;
use massa_models::output_event::SCOutputEvent;

/// structure used to communicate with controller
#[derive(Debug, Default)]
pub(crate) struct EventCacheWriterInputData {
/// set stop to true to stop the thread
pub stop: bool,
pub(crate) events: VecDeque<SCOutputEvent>,
}

impl EventCacheWriterInputData {
pub fn new() -> Self {
Self {
stop: Default::default(),
events: Default::default(),
}
}

/*
/// Takes the current input data into a clone that is returned,
/// and resets self.
pub fn take(&mut self) -> Self {
damip marked this conversation as resolved.
Show resolved Hide resolved
Self {
stop: std::mem::take(&mut self.stop),
events: std::mem::take(&mut self.events),
}
}
*/
}

/// interface that communicates with the worker thread
#[cfg_attr(feature = "test-exports", mockall_wrap::wrap, mockall::automock)]
pub trait EventCacheController: Send + Sync {
fn save_events(&self, events: VecDeque<SCOutputEvent>);

fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent>;
}

#[derive(Clone)]
/// implementation of the event cache controller
pub struct EventCacheControllerImpl {
/// input data to process in the VM loop
/// with a wake-up condition variable that needs to be triggered when the data changes
pub(crate) input_data: Arc<(Condvar, Mutex<EventCacheWriterInputData>)>,
/// Event cache
pub(crate) cache: Arc<RwLock<EventCache>>,
}

impl EventCacheController for EventCacheControllerImpl {
fn save_events(&self, events: VecDeque<SCOutputEvent>) {
// lock input data
let mut input_data = self.input_data.1.lock();
input_data.events.extend(events);
// Wake up the condvar in EventCacheWriterThread waiting for events
self.input_data.0.notify_all();
}

fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent> {
damip marked this conversation as resolved.
Show resolved Hide resolved
let mut res_0 = {
// Read from new events first
let lock_0 = self.input_data.1.lock();
#[allow(clippy::unnecessary_filter_map)]
let it = lock_0.events.iter().filter_map(|event| {
if let Some(start) = filter.start {
if event.context.slot < start {
return None;
}
}
if let Some(end) = filter.end {
if event.context.slot >= end {
return None;
}
}
if let Some(is_final) = filter.is_final {
if event.context.is_final != is_final {
return None;
}
}
if let Some(is_error) = filter.is_error {
if event.context.is_error != is_error {
return None;
}
}
match (
filter.original_caller_address,
event.context.call_stack.front(),
) {
(Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
match (filter.emitter_address, event.context.call_stack.back()) {
(Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
match (
filter.original_operation_id,
event.context.origin_operation_id,
) {
(Some(addr1), Some(addr2)) if addr1 != addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
Some(event)
});

let res_0: BTreeSet<SCOutputEvent> = it.cloned().collect();
// Drop the lock on the queue as soon as possible to avoid deadlocks
drop(lock_0);
damip marked this conversation as resolved.
Show resolved Hide resolved
res_0
};

let res_1 = {
// Read from db (on disk) events
let lock = self.cache.read();
let (_, res_1) = lock.get_filtered_sc_output_events(filter);
// Drop the lock on the event cache db asap
drop(lock);
damip marked this conversation as resolved.
Show resolved Hide resolved
res_1
};

// Merge results
let res_1: BTreeSet<SCOutputEvent> = BTreeSet::from_iter(res_1);
res_0.extend(res_1);
Vec::from_iter(res_0)
}
}
Loading
Loading