Skip to content

Commit

Permalink
Merge branch 'mainnet_2_3' into fix_entry_count_massa_module_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo-Besancon authored Dec 19, 2024
2 parents 2f5a060 + 597f487 commit 55e001c
Show file tree
Hide file tree
Showing 29 changed files with 3,837 additions and 890 deletions.
1,040 changes: 653 additions & 387 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ members = [
"massa-versioning",
"massa-grpc",
"massa-xtask",
"massa-event-cache",
]
resolver = "2"

Expand Down Expand Up @@ -105,12 +106,13 @@ massa_test_framework = { path = "./massa-test-framework" }
massa_time = { path = "./massa-time" }
massa_versioning = { path = "./massa-versioning" }
massa_wallet = { path = "./massa-wallet" }
massa_event_cache = { path = "./massa-event-cache" }

# Massa projects dependencies
# massa-proto-rs = { git = "https://github.com/massalabs/massa-proto-rs", branch = "deferred_calls" }
# massa-sc-runtime = { git = "https://github.com/massalabs/massa-sc-runtime", "branch" = "deferred_calls" }
massa-proto-rs = { git = "https://github.com/massalabs/massa-proto-rs", "rev" = "b5267178eaf266ec724691d7de163e4c34343416" }
massa-sc-runtime = { git = "https://github.com/massalabs/massa-sc-runtime", "rev" = "82dd714d38cbcd863ae0bb215c06442917bb6404" }
massa-sc-runtime = { git = "https://github.com/massalabs/massa-sc-runtime", "rev" = "f5a584b9f8050f332c9ed332bd0a40f8e0372807" }


peernet = { git = "https://github.com/massalabs/PeerNet", "rev" = "04b05ddd320fbe76cc858115af7b5fc28bdb8310" }
Expand Down
3 changes: 2 additions & 1 deletion _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ extend-ignore-re = [
# Secret key (S): 18 - 62 characters
# Public key (P): 18 - 62 characters
# NodeId (N)
"(AU|AS|N|S|P)\\d\\w{18,62}",
# OperationId (O)
"(AU|AS|N|S|P|O)\\d\\w{18,62}",
]

[default.extend-words]
Expand Down
6 changes: 3 additions & 3 deletions massa-db-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//! * if we want to delete item a: 1000 ^ 0011 == 1011 (== item b)
//! * if we want to delete item b: 1000 ^ 1011 == 0011 (== item a)
//!
//! Note that this does not provides "Proof of present" nor "Proof of Absence"
//! Note that this does not provide "Proof of present" nor "Proof of Absence"
//! (operations avail with Merkle trees)
//!
//! For more details here: https://github.com/massalabs/massa/discussions/3852#discussioncomment-6188158
Expand All @@ -45,10 +45,10 @@
//! # Caches
//!
//! A cache of db changes is kept in memory allowing to easily stream it
//! (by streaming, we means: sending it to another massa node (aka bootstrap))
//! (by streaming, we mean: sending it to another massa node (aka bootstrap))
//! There is 2 separate caches: one for 'state' and one for 'versioning'
//!
//! These caches is stored as a key, value: slot -> insertion_data|deletion_data.
//! These caches are stored as a key, value: slot -> insertion_data|deletion_data.
//!
//! # Streaming steps
//!
Expand Down
8 changes: 7 additions & 1 deletion massa-db-worker/src/massa_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,14 @@ impl MassaDBController for RawMassaDB<Slot, SlotSerializer, SlotDeserializer> {
}

/// Reset the database, and attach it to the given slot.
///
/// This function is used in the FinalStateController::reset method which is used in the Bootstrap
/// process when the bootstrap fails (Bootstrap slot too old). A bootstrap to another node will likely occur
/// after this reset.
fn reset(&mut self, slot: Slot) {
self.set_initial_change_id(slot);
// For dev: please take care of correctly reset the db to avoid any issue when the bootstrap
// process is restarted
self.set_initial_change_id(slot); // Note: this also reset the field: current_batch
self.change_history.clear();
self.change_history_versioning.clear();
}
Expand Down
33 changes: 33 additions & 0 deletions massa-event-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "massa_event_cache"
version = "0.1.0"
edition = "2021"

[features]
test-exports = [
"massa_models/test-exports",
"mockall",
"mockall_wrap"
]


[dependencies]
nom = {workspace = true}
rocksdb = {workspace = true}
tracing = {workspace = true}
parking_lot = { workspace = true }
num_enum = { workspace = true }
massa_models = {workspace = true}
massa_serialization = {workspace = true}
massa_time = {workspace = true}
mockall = {workspace = true, optional = true}
mockall_wrap = {workspace = true, optional = true}

[dev-dependencies]
tempfile = {workspace = true}
serial_test = {workspace = true}
more-asserts = {workspace = true}
rand = {workspace = true}
mockall = {workspace = true}
mockall_wrap = {workspace = true}
massa_models = { workspace = true, features = ["test-exports"] }
22 changes: 22 additions & 0 deletions massa-event-cache/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::path::PathBuf;

pub struct EventCacheConfig {
/// Path to the hard drive cache storage
pub event_cache_path: PathBuf,
/// Maximum number of entries we want to keep in the event cache
pub max_event_cache_length: usize,
/// Amount of entries removed when `event_cache_size` is reached
pub snip_amount: usize,
/// Maximum length of an event data (aka event message)
pub max_event_data_length: u64,
/// Thread count
pub thread_count: u8,
/// Call stack max length
pub max_call_stack_length: u16,
/// Maximum number of events per operation
pub max_events_per_operation: u64,
/// Maximum number of operations per block
pub max_operations_per_block: u64,
/// Maximum events returned in a query
pub max_events_per_query: usize,
}
136 changes: 136 additions & 0 deletions massa-event-cache/src/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// std
use std::collections::{BTreeSet, VecDeque};
use std::sync::Arc;
// third-party
use parking_lot::{Condvar, Mutex, RwLock};
// internal
use crate::event_cache::EventCache;
use massa_models::execution::EventFilter;
use massa_models::output_event::SCOutputEvent;

/// structure used to communicate with controller
#[derive(Debug, Default)]
pub(crate) struct EventCacheWriterInputData {
/// set stop to true to stop the thread
pub stop: bool,
pub(crate) events: VecDeque<SCOutputEvent>,
}

impl EventCacheWriterInputData {
pub fn new() -> Self {
Self {
stop: Default::default(),
events: Default::default(),
}
}

/*
/// Takes the current input data into a clone that is returned,
/// and resets self.
pub fn take(&mut self) -> Self {
Self {
stop: std::mem::take(&mut self.stop),
events: std::mem::take(&mut self.events),
}
}
*/
}

/// interface that communicates with the worker thread
#[cfg_attr(feature = "test-exports", mockall_wrap::wrap, mockall::automock)]
pub trait EventCacheController: Send + Sync {
fn save_events(&self, events: VecDeque<SCOutputEvent>);

fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent>;
}

#[derive(Clone)]
/// implementation of the event cache controller
pub struct EventCacheControllerImpl {
/// input data to process in the VM loop
/// with a wake-up condition variable that needs to be triggered when the data changes
pub(crate) input_data: Arc<(Condvar, Mutex<EventCacheWriterInputData>)>,
/// Event cache
pub(crate) cache: Arc<RwLock<EventCache>>,
}

impl EventCacheController for EventCacheControllerImpl {
fn save_events(&self, events: VecDeque<SCOutputEvent>) {
// lock input data
let mut input_data = self.input_data.1.lock();
input_data.events.extend(events);
// Wake up the condvar in EventCacheWriterThread waiting for events
self.input_data.0.notify_all();
}

fn get_filtered_sc_output_events(&self, filter: &EventFilter) -> Vec<SCOutputEvent> {
let mut res_0 = {
// Read from new events first
let lock_0 = self.input_data.1.lock();
#[allow(clippy::unnecessary_filter_map)]
let it = lock_0.events.iter().filter_map(|event| {
if let Some(start) = filter.start {
if event.context.slot < start {
return None;
}
}
if let Some(end) = filter.end {
if event.context.slot >= end {
return None;
}
}
if let Some(is_final) = filter.is_final {
if event.context.is_final != is_final {
return None;
}
}
if let Some(is_error) = filter.is_error {
if event.context.is_error != is_error {
return None;
}
}
match (
filter.original_caller_address,
event.context.call_stack.front(),
) {
(Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
match (filter.emitter_address, event.context.call_stack.back()) {
(Some(addr1), Some(addr2)) if addr1 != *addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
match (
filter.original_operation_id,
event.context.origin_operation_id,
) {
(Some(addr1), Some(addr2)) if addr1 != addr2 => return None,
(Some(_), None) => return None,
_ => (),
}
Some(event)
});

let res_0: BTreeSet<SCOutputEvent> = it.cloned().collect();
// Drop the lock on the queue as soon as possible to avoid deadlocks
drop(lock_0);
res_0
};

let res_1 = {
// Read from db (on disk) events
let lock = self.cache.read();
let (_, res_1) = lock.get_filtered_sc_output_events(filter);
// Drop the lock on the event cache db asap
drop(lock);
res_1
};

// Merge results
let res_1: BTreeSet<SCOutputEvent> = BTreeSet::from_iter(res_1);
res_0.extend(res_1);
Vec::from_iter(res_0)
}
}
Loading

0 comments on commit 55e001c

Please sign in to comment.