From f2971ca31efe92a058b6e33cd04fac83d7402334 Mon Sep 17 00:00:00 2001 From: DogLooksGood Date: Wed, 20 Nov 2024 21:59:38 +0800 Subject: [PATCH] Bugfixes for subgame --- Cargo.lock | 1 + api/src/event.rs | 16 ++++ core/Cargo.toml | 15 ++-- core/src/checkpoint.rs | 4 +- core/src/context.rs | 23 ++---- core/src/engine.rs | 6 -- core/src/types/accounts/game_account.rs | 19 +++++ env/src/config.rs | 1 + facade/src/main.rs | 2 +- js/sdk-core/src/app-client.ts | 2 +- js/sdk-core/src/events.ts | 17 +++++ js/sdk-core/src/game-context.ts | 26 +++++-- transactor/src/component.rs | 2 +- transactor/src/component/broadcaster.rs | 2 +- transactor/src/component/common.rs | 22 ++++-- transactor/src/component/event_bridge.rs | 73 ++++++++++++------- transactor/src/component/event_loop.rs | 55 +++++++------- .../src/component/event_loop/event_handler.rs | 56 +++++++------- transactor/src/context.rs | 11 +-- transactor/src/frame.rs | 18 ++--- transactor/src/game_manager.rs | 15 +--- transactor/src/handle.rs | 14 +--- transactor/src/handle/subgame.rs | 7 +- transport/src/solana.rs | 9 ++- 24 files changed, 238 insertions(+), 178 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c009342..0cfb6d76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3124,6 +3124,7 @@ dependencies = [ "serde_json", "sha256", "thiserror", + "tokio", ] [[package]] diff --git a/api/src/event.rs b/api/src/event.rs index 8f9b926f..40a61751 100644 --- a/api/src/event.rs +++ b/api/src/event.rs @@ -255,3 +255,19 @@ pub trait BridgeEvent: Sized + BorshSerialize + BorshDeserialize { Self::try_from_slice(slice).or(Err(HandleError::MalformedBridgeEvent)) } } + +#[cfg(test)] +mod tests { + + use crate::effect::Effect; + + use super::*; + + #[test] + fn a() { + let v = vec![0,0,0,0,0,66,21,114,73,147,1,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]; + let e = Effect::try_from_slice(&v); + + println!("{:?}", e); + } +} diff --git a/core/Cargo.toml b/core/Cargo.toml index 3bc411e6..b01fb265 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,15 +14,16 @@ readme.workspace = true publish = true [dependencies] -sha256 = { workspace = true } -race-api = { workspace = true } -async-trait = { workspace = true } -thiserror = { workspace = true } +sha256.workspace = true +race-api.workspace = true +async-trait.workspace = true +thiserror.workspace = true serde = { workspace = true, features = ["derive"], optional = true } serde_json = { workspace = true, optional = true } -borsh = { workspace = true } -futures = { workspace = true } -rs_merkle = { workspace = true } +borsh.workspace = true +futures.workspace = true +rs_merkle.workspace = true +tokio = { workspace = true, features = ["sync"] } [dev-dependencies] anyhow.workspace = true diff --git a/core/src/checkpoint.rs b/core/src/checkpoint.rs index 8a3496a2..c44b706e 100644 --- a/core/src/checkpoint.rs +++ b/core/src/checkpoint.rs @@ -141,7 +141,7 @@ impl Checkpoint { } } - pub fn init_data(&mut self, id: usize, game_spec: GameSpec, data: Vec) -> Result<(), Error> { + pub fn init_data(&mut self, id: usize, game_spec: GameSpec, versions: Versions, data: Vec) -> Result<(), Error> { match self.data.entry(id) { std::collections::hash_map::Entry::Occupied(_) => { return Err(Error::CheckpointAlreadyExists); @@ -154,7 +154,7 @@ impl Checkpoint { data, sha: sha.into(), game_spec, - ..Default::default() + versions, }; v.insert(versioned_data); self.update_root_and_proofs(); diff --git a/core/src/context.rs b/core/src/context.rs index 843089dc..6d5ed226 100644 --- a/core/src/context.rs +++ b/core/src/context.rs @@ -126,7 +126,7 @@ impl ContextPlayer { #[derive(Debug, Clone)] pub enum SubGameInitSource { FromCheckpoint(VersionedData), - FromInitAccount(InitAccount), + FromInitAccount(InitAccount, Versions), } #[derive(Debug, Clone)] @@ -214,6 +214,8 @@ impl GameContext { pub fn try_new_with_sub_game_spec(init: SubGameInit) -> Result { let SubGameInit { spec, nodes, source } = init; + println!("NODES: {:?}", nodes); + let (handler_state, versions, init_data, checkpoint) = match source { SubGameInitSource::FromCheckpoint(versioned_data) => { let mut checkpoint = Checkpoint::default(); @@ -222,11 +224,13 @@ impl GameContext { checkpoint.init_versioned_data(versioned_data)?; (data, versions, vec![], checkpoint) } - SubGameInitSource::FromInitAccount(init_account) => { - (vec![], Default::default(), init_account.data, Default::default()) + SubGameInitSource::FromInitAccount(init_account, versions) => { + (vec![], versions, init_account.data, Default::default()) } }; + println!("Versions: {}", versions); + Ok(Self { spec, nodes: nodes.clone(), @@ -581,17 +585,6 @@ impl GameContext { self.status = status; } - /// Set player status by address. - /// Using it in custom event handler is not allowed. - pub fn set_node_status(&mut self, addr: &str, status: NodeStatus) -> Result<()> { - if let Some(n) = self.nodes.iter_mut().find(|n| n.addr.eq(&addr)) { - n.status = status; - } else { - return Err(Error::InvalidPlayerAddress); - } - Ok(()) - } - pub fn add_node(&mut self, node_addr: String, access_version: u64, mode: ClientMode) { self.nodes.retain(|n| n.addr.ne(&node_addr)); self.nodes @@ -892,7 +885,7 @@ impl GameContext { self.set_game_status(GameStatus::Idle); } else if is_init { self.bump_settle_version()?; - self.checkpoint.init_data(self.spec.game_id, self.spec.clone(), state)?; + self.checkpoint.init_data(self.spec.game_id, self.spec.clone(), self.versions, state)?; self.checkpoint.set_access_version(self.versions.access_version); self.set_game_status(GameStatus::Idle); } diff --git a/core/src/engine.rs b/core/src/engine.rs index ea3a9447..3c55773c 100644 --- a/core/src/engine.rs +++ b/core/src/engine.rs @@ -129,12 +129,6 @@ pub fn general_handle_event( Ok(()) } - Event::Bridge { - .. - } => { - Ok(()) - } - _ => Ok(()), } } diff --git a/core/src/types/accounts/game_account.rs b/core/src/types/accounts/game_account.rs index ed979162..3550f876 100644 --- a/core/src/types/accounts/game_account.rs +++ b/core/src/types/accounts/game_account.rs @@ -49,6 +49,12 @@ pub struct PlayerJoin { pub verify_key: String, } +impl std::fmt::Display for PlayerJoin { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Player#{}@{}", self.addr, self.access_version) + } +} + impl PlayerJoin { pub fn new>( addr: S, @@ -91,6 +97,12 @@ impl ServerJoin { } } +impl std::fmt::Display for ServerJoin { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Server#{}@{}", self.addr, self.access_version) + } +} + /// Represent a player call the deposit instruction in contract. #[derive(Debug, PartialEq, Eq, Clone, BorshSerialize, BorshDeserialize)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -101,6 +113,13 @@ pub struct PlayerDeposit { pub settle_version: u64, } +impl std::fmt::Display for PlayerDeposit { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Deposit#{}-{}", self.addr, self.amount) + } +} + + impl PlayerDeposit { pub fn new>(addr: S, balance: u64, settle_version: u64) -> Self { Self { diff --git a/env/src/config.rs b/env/src/config.rs index 1ab870f3..e7af8de9 100644 --- a/env/src/config.rs +++ b/env/src/config.rs @@ -34,6 +34,7 @@ pub struct TransactorConfig { pub disable_blacklist: Option, pub debug_mode: Option, pub log_dir: Option, + pub bundle_dir: Option, } #[derive(Deserialize)] diff --git a/facade/src/main.rs b/facade/src/main.rs index 6c27d5bd..a7bcdcf7 100644 --- a/facade/src/main.rs +++ b/facade/src/main.rs @@ -728,7 +728,7 @@ async fn run_server(context: Context) -> anyhow::Result { let middleware = ServiceBuilder::new().layer(cors); let http_server = ServerBuilder::default() - .max_response_body_size(64_000_000) + .max_response_body_size(1_000_000_000) .set_host_filtering(AllowHosts::Any) .set_middleware(middleware) .build(HTTP_HOST.parse::()?) diff --git a/js/sdk-core/src/app-client.ts b/js/sdk-core/src/app-client.ts index 3b0b1862..0a9844b0 100644 --- a/js/sdk-core/src/app-client.ts +++ b/js/sdk-core/src/app-client.ts @@ -257,7 +257,7 @@ export class AppClient extends BaseClient { if (checkpointOffChain !== undefined && checkpointOnChain !== undefined) { this.__gameContext.checkpoint = Checkpoint.fromParts(checkpointOffChain, checkpointOnChain) } else { - this.__gameContext.checkpoint = Checkpoint.default() + throw new Error('Game is not served. This is unexpected, mostly a bug.') } const gameContext = this.__gameContext.subContext(subGame, this.__gameContext.checkpoint); diff --git a/js/sdk-core/src/events.ts b/js/sdk-core/src/events.ts index 6e652014..5ad78851 100644 --- a/js/sdk-core/src/events.ts +++ b/js/sdk-core/src/events.ts @@ -23,6 +23,7 @@ export type EventKind = | 'SecretsReady' | 'Shutdown' | 'Bridge' + | 'SubGameReady' // Client-only events | 'Init' | 'Checkpoint' @@ -424,6 +425,22 @@ export class Bridge extends GameEvent implements IEventKind { } } +@variant(20) +export class SubGameReady extends GameEvent implements IEventKind { + @field('usize') + gameId!: number + + constructor(fields: Fields) { + super() + Object.assign(this, fields) + Object.setPrototypeOf(this, SubGameReady.prototype) + } + + kind(): EventKind { + return 'SubGameReady' + } +} + // Client-only events, they can't be serialized and deserialized. export class Init extends GameEvent implements IEventKind { diff --git a/js/sdk-core/src/game-context.ts b/js/sdk-core/src/game-context.ts index a4901b63..9c3e9907 100644 --- a/js/sdk-core/src/game-context.ts +++ b/js/sdk-core/src/game-context.ts @@ -148,6 +148,19 @@ export class GameContext { settleVersion: gameAccount.settleVersion, }) + let subGames = [] + + for (const versionedData of checkpoint.data.values()) { + subGames.push(new SubGame({ + gameId: versionedData.id, + bundleAddr: versionedData.spec.bundleAddr, + initAccount: new InitAccount({ + maxPlayers: versionedData.spec.maxPlayers, + data: Uint8Array.of(), + }) + })) + } + this.spec = spec this.versions = versions this.status = 'idle' @@ -158,7 +171,7 @@ export class GameContext { this.decisionStates = [] this.handlerState = handlerState this.checkpoint = checkpoint - this.subGames = [] + this.subGames = subGames this.initData = gameAccount.data this.players = players this.entryType = gameAccount.entryType @@ -171,6 +184,7 @@ export class GameContext { // Use init_account or checkpoint let versions: Versions let initData: Uint8Array + let handlerState: Uint8Array let spec = new GameSpec({ gameAddr: this.spec.gameAddr, gameId: subGame.gameId, @@ -180,13 +194,15 @@ export class GameContext { if (source instanceof InitAccount) { versions = Versions.default() initData = source.data + handlerState = Uint8Array.of() } else { - const v = source.getVersionedData(spec.gameId)?.versions - if (v === undefined){ + const vd = source.getVersionedData(spec.gameId) + if (vd === undefined){ throw new Error('Missing checkpoint') } - versions = v + versions = vd.versions initData = Uint8Array.of() + handlerState = vd.data } c.versions = versions c.nodes = this.nodes @@ -195,7 +211,7 @@ export class GameContext { c.timestamp = 0n c.randomStates = [] c.decisionStates = [] - c.handlerState = Uint8Array.of() + c.handlerState = handlerState; c.checkpoint = this.checkpoint.clone() c.subGames = [] c.initData = initData diff --git a/transactor/src/component.rs b/transactor/src/component.rs index 28276134..add65035 100644 --- a/transactor/src/component.rs +++ b/transactor/src/component.rs @@ -29,4 +29,4 @@ pub use wrapped_client::WrappedClient; pub use wrapped_handler::WrappedHandler; pub use wrapped_transport::WrappedTransport; pub use wrapped_storage::WrappedStorage; -pub use event_bridge::{EventBridgeChild, EventBridgeParent}; +pub use event_bridge::{EventBridgeChild, EventBridgeParent, BridgeToParent}; diff --git a/transactor/src/component/broadcaster.rs b/transactor/src/component/broadcaster.rs index 8ccadde9..56010b5e 100644 --- a/transactor/src/component/broadcaster.rs +++ b/transactor/src/component/broadcaster.rs @@ -278,7 +278,7 @@ impl Component for Broadcaster { let mut event_backup_groups = ctx.event_backup_groups.lock().await; if let Some(current) = event_backup_groups.back_mut() { - info!("{} Merge sync: {:?}", env.log_prefix, sync); + // info!("{} Merge sync: {:?}", env.log_prefix, sync); current.sync.merge(&sync); } else { error!("{} Sync dropped", env.log_prefix); diff --git a/transactor/src/component/common.rs b/transactor/src/component/common.rs index 9c46c94e..e1b987fe 100644 --- a/transactor/src/component/common.rs +++ b/transactor/src/component/common.rs @@ -7,7 +7,7 @@ use tracing::{info, warn}; use crate::{frame::EventFrame, utils::addr_shorthand}; -use super::{event_bus::CloseReason}; +use super::event_bus::CloseReason; /// An interface for a component that can be attached to the event bus. pub trait Attachable { @@ -37,7 +37,11 @@ pub struct PortsHandle { } impl PortsHandle { - fn from_inner>(id: S, value: PortsHandleInner, join_handle: JoinHandle) -> Self { + fn from_inner>( + id: S, + value: PortsHandleInner, + join_handle: JoinHandle, + ) -> Self { Self { id: id.into(), input_tx: value.input_tx, @@ -49,7 +53,13 @@ impl PortsHandle { impl PortsHandle { pub async fn wait(self) -> CloseReason { - self.join_handle.await.unwrap() + match self.join_handle.await { + Ok(close_reason) => close_reason, + Err(e) => CloseReason::Fault(race_core::error::Error::InternalError(format!( + "Error in waiting close reason: {:?}", + e + ))), + } } #[allow(dead_code)] @@ -182,7 +192,7 @@ impl PipelinePorts { pub fn clone_as_producer(&self) -> ProducerPorts { ProducerPorts { - tx: self.tx.clone() + tx: self.tx.clone(), } } @@ -223,7 +233,9 @@ pub struct ComponentEnv { impl ComponentEnv { pub fn new(addr: &str, component_name: &str) -> Self { let addr_short = addr_shorthand(addr); - Self { log_prefix: format!("[{}|{}]", addr_short, component_name)} + Self { + log_prefix: format!("[{}|{}]", addr_short, component_name), + } } } diff --git a/transactor/src/component/event_bridge.rs b/transactor/src/component/event_bridge.rs index 3590a018..8ed84c51 100644 --- a/transactor/src/component/event_bridge.rs +++ b/transactor/src/component/event_bridge.rs @@ -5,16 +5,26 @@ use tracing::{info, log::error}; use super::{common::PipelinePorts, CloseReason, Component, ComponentEnv}; +#[derive(Debug)] +pub struct BridgeToParent { + tx_to_parent: mpsc::Sender, + rx_from_parent: broadcast::Receiver, +} + #[allow(dead_code)] pub struct EventBridgeParentContext { + /// The sender to send to sub games. tx: broadcast::Sender, + /// The receiver to receive from sub games. rx: mpsc::Receiver, + /// The sender used to be cloned when launching sub games. + sub_tx: mpsc::Sender, signal_tx: mpsc::Sender, } #[derive(Clone, Debug)] pub struct EventBridgeParent { - tx: mpsc::Sender, + #[allow(unused)] bc: broadcast::Sender, } @@ -34,29 +44,16 @@ impl EventBridgeParent { let (bc_tx, _bc_rx) = broadcast::channel(10); ( Self { - tx: mpsc_tx, bc: bc_tx.clone(), }, EventBridgeParentContext { tx: bc_tx, rx: mpsc_rx, + sub_tx: mpsc_tx.clone(), signal_tx, }, ) } - - pub fn derive_child(&self, game_id: usize) -> (EventBridgeChild, EventBridgeChildContext) { - ( - EventBridgeChild { - game_id: game_id.clone(), - }, - EventBridgeChildContext { - game_id, - tx: self.tx.clone(), - rx: self.bc.subscribe(), - }, - ) - } } impl EventBridgeParent { @@ -99,7 +96,7 @@ impl Component for EventBridgeParent { ) -> CloseReason { while let Some((from_bridge, event_frame)) = Self::read_event(&mut ports, &mut ctx.rx).await { - if from_bridge { + if from_bridge { // Bridge parent receives event from bridge child match event_frame { EventFrame::SendBridgeEvent { from, @@ -122,27 +119,32 @@ impl Component for EventBridgeParent { .await; } - EventFrame::SubGameReady { .. } => { + EventFrame::SubGameReady { game_id, .. } => { + info!("{} Receives subgame ready: {}", env.log_prefix, game_id); ports.send(event_frame).await; } _ => (), } - } else { + } else { // Bridge parent receives event from event bus match event_frame { EventFrame::LaunchSubGame { sub_game_init } => { - let f = SignalFrame::LaunchSubGame { sub_game_init: *sub_game_init }; + let f = SignalFrame::LaunchSubGame { + sub_game_init: *sub_game_init, + bridge_to_parent: BridgeToParent { + rx_from_parent: ctx.tx.subscribe(), + tx_to_parent: ctx.sub_tx.clone(), + }, + }; if let Err(e) = ctx.signal_tx.send(f).await { error!("{} Failed to send: {}", env.log_prefix, e); } } EventFrame::Shutdown => { - info!("{} Stopped", env.log_prefix); - if !ctx.tx.is_empty() { - info!("{} Sends Shutdown", env.log_prefix); - if let Err(e) = ctx.tx.send(event_frame) { - error!("{} Failed to send: {}", env.log_prefix, e); - } + info!("{} Sends Shutdown", env.log_prefix); + if let Err(e) = ctx.tx.send(event_frame) { + error!("{} Failed to send: {}", env.log_prefix, e); } + info!("{} Stopped", env.log_prefix); break; } EventFrame::SendBridgeEvent { dest, .. } if dest != 0 => { @@ -177,6 +179,20 @@ impl Component for EventBridgeParent { } impl EventBridgeChild { + + pub fn init(game_id: usize, bridge_to_parent: BridgeToParent) -> (EventBridgeChild, EventBridgeChildContext) { + ( + EventBridgeChild { + game_id: game_id.clone(), + }, + EventBridgeChildContext { + game_id, + tx: bridge_to_parent.tx_to_parent, + rx: bridge_to_parent.rx_from_parent, + }, + ) + } + /// Read event from both the local event bus and the bridge. /// Return (true, event) when the event is from the bridge. /// Return None when bridge is closed. @@ -189,7 +205,7 @@ impl EventBridgeChild { if let Ok(e) = e { Some((true, e)) } else { - None + None } }, e = ports.recv() => { @@ -216,7 +232,7 @@ impl Component for EventBridgeChild { ) -> CloseReason { while let Some((from_bridge, event_frame)) = Self::read_event(&mut ports, &mut ctx.rx).await { - if from_bridge { + if from_bridge { // Bridge child receives event from event parent match event_frame { EventFrame::Shutdown => { info!("{} Stopped", env.log_prefix); @@ -253,11 +269,12 @@ impl Component for EventBridgeChild { } _ => {} } - } else { + } else { // Bridge child receives event from event bus match event_frame { EventFrame::Shutdown => break, EventFrame::SubGameReady { .. } => { + info!("{} Send SubGameReady to parent", env.log_prefix); if let Err(e) = ctx.tx.send(event_frame).await { error!("{} Failed to send: {}", env.log_prefix, e); } diff --git a/transactor/src/component/event_loop.rs b/transactor/src/component/event_loop.rs index 756a4d84..6ac9e789 100644 --- a/transactor/src/component/event_loop.rs +++ b/transactor/src/component/event_loop.rs @@ -61,14 +61,12 @@ impl Component for EventLoop { ctx.game_mode, &env, ).await { - ports.send(EventFrame::Shutdown).await; - return close_reason; + return close_reason } } else { if let Some(close_reason) = event_handler::resume_from_checkpoint( &mut game_context, &ports, ctx.client_mode, ctx.game_mode, &env ).await { - ports.send(EventFrame::Shutdown).await; return close_reason; } } @@ -87,12 +85,10 @@ impl Component for EventLoop { ctx.game_mode, timestamp, &env, - ) - .await - { - ports.send(EventFrame::Shutdown).await; + ).await { return close_reason; } + } } @@ -176,7 +172,6 @@ impl Component for EventLoop { timestamp, &env, ).await { - ports.send(EventFrame::Shutdown).await; return close_reason; } } @@ -195,7 +190,6 @@ impl Component for EventLoop { timestamp, &env, ).await { - ports.send(EventFrame::Shutdown).await; return close_reason; } } @@ -214,10 +208,7 @@ impl Component for EventLoop { ctx.game_mode, timestamp, &env, - ) - .await - { - ports.send(EventFrame::Shutdown).await; + ).await { return close_reason; } } else { @@ -232,10 +223,24 @@ impl Component for EventLoop { if ctx.game_mode == GameMode::Main { info!("Update checkpoint for sub game: {}", game_id); if let Err(e) = game_context.checkpoint_mut().init_versioned_data(checkpoint_state) { - error!("{} Failed to set checkpoint data: {:?}", env.log_prefix, e); + error!("{} Failed to init checkpoint data: {:?}", env.log_prefix, e); ports.send(EventFrame::Shutdown).await; } } + let timestamp = current_timestamp(); + let event = Event::SubGameReady { game_id }; + if let Some(close_reason) = event_handler::handle_event( + &mut handler, + &mut game_context, + event, + &ports, + ctx.client_mode, + ctx.game_mode, + timestamp, + &env, + ).await { + return close_reason; + } } EventFrame::RecvBridgeEvent { @@ -268,10 +273,7 @@ impl Component for EventLoop { ctx.game_mode, timestamp, &env, - ) - .await - { - ports.send(EventFrame::Shutdown).await; + ).await { return close_reason; } } @@ -285,19 +287,17 @@ impl Component for EventLoop { ctx.game_mode, timestamp, &env, - ) - .await - { - ports.send(EventFrame::Shutdown).await; + ).await { return close_reason; } } EventFrame::SendServerEvent { event, timestamp } => { // Handle the shutdown event from game logic if matches!(event, Event::Shutdown) { - ports.send(EventFrame::Shutdown).await; return CloseReason::Complete; - } else if let Some(close_reason) = event_handler::handle_event( + } + + if let Some(close_reason) = event_handler::handle_event( &mut handler, &mut game_context, event, @@ -306,10 +306,7 @@ impl Component for EventLoop { ctx.game_mode, timestamp, &env, - ) - .await - { - ports.send(EventFrame::Shutdown).await; + ).await { return close_reason; } } @@ -322,10 +319,12 @@ impl Component for EventLoop { } return CloseReason::Complete; + } } impl EventLoop { + pub fn init( handler: WrappedHandler, game_context: GameContext, diff --git a/transactor/src/component/event_loop/event_handler.rs b/transactor/src/component/event_loop/event_handler.rs index ad9682b2..ea563f04 100644 --- a/transactor/src/component/event_loop/event_handler.rs +++ b/transactor/src/component/event_loop/event_handler.rs @@ -5,7 +5,6 @@ use race_api::{ types::{EntryLock, Settle, Transfer}, }; use race_core::{ - checkpoint::Checkpoint, context::{EventEffects, GameContext, SubGameInit, SubGameInitSource, Versions}, error::Error, types::{ClientMode, GameMode, GameSpec}, @@ -84,7 +83,6 @@ async fn send_bridge_event( } async fn send_settlement( - checkpoint: Checkpoint, transfers: Vec, settles: Vec, entry_lock: Option, @@ -93,6 +91,7 @@ async fn send_settlement( ports: &PipelinePorts, env: &ComponentEnv, ) { + let checkpoint = game_context.checkpoint().clone(); let checkpoint_size = checkpoint.get_data(game_context.game_id()).map(|d| d.len()); info!( "{} Create checkpoint, settle_version: {}, size: {:?}", @@ -129,16 +128,18 @@ async fn launch_sub_game( init_account, } = sub_game; // Use the existing checkpoint when possible. + let spec = GameSpec { + game_addr: game_context.game_addr().to_owned(), + game_id: id, + bundle_addr, + max_players: init_account.max_players, + }; + let versions = game_context.versions(); let ef = EventFrame::LaunchSubGame { sub_game_init: Box::new(SubGameInit { - spec: GameSpec { - game_addr: game_context.game_addr().to_owned(), - game_id: id, - bundle_addr, - max_players: init_account.max_players, - }, + spec, nodes: game_context.get_nodes().into(), - source: SubGameInitSource::FromInitAccount(init_account), + source: SubGameInitSource::FromInitAccount(init_account, versions), }), }; ports.send(ef).await; @@ -174,7 +175,6 @@ pub async fn init_state( let EventEffects { checkpoint, - launch_sub_games, .. } = effects; @@ -187,11 +187,11 @@ pub async fn init_state( ); let Some(checkpoint) = checkpoint else { + ports.send(EventFrame::Shutdown).await; return Some(CloseReason::Fault(Error::CheckpointNotFoundAfterInit)); }; send_settlement( - checkpoint.clone(), vec![], vec![], None, @@ -203,16 +203,10 @@ pub async fn init_state( .await; // Dispatch the initial Ready event if running in Transactor mode. - // The whole game lifetime has only one Ready event. if client_mode == ClientMode::Transactor { game_context.dispatch_safe(Event::Ready, 0); } - // XXX: launch sub games based on checkpoint, not effects - if game_mode == GameMode::Main { - launch_sub_game(launch_sub_games, game_context, ports, env).await; - } - // Tell master game the subgame is successfully created. if game_mode == GameMode::Sub { let game_id = game_context.game_id(); @@ -225,6 +219,7 @@ pub async fn init_state( }) .await; } else { + ports.send(EventFrame::Shutdown).await; return Some(CloseReason::Fault(Error::CheckpointNotFoundAfterInit)); } } @@ -235,18 +230,19 @@ pub async fn init_state( pub async fn resume_from_checkpoint( game_context: &mut GameContext, ports: &PipelinePorts, - _client_mode: ClientMode, + client_mode: ClientMode, game_mode: GameMode, - _env: &ComponentEnv, + env: &ComponentEnv, ) -> Option { if game_mode == GameMode::Main { let versioned_data_list = game_context.checkpoint().list_versioned_data(); + info!("{} Launch {} subgames", env.log_prefix, versioned_data_list.len() - 1); // except the master game + for versioned_data in versioned_data_list { if versioned_data.id == 0 { continue; } - println!("{:?}", versioned_data); let ef = EventFrame::LaunchSubGame { sub_game_init: Box::new(SubGameInit { spec: GameSpec { @@ -263,6 +259,10 @@ pub async fn resume_from_checkpoint( } } + if client_mode == ClientMode::Transactor { + game_context.dispatch_safe(Event::Ready, 0); + } + None } @@ -309,11 +309,13 @@ pub async fn handle_event( send_start_game(&game_context, ports).await; } - // Send the settlement when there's one - // This event will be sent no matter what client mode we are running at - if let Some(checkpoint) = checkpoint { + // Launch sub games + if game_mode == GameMode::Main { + launch_sub_game(launch_sub_games, game_context, ports, env).await; + } + + if checkpoint.is_some() { send_settlement( - checkpoint, transfers, settles, entry_lock, @@ -325,11 +327,6 @@ pub async fn handle_event( .await; } - // Launch sub games - if game_mode == GameMode::Main { - launch_sub_game(launch_sub_games, game_context, ports, env).await; - } - // Emit bridge events if client_mode == ClientMode::Transactor { send_bridge_event(bridge_events, &game_context, ports, env).await; @@ -340,6 +337,7 @@ pub async fn handle_event( log_execution_context(game_context, &event); match e { Error::WasmExecutionError(_) | Error::WasmMemoryOverflow => { + ports.send(EventFrame::Shutdown).await; return Some(CloseReason::Fault(e)) } _ => (), diff --git a/transactor/src/context.rs b/transactor/src/context.rs index bdd2700f..571a7bf9 100644 --- a/transactor/src/context.rs +++ b/transactor/src/context.rs @@ -120,18 +120,11 @@ impl ApplicationContext { join_handles.push(join_handle); } } - SignalFrame::LaunchSubGame { sub_game_init } => { - let bridge_parent = game_manager_1 - .get_event_parent(&sub_game_init.spec.game_addr) - .await - .expect( - format!("Bridge parent not found: {}", sub_game_init.spec.game_addr).as_str(), - ); - + SignalFrame::LaunchSubGame { sub_game_init, bridge_to_parent } => { if let Some(join_handle) = game_manager_1 .launch_sub_game( sub_game_init, - bridge_parent, + bridge_to_parent, &account_1, transport_1.clone(), encryptor_1.clone(), diff --git a/transactor/src/frame.rs b/transactor/src/frame.rs index d05e8d36..fdcc5bea 100644 --- a/transactor/src/frame.rs +++ b/transactor/src/frame.rs @@ -5,10 +5,15 @@ use race_core::{ checkpoint::{Checkpoint, VersionedData}, context::{GameContext, SubGameInit}, types::{PlayerDeposit, PlayerJoin, ServerJoin, Transfer, TxState, VoteType} }; -#[derive(Debug, Clone)] +use crate::component::BridgeToParent; + +#[derive(Debug)] pub enum SignalFrame { StartGame { game_addr: String }, - LaunchSubGame { sub_game_init: SubGameInit }, + LaunchSubGame { + sub_game_init: SubGameInit, + bridge_to_parent: BridgeToParent, + }, Shutdown, } @@ -101,12 +106,6 @@ pub enum EventFrame { sub_game_init: Box, }, - /// Resume a subgame from its checkpoint. - #[allow(unused)] - ResumeSubGame { - checkpoint: Checkpoint, - }, - /// Sync frame for subgames broadcasted from master game. SubSync { new_players: Vec, @@ -175,9 +174,6 @@ impl std::fmt::Display for EventFrame { EventFrame::SubSync { new_players, new_servers, .. } => { write!(f, "SyncNodes: new_players: {}, new_servers: {}", new_players.len(), new_servers.len()) }, - EventFrame::ResumeSubGame { .. } => { - write!(f, "ResumeSubGame") - }, EventFrame::SubGameReady { game_id, .. } => { write!(f, "SubGameReady, game_id: {}", game_id) } diff --git a/transactor/src/game_manager.rs b/transactor/src/game_manager.rs index 61ac2995..a7e9943b 100644 --- a/transactor/src/game_manager.rs +++ b/transactor/src/game_manager.rs @@ -7,12 +7,12 @@ use race_encryptor::Encryptor; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::{broadcast, Mutex, mpsc}; +use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::task::JoinHandle; use tracing::{error, info, warn}; use crate::blacklist::Blacklist; -use crate::component::{CloseReason, EventBridgeParent, WrappedStorage, WrappedTransport}; +use crate::component::{BridgeToParent, CloseReason, WrappedStorage, WrappedTransport}; use crate::frame::{EventFrame, SignalFrame}; use crate::handle::Handle; use crate::utils::current_timestamp; @@ -34,7 +34,7 @@ impl GameManager { pub async fn launch_sub_game( &self, sub_game_init: SubGameInit, - bridge_parent: EventBridgeParent, + bridge_to_parent: BridgeToParent, server_account: &ServerAccount, transport: Arc, encryptor: Arc, @@ -42,7 +42,7 @@ impl GameManager { ) -> Option> { let game_addr = sub_game_init.spec.game_addr.clone(); let game_id = sub_game_init.spec.game_id; - match Handle::try_new_sub_game_handle(sub_game_init, bridge_parent, server_account, encryptor, transport, debug_mode).await { + match Handle::try_new_sub_game_handle(sub_game_init, bridge_to_parent, server_account, encryptor, transport, debug_mode).await { Ok(mut handle) => { let mut games = self.games.lock().await; let addr = format!("{}:{}", game_addr, game_id); @@ -169,13 +169,6 @@ impl GameManager { Ok((receiver, histories)) } - pub async fn get_event_parent(&self, game_addr: &str) -> Result { - let games = self.games.lock().await; - let handle = games.get(game_addr).ok_or(Error::GameNotLoaded)?; - let bridge_parent = handle.event_parent_owned()?; - Ok(bridge_parent) - } - /// Shutdown all games, and drop their handles. pub async fn shutdown(&self) { let mut games = self.games.lock().await; diff --git a/transactor/src/handle.rs b/transactor/src/handle.rs index bc68c994..6f8d6df4 100644 --- a/transactor/src/handle.rs +++ b/transactor/src/handle.rs @@ -5,7 +5,7 @@ mod validator; use std::sync::Arc; use crate::component::{ - Broadcaster, CloseReason, EventBridgeParent, EventBus, WrappedStorage, WrappedTransport, + BridgeToParent, Broadcaster, CloseReason, EventBus, WrappedStorage, WrappedTransport }; use crate::frame::SignalFrame; use race_core::context::SubGameInit; @@ -105,7 +105,7 @@ impl Handle { pub async fn try_new_sub_game_handle( sub_game_init: SubGameInit, - bridge_parent: EventBridgeParent, + bridge_to_parent: BridgeToParent, server_account: &ServerAccount, encryptor: Arc, transport: Arc, @@ -113,7 +113,7 @@ impl Handle { ) -> Result { let handle = SubGameHandle::try_new( sub_game_init, - bridge_parent, + bridge_to_parent, server_account, encryptor, transport, @@ -131,14 +131,6 @@ impl Handle { } } - pub fn event_parent_owned(&self) -> Result { - match self { - Handle::Transactor(h) => Ok(h.bridge_parent.to_owned()), - Handle::Validator(h) => Ok(h.bridge_parent.to_owned()), - Handle::SubGame(_) => Err(Error::NotSupportedInSubGameMode), - } - } - pub fn event_bus(&self) -> &EventBus { match self { Handle::Transactor(h) => &h.event_bus, diff --git a/transactor/src/handle/subgame.rs b/transactor/src/handle/subgame.rs index f549f698..07b76e28 100644 --- a/transactor/src/handle/subgame.rs +++ b/transactor/src/handle/subgame.rs @@ -1,8 +1,7 @@ use std::sync::Arc; use crate::component::{ - Broadcaster, Component, EventBridgeChild, EventBridgeParent, EventBus, EventLoop, - LocalConnection, PortsHandle, WrappedClient, WrappedHandler, + BridgeToParent, Broadcaster, Component, EventBridgeChild, EventBus, EventLoop, LocalConnection, PortsHandle, WrappedClient, WrappedHandler }; use crate::frame::EventFrame; use race_core::error::{Error, Result}; @@ -23,7 +22,7 @@ pub struct SubGameHandle { impl SubGameHandle { pub async fn try_new( sub_game_init: SubGameInit, - bridge_parent: EventBridgeParent, + bridge_to_parent: BridgeToParent, server_account: &ServerAccount, encryptor: Arc, transport: Arc, @@ -50,7 +49,7 @@ impl SubGameHandle { let (broadcaster, broadcaster_ctx) = Broadcaster::init(addr.clone(), game_id); let mut broadcaster_handle = broadcaster.start(&addr, broadcaster_ctx); - let (bridge, bridge_ctx) = bridge_parent.derive_child(game_id.clone()); + let (bridge, bridge_ctx) = EventBridgeChild::init(game_id, bridge_to_parent); let mut bridge_handle = bridge.start(&addr, bridge_ctx); let (event_loop, event_loop_ctx) = diff --git a/transport/src/solana.rs b/transport/src/solana.rs index 6635d974..afe1c28f 100755 --- a/transport/src/solana.rs +++ b/transport/src/solana.rs @@ -916,9 +916,12 @@ impl TransportT for SolanaTransport { Ok(Box::pin(stream! { - let Ok(client) = PubsubClient::new(&ws_rpc).await else { - error!("Failed to create PubsubClient"); - return; + let client = match PubsubClient::new(&ws_rpc).await { + Ok(client) => client, + Err(e) => { + error!("Failed to create PubsubClient due to {:?}", e); + return; + } }; let Ok((mut stream, unsub)) = client