diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3389c8097b6c..570dd56cf660 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,12 +47,15 @@ jobs: _files="$(git diff --name-only --cached)" if [ -n "$_files" ]; then - echo "Some files are not correctly formatted. Please run 'pnpm autoformat' and commit the changes." >&2 + printf "\e[31mThe following files are not correctly formatted:\e[0m\n" >&2 while IFS= read -r _file || [ -n "$_file" ]; do echo "::error file=${_file},title=Incorrectly formatted file::Please run 'pnpm autoformat' and commit the changes." + echo " - ${_file}" done < <(printf '%s' "$_files") + printf "\e[31mPlease run 'pnpm autoformat' and commit the changes.\e[0m\n" >&2 + exit 1 fi diff --git a/apps/landing/src/app/page.tsx b/apps/landing/src/app/page.tsx index 55a2c9737d37..2192f0fe8668 100644 --- a/apps/landing/src/app/page.tsx +++ b/apps/landing/src/app/page.tsx @@ -64,10 +64,7 @@ export default async function Page() { .join(' ')} />
-
+
String { - match (is_dir, extension) { - (false, extension) if !extension.is_empty() => { - format!("{}{}.{}", &materialized_path[1..], name, extension) - } - (_, _) => format!("{}{}", &materialized_path[1..], name), + if !is_dir && !extension.is_empty() { + format!("{}{}.{}", &materialized_path[1..], name, extension) + } else { + format!("{}{}", &materialized_path[1..], name) } } -#[allow(clippy::missing_panics_doc)] // Don't actually panic as we check before `expect` pub fn join_location_relative_path( location_path: impl AsRef, relative_path: impl AsRef, ) -> PathBuf { let relative_path = relative_path.as_ref(); + let relative_path = relative_path + .strip_prefix(MAIN_SEPARATOR_STR) + .unwrap_or(relative_path); - location_path - .as_ref() - .join(if relative_path.starts_with(MAIN_SEPARATOR_STR) { - relative_path - .strip_prefix(MAIN_SEPARATOR_STR) - .expect("just checked") - } else { - relative_path - }) + location_path.as_ref().join(relative_path) } -#[allow(clippy::missing_panics_doc)] // Don't actually panic as we check before `expect` pub fn push_location_relative_path( mut location_path: PathBuf, relative_path: impl AsRef, ) -> PathBuf { let relative_path = relative_path.as_ref(); - location_path.push(if relative_path.starts_with(MAIN_SEPARATOR_STR) { - relative_path - .strip_prefix(MAIN_SEPARATOR_STR) - .expect("just checked") - } else { - relative_path - }); + let relative_path = relative_path + .strip_prefix(MAIN_SEPARATOR_STR) + .unwrap_or(relative_path); + location_path.push(relative_path); location_path } diff --git a/core/crates/heavy-lifting/src/job_system/report.rs b/core/crates/heavy-lifting/src/job_system/report.rs index ff2e1149642b..06cba2af9ba0 100644 --- a/core/crates/heavy-lifting/src/job_system/report.rs +++ b/core/crates/heavy-lifting/src/job_system/report.rs @@ -144,6 +144,7 @@ pub struct Report { pub status: Status, pub task_count: i32, pub completed_task_count: i32, + pub info: String, pub phase: String, pub message: String, @@ -177,6 +178,7 @@ impl TryFrom for Report { metadata, parent_id, task_count, + info, completed_task_count, date_estimated_completion, date_created, @@ -208,6 +210,7 @@ impl TryFrom for Report { .expect("corrupted database"), task_count: task_count.unwrap_or(0), completed_task_count: completed_task_count.unwrap_or(0), + info: info.unwrap_or_default(), phase: String::new(), message: String::new(), estimated_completion: date_estimated_completion.map_or_else(Utc::now, DateTime::into), @@ -232,6 +235,7 @@ impl Report { metadata: vec![], parent_id: None, completed_task_count: 0, + info: String::new(), phase: String::new(), message: String::new(), estimated_completion: Utc::now(), @@ -278,6 +282,7 @@ impl Report { job::status::set(Some(self.status as i32)), job::date_started::set(self.started_at.map(Into::into)), job::task_count::set(Some(0)), + job::info::set(Some(self.info.clone())), job::completed_task_count::set(Some(0)), ], [self @@ -307,6 +312,7 @@ impl Report { )?)), job::metadata::set(Some(serde_json::to_vec(&self.metadata)?)), job::task_count::set(Some(self.task_count)), + job::info::set(Some(self.info.clone())), job::completed_task_count::set(Some(self.completed_task_count)), job::date_started::set(self.started_at.map(Into::into)), job::date_completed::set(self.completed_at.map(Into::into)), @@ -388,6 +394,7 @@ impl ReportBuilder { metadata: self.metadata, parent_id: self.parent_id, completed_task_count: 0, + info: String::new(), phase: String::new(), message: String::new(), estimated_completion: Utc::now(), diff --git a/core/crates/prisma-helpers/src/lib.rs b/core/crates/prisma-helpers/src/lib.rs index 958bb770324a..2d5abddd9a02 100644 --- a/core/crates/prisma-helpers/src/lib.rs +++ b/core/crates/prisma-helpers/src/lib.rs @@ -235,6 +235,7 @@ job::select!(job_without_data { date_started date_completed task_count + info completed_task_count date_estimated_completion }); diff --git a/core/prisma/migrations/20240710015044_add_info_colum_to_job/migration.sql b/core/prisma/migrations/20240710015044_add_info_colum_to_job/migration.sql new file mode 100644 index 000000000000..2b5f53d6b7f6 --- /dev/null +++ b/core/prisma/migrations/20240710015044_add_info_colum_to_job/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "job" ADD COLUMN "info" TEXT; diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index dbba6634dce4..62da3559df28 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -605,6 +605,7 @@ model Job { task_count Int? completed_task_count Int? + info String? date_estimated_completion DateTime? // Estimated timestamp that the job will be complete at date_created DateTime? diff --git a/core/src/context.rs b/core/src/context.rs index f47ff5aaeb74..217acd54b75b 100644 --- a/core/src/context.rs +++ b/core/src/context.rs @@ -209,6 +209,7 @@ impl sd_core_heavy_lifting::JobContext< library_id: library.id, task_count: report.task_count, completed_task_count: report.completed_task_count, + info: report.info.clone(), estimated_completion: report.estimated_completion, phase: report.phase.clone(), message: report.message.clone(), diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index c8a3cc2b2bd4..e6380dd5ee71 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -339,47 +339,43 @@ async fn inner_create_file( .exec() .await?; - let ( - object_ids::Data { - id: object_id, - pub_id: object_pub_id, - }, - is_new, - ) = if let Some(object) = existing_object { - (object, false) + let is_new_file = existing_object.is_none(); + + let object_ids::Data { + id: object_id, + pub_id: object_pub_id, + } = if let Some(object) = existing_object { + object } else { let pub_id: ObjectPubId = ObjectPubId::new(); let date_created: DateTime = DateTime::::from(fs_metadata.created_or_now()).into(); let int_kind = kind as i32; - ( - sync.write_ops( - db, - ( - sync.shared_create( - prisma_sync::object::SyncId { - pub_id: pub_id.to_db(), - }, - [ - (object::date_created::NAME, msgpack!(date_created)), - (object::kind::NAME, msgpack!(int_kind)), - ], - ), - db.object() - .create( - pub_id.into(), - vec![ - object::date_created::set(Some(date_created)), - object::kind::set(Some(int_kind)), - ], - ) - .select(object_ids::select()), + sync.write_ops( + db, + ( + sync.shared_create( + prisma_sync::object::SyncId { + pub_id: pub_id.to_db(), + }, + [ + (object::date_created::NAME, msgpack!(date_created)), + (object::kind::NAME, msgpack!(int_kind)), + ], ), - ) - .await?, - true, + db.object() + .create( + pub_id.into(), + vec![ + object::date_created::set(Some(date_created)), + object::kind::set(Some(int_kind)), + ], + ) + .select(object_ids::select()), + ), ) + .await? }; sync.write_op( @@ -403,7 +399,7 @@ async fn inner_create_file( .await?; // If the file is a duplicate of an existing file, we don't need to generate thumbnails nor extract media data - if is_new + if is_new_file && !extension.is_empty() && matches!( kind, diff --git a/core/src/object/fs/mod.rs b/core/src/object/fs/mod.rs index ba2d68916361..46b6afa67961 100644 --- a/core/src/object/fs/mod.rs +++ b/core/src/object/fs/mod.rs @@ -8,6 +8,7 @@ use sd_utils::{ db::maybe_missing, error::{FileIOError, NonUtf8PathError}, }; +use tracing::trace; use std::{ ffi::OsStr, @@ -43,12 +44,13 @@ pub enum ObjectType { Directory, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct FileData { pub file_path: file_path_with_object::Data, pub full_path: PathBuf, } +/// Get the [`FileData`] related to every `file_path_id` pub async fn get_many_files_datas( db: &PrismaClient, location_path: impl AsRef, @@ -81,7 +83,7 @@ pub async fn get_many_files_datas( }) }) }) - .collect::, _>>() + .collect() } pub async fn get_file_data_from_isolated_file_path( @@ -207,6 +209,7 @@ pub async fn find_available_filename_for_duplicate( continue; } Err(e) if e.kind() == io::ErrorKind::NotFound => { + trace!(old_name=?target_path, new_name=?new_file_full_path_candidate, "duplicated file name, file renamed"); return Ok(new_file_full_path_candidate); } Err(e) => return Err(FileIOError::from((new_file_full_path_candidate, e)).into()), diff --git a/core/src/object/fs/old_copy.rs b/core/src/object/fs/old_copy.rs index 41b1d1b1a4b0..8b760b920f74 100644 --- a/core/src/object/fs/old_copy.rs +++ b/core/src/object/fs/old_copy.rs @@ -2,8 +2,8 @@ use crate::{ invalidate_query, library::Library, old_job::{ - CurrentStep, JobError, JobInitOutput, JobResult, JobRunErrors, JobStepOutput, StatefulJob, - WorkerContext, + CurrentStep, JobError, JobInitOutput, JobReportUpdate, JobResult, JobRunMetadata, + JobStepOutput, StatefulJob, WorkerContext, }, }; @@ -12,14 +12,20 @@ use sd_core_file_path_helper::{join_location_relative_path, IsolatedFilePathData use sd_prisma::prisma::{file_path, location}; use sd_utils::{db::maybe_missing, error::FileIOError}; -use std::{hash::Hash, path::PathBuf}; +use std::{ + collections::HashSet, + hash::Hash, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + time::Duration, +}; -use futures_concurrency::future::TryJoin; +use futures_concurrency::future::{Race, TryJoin}; use serde::{Deserialize, Serialize}; use serde_json::json; use specta::Type; -use tokio::{fs, io}; -use tracing::{trace, warn}; +use tokio::fs; +use tracing::debug; use super::{ construct_target_filename, error::FileSystemJobsError, fetch_source_and_target_location_paths, @@ -30,6 +36,8 @@ use super::{ #[derive(Serialize, Deserialize, Debug, Clone)] pub struct OldFileCopierJobData { sources_location_path: PathBuf, + total_size: u64, + steps_len: usize, } #[derive(Serialize, Deserialize, Hash, Type, Debug)] @@ -40,19 +48,197 @@ pub struct OldFileCopierJobInit { pub target_location_relative_directory_path: PathBuf, } +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +enum CopierStepKind { + CreateDirs(CreateDirs), + CopyFiles(CopyFiles), +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +struct CopyFiles; + +impl CopyFiles { + async fn copy_files( + files: &[Copy], + jobmeta: Arc>, + ) -> Result<(), JobError> { + // NOTE(matheus-consoli): if a step contains multiple files with the same name, + // for example `[file (1), file (2), file (3)]`, they all may be get renamed to the same + // new file (`file (4)`) as the decision for the new file name and the creation of such file + // all happens concurrently. + // this HashSet introduces a point of synchronization so we guarantee that all files have + // unique names - this is not the ideal solution, but it's a quick one. + let renamed_files_in_this_step = Arc::new(Mutex::new(HashSet::new())); + + files + .iter() + .map( + |Copy { + source, + source_size, + target_full_path, + }| { + let jobmeta = Arc::clone(&jobmeta); + let renamed_files_in_this_step = Arc::clone(&renamed_files_in_this_step); + async move { + let target = loop { + let target = + OldFileCopierJobStep::find_available_name(&target_full_path) + .await?; + + let mut cache = renamed_files_in_this_step + .lock() + .expect("failed to get lock for internal cache"); + if cache.contains(&target) { + // file name is taken, try again + continue; + } else { + cache.insert(target.clone()); + break target; + } + }; + fs::copy(&source.full_path, &target).await.map_err(|e| { + let source = source.full_path.clone(); + FileIOError::from((source, e)) + })?; + + let mut meta = jobmeta + .lock() + .expect("failed to get the lock for the list of files to copy"); + let accumulated_copied_size = meta.accumulated_copied_size + source_size; + let copied_files_count = meta.copied_files_count + 1; + meta.update(OldFileCopierJobMetadata { + accumulated_copied_size, + copied_files_count, + }); + + Ok::<_, JobError>(()) + } + }, + ) + .collect::>() + .try_join() + .await?; + + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +struct CreateDirs; + +impl CreateDirs { + /// Create the directories + async fn create_dir_structure(files: &[Copy]) -> Result<(), JobError> { + // TODO(matheus-consoli): when the directory name conflicts, what should we do? + // same as find_available...? + files + .iter() + .map(|file| async move { + fs::create_dir_all(&file.target_full_path) + .await + .map_err(|e| FileIOError::from((file.target_full_path.clone(), e))) + }) + .collect::>() + .try_join() + .await?; + Ok(()) + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Copy { + source: FileData, + source_size: u64, + target_full_path: Box, +} + #[derive(Serialize, Deserialize, Debug)] pub struct OldFileCopierJobStep { - pub source_file_data: FileData, - pub target_full_path: PathBuf, + files: Box<[Copy]>, + target_location_path: Box, + copy_kind: CopierStepKind, +} + +impl OldFileCopierJobStep { + async fn find_available_name(path: impl AsRef) -> Result { + let path = path.as_ref(); + match fs::try_exists(&path).await { + Ok(true) => { + // file already exists, try finding a better name + find_available_filename_for_duplicate(&path) + .await + .map_err(Into::into) + } + Ok(false) => { + // nothing todo, file name is available + Ok(path.to_owned()) + } + Err(e) => Err(FileIOError::from((path, e)).into()), + } + } +} + +fn progress(ctx: &WorkerContext, msgs: impl IntoIterator) { + let updates = msgs.into_iter().map(Into::into).collect(); + + ctx.progress(updates); +} + +#[derive(Debug, Clone)] +enum CopierUpdate { + Start, + TotalSize(String), + FileCount(usize), + TotalProgress(u64), + ProgressPerFile(String), + FinishedWithPercetage(u64), +} + +impl From for JobReportUpdate { + fn from(value: CopierUpdate) -> Self { + match value { + CopierUpdate::Start => { + const HUNDRED_PERCENT: usize = 100; + JobReportUpdate::TaskCount(HUNDRED_PERCENT) + } + CopierUpdate::FinishedWithPercetage(task_progress) => JobReportUpdate::TaskCount( + task_progress + .try_into() + .expect("should be able to convert a `u64` to `usize`"), + ), + CopierUpdate::TotalSize(size) => JobReportUpdate::Message(size.to_owned()), + CopierUpdate::FileCount(count) => JobReportUpdate::Info(count.to_string()), + CopierUpdate::ProgressPerFile(per_file) => JobReportUpdate::Phase(per_file.to_owned()), + CopierUpdate::TotalProgress(progressed_tasks) => JobReportUpdate::CompletedTaskCount( + progressed_tasks + .try_into() + .expect("should be able to convert a `u64` to `usize`"), + ), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct OldFileCopierJobMetadata { + accumulated_copied_size: u64, + copied_files_count: u64, +} + +impl JobRunMetadata for OldFileCopierJobMetadata { + fn update(&mut self, metadata: Self) { + *self = metadata; + } } #[async_trait::async_trait] impl StatefulJob for OldFileCopierJobInit { type Data = OldFileCopierJobData; type Step = OldFileCopierJobStep; - type RunMetadata = (); + type RunMetadata = OldFileCopierJobMetadata; const NAME: &'static str = "file_copier"; + const IS_BATCHED: bool = true; fn target_location(&self) -> location::id::Type { self.target_location_id @@ -74,8 +260,17 @@ impl StatefulJob for OldFileCopierJobInit { ) .await?; - let steps = get_many_files_datas(db, &sources_location_path, &init.sources_file_path_ids) - .await? + let files = + get_many_files_datas(db, &sources_location_path, &init.sources_file_path_ids).await?; + + if let Some(missing_field) = files + .iter() + .find_map(|file| maybe_missing(file.file_path.is_dir, "file.is_dir").err()) + { + return Err(missing_field.into()); + } + + let archives = files .into_iter() .map(|file_data| async { // add the currently viewed subdirectory to the location root @@ -86,144 +281,236 @@ impl StatefulJob for OldFileCopierJobInit { full_target_path.push(construct_target_filename(&file_data)?); - if file_data.full_path == full_target_path { - full_target_path = - find_available_filename_for_duplicate(full_target_path).await?; - } - - Ok::<_, FileSystemJobsError>(OldFileCopierJobStep { - source_file_data: file_data, - target_full_path: full_target_path, - }) + Ok::<_, FileSystemJobsError>((file_data, full_target_path)) }) .collect::>() .try_join() .await?; + let (mut dirs, mut files): (Vec<_>, _) = archives.into_iter().partition(|file| { + file.0 + .file_path + .is_dir + .expect("we tested that all file paths have the `is_dir` field") + }); + + let mut steps = Vec::with_capacity(dirs.len() + files.len()); + + // first step: create all directories using a single job + if !dirs.is_empty() { + let (more_dirs, more_files) = unfold_diretory(&dirs).await?; + + let more_dirs = more_dirs + .into_iter() + .map(|dir| async { + let iso = IsolatedFilePathData::new( + init.source_location_id, + &sources_location_path, + dir.source, + true, // is dir + ) + .map_err(FileSystemJobsError::from)?; + let file_data = get_file_data_from_isolated_file_path( + &ctx.library.db, + &sources_location_path, + &iso, + ) + .await?; + Ok::<_, JobError>((file_data, dir.dest)) + }) + .collect::>() + .try_join() + .await?; + dirs.extend(more_dirs); + + let (dir_source_file_data, dir_target_full_path): (Vec<_>, Vec<_>) = + dirs.into_iter().unzip(); + + let step_files = dir_source_file_data + .into_iter() + .zip(dir_target_full_path.into_iter()) + .map(|(source, target_full_path)| Copy { + source, + source_size: 0, + target_full_path: target_full_path.into_boxed_path(), + }) + .collect(); + + let create_dirs_step = OldFileCopierJobStep { + files: step_files, + target_location_path: targets_location_path.clone().into_boxed_path(), + copy_kind: CopierStepKind::CreateDirs(CreateDirs), + }; + steps.push(create_dirs_step); + + let more_files = more_files + .into_iter() + .map(|file| async { + let iso = IsolatedFilePathData::new( + init.source_location_id, + &sources_location_path, + file.source, + false, // is dir + ) + .map_err(FileSystemJobsError::from)?; + let file_data = get_file_data_from_isolated_file_path( + &ctx.library.db, + &sources_location_path, + &iso, + ) + .await?; + Ok::<_, JobError>((file_data, file.dest)) + }) + .collect::>() + .try_join() + .await; + + if let Err(e) = more_files.as_ref() { + // the file is not indexed + tracing::error!(?e, "job init failed"); + } + + files.extend(more_files?); + }; + + // remaining steps: delegate to the copy strategist to decide how to organize + // the steps that copies the files + let steps_to_create_files = file_copy_strategist(files, &targets_location_path).await?; + steps.extend(steps_to_create_files); + + let total_size = steps + .iter() + .filter(|step| matches!(step.copy_kind, CopierStepKind::CopyFiles(_))) + .map(|step| step.files.iter().map(|file| file.source_size).sum::()) + .sum::(); + + let file_count = steps + .iter() + .filter(|step| matches!(step.copy_kind, CopierStepKind::CopyFiles(_))) + .map(|step| step.files.len()) + .sum::(); + + let updates = [ + CopierUpdate::Start, + CopierUpdate::FileCount(file_count), + CopierUpdate::TotalSize(total_size.to_string()), + ]; + progress(ctx, updates); + *data = Some(OldFileCopierJobData { sources_location_path, + total_size, + steps_len: steps.len(), }); Ok(steps.into()) } + #[tracing::instrument( + skip_all, + fields( + step.kind = ?step.step.copy_kind, + step.n = step.step_number, + progress = jobmeta.accumulated_copied_size + ) + )] async fn execute_step( &self, ctx: &WorkerContext, - CurrentStep { - step: OldFileCopierJobStep { - source_file_data, - target_full_path, - }, - .. - }: CurrentStep<'_, Self::Step>, + step: CurrentStep<'_, Self::Step>, data: &Self::Data, - _: &Self::RunMetadata, + jobmeta: &Self::RunMetadata, ) -> Result, JobError> { - let init = self; - - if maybe_missing(source_file_data.file_path.is_dir, "file_path.is_dir")? { - let mut more_steps = Vec::new(); - - fs::create_dir_all(target_full_path) - .await - .map_err(|e| FileIOError::from((target_full_path, e)))?; - - let mut read_dir = fs::read_dir(&source_file_data.full_path) - .await - .map_err(|e| FileIOError::from((&source_file_data.full_path, e)))?; - - while let Some(children_entry) = read_dir - .next_entry() - .await - .map_err(|e| FileIOError::from((&source_file_data.full_path, e)))? - { - let children_path = children_entry.path(); - let target_children_full_path = target_full_path.join( - children_path - .strip_prefix(&source_file_data.full_path) - .expect("We got the children path from the read_dir, so it should be a child of the source path"), - ); - - match get_file_data_from_isolated_file_path( - &ctx.library.db, - &data.sources_location_path, - &IsolatedFilePathData::new( - init.source_location_id, - &data.sources_location_path, - &children_path, - children_entry - .metadata() - .await - .map_err(|e| FileIOError::from((&children_path, e)))? - .is_dir(), - ) - .map_err(FileSystemJobsError::from)?, - ) - .await - { - Ok(source_file_data) => { - // Currently not supporting file_name suffixes children files in a directory being copied - more_steps.push(OldFileCopierJobStep { - target_full_path: target_children_full_path, - source_file_data, - }); + let files = &step.step.files; + let acc_copied_size = jobmeta.accumulated_copied_size; + let total_size = data.total_size; + let jobmeta = Arc::new(Mutex::new(jobmeta.clone())); + + let transfer = { + let jobmeta = Arc::clone(&jobmeta); + async move { + match step.step.copy_kind { + CopierStepKind::CreateDirs(CreateDirs) => { + CreateDirs::create_dir_structure(&step.step.files).await?; } - Err(FileSystemJobsError::FilePathNotFound(path)) => { - // FilePath doesn't exist in the database, it possibly wasn't indexed, so we skip it - warn!( - path = %path.display(), - "Skipping duplicating as it wasn't indexed;", - ); + CopierStepKind::CopyFiles(CopyFiles) => { + CopyFiles::copy_files(&step.step.files, jobmeta).await?; } - Err(e) => return Err(e.into()), - } + }; + Ok::<_, JobError>(()) } + }; + + let report = async move { + let mut finished = vec![false; files.len()]; + let mut step_copied = vec![0; files.len()]; + let relative_paths: Vec<&Path> = files + .iter() + .map(|f| { + f.target_full_path + .strip_prefix(&step.step.target_location_path) + .unwrap_or(&f.target_full_path) + }) + .collect(); + + loop { + for (((file, relative_path), copied), is_file_done) in files + .iter() + .zip(relative_paths.iter()) + .zip(step_copied.iter_mut()) + .zip(finished.iter_mut()) + .filter(|(_, is_file_done)| !**is_file_done) + { + let Ok(transfering) = fs::metadata(&file.target_full_path).await else { + // file may not have been created yet + continue; + }; - Ok(more_steps.into()) - } else { - match fs::metadata(target_full_path).await { - Ok(_) => { - // Already exist a file with this name, so we need to find an available name - match find_available_filename_for_duplicate(target_full_path).await { - Ok(new_path) => { - fs::copy(&source_file_data.full_path, &new_path) - .await - // Using the ? here because we don't want to increase the completed task - // count in case of file system errors - .map_err(|e| FileIOError::from((new_path, e)))?; - - Ok(().into()) - } - - Err(FileSystemJobsError::FailedToFindAvailableName(path)) => { - Ok(JobRunErrors(vec![ - FileSystemJobsError::WouldOverwrite(path).to_string() - ]) - .into()) - } - - Err(e) => Err(e.into()), + let file_percentage = + (transfering.len() as f64 / file.source_size as f64) * 100.0; + let file_percentage = file_percentage.round(); + + let msg = format!("{file_percentage}% of {:?}", relative_path); + progress(ctx, [CopierUpdate::ProgressPerFile(msg)]); + + *copied = transfering.len(); + if transfering.len() == file.source_size { + *is_file_done = true; } } - Err(e) if e.kind() == io::ErrorKind::NotFound => { - trace!( - source = %source_file_data.full_path.display(), - target = %target_full_path.display(), - "Copying source -> target;", - ); - - fs::copy(&source_file_data.full_path, &target_full_path) - .await - // Using the ? here because we don't want to increase the completed task - // count in case of file system errors - .map_err(|e| FileIOError::from((target_full_path, e)))?; - Ok(().into()) - } - Err(e) => Err(FileIOError::from((target_full_path, e)).into()), + let copied_in_step = step_copied.iter().sum::(); + let total_percentage = + ((copied_in_step + acc_copied_size) as f64 / total_size as f64) * 100.; + let per = total_percentage.round() as u64; + progress(ctx, [CopierUpdate::TotalProgress(per)]); + + // wait for progress + tokio::time::sleep(Duration::from_millis(200)).await; } + }; + + (transfer, report).race().await?; + + if data.steps_len == step.step_number + 1 { + let jobmeta = jobmeta + .lock() + .expect("failed to get the lock for job metadata"); + progress( + ctx, + [CopierUpdate::FinishedWithPercetage( + jobmeta.copied_files_count, + )], + ); } + + let jobmeta = Arc::into_inner(jobmeta) + .expect("all the other copies should have been dropped by this point") + .into_inner() + .expect("the Mutex shouldn't be poisoned"); + + // we've calculated all steps up ahead + Ok(jobmeta.into()) } async fn finalize( @@ -239,3 +526,144 @@ impl StatefulJob for OldFileCopierJobInit { Ok(Some(json!({ "init": init }))) } } + +/// Gather information about the list of files and decide what is the best +/// approach to organize them into steps. +async fn file_copy_strategist( + files: Vec<(FileData, PathBuf)>, + location_path: &Path, +) -> Result, JobError> { + // maximum size in bytes per step (800) + const MAX_TOTAL_SIZE_PER_STEP: u64 = 1024 * 1024 * 800; + // max quantity of files per step + const MAX_FILES_PER_STEP: usize = 20; + + debug!("generating steps to copy files"); + + let mut metadata = files + .into_iter() + .map(|(data, path)| async move { + let meta = tokio::fs::metadata(&data.full_path) + .await + .map_err(|e| FileIOError::from((data.full_path.clone(), e)))?; + Ok::<_, JobError>((meta.len(), data, path)) + }) + .collect::>() + .try_join() + .await?; + + // sort by size + metadata.sort_unstable_by_key(|m| m.0); + + let mut metadata = metadata.into_iter().peekable(); + let mut steps = Vec::new(); + + loop { + let mut sum = 0; + let mut files = Vec::with_capacity(MAX_FILES_PER_STEP); + + while let Some((source_size, source, path)) = metadata.next_if(|(len, _, _)| { + files.len() < MAX_FILES_PER_STEP && len + sum <= MAX_TOTAL_SIZE_PER_STEP || sum == 0 + }) { + sum += source_size; + files.push(Copy { + source, + source_size, + target_full_path: path.into_boxed_path(), + }); + } + + steps.push(OldFileCopierJobStep { + files: files.into_boxed_slice(), + target_location_path: location_path.into(), + copy_kind: CopierStepKind::CopyFiles(CopyFiles), + }); + + if metadata.peek().is_none() { + // nothing left to do, all files are grouped into steps + break; + } + } + + Ok(steps) +} + +async fn unfold_diretory( + dirs: &[(FileData, PathBuf)], +) -> Result<(Vec, Vec), JobError> { + let mut unfolded_dirs = Vec::new(); + let mut unfolded_files = Vec::new(); + + let mut dirs = Vec::from_iter( + dirs.iter() + .map(|(file_data, path)| (file_data.full_path.clone(), path.clone())), + ); + + loop { + if dirs.is_empty() { + break; + } + let unfolds = dirs + .iter() + .map(|(file_data, target_full_path)| async move { + let target_full_path = target_full_path.clone(); + + let mut to_look = Vec::new(); + let mut more_dirs = Vec::new(); + let mut more_files = Vec::new(); + let mut read_dir = fs::read_dir(file_data) + .await + .map_err(|e| FileIOError::from((file_data.clone(), e)))?; + + while let Some(children_entry) = read_dir + .next_entry() + .await + .map_err(|e| FileIOError::from((file_data.clone(), e)))? + { + let children_path = &children_entry.path(); + let relative_path = children_path.strip_prefix(file_data) + .expect("We got the children path from the `read_dir`, so it should be a child of the source path"); + let target_children_full_path = target_full_path.join(relative_path); + let metadata = fs::metadata(children_path) + .await + .map_err(|e| FileIOError::from((file_data.clone(), e)))?; + if metadata.is_dir() { + to_look.push((children_path.clone(), target_children_full_path.clone())); + let dir = NewEntry { + source: children_path.clone(), + dest: target_children_full_path, + }; + more_dirs.push(dir); + } else { + let file = NewEntry { + source: children_path.clone(), + dest: target_children_full_path, + }; + more_files.push(file); + } + } + + Ok::<_, JobError>((to_look, more_dirs, more_files)) + }) + .collect::>() + .try_join() + .await?; + + dirs.clear(); + unfolds + .into_iter() + .for_each(|(keep_looking, more_dirs, more_files)| { + dirs.extend(keep_looking); + unfolded_dirs.extend(more_dirs); + unfolded_files.extend(more_files); + }); + } + + Ok((unfolded_dirs, unfolded_files)) +} + +#[derive(Debug)] +struct NewEntry { + source: PathBuf, + dest: PathBuf, +} diff --git a/core/src/old_job/manager.rs b/core/src/old_job/manager.rs index 2770eb7e5626..f47164759e0e 100644 --- a/core/src/old_job/manager.rs +++ b/core/src/old_job/manager.rs @@ -357,13 +357,11 @@ impl OldJobs { /// Check if the manager currently has some active workers. pub async fn has_active_workers(&self, library_id: Uuid) -> bool { - for worker in self.running_workers.read().await.values() { - if worker.library_id == library_id && !worker.is_paused() { - return true; - } - } - - false + self.running_workers + .read() + .await + .values() + .any(|worker| worker.library_id == library_id && !worker.is_paused()) } pub async fn has_job_running(&self, predicate: impl Fn(JobIdentity) -> bool) -> bool { diff --git a/core/src/old_job/mod.rs b/core/src/old_job/mod.rs index 5bbc9be1c7f4..01edd1a47405 100644 --- a/core/src/old_job/mod.rs +++ b/core/src/old_job/mod.rs @@ -401,7 +401,7 @@ impl DynJob for OldJob { ::NAME } - #[instrument(skip_all, fields(job_name = %self.name(), job_id = %self.id()), err)] + #[instrument(skip_all, fields(job_name = %self.name()), err)] #[allow(clippy::blocks_in_conditions)] // Due to `err` on instrument above async fn run( &mut self, @@ -447,6 +447,7 @@ impl DynJob for OldJob { if let Ok(res) = res.as_ref() { if !::IS_BATCHED { + // tell the reporter how much work there is ctx.progress(vec![JobReportUpdate::TaskCount(res.steps.len())]); } } @@ -507,7 +508,6 @@ impl DynJob for OldJob { let init_time = Instant::now(); - // JoinHandle, JobError>> let step_task = { // Need these bunch of Arcs to be able to move them into the async block of tokio::spawn let ctx = Arc::clone(&ctx); diff --git a/core/src/old_job/report.rs b/core/src/old_job/report.rs index b85f58110bf3..60761ab170a6 100644 --- a/core/src/old_job/report.rs +++ b/core/src/old_job/report.rs @@ -29,6 +29,7 @@ pub enum JobReportUpdate { TaskCount(usize), CompletedTaskCount(usize), Message(String), + Info(String), Phase(String), } @@ -53,6 +54,7 @@ pub struct OldJobReport { pub status: JobStatus, pub task_count: i32, pub completed_task_count: i32, + pub info: String, pub phase: String, pub message: String, @@ -75,6 +77,7 @@ impl From for sd_core_heavy_lifting::job_system::report::Report { status, task_count, completed_task_count, + info, phase, message, estimated_completion, @@ -195,6 +198,7 @@ impl From for sd_core_heavy_lifting::job_system::report::Report { status: status.into(), task_count, completed_task_count, + info, phase, message, estimated_completion, @@ -242,6 +246,7 @@ impl TryFrom for OldJobReport { .expect("corrupted database"), task_count: data.task_count.unwrap_or(0), completed_task_count: data.completed_task_count.unwrap_or(0), + info: data.info.unwrap_or_default(), phase: String::new(), message: String::new(), estimated_completion: data @@ -283,7 +288,7 @@ impl TryFrom for OldJobReport { .expect("corrupted database"), task_count: data.task_count.unwrap_or(0), completed_task_count: data.completed_task_count.unwrap_or(0), - + info: data.info.unwrap_or_default(), phase: String::new(), message: String::new(), estimated_completion: data @@ -309,6 +314,7 @@ impl OldJobReport { metadata: None, parent_id: None, completed_task_count: 0, + info: String::new(), phase: String::new(), message: String::new(), estimated_completion: Utc::now(), @@ -352,6 +358,7 @@ impl OldJobReport { job::status::set(Some(self.status as i32)), job::date_started::set(self.started_at.map(|d| d.into())), job::task_count::set(Some(1)), + job::info::set(Some(self.info.clone())), job::completed_task_count::set(Some(0)), ], [self @@ -382,6 +389,7 @@ impl OldJobReport { job::data::set(self.data.clone()), job::metadata::set(serde_json::to_vec(&self.metadata).ok()), job::task_count::set(Some(self.task_count)), + job::info::set(Some(self.info.clone())), job::completed_task_count::set(Some(self.completed_task_count)), job::date_started::set(self.started_at.map(Into::into)), job::date_completed::set(self.completed_at.map(Into::into)), @@ -474,6 +482,7 @@ impl JobReportBuilder { metadata: self.metadata, parent_id: self.parent_id, completed_task_count: 0, + info: String::new(), phase: String::new(), message: String::new(), estimated_completion: Utc::now(), diff --git a/core/src/old_job/worker.rs b/core/src/old_job/worker.rs index 69815fca1bfb..5ede4d2d418d 100644 --- a/core/src/old_job/worker.rs +++ b/core/src/old_job/worker.rs @@ -43,10 +43,11 @@ pub struct JobProgressEvent { pub completed_task_count: i32, pub phase: String, pub message: String, + pub info: String, pub estimated_completion: DateTime, } -// used to update the worker state from inside the worker thread +/// used to update the worker state from inside the worker thread #[derive(Debug)] pub enum WorkerEvent { Progressed(Vec), @@ -54,7 +55,7 @@ pub enum WorkerEvent { Stop, } -// used to send commands to the worker thread from the manager +/// used to send commands to the worker thread from the manager #[derive(Debug)] pub enum WorkerCommand { Pause(Instant), @@ -298,6 +299,9 @@ impl Worker { ); report.phase = phase; } + JobReportUpdate::Info(info) => { + report.info = info; + } } } @@ -336,6 +340,7 @@ impl Worker { task_count: report.task_count, completed_task_count: report.completed_task_count, estimated_completion: report.estimated_completion, + info: report.info.clone(), phase: report.phase.clone(), message: report.message.clone(), })); diff --git a/crates/utils/src/db.rs b/crates/utils/src/db.rs index edd406127d60..e27a000a63a9 100644 --- a/crates/utils/src/db.rs +++ b/crates/utils/src/db.rs @@ -154,6 +154,7 @@ impl<'a, T> OptionalField for &'a Option { } } +/// If `data` is `Some(t)` returns `Ok(t)`, otherwise returns a `MissingFieldError(field)` pub fn maybe_missing( data: T, field: &'static str, diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 07a0bd20a5d7..da92b7b7994c 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -46,9 +46,14 @@ pub fn chain_optional_iter( .collect() } +/// A splitted version of `u64`, divided into `(u32, u32)` +/// +/// rspc/specta doesn't support `BigInt`, so we need this hack +pub type U64Front = (u32, u32); + #[inline] #[must_use] -pub const fn u64_to_frontend(num: u64) -> (u32, u32) { +pub const fn u64_to_frontend(num: u64) -> U64Front { #[allow(clippy::cast_possible_truncation)] { // SAFETY: We're splitting in (high, low) parts, so we're not going to lose data on truncation @@ -56,9 +61,14 @@ pub const fn u64_to_frontend(num: u64) -> (u32, u32) { } } +/// A splitted version of `i64`, divided into `(i32, u32)` +/// +/// rspc/specta doesn't support `BigInt`, so we need this hack +pub type I64Front = (i32, u32); + #[inline] #[must_use] -pub const fn i64_to_frontend(num: i64) -> (i32, u32) { +pub const fn i64_to_frontend(num: i64) -> I64Front { #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] { // SAFETY: We're splitting in (high, low) parts, so we're not going to lose data on truncation diff --git a/interface/app/$libraryId/Layout/Sidebar/JobManager/Job.tsx b/interface/app/$libraryId/Layout/Sidebar/JobManager/Job.tsx index f667688f6a62..a1f4ce454432 100644 --- a/interface/app/$libraryId/Layout/Sidebar/JobManager/Job.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/JobManager/Job.tsx @@ -5,11 +5,12 @@ import { Icon, Image, Info, + Lightning, Scissors, Trash } from '@phosphor-icons/react'; import { memo } from 'react'; -import { JobProgressEvent, Report, useJobInfo } from '@sd/client'; +import { JobName, JobProgressEvent, Report, useJobInfo } from '@sd/client'; import { ProgressBar } from '@sd/ui'; import { showAlertDialog } from '~/components'; import { useLocale } from '~/hooks'; @@ -24,14 +25,22 @@ interface JobProps { eta: number; } -const JobIcon: Record = { +export const JobIcon: Record = { Indexer: Folder, MediaProcessor: Image, FileIdentifier: Fingerprint, - FileCopier: Copy, - FileDeleter: Trash, - FileCutter: Scissors, - ObjectValidator: Fingerprint + Copy: Copy, + Delete: Trash, + Erase: Trash, + Move: Scissors, + FileValidator: Fingerprint +}; + +// Jobs like deleting and copying files do not have simplied job names +// so we need to use the metadata to display an icon +const MetaDataJobIcon = { + deleter: Trash, + copier: Copy }; function Job({ job, className, isChild, progress, eta }: JobProps) { @@ -67,11 +76,23 @@ function Job({ job, className, isChild, progress, eta }: JobProps) { ]); } + let jobIcon = Lightning; + if (job.name in JobIcon) { + jobIcon = JobIcon[job.name]; + } else { + const meta = [...jobData.meta, ...jobData.output].find( + (meta) => meta.type in MetaDataJobIcon + ); + if (meta) { + jobIcon = MetaDataJobIcon[meta.type as keyof typeof MetaDataJobIcon]; + } + } + return ( ((props, ref) = const [currentETA, setCurrentETA] = useState(eta); useEffect(() => { - if (currentETA !== undefined && currentETA > 0) { + if (currentETA != null && currentETA > 0) { const interval = setInterval(() => { setCurrentETA((prevETA) => { if (prevETA === undefined || prevETA <= 1000) return 0; diff --git a/interface/app/$libraryId/Layout/Sidebar/JobManager/index.tsx b/interface/app/$libraryId/Layout/Sidebar/JobManager/index.tsx index 9bf6a85d02ba..837dd0228b03 100644 --- a/interface/app/$libraryId/Layout/Sidebar/JobManager/index.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/JobManager/index.tsx @@ -121,7 +121,7 @@ export function JobManager() { )} - {t('recent_jobs')} + {t('recent_jobs')}
{toggleConfirmation ? (
diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index d948acd2c7d1..2935c095135d 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -377,7 +377,7 @@ export type JobGroup = { id: string; running_job_id: string | null; action: stri export type JobName = "Indexer" | "FileIdentifier" | "MediaProcessor" | "Copy" | "Move" | "Delete" | "Erase" | "FileValidator" -export type JobProgressEvent = { id: string; library_id: string; task_count: number; completed_task_count: number; phase: string; message: string; estimated_completion: string } +export type JobProgressEvent = { id: string; library_id: string; task_count: number; completed_task_count: number; phase: string; message: string; info: string; estimated_completion: string } export type JsonValue = null | boolean | number | string | JsonValue[] | { [key in string]: JsonValue } @@ -583,7 +583,7 @@ export type RenameMany = { from_pattern: FromPattern; to_pattern: string; from_f export type RenameOne = { from_file_path_id: number; to: string } -export type Report = { id: string; name: JobName; action: string | null; metadata: ReportMetadata[]; critical_error: string | null; non_critical_errors: NonCriticalError[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: Status; task_count: number; completed_task_count: number; phase: string; message: string; estimated_completion: string } +export type Report = { id: string; name: JobName; action: string | null; metadata: ReportMetadata[]; critical_error: string | null; non_critical_errors: NonCriticalError[]; created_at: string | null; started_at: string | null; completed_at: string | null; parent_id: string | null; status: Status; task_count: number; completed_task_count: number; info: string; phase: string; message: string; estimated_completion: string } export type ReportInputMetadata = { type: "location"; data: Location } | { type: "sub_path"; data: string } diff --git a/packages/client/src/utils/jobs/useJobInfo.tsx b/packages/client/src/utils/jobs/useJobInfo.tsx index 045447e22dff..3dcb7176e17a 100644 --- a/packages/client/src/utils/jobs/useJobInfo.tsx +++ b/packages/client/src/utils/jobs/useJobInfo.tsx @@ -1,6 +1,12 @@ import { TextItems } from '.'; -import { formatNumber, uint32ArrayToBigInt } from '../..'; -import { JobProgressEvent, Report, ReportOutputMetadata } from '../../core'; +import { formatNumber, humanizeSize, uint32ArrayToBigInt } from '../..'; +import { + JobName, + JobProgressEvent, + Report, + ReportMetadata, + ReportOutputMetadata +} from '../../core'; interface JobNiceData { name: string; @@ -12,8 +18,8 @@ interface JobNiceData { indexedPath?: any; taskCount: number; completedTaskCount: number; - meta: any; - output: any; + meta: ReportMetadata[]; + output: ReportOutputMetadata[]; } export function useJobInfo(job: Report, realtimeUpdate: JobProgressEvent | null): JobNiceData { @@ -258,11 +264,16 @@ export function useJobInfo(job: Report, realtimeUpdate: JobProgressEvent | null) case 'Copy': return { ...data, - name: `${isQueued ? 'Copy' : isRunning ? 'Copying' : 'Copied'} ${ - isRunning ? completedTaskCount + 1 : completedTaskCount - } ${isRunning ? `of ${job.task_count}` : ``} ${plural(job.task_count, 'file')}`, - textItems: [[{ text: job.status }]] + name: isQueued + ? `Duplicate ${taskCount} ${plural(taskCount, 'file')}` + : isRunning + ? `Duplicating ${completedTaskCount}% of ${realtimeUpdate?.info} ${plural(taskCount, 'file')} (${humanizeSize(parseInt(realtimeUpdate?.message || '0'))})` + : `Duplicated ${taskCount} ${plural(taskCount, 'file')}`, + textItems: realtimeUpdate + ? [[{ text: realtimeUpdate?.phase }]] + : [[{ text: job.status }]] }; + case 'Delete': return { ...data,