Skip to content

Commit

Permalink
Implement .sdvolume file read/write operations. Enhance storage bar r…
Browse files Browse the repository at this point in the history
…esponsiveness.
  • Loading branch information
jamiepine committed Nov 6, 2024
1 parent 167170d commit e1a0988
Show file tree
Hide file tree
Showing 17 changed files with 402 additions and 81 deletions.
1 change: 1 addition & 0 deletions .cspell/project_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ rspcws
rywalker
sanjay
sdvol
sdvolume
sharma
skippable
spacedrive
Expand Down
2 changes: 2 additions & 0 deletions core/src/location/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub enum LocationError {
InvalidScanStateValue(i32),
#[error(transparent)]
Sync(#[from] sd_core_sync::Error),
#[error("other error: {0}")]
Other(String),
}

impl From<LocationError> for rspc::Error {
Expand Down
32 changes: 32 additions & 0 deletions core/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl LocationCreateArgs {
let uuid = Uuid::now_v7();

let location = create_location(
node,
library,
uuid,
&self.path,
Expand Down Expand Up @@ -249,6 +250,7 @@ impl LocationCreateArgs {
let uuid = Uuid::now_v7();

let location = create_location(
node,
library,
uuid,
&self.path,
Expand Down Expand Up @@ -705,6 +707,7 @@ pub(crate) fn normalize_path(path: impl AsRef<Path>) -> io::Result<(String, Stri
}

async fn create_location(
_node: &Node,
library @ Library { db, sync, .. }: &Library,
location_pub_id: Uuid,
location_path: impl AsRef<Path>,
Expand Down Expand Up @@ -733,6 +736,35 @@ async fn create_location(
return Ok(None);
}

// let library_arc = Arc::new(*library);
// // Track the volume before creating the location
// // Get the volume fingerprint for the location path
// let system_volumes = node
// .volumes
// .list_system_volumes(library_arc)
// .await
// .map_err(|e| {
// warn!("Failed to list system volumes: {}", e);
// LocationError::Other(e.to_string())
// })?;

// for volume in system_volumes {
// if let Some(mount_point) = volume.mount_point.as_ref() {
// if location_path.starts_with(mount_point) {
// // Track this volume since we're creating a location on it
// if let Err(e) = node
// .volumes
// .track_volume(volume.fingerprint, library.clone())
// .await
// {
// warn!("Failed to track volume for new location: {}", e);
// // Continue with location creation even if volume tracking fails
// }
// break;
// }
// }
// }

let (sync_values, mut db_params) = [
sync_db_entry!(&name, location::name),
sync_db_entry!(path, location::path),
Expand Down
36 changes: 32 additions & 4 deletions core/src/volume/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,17 +450,45 @@ impl VolumeManagerActor {
let state = self.state.write().await;
let device_pub_id = self.ctx.device_id.clone();

// Find the volume in our current system volumes
let mut registry = state.registry.write().await;
let mut volume = match registry.get_volume_mut(&fingerprint) {
Some(v) => v.clone(),
None => return Err(VolumeError::InvalidFingerprint(fingerprint.clone())),
};

// Create in database with current device association
volume.create(&library.db, device_pub_id.into()).await?;
// Check for existing .sdvolume file
if let Some(volume_file) = volume.read_volume_file().await? {
// If pub_id exists in database, use that volume record
if let Some(existing_volume) = library
.db
.volume()
.find_unique(volume::pub_id::equals(volume_file.pub_id.clone()))
.exec()
.await?
.map(Volume::from)
{
// Update volume with existing data
volume = Volume::merge_with_db(&volume, &existing_volume);
registry.update_volume(volume.clone());
}
}

// Create or update in database with sync
if volume.pub_id.is_none() {
volume = volume
.sync_db_create(&library, device_pub_id.into())
.await?;
} else {
volume.sync_db_update(&library).await?;
}

// Write .sdvolume file
volume.write_volume_file().await?;

// Update registry with final state
registry.update_volume(volume.clone());

// Spawn a background task to perform the speed test
// Spawn speed test
let event_tx = self.event_tx.clone();
let mut volume = volume.clone();
tokio::spawn(async move {
Expand Down
19 changes: 19 additions & 0 deletions core/src/volume/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ pub enum VolumeError {
/// Resource exhausted
#[error("Resource exhausted: {0}")]
ResourceExhausted(String),

/// Volume is not tracked
#[error("Volume is not tracked")]
NotTracked,

/// Volume fingerprint is missing
#[error("Volume fingerprint is missing")]
MissingFingerprint,

/// IO error
#[error("IO error: {0}")]
IoError(std::io::Error),

/// Serialization error
#[error("Serialization error: {0}")]
SerializationError(serde_json::Error),

#[error(transparent)]
Sync(#[from] sd_core_sync::Error),
}

/// Specific kinds of speed test errors
Expand Down
216 changes: 210 additions & 6 deletions core/src/volume/types.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use super::error::VolumeError;
use crate::volume::speed::SpeedTest;
use crate::library::Library;
use sd_core_sync::DevicePubId;
use sd_prisma::prisma::{
device,
volume::{self},
PrismaClient,
use sd_prisma::{
prisma::{
device,
volume::{self},
PrismaClient,
},
prisma_sync,
};
use sd_sync::*;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use specta::Type;
Expand All @@ -29,7 +33,7 @@ impl VolumeFingerprint {
hasher.update(&volume.total_bytes_capacity.to_be_bytes());
hasher.update(volume.file_system.to_string().as_bytes());
// These are all properties that are unique to a volume and unlikely to change
// If a .spacedrive file is found in the volume, and is fingerprint does not match,
// If a `.sdvolume` file is found in the volume, and is fingerprint does not match,
// but the `pub_id` is the same, we can update the values and regenerate the fingerprint
// preserving the tracked instance of the volume
Self(hasher.finalize().as_bytes().to_vec())
Expand Down Expand Up @@ -348,6 +352,199 @@ impl Volume {
.await?;
Ok(())
}

/// Writes the .sdvolume file to the volume's root
pub async fn write_volume_file(&self) -> Result<(), VolumeError> {
if !self.is_mounted || self.read_only {
return Ok(()); // Skip if volume isn't mounted or is read-only
}

let fingerprint = self
.fingerprint
.as_ref()
.ok_or(VolumeError::MissingFingerprint)?;
let pub_id = self.pub_id.as_ref().ok_or(VolumeError::NotTracked)?;

let volume_file = SdVolumeFile {
pub_id: pub_id.clone(),
fingerprint: fingerprint.to_string(),
last_seen: chrono::Utc::now(),
};

let path = self.mount_point.join(".sdvolume");
let file = tokio::fs::File::create(&path)
.await
.map_err(|e| VolumeError::IoError(e))?;

serde_json::to_writer(file.into_std().await, &volume_file)
.map_err(|e| VolumeError::SerializationError(e))?;

Ok(())
}

/// Reads the .sdvolume file from the volume's root if it exists
pub async fn read_volume_file(&self) -> Result<Option<SdVolumeFile>, VolumeError> {
if !self.is_mounted {
return Ok(None);
}

let path = self.mount_point.join(".sdvolume");
if !path.exists() {
return Ok(None);
}

let file = tokio::fs::File::open(&path)
.await
.map_err(|e| VolumeError::IoError(e))?;

let volume_file = serde_json::from_reader(file.into_std().await)
.map_err(|e| VolumeError::SerializationError(e))?;

Ok(Some(volume_file))
}

pub async fn sync_db_create(
&self,
library: &Library,
device_pub_id: Vec<u8>,
) -> Result<Volume, VolumeError> {
let Library { db, sync, .. } = library;
let pub_id = Uuid::now_v7().as_bytes().to_vec();

let device_id = db
.device()
.find_unique(device::pub_id::equals(device_pub_id.clone()))
.select(device::select!({ id }))
.exec()
.await?
.ok_or(VolumeError::DeviceNotFound(device_pub_id))?
.id;

let (sync_params, db_params) = [
sync_db_entry!(self.name.clone(), volume::name),
sync_db_entry!(
self.mount_point.to_str().unwrap_or_default().to_string(),
volume::mount_point
),
sync_db_entry!(self.mount_type.to_string(), volume::mount_type),
sync_db_entry!(
self.total_bytes_capacity.to_string(),
volume::total_bytes_capacity
),
sync_db_entry!(
self.total_bytes_available.to_string(),
volume::total_bytes_available
),
sync_db_entry!(self.disk_type.to_string(), volume::disk_type),
sync_db_entry!(self.file_system.to_string(), volume::file_system),
sync_db_entry!(self.is_mounted, volume::is_mounted),
sync_db_entry!(
self.read_speed_mbps.unwrap_or(0) as i64,
volume::read_speed_mbps
),
sync_db_entry!(
self.write_speed_mbps.unwrap_or(0) as i64,
volume::write_speed_mbps
),
sync_db_entry!(self.read_only, volume::read_only),
sync_db_entry!(
self.error_status.clone().unwrap_or_default(),
volume::error_status
),
]
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();

// Add device connection to db_params
let mut db_params = db_params;
db_params.push(volume::device::connect(device::id::equals(device_id)));

let volume = sync
.write_op(
db,
sync.shared_create(
prisma_sync::volume::SyncId {
pub_id: pub_id.clone(),
},
sync_params,
),
db.volume().create(pub_id, db_params),
)
.await?;

Ok(volume.into())
}

pub async fn sync_db_update(&self, library: &Library) -> Result<(), VolumeError> {
let Library { db, sync, .. } = library;
let pub_id = self.pub_id.as_ref().ok_or(VolumeError::NotTracked)?;

let (sync_params, db_params) = [
sync_db_entry!(self.name.clone(), volume::name),
sync_db_entry!(
self.mount_point.to_str().unwrap_or_default().to_string(),
volume::mount_point
),
sync_db_entry!(self.mount_type.to_string(), volume::mount_type),
sync_db_entry!(
self.total_bytes_capacity.to_string(),
volume::total_bytes_capacity
),
sync_db_entry!(
self.total_bytes_available.to_string(),
volume::total_bytes_available
),
sync_db_entry!(self.disk_type.to_string(), volume::disk_type),
sync_db_entry!(self.file_system.to_string(), volume::file_system),
sync_db_entry!(self.is_mounted, volume::is_mounted),
sync_db_entry!(
self.read_speed_mbps.unwrap_or(0) as i64,
volume::read_speed_mbps
),
sync_db_entry!(
self.write_speed_mbps.unwrap_or(0) as i64,
volume::write_speed_mbps
),
sync_db_entry!(self.read_only, volume::read_only),
sync_db_entry!(
self.error_status.clone().unwrap_or_default(),
volume::error_status
),
]
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();

sync.write_op(
db,
sync.shared_update(
prisma_sync::volume::SyncId {
pub_id: pub_id.clone(),
},
sync_params,
),
db.volume()
.update(volume::pub_id::equals(pub_id.clone()), db_params),
)
.await?;

Ok(())
}

pub async fn sync_db_delete(&self, library: &Library) -> Result<(), VolumeError> {
let Library { db, sync, .. } = library;
let pub_id = self.pub_id.as_ref().ok_or(VolumeError::NotTracked)?;

sync.write_op(
db,
sync.shared_delete(prisma_sync::volume::SyncId {
pub_id: pub_id.clone(),
}),
db.volume().delete(volume::pub_id::equals(pub_id.clone())),
)
.await?;

Ok(())
}
}

/// Represents the type of physical storage device
Expand Down Expand Up @@ -472,3 +669,10 @@ impl<'de> Deserialize<'de> for VolumeFingerprint {
.map_err(serde::de::Error::custom)
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SdVolumeFile {
pub pub_id: Vec<u8>,
pub fingerprint: String, // Store as hex string
pub last_seen: chrono::DateTime<chrono::Utc>,
}
Loading

0 comments on commit e1a0988

Please sign in to comment.