Skip to content

Commit

Permalink
Add save/load?
Browse files Browse the repository at this point in the history
  • Loading branch information
joeoneil committed Aug 24, 2023
1 parent 314d0d9 commit 96e0457
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 35 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,7 @@ HACKING/__pycache__/**
.env
onboard/.env
HACKING/.env

# test locations for save data
onboard/backend/.save
onboard/backend/src/.save
21 changes: 21 additions & 0 deletions onboard/backend/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@ use devcade_onboard_types::{
schema::{DevcadeGame, MinimalGame, Tag, User},
Map, Player, Value,
};
use lazy_static::lazy_static;
use log::{log, Level};

use std::ffi::OsStr;

use std::cell::Cell;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Stdio;
use std::sync::Mutex;
use std::time::Duration;
use tokio::process::Command;

lazy_static! {
static ref CURRENT_GAME: Mutex<Cell<DevcadeGame>> =
Mutex::new(Cell::new(DevcadeGame::default()));
}

/**
* Internal module for network requests and JSON serialization
*/
Expand Down Expand Up @@ -403,6 +411,15 @@ pub async fn launch_game(game_id: String) -> Result<(), Error> {
download_game(game_id.clone()).await?;
}

let game = game_from_path(
path.parent()
.unwrap()
.join("game.json")
.to_str()
.unwrap_or(""),
)?;
CURRENT_GAME.lock().unwrap().set(game);

// Infer executable name from *.runtimeconfig.json
let mut executable = String::new();

Expand Down Expand Up @@ -573,3 +590,7 @@ async fn game_from_minimal(game: MinimalGame) -> Result<DevcadeGame, Error> {
)
.await
}

pub fn current_game() -> DevcadeGame {
CURRENT_GAME.lock().unwrap().get_mut().clone()
}
17 changes: 16 additions & 1 deletion onboard/backend/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::api::nfc_user;
use crate::api::{self, nfc_user};

use crate::api::{
download_banner, download_game, download_icon, game_list, game_list_from_fs, launch_game,
nfc_tags, tag_games, tag_list, user,
};
use crate::servers;
use devcade_onboard_types::{RequestBody, ResponseBody};

/**
Expand Down Expand Up @@ -74,5 +75,19 @@ pub async fn handle(req: RequestBody) -> ResponseBody {
Ok(user) => ResponseBody::NfcUser(user),
Err(err) => err.into(),
},
RequestBody::Save(group, key, value) => {
let group = format!("{}/{}", api::current_game().id, group);
match servers::persistence::save(group.as_str(), key.as_str(), value.as_str()).await {
Ok(()) => ResponseBody::Ok,
Err(err) => err.into(),
}
}
RequestBody::Load(group, key) => {
let group = format!("{}/{}", api::current_game().id, group);
match servers::persistence::load(group.as_str(), key.as_str()).await {
Ok(s) => ResponseBody::Object(s),
Err(err) => err.into(),
}
}
}
}
8 changes: 4 additions & 4 deletions onboard/backend/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use backend::env::devcade_path;
use backend::servers::path::onboard_pipe;
use backend::servers::path::{onboard_pipe, persistence_pipe};
use backend::servers::ThreadHandles;
use log::{log, Level};
use tokio::fs;
Expand Down Expand Up @@ -27,7 +27,7 @@ async fn main() -> ! {

handles.restart_onboard(onboard_pipe());

// TODO Game Save / Load
handles.restart_persistence(persistence_pipe());

// TODO Gatekeeper / Authentication

Expand All @@ -39,9 +39,9 @@ async fn main() -> ! {
log!(Level::Error, "Onboard thread has panicked: {}", err);
handles.restart_onboard(onboard_pipe());
}
if let Some(err) = handles._game_error() {
if let Some(err) = handles.game_error() {
log!(Level::Error, "Game thread has panicked: {}", err);
// TODO Restart game thread (once implemented)
handles.restart_persistence(persistence_pipe());
}
if let Some(err) = handles._gatekeeper_error() {
log!(Level::Error, "Gatekeeper thread has panicked: {}", err);
Expand Down
30 changes: 27 additions & 3 deletions onboard/backend/src/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,27 @@ pub mod path {
pub fn onboard_pipe() -> String {
format!("{}/onboard.sock", devcade_path())
}

/**
* Get the path to the pipe that the persistence thread will write to
* */
#[must_use]
pub fn persistence_pipe() -> String {
format!("{}/persistence.sock", devcade_path())
}
}

/**
* The onboard server is responsible for communicating with the frontend
*/
pub mod onboard;

/**
* The persistence server is responsible for communicating with the running game and saving /
* loading persistent data.
* */
pub mod persistence;

/**
* A struct to hold the handles to the threads spawned by the backend.
*/
Expand Down Expand Up @@ -63,7 +77,7 @@ impl ThreadHandles {
}

/**
* Restart the onboard server thread with the given pipes
* Restart the onboard server thread with the given pipe
*/
pub fn restart_onboard(&mut self, command_pipe: String) {
log!(Level::Info, "Starting onboard thread ...");
Expand All @@ -72,6 +86,16 @@ impl ThreadHandles {
}));
}

/**
* Restart the save / load server thread with the given pipe
* */
pub fn restart_persistence(&mut self, command_pipe: String) {
log!(Level::Info, "Starting persistence thread ...");
self.game_sl = Some(tokio::spawn(async move {
persistence::main(command_pipe.as_str()).await;
}));
}

/**
* Check if the onboard server thread has errored and return the error if it has
*/
Expand All @@ -91,7 +115,7 @@ impl ThreadHandles {
/**
* Check if the game thread has errored and return the error if it has
*/
pub fn _game_error(&mut self) -> Option<JoinError> {
pub fn game_error(&mut self) -> Option<JoinError> {
let handle = self.game_sl.take();
if let Some(handle) = handle {
return if handle.is_finished() {
Expand Down Expand Up @@ -167,7 +191,7 @@ fn bind_listener(path: &str) -> Result<UnixListener, anyhow::Error> {
.stdout(Stdio::null())
.status()?;
if lsof.success() {
// lsof returns success if any process if using this file
// lsof returns success if any process is using this file
Err(anyhow!("Failed to bind listener to path {}: {}", path, e))
} else {
log::debug!("Socket was not closed correctly in last shutdown. Removing");
Expand Down
188 changes: 188 additions & 0 deletions onboard/backend/src/servers/persistence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use crate::command::handle;
use crate::servers::open_server;
use anyhow::anyhow;
use devcade_onboard_types::{Request, RequestBody, Response, ResponseBody};
use futures_util::future;
use lazy_static::lazy_static;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;

lazy_static! {
// basically just checks if a user 'devcade' exists. If so, assumes that this is running on the
// machine, and saves to the homedir. Otherwise, saves to the cwd.
static ref ON_MACHINE: bool = Path::new("/home/devcade").exists();
static ref DB: Mutex<HashMap<String, HashMap<String, String>>> = Mutex::new(HashMap::new());
static ref DB_MODIFIED: Mutex<HashSet<String>> = Mutex::new(HashSet::new());
}

pub async fn main(command_pipe: &str) -> ! {
log::info!("Starting save/load process");

open_server(command_pipe, async move |mut lines, writer| {
let writer = Arc::new(Mutex::new(writer));
let mut handles = vec![];
while let Some(line) = lines.next_line().await? {
let command: Request = serde_json::from_str(&line)?;

let writer = writer.clone();

handles.push(async move {
let body: ResponseBody = match &command.body {
RequestBody::Save(_, _, _) | RequestBody::Load(_, _) | RequestBody::Ping => {
handle(command.body).await
}
// Don't allow game save/load to (for example) download a game, launch a game,
// etc. If games could launch other games, it would update the 'current game' in
// crate::api and allow games to corrupt other games' save data (possibly
// maliciously!)
_ => anyhow!("Invalid command: {}", command).into(),
};
let response = Response {
request_id: command.request_id,
body,
};
log::debug!("Sending: {response}");
let mut response = serde_json::to_vec(&response)?;
response.push(b'\n');

let mut writer = writer.lock().await;
writer.write_all(&response).await?;
Ok(()) as Result<(), anyhow::Error>
});
}

future::join_all(handles).await;
Ok(())
})
.await
}

// currently saves to the devcade machine (or local machine if running locally) in the future,
// should ideally use a remote database / something else.
pub async fn save(group: &str, key: &str, value: &str) -> Result<(), anyhow::Error> {
log::trace!("saving data to {}/{} ({})", group, key, value);
let (path, group) = from_group(group);
let full_key = format!("{}/{}", path, group);

let mut data = DB.lock().await;
let mut mod_list = DB_MODIFIED.lock().await;

let inner = get_submap_or_load(&mut data, full_key.clone()).await?;

inner.insert(key.to_string(), value.to_string());
mod_list.insert(full_key);

Ok(())
}

/**
* Load a value from using a group and key
* group will start with a game_id, but can be further subdivided by the game to
* */
pub async fn load(group: &str, key: &str) -> Result<String, anyhow::Error> {
log::trace!("loading data from {}/{}", group, key);
let (path, file_name) = from_group(group);
let full_key = format!("{}/{}", path, file_name);

let mut data = DB.lock().await;

let inner = get_submap_or_load(&mut data, full_key.clone()).await?;

inner
.get(&key.to_string())
.ok_or_else(|| anyhow!("Could not find key {} in group {}", key, full_key))
.cloned()
}

/**
* Flush all pending writes to the filesystem.
* */
pub async fn flush() -> Result<(), anyhow::Error> {
let mut data = DB.lock().await;
let mut mod_list = DB_MODIFIED.lock().await;

log::debug!(
"Flushing data in db to file ({} modified groups)",
mod_list.len()
);

for key in mod_list.iter() {
let inner = get_submap_or_load(&mut data, key.clone()).await?;
let file_name = format!("{}.save", key);
log::debug!("Flushing to {}", file_name);
let path = Path::new(&file_name);
let dir = path.parent().expect("path failed to have parents");
if !dir.exists() {
fs::create_dir_all(dir).await?;
}
fs::write(path, serde_json::to_string(inner)?.as_bytes()).await?;
}

mod_list.clear();

Ok(())
}

/**
* Flushes all DB changes, and clears the in-memory cache. This shouldn't need to be done often but
* can be done if some games are storing too much data and we need to save memory. I don't see this
* actually needing use unless someone is maliciously (or stupidly) trying to store GBs of data at
* a time.
* */
pub async fn clear_db() -> Result<(), anyhow::Error> {
log::info!("Flushing and clearing DB cache");
flush().await?;

let mut data = DB.lock().await;
data.clear();

Ok(())
}

fn from_group(group: &str) -> (String, String) {
let save_path = Path::new(if *ON_MACHINE {
"/home/devcade/.save"
} else {
"./.save"
});

let mut parts: Vec<String> = group.split("/").map(|a| a.to_string()).collect();
let group = parts.pop().unwrap_or(String::new());
let save_path = save_path.join(parts.join("/"));
(save_path.to_str().unwrap_or("").to_string(), group)
}

/**
* Gets the sub-map at a specified path, and returns the cached version, the version on the
* filesystem, or a new empty HashMap, in order of preference.
* */
async fn get_submap_or_load(
db: &mut HashMap<String, HashMap<String, String>>,
group: String,
) -> Result<&mut HashMap<String, String>, anyhow::Error> {
let file_name = format!("{}.save", group);
if !db.contains_key(&group) {
if Path::new(&file_name).exists() {
let map = serde_json::from_str::<HashMap<String, String>>(
fs::read_to_string(file_name).await?.as_str(),
)?;
db.insert(group.clone(), map);
} else {
db.insert(group.clone(), HashMap::new());
}
}
Ok(db.get_mut(&group).unwrap())
}

/**
* Gets the total number of K, V pairs across the entire cache, as a rough proxy for how large the
* current cache is.
* */
pub async fn db_cache_size() -> usize {
let data = DB.lock().await;
data.values().map(|hm| hm.len()).sum()
}
Loading

0 comments on commit 96e0457

Please sign in to comment.