Skip to content

Commit

Permalink
Implement volume initialization tailored for libraries, optimize volu…
Browse files Browse the repository at this point in the history
…me scanning on startup, and enhance state management
  • Loading branch information
jamiepine committed Nov 3, 2024
1 parent 8db67dc commit bd00623
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 57 deletions.
90 changes: 39 additions & 51 deletions core/src/volume/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use super::{
watcher::{VolumeWatcher, WatcherState},
VolumeManagerContext, VolumeManagerState,
};
use crate::library::{Library, LibraryManagerEvent};
use crate::{
library::{Library, LibraryManagerEvent},
volume::MountType,
};
use async_channel as chan;
use sd_prisma::prisma::album::pub_id;
use sd_prisma::prisma::{album::pub_id, volume};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{broadcast, oneshot, Mutex, RwLock};
use tokio::time::Instant;
Expand Down Expand Up @@ -184,6 +187,9 @@ impl VolumeManagerActor {
actor.ctx.library_event_tx.clone()
};

// Scan for volumes on startup
let _ = self_arc.lock().await.scan_volumes().await;

// This is a fire-and-forget subscription
tokio::spawn(async move {
if let Err(e) = rx
Expand All @@ -194,7 +200,8 @@ impl VolumeManagerActor {
LibraryManagerEvent::Load(library) => {
if let Err(e) = {
let mut actor = self_arc_inner.lock().await;
actor.initialize(library.clone()).await
// TODO: check if active library somehow, as we don't care to sync volumes for inactive libraries
actor.initialize_for_library(library.clone()).await
} {
error!(?e, "Failed to initialize volume manager for library");
}
Expand Down Expand Up @@ -231,68 +238,49 @@ impl VolumeManagerActor {
info!("Volume manager actor initialized");
}

pub async fn initialize(&mut self, library: Arc<Library>) -> Result<(), VolumeError> {
/// Syncs volume memory state with library database
pub async fn initialize_for_library(
&mut self,
library: Arc<Library>,
) -> Result<(), VolumeError> {
use sd_prisma::prisma::device;
// Use device_id from context instead of node
let device_pub_id = self.ctx.device_id.clone();
let current_volumes = self.get_volumes().await;

// Scan for system volumes first
{
let mut state = self.state.write().await;
state.scan_volumes(device_pub_id.clone()).await?;
}

// Get volumes from library database
let db_volumes = library
let db_device = library
.db
.volume()
.find_many(vec![])
.device()
.find_unique(device::pub_id::equals(device_pub_id.clone()))
.exec()
.await?
.into_iter()
.map(Volume::from);

// Get current volumes and clone what we need
let current_volumes = {
let state = self.state.read().await;
state.volumes.clone()
};
.ok_or(VolumeError::DeviceNotFound(device_pub_id.clone()))?;

let mut updates = Vec::new();
let db_system_volumes = library
.db
.volume()
.find_many(vec![
volume::device_id::equals(Some(db_device.id)),
volume::mount_type::equals(Some(MountType::System.to_string())),
])
.exec()
.await?;

// Prepare updates
for db_volume in db_volumes {
let fingerprint = db_volume.generate_fingerprint(device_pub_id.clone().into());
let db_system_volumes = db_system_volumes.into_iter().map(Volume::from);

if let Some(system_volume) = current_volumes
.values()
.find(|v| v.generate_fingerprint(device_pub_id.clone().into()) == fingerprint)
{
let merged = Volume::merge_with_db_volume(system_volume, &db_volume);
if let Some(pub_id) = &merged.pub_id {
updates.push((pub_id.clone(), merged.clone()));
let _ = self.event_tx.send(VolumeEvent::VolumeUpdated {
old: system_volume.clone(),
new: merged,
});
// Register system volumes in the db
if db_system_volumes.len() == 0 {
for v in current_volumes.iter() {
if v.mount_type == MountType::System {
// Create is will always treat the volume as a new volume
// Assigning a new pub_id in the process
v.create(&library.db, device_pub_id.clone()).await?;
}
} else if let Some(pub_id) = &db_volume.pub_id {
updates.push((pub_id.clone(), db_volume.clone()));
let _ = self.event_tx.send(VolumeEvent::VolumeAdded(db_volume));
}
}

// Apply updates and initialize watchers
{
let mut state = self.state.write().await;

// Update volumes
for (pub_id, volume) in updates {
state.volumes.insert(pub_id.clone(), volume);
}
}

info!(
"Volume manager initialized: {:?}",
"Volume manager initialized for library: {:?}",
self.state.read().await.volumes
);

Expand Down
1 change: 0 additions & 1 deletion core/src/volume/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ impl VolumeManagerState {
let detected_volumes = super::os::get_volumes().await?;
debug!("Found {} volumes during scan", detected_volumes.len());

let current_volumes = self.volumes.clone(); // Copy of current state
let mut new_state = HashMap::new(); // New state to build with detected volumes

// Process each detected volume
Expand Down
18 changes: 13 additions & 5 deletions core/src/volume/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use sd_prisma::prisma::{
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use specta::Type;
use std::path::Path;
use std::path::PathBuf;
use std::{path::Path, sync::Arc};
use strum_macros::Display;
use uuid::Uuid;

Expand Down Expand Up @@ -79,14 +79,22 @@ pub struct Volume {
pub total_bytes_available: u64,
}

// We can use this to see if a volume has changed
impl PartialEq for Volume {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.disk_type == other.disk_type
&& self.file_system == other.file_system
// Check if any mount points overlap
&& (self.mount_points.iter().any(|mp| other.mount_points.contains(mp))
|| other.mount_points.iter().any(|mp| self.mount_points.contains(mp)))
&& self.mount_type == other.mount_type
&& self.mount_point == other.mount_point
// Check if any mount points overlap
&& (self.mount_points.iter().any(|mp| other.mount_points.contains(mp))
|| other.mount_points.iter().any(|mp| self.mount_points.contains(mp)))
&& self.is_mounted == other.is_mounted
&& self.read_only == other.read_only
&& self.error_status == other.error_status
&& self.total_bytes_capacity == other.total_bytes_capacity
&& self.total_bytes_available == other.total_bytes_available
}
}

Expand Down Expand Up @@ -245,7 +253,7 @@ impl Volume {
/// Creates a new volume record in the database
pub async fn create(
&self,
db: &PrismaClient,
db: &Arc<PrismaClient>,
device_pub_id: Vec<u8>,
) -> Result<(), VolumeError> {
let pub_id = Uuid::now_v7().as_bytes().to_vec();
Expand Down

0 comments on commit bd00623

Please sign in to comment.