diff --git a/massa-execution/src/controller.rs b/massa-execution/src/controller.rs index 4a6f744f74a..42c3dca0330 100644 --- a/massa-execution/src/controller.rs +++ b/massa-execution/src/controller.rs @@ -7,6 +7,7 @@ use massa_models::{Block, BlockHashMap}; use std::collections::VecDeque; use tokio::sync::mpsc; use tokio::task::JoinHandle; +use tracing::{error, info}; /// A sender of execution commands. #[derive(Clone)] @@ -35,16 +36,15 @@ pub struct ExecutionManager { } impl ExecutionManager { - pub async fn stop( - self, - execution_event_receiver: ExecutionEventReceiver, - ) -> Result<(), ExecutionError> { + pub async fn stop(self) -> Result<(), ExecutionError> { drop(self.manager_tx); - execution_event_receiver.drain().await; - match self.join_handle.await { - Err(_) => Err(ExecutionError::JoinError), - _ => Ok(()), - } + if let Err(err) = self.join_handle.await { + error!("execution worker crashed: {}", err); + return Err(ExecutionError::JoinError); + }; + + info!("execution worker finished cleanly"); + Ok(()) } } @@ -72,16 +72,13 @@ pub async fn start_controller( // Unbounded, as execution is limited per metering already. let (event_tx, event_rx) = mpsc::unbounded_channel::(); - - let mut worker = ExecutionWorker::new(cfg, thread_count, event_tx, command_rx, manager_rx)?; - + let worker = ExecutionWorker::new(cfg, thread_count, event_tx, command_rx, manager_rx)?; let join_handle = tokio::spawn(async move { match worker.run_loop().await { Err(err) => Err(err), Ok(v) => Ok(v), } }); - Ok(( ExecutionCommandSender(command_tx), ExecutionEventReceiver(event_rx), diff --git a/massa-execution/src/tests/scenarios_mandatories.rs b/massa-execution/src/tests/scenarios_mandatories.rs index 92ddb722e96..131413ee8fe 100644 --- a/massa-execution/src/tests/scenarios_mandatories.rs +++ b/massa-execution/src/tests/scenarios_mandatories.rs @@ -12,27 +12,21 @@ async fn test_execution_basic() { #[tokio::test] #[serial] async fn test_execution_shutdown() { - let (_command_sender, event_receiver, manager) = start_controller(ExecutionConfig {}, 2) + let (_command_sender, _event_receiver, manager) = start_controller(ExecutionConfig {}, 2) .await .expect("Failed to start execution."); - manager - .stop(event_receiver) - .await - .expect("Failed to stop execution."); + manager.stop().await.expect("Failed to stop execution."); } #[tokio::test] #[serial] async fn test_sending_command() { - let (mut command_sender, event_receiver, manager) = start_controller(ExecutionConfig {}, 2) + let (mut command_sender, _event_receiver, manager) = start_controller(ExecutionConfig {}, 2) .await .expect("Failed to start execution."); command_sender .update_blockclique(Default::default(), Default::default()) .await .expect("Failed to send command"); - manager - .stop(event_receiver) - .await - .expect("Failed to stop execution."); + manager.stop().await.expect("Failed to stop execution."); } diff --git a/massa-execution/src/types.rs b/massa-execution/src/types.rs index dc93901f3c9..d1b82ec11e1 100644 --- a/massa-execution/src/types.rs +++ b/massa-execution/src/types.rs @@ -107,6 +107,8 @@ pub(crate) enum ExecutionRequest { ResetToFinalState, /// Shutdown state, set by the worker to signal shutdown to the VM thread. Shutdown, + /// Starting state + Starting, } pub(crate) type ExecutionQueue = Arc<(Mutex>, Condvar)>; diff --git a/massa-execution/src/worker.rs b/massa-execution/src/worker.rs index cb26c2d96b7..8745a389196 100644 --- a/massa-execution/src/worker.rs +++ b/massa-execution/src/worker.rs @@ -6,6 +6,7 @@ use crate::vm::VM; use crate::{config::ExecutionConfig, types::ExecutionStep}; use massa_models::{Block, BlockHashMap, BlockId, Slot}; use tokio::sync::mpsc; +use tracing::debug; /// Commands sent to the `execution` component. #[derive(Debug)] @@ -48,22 +49,44 @@ pub struct ExecutionWorker { /// pending CSS final blocks ordered_pending_css_final_blocks: Vec<(BlockId, Block)>, /// VM - vm_thread: Option>, + vm_thread: JoinHandle<()>, /// VM execution requests queue - request_queue: ExecutionQueue, + execution_queue: ExecutionQueue, } impl ExecutionWorker { pub fn new( - _cfg: ExecutionConfig, + cfg: ExecutionConfig, thread_count: u8, event_sender: mpsc::UnboundedSender, controller_command_rx: mpsc::Receiver, controller_manager_rx: mpsc::Receiver, ) -> Result { + let execution_queue = ExecutionQueue::default(); + let execution_queue_clone = execution_queue.clone(); + let cfg_clone = cfg.clone(); + // Start vm thread + let vm_thread = thread::spawn(move || { + let mut vm = VM::new(cfg); + let (lock, condvar) = &*execution_queue_clone; + let mut requests = lock.lock().unwrap(); + requests.push_back(ExecutionRequest::Starting); + // Run until shutdown. + loop { + match &requests.pop_front() { + Some(ExecutionRequest::RunFinalStep(step)) => vm.run_final_step(step), + Some(ExecutionRequest::RunActiveStep(step)) => vm.run_active_step(step), + Some(ExecutionRequest::ResetToFinalState) => vm.reset_to_final(), + Some(ExecutionRequest::Shutdown) => return, + Some(ExecutionRequest::Starting) => {} + None => panic!("Unexpected request None"), + }; + requests = condvar.wait(requests).unwrap(); + } + }); // return execution worker Ok(ExecutionWorker { - _cfg, + _cfg: cfg_clone, thread_count, controller_command_rx, controller_manager_rx, @@ -73,121 +96,76 @@ impl ExecutionWorker { last_active_slot: Slot::new(0, 0), ordered_active_blocks: Default::default(), ordered_pending_css_final_blocks: Default::default(), - vm_thread: None, - request_queue: ExecutionQueue::default(), + vm_thread, + execution_queue, }) } - fn start_thread(&mut self) { - let reqs_clone = self.request_queue.clone(); - let cfg = self._cfg.clone(); - // Start vm thread - self.vm_thread = Some(thread::spawn(move || { - let mut vm = VM::new(cfg); - // Run until shutdown. - let condvar = &reqs_clone.1; - condvar.notify_one(); - loop { - let (lock, condvar) = &*reqs_clone; - let lock = lock.lock().unwrap(); - let mut requests = condvar.wait(lock).unwrap(); - if let Some(request) = &requests.pop_front() { - match request { - ExecutionRequest::RunFinalStep(step) => vm.run_final_step(step), - ExecutionRequest::RunActiveStep(step) => vm.run_active_step(step), - ExecutionRequest::ResetToFinalState => vm.reset_to_final(), - ExecutionRequest::Shutdown => return, - }; - } else { - panic!("Unexpected execution queue state.") - } - } - })); - // Wait for the VM thread to have started - let _started_flag = self - .request_queue - .1 - .wait(self.request_queue.0.lock().unwrap()) - .unwrap(); - } - // asks the VM to reset to its final pub fn reset_to_final(&mut self) { - let (queue_lock, condvar) = &*self.request_queue; + let (queue_lock, condvar) = &*self.execution_queue; let queue_guard = &mut queue_lock.lock().unwrap(); // cancel all non-final requests // Final execution requests are left to maintain final state consistency - queue_guard.retain(|req| match req { - ExecutionRequest::RunFinalStep(..) => true, - ExecutionRequest::RunActiveStep(..) => false, - ExecutionRequest::ResetToFinalState => false, - ExecutionRequest::Shutdown => true, + queue_guard.retain(|req| { + matches!( + req, + ExecutionRequest::RunFinalStep(..) | ExecutionRequest::Shutdown + ) }); // request reset to final state queue_guard.push_back(ExecutionRequest::ResetToFinalState); + // notify condvar.notify_one(); } - fn stop_thread(&mut self) -> anyhow::Result<()> { - self.push_request(ExecutionRequest::Shutdown); - if let Some(th) = self.vm_thread.take() { - match th.join() { - Err(_) => anyhow::bail!("Failed joining vm thread"), - _ => return Ok(()), - } - } - anyhow::bail!("Failed joining vm thread") - } - /// runs an SCE-active step (slot) /// /// # Arguments /// * slot: target slot /// * block: None if miss, Some(block_id, block) otherwise fn push_request(&mut self, request: ExecutionRequest) { - let (queue_lock, condvar) = &*self.request_queue; + let (queue_lock, condvar) = &*self.execution_queue; let queue_guard = &mut queue_lock.lock().unwrap(); queue_guard.push_back(request); condvar.notify_one(); } - pub async fn run_loop(&mut self) -> Result<(), ExecutionError> { - self.start_thread(); + pub async fn run_loop(mut self) -> Result<(), ExecutionError> { loop { tokio::select! { // Process management commands - cmd = self.controller_manager_rx.recv() => { - self.stop_thread().unwrap(); - match cmd { - None => break, - Some(_) => {} - } - }, + _ = self.controller_manager_rx.recv() => break, + // Process commands - Some(cmd) = self.controller_command_rx.recv() => self.process_command(cmd).await?, + Some(cmd) = self.controller_command_rx.recv() => self.process_command(cmd)?, } } + // Shutdown VM, cancel all pending execution requests + self.push_request(ExecutionRequest::Shutdown); + if self.vm_thread.join().is_err() { + debug!("Failed joining vm thread") + } Ok(()) } - /// Process a given command. + /// Proces a given command. /// /// # Argument /// * cmd: command to process - async fn process_command(&mut self, cmd: ExecutionCommand) -> Result<(), ExecutionError> { + fn process_command(&mut self, cmd: ExecutionCommand) -> Result<(), ExecutionError> { match cmd { ExecutionCommand::BlockCliqueChanged { blockclique, finalized_blocks, } => { - self.blockclique_changed(blockclique, finalized_blocks) - .await?; + self.blockclique_changed(blockclique, finalized_blocks)?; } } Ok(()) } - async fn blockclique_changed( + fn blockclique_changed( &mut self, blockclique: BlockHashMap, finalized_blocks: BlockHashMap, diff --git a/massa-node/src/main.rs b/massa-node/src/main.rs index 6e88daf4146..04ab339d579 100644 --- a/massa-node/src/main.rs +++ b/massa-node/src/main.rs @@ -234,14 +234,14 @@ async fn stop( api_private_handle.stop(); // stop consensus controller - let (protocol_event_receiver, execution_event_receiver) = consensus_manager + let (protocol_event_receiver, _execution_event_receiver) = consensus_manager .stop(consensus_event_receiver) .await .expect("consensus shutdown failed"); // Stop execution controller. execution_manager - .stop(execution_event_receiver) + .stop() .await .expect("Failed to shutdown execution.");