Skip to content

Commit

Permalink
Let broadcaster don't return empty frames
Browse files Browse the repository at this point in the history
  • Loading branch information
DogLooksGood committed Nov 29, 2024
1 parent 11da584 commit 4aa15c0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 28 deletions.
61 changes: 34 additions & 27 deletions transactor/src/component/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct Broadcaster {
}

impl Broadcaster {
pub fn init(id: String, game_id: usize,) -> (Self, BroadcasterContext) {
pub fn init(id: String, game_id: usize) -> (Self, BroadcasterContext) {
let event_backup_groups = Arc::new(Mutex::new(LinkedList::new()));
let (broadcast_tx, broadcast_rx) = broadcast::channel(10);
drop(broadcast_rx);
Expand Down Expand Up @@ -89,44 +89,26 @@ impl Broadcaster {
}

warn!("Missing the checkpoint for settle_version = {}, the client won't be able to join this game.", settle_version);
let available_settle_versions: Vec<u64> = event_backup_groups.iter().map(|g| g.settle_version).collect();
let available_settle_versions: Vec<u64> = event_backup_groups
.iter()
.map(|g| g.settle_version)
.collect();
warn!("Available versions are: {:?}", available_settle_versions);
None
}


/// Retrieve a list of event histories with a given
/// `settle_version`. All events happened after the
/// `settle_version` will be returned. If a zero `settle_version`
/// is provided, just return the events after the latest
/// checkpoint.
pub async fn retrieve_histories(&self, settle_version: u64) -> Vec<BroadcastFrame> {

let mut frames: Vec<BroadcastFrame> = Vec::new();
let event_backup_groups = self.event_backup_groups.lock().await;

if settle_version == 0 {
if let Some(group) = event_backup_groups.iter().last() {
let mut histories: Vec<EventHistory> = Vec::new();
frames.push(BroadcastFrame::Sync {
sync: group.sync.clone()
});
for event in group.events.iter() {
histories.push(EventHistory {
event: event.event.clone(),
timestamp: event.timestamp,
state_sha: event.state_sha.clone(),
});
}
frames.push(BroadcastFrame::EventHistories {
game_addr: self.id.clone(),
checkpoint_off_chain: group.checkpoint_off_chain.clone(),
histories,
state_sha: group.state_sha.clone(),
settle_version: group.settle_version,
})
}
} else {
// By default, returns the histories with settle_version
// greater than the given one
if settle_version > 0 {
for group in event_backup_groups.iter() {
if group.settle_version >= settle_version {
let mut histories: Vec<EventHistory> = Vec::new();
Expand All @@ -152,6 +134,31 @@ impl Broadcaster {
}
}

// Return the latest one if the frames are still empty
if frames.is_empty() {
if let Some(group) = event_backup_groups.iter().last() {
let mut histories: Vec<EventHistory> = Vec::new();
frames.push(BroadcastFrame::Sync {
sync: group.sync.clone(),
});
for event in group.events.iter() {
histories.push(EventHistory {
event: event.event.clone(),
timestamp: event.timestamp,
state_sha: event.state_sha.clone(),
});
}
frames.push(BroadcastFrame::EventHistories {
game_addr: self.id.clone(),
checkpoint_off_chain: group.checkpoint_off_chain.clone(),
histories,
state_sha: group.state_sha.clone(),
settle_version: group.settle_version,
})
}
} else {
}

frames
}
}
Expand Down Expand Up @@ -390,7 +397,7 @@ mod tests {
access_version: 10,
verify_key: "alice".into(),
}
.into()],
.into()],
access_version: 10,
};
let event_frame = EventFrame::TxState {
Expand Down
1 change: 0 additions & 1 deletion transport/src/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,6 @@ impl SolanaTransport {
}

/// Get the state of an on-chain game account by its public key
/// It queries the chain according to different modes
/// Not for public API usage
async fn internal_get_game_state(
&self,
Expand Down

0 comments on commit 4aa15c0

Please sign in to comment.