Skip to content

Commit

Permalink
fix: make rattler_index::index concurrency safe
Browse files Browse the repository at this point in the history
  • Loading branch information
tl-hbk committed Nov 27, 2024
1 parent 104a521 commit edfbf3e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 16 deletions.
1 change: 1 addition & 0 deletions crates/rattler_index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rattler_package_streaming = { path="../rattler_package_streaming", version = "0.
serde_json = { workspace = true }
tracing = { workspace = true }
walkdir = { workspace = true }
fslock = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/rattler_index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ pub fn index(
};
}
let out_file = output_folder.join(platform).join("repodata.json");
let lock_file_path = out_file.with_extension("lock");
let mut lock = fslock::LockFile::open(&lock_file_path)?;
lock.lock_with_pid()?;
File::create(&out_file)?.write_all(serde_json::to_string_pretty(&repodata)?.as_bytes())?;
}

Expand Down
6 changes: 5 additions & 1 deletion crates/rattler_repodata_gateway/src/gateway/remote_subdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ impl RemoteSubdirClient {
e => GatewayError::FetchRepoDataError(e),
})?;

let repo_data_json_path = repodata.repo_data_json_path.clone();
// repodata holds onto a file lock for json file that sparse will need
drop(repodata);

// Create a new sparse repodata client that can be used to read records from the repodata.
let sparse = LocalSubdirClient::from_channel_subdir(
&repodata.repo_data_json_path,
&repo_data_json_path,
channel.clone(),
platform.as_str(),
)
Expand Down
56 changes: 41 additions & 15 deletions crates/rattler_repodata_gateway/src/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

use std::{
collections::{HashSet, VecDeque},
fmt, io,
fmt,
fs::OpenOptions,
io,
marker::PhantomData,
path::Path,
};
Expand All @@ -26,6 +28,8 @@ use serde_json::value::RawValue;
use superslice::Ext;
use thiserror::Error;

use crate::utils::LockedFile;

/// A struct to enable loading records from a `repodata.json` file on demand.
/// Since most of the time you don't need all the records from the
/// `repodata.json` this can help provide some significant speedups.
Expand All @@ -43,6 +47,9 @@ pub struct SparseRepoData {
/// A function that can be used to patch the package record after it has
/// been parsed. This is mainly used to add `pip` to `python` if desired
patch_record_fn: Option<fn(&mut PackageRecord)>,

/// memmap2 blocks file from being modified so wrap the repodata file with a lock
_lock: Option<LockedFile>,
}

enum SparseRepoDataInner {
Expand Down Expand Up @@ -104,20 +111,38 @@ impl SparseRepoData {
path: impl AsRef<Path>,
patch_function: Option<fn(&mut PackageRecord)>,
) -> Result<Self, io::Error> {
let file = fs::File::open(path.as_ref().to_owned())?;
let memory_map = unsafe { memmap2::Mmap::map(&file) }?;
Ok(SparseRepoData {
inner: SparseRepoDataInner::Memmapped(
MemmappedSparseRepoDataInnerTryBuilder {
memory_map,
repo_data_builder: |memory_map| serde_json::from_slice(memory_map.as_ref()),
}
.try_build()?,
),
subdir: subdir.into(),
channel,
patch_record_fn: patch_function,
})
if !path.as_ref().exists() {
Err(io::Error::new(
io::ErrorKind::NotFound,
format!("file not found: {:?}", path.as_ref()),
))
} else {
let lock_file_path = path.as_ref().with_extension("lock");
if !lock_file_path.exists() {
OpenOptions::new()
.read(true)
.write(true)
.create(true)

Check failure on line 125 in crates/rattler_repodata_gateway/src/sparse/mod.rs

View workflow job for this annotation

GitHub Actions / Format and Lint

file opened with `create`, but `truncate` behavior not defined
.open(&lock_file_path)?;
}
let lock_file = LockedFile::open_ro(lock_file_path, "repodata cache")
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let file = fs::File::open(path.as_ref().to_owned())?;
let memory_map = unsafe { memmap2::Mmap::map(&file) }?;
Ok(SparseRepoData {
inner: SparseRepoDataInner::Memmapped(
MemmappedSparseRepoDataInnerTryBuilder {
memory_map,
repo_data_builder: |memory_map| serde_json::from_slice(memory_map.as_ref()),
}
.try_build()?,
),
subdir: subdir.into(),
channel,
patch_record_fn: patch_function,
_lock: Some(lock_file),
})
}
}

/// Construct an instance of self from a bytes and a [`Channel`].
Expand All @@ -141,6 +166,7 @@ impl SparseRepoData {
channel,
subdir: subdir.into(),
patch_record_fn: patch_function,
_lock: None,
})
}

Expand Down

0 comments on commit edfbf3e

Please sign in to comment.