Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] Performance improvements for Persistent Caching #73265

Merged
merged 7 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod indexed;
mod operation;
mod persisted_storage_log;
mod storage;

use std::{
Expand Down Expand Up @@ -43,6 +44,7 @@ use crate::{
AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation,
ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskDirtyCause, TaskGuard,
},
persisted_storage_log::PersistedStorageLog,
storage::{get, get_many, get_mut, iter_many, remove, Storage},
},
backing_storage::BackingStorage,
Expand Down Expand Up @@ -133,7 +135,6 @@ impl Default for BackendOptions {
pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);

type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;
type StorageLog = Sharded<ChunkedVec<CachedDataUpdate>>;

struct TurboTasksBackendInner<B: BackingStorage> {
options: BackendOptions,
Expand All @@ -148,8 +149,8 @@ struct TurboTasksBackendInner<B: BackingStorage> {
task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
transient_tasks: DashMap<TaskId, Arc<TransientTask>, BuildHasherDefault<FxHasher>>,

persisted_storage_data_log: Option<StorageLog>,
persisted_storage_meta_log: Option<StorageLog>,
persisted_storage_data_log: Option<PersistedStorageLog>,
persisted_storage_meta_log: Option<PersistedStorageLog>,
storage: Storage<TaskId, CachedDataItem>,

/// Number of executing operations + Highest bit is set when snapshot is
Expand Down Expand Up @@ -207,8 +208,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
task_cache: BiMap::new(),
transient_tasks: DashMap::default(),
persisted_storage_data_log: need_log.then(|| Sharded::new(shard_amount)),
persisted_storage_meta_log: need_log.then(|| Sharded::new(shard_amount)),
persisted_storage_data_log: need_log.then(|| PersistedStorageLog::new(shard_amount)),
persisted_storage_meta_log: need_log.then(|| PersistedStorageLog::new(shard_amount)),
storage: Storage::new(),
in_progress_operations: AtomicUsize::new(0),
snapshot_request: Mutex::new(SnapshotRequest::new()),
Expand Down Expand Up @@ -312,10 +313,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

fn persisted_storage_log(
&self,
category: TaskDataCategory,
) -> Option<&Sharded<ChunkedVec<CachedDataUpdate>>> {
fn persisted_storage_log(&self, category: TaskDataCategory) -> Option<&PersistedStorageLog> {
match category {
TaskDataCategory::Data => &self.persisted_storage_data_log,
TaskDataCategory::Meta => &self.persisted_storage_meta_log,
Expand Down Expand Up @@ -696,12 +694,16 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
drop(snapshot_request);
fn take_from_log<T: Default>(log: &Option<Sharded<T>>) -> Vec<T> {
fn take_from_log(log: &Option<PersistedStorageLog>) -> Vec<ChunkedVec<CachedDataUpdate>> {
log.as_ref().map(|l| l.take()).unwrap_or_default()
}
let persisted_storage_meta_log = take_from_log(&self.persisted_storage_meta_log);
let persisted_storage_data_log = take_from_log(&self.persisted_storage_data_log);
let persisted_task_cache_log = take_from_log(&self.persisted_task_cache_log);
let persisted_task_cache_log = self
.persisted_task_cache_log
.as_ref()
.map(|l| l.take(|i| i))
.unwrap_or_default();
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
self.in_progress_operations
Expand Down Expand Up @@ -741,7 +743,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
persisted_storage_meta_log,
persisted_storage_data_log,
) {
println!("Persisting failed: {}", err);
println!("Persisting failed: {:?}", err);
return None;
}
}
Expand Down
108 changes: 32 additions & 76 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use crate::{
TurboTasksBackend, TurboTasksBackendInner,
},
backing_storage::BackingStorage,
data::{
CachedDataItem, CachedDataItemIndex, CachedDataItemKey, CachedDataItemValue,
CachedDataUpdate,
},
data::{CachedDataItem, CachedDataItemIndex, CachedDataItemKey, CachedDataItemValue},
};

pub trait Operation:
Expand Down Expand Up @@ -419,13 +416,7 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
self.backend
.persisted_storage_log(key.category())
.unwrap()
.lock(self.task_id)
.push(CachedDataUpdate {
key,
task: self.task_id,
value: Some(value),
old_value: None,
});
.push(self.task_id, key, None, Some(value));
true
} else {
false
Expand All @@ -451,15 +442,13 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
self.backend
.persisted_storage_log(key.category())
.unwrap()
.lock(self.task_id)
.push(CachedDataUpdate {
.push(
self.task_id,
key,
task: self.task_id,
value: Some(value),
old_value: old
.as_ref()
old.as_ref()
.and_then(|old| old.is_persistent().then(|| old.clone())),
});
Some(value),
);
old
} else {
let item = CachedDataItem::from_key_and_value(key.clone(), value);
Expand All @@ -469,13 +458,7 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
self.backend
.persisted_storage_log(key.category())
.unwrap()
.lock(self.task_id)
.push(CachedDataUpdate {
key,
task: self.task_id,
value: None,
old_value: Some(old.clone()),
});
.push(self.task_id, key, Some(old.clone()), None);
}
Some(old)
} else {
Expand Down Expand Up @@ -510,29 +493,21 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
(None, false) => {}
(Some(old_value), false) => {
add_persisting_item = true;
backend
.persisted_storage_log(key.category())
.unwrap()
.lock(*task_id)
.push(CachedDataUpdate {
key: key.clone(),
task: *task_id,
value: None,
old_value: Some(old_value),
});
backend.persisted_storage_log(key.category()).unwrap().push(
*task_id,
key.clone(),
Some(old_value),
None,
);
}
(old_value, true) => {
add_persisting_item = true;
backend
.persisted_storage_log(key.category())
.unwrap()
.lock(*task_id)
.push(CachedDataUpdate {
key: key.clone(),
task: *task_id,
value: new.clone(),
old_value,
});
backend.persisted_storage_log(key.category()).unwrap().push(
*task_id,
key.clone(),
old_value,
new.clone(),
);
}
}

Expand All @@ -556,13 +531,12 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
self.backend
.persisted_storage_log(key.category())
.unwrap()
.lock(self.task_id)
.push(CachedDataUpdate {
.push(
self.task_id,
key,
task: self.task_id,
value: None,
old_value: value.is_persistent().then(|| value.clone()),
});
value.is_persistent().then(|| value.clone()),
None,
);
}
Some(value)
} else {
Expand Down Expand Up @@ -615,13 +589,7 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
self.backend
.persisted_storage_log(key.category())
.unwrap()
.lock(self.task_id)
.push(CachedDataUpdate {
key,
task: self.task_id,
value: None,
old_value: Some(value),
});
.push(self.task_id, key, Some(value), None);
}
}))
}
Expand All @@ -640,13 +608,7 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
self.backend
.persisted_storage_log(key.category())
.unwrap()
.lock(self.task_id)
.push(CachedDataUpdate {
key,
task: self.task_id,
value: None,
old_value: Some(value),
});
.push(self.task_id, key, Some(value), None);
}
}))
}
Expand All @@ -661,24 +623,18 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
.filter_map(|(key, value)| match (key, value) {
(CachedDataItemKey::CellData { cell }, CachedDataItemValue::CellData { value }) => {
count += 1;
Some(CachedDataUpdate {
task: self.task_id,
key: CachedDataItemKey::CellData { cell: *cell },
value: Some(CachedDataItemValue::CellData {
value: value.clone(),
}),
old_value: None,
Some(CachedDataItem::CellData {
cell: *cell,
value: value.clone(),
})
}
_ => None,
});
{
let mut guard = self
.backend
self.backend
.persisted_storage_log(TaskDataCategory::Data)
.unwrap()
.lock(self.task_id);
guard.extend(cell_data);
.push_batch_insert(self.task_id, cell_data);
self.task
.persistance_state_mut()
.add_persisting_items(count);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use turbo_tasks::{KeyValuePair, TaskId};

use crate::{
data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate},
utils::{chunked_vec::ChunkedVec, sharded::Sharded},
};

#[derive(Default)]
struct ShardData {
last_task: Option<TaskId>,
data: ChunkedVec<CachedDataUpdate>,
}

impl ShardData {
fn set_task(&mut self, task: TaskId) {
if self.last_task != Some(task) {
self.data.push(CachedDataUpdate::Task { task });
self.last_task = Some(task);
}
}
}

pub struct PersistedStorageLog {
data: Sharded<ShardData>,
}

impl PersistedStorageLog {
pub fn new(shard_amount: usize) -> Self {
Self {
data: Sharded::new(shard_amount),
}
}

pub fn push(
&self,
task: TaskId,
key: CachedDataItemKey,
old_value: Option<CachedDataItemValue>,
new_value: Option<CachedDataItemValue>,
) {
let mut guard = self.data.lock(task);
guard.set_task(task);
match (old_value, new_value) {
(None, None) => {}
(None, Some(new_value)) => guard.data.push(CachedDataUpdate::New {
item: CachedDataItem::from_key_and_value(key, new_value),
}),
(Some(old_value), None) => guard.data.push(CachedDataUpdate::Removed {
old_item: CachedDataItem::from_key_and_value(key, old_value),
}),
(Some(old_value), Some(new_value)) => {
guard.data.push(CachedDataUpdate::Replace1 {
old_item: CachedDataItem::from_key_and_value(key, old_value),
});
guard
.data
.push(CachedDataUpdate::Replace2 { value: new_value });
}
}
}

pub fn push_batch_insert(
&self,
task: TaskId,
updates: impl IntoIterator<Item = CachedDataItem>,
) {
let updates = updates
.into_iter()
.map(|item| CachedDataUpdate::New { item });
let mut guard = self.data.lock(task);
guard.set_task(task);
guard.data.extend(updates);
}

pub fn take(&self) -> Vec<ChunkedVec<CachedDataUpdate>> {
self.data.take(|shard| shard.data)
}
}
25 changes: 15 additions & 10 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,6 @@ impl CachedDataItem {
}
}

pub fn is_optional(&self) -> bool {
matches!(self, CachedDataItem::CellData { .. })
}

pub fn new_scheduled(description: impl Fn() -> String + Sync + Send + 'static) -> Self {
CachedDataItem::InProgress {
value: InProgressState::Scheduled {
Expand Down Expand Up @@ -541,6 +537,10 @@ impl CachedDataItemKey {
}
}

pub fn is_optional(&self) -> bool {
matches!(self, CachedDataItemKey::CellData { .. })
}

pub fn category(&self) -> TaskDataCategory {
match self {
CachedDataItemKey::Collectible { .. }
Expand Down Expand Up @@ -693,10 +693,15 @@ impl CachedDataItemValue {
}

#[derive(Debug)]
pub struct CachedDataUpdate {
pub task: TaskId,
// TODO generate CachedDataItemUpdate to avoid repeating the variant field 3 times
pub key: CachedDataItemKey,
pub value: Option<CachedDataItemValue>,
pub old_value: Option<CachedDataItemValue>,
pub enum CachedDataUpdate {
/// Sets the current task id.
Task { task: TaskId },
/// An item was added. There was no old value.
New { item: CachedDataItem },
/// An item was removed.
Removed { old_item: CachedDataItem },
/// An item was replaced. This is step 1 and tells about the key and the old value
Replace1 { old_item: CachedDataItem },
/// An item was replaced. This is step 2 and tells about the new value.
Replace2 { value: CachedDataItemValue },
}
Loading
Loading