Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in object_by_id_cache #20450

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 122 additions & 13 deletions crates/sui-core/src/execution_cache/cache_types.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::hash::Hash;
use std::hash::{Hash, Hasher};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::{cmp::Ordering, hash::DefaultHasher};

use moka::sync::Cache as MokaCache;
use parking_lot::Mutex;
Expand Down Expand Up @@ -149,8 +150,31 @@ pub trait IsNewer {

pub struct MonotonicCache<K, V> {
cache: MokaCache<K, Arc<Mutex<V>>>,
// When inserting a possibly stale value, we prove that it is not stale by
// ensuring that no fresh value was inserted since we began reading the value
// we are inserting. We do this by hashing the key to an element in this array,
// reading the current value, and then passing that value to insert(). If the
// value is out of date, then there may have been an intervening write, so we
// discard the insert attempt.
key_generation: Vec<AtomicU64>,
}

pub enum Ticket {
// Read tickets are used when caching the result of a read from the db.
// They are only valid if the generation number matches the current generation.
// Used to ensure that no write occurred while we were reading from the db.
Read(u64),
// Write tickets are always valid. Used when caching writes, which cannot be stale.
Write,
}

// key_generation should be big enough to make false positives unlikely. If, on
// average, there is one millisecond between acquiring the ticket and calling insert(),
// then even at 1 million inserts per second, there will be 1000 inserts between acquiring
// the ticket and calling insert(), so about 1/16th of the entries will be invalidated,
// so valid inserts will succeed with probability 15/16.
const KEY_GENERATION_SIZE: usize = 1024 * 16;

impl<K, V> MonotonicCache<K, V>
where
K: Hash + Eq + Send + Sync + Copy + 'static,
Expand All @@ -159,36 +183,121 @@ where
pub fn new(cache_size: u64) -> Self {
Self {
cache: MokaCache::builder().max_capacity(cache_size).build(),
key_generation: (0..KEY_GENERATION_SIZE)
.map(|_| AtomicU64::new(0))
.collect(),
}
}

pub fn get(&self, key: &K) -> Option<Arc<Mutex<V>>> {
self.cache.get(key)
}

fn generation(&self, key: &K) -> &AtomicU64 {
let mut state = DefaultHasher::new();
key.hash(&mut state);
let hash = state.finish();
&self.key_generation[(hash % KEY_GENERATION_SIZE as u64) as usize]
}

/// Get a ticket for caching the result of a read operation. The ticket will be
/// expired if a writer writes a new version of the value.
/// The caller must obtain the ticket BEFORE checking the dirty set and db. By
/// obeying this rule, the caller can be sure that if their ticket remains valid
/// at insert time, they either are inserting the most recent value, or a concurrent
/// writer will shortly overwrite their value.
pub fn get_ticket_for_read(&self, key: &K) -> Ticket {
let gen = self.generation(key);
Ticket::Read(gen.load(std::sync::atomic::Ordering::Acquire))
}

// Update the cache with guaranteed monotonicity. That is, if there are N
// calls to the this function from N threads, the write with the newest value will
// win the race regardless of what ordering the writes occur in.
//
// Caller should log the insert with trace! and increment the appropriate metric.
pub fn insert(&self, key: &K, value: V) {
pub fn insert(&self, key: &K, value: V, ticket: Ticket) -> Result<(), ()> {
let gen = self.generation(key);

// invalidate other readers as early as possible. If a reader acquires a
// new ticket after this point, then it will read the new value from
// the dirty set (or db).
if matches!(ticket, Ticket::Write) {
gen.fetch_add(1, std::sync::atomic::Ordering::Release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this scenario possible?

Key 1 Gen 0 is not in cache

Writer calls insert Key 1 and fetch_add Gen 1
Reader obtains ticket Gen 1 for Key 1
Reader misses cache read and reads old value for Key 1 from DB
Writer writes new value for Key 1 to dirty cache
Cache value for Key 1 gets evicted
Reader caches old value in cache with valid Gen 1 ticket

If so, would swapping the generation fetch_add to after the cache update fix it? Or maybe cache eviction also updates generation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not possible because the writer holds the lock on the dirty cache while it calls fetch_add. reader waits for that lock before checking in the dirty cache. So reader will either complete its read before the fetch_add (in which case it will get an old ticket), or it will wait until after the fetch_add, in which case it is guaranteed to read the newest value from the dirty set (or possibly the db if the reader waits so long that the value has been flushed to the db).

}

let check_ticket = || -> Result<(), ()> {
match ticket {
Ticket::Read(ticket) => {
if ticket != gen.load(std::sync::atomic::Ordering::Acquire) {
return Err(());
}
Ok(())
}
Ticket::Write => Ok(()),
}
};

// Warning: tricky code!
let entry = self
.cache
.entry(*key)
// only one racing insert will call the closure
.or_insert_with(|| Arc::new(Mutex::new(value.clone())));

// We may be racing with another thread that observed an older version of value
// Suppose there is a reader (who has an old version) and a writer (who has
// the newest version by definition) both trying to insert when the cache has
// no entry. Here are the possible outcomes:
//
// 1. Race in `or_optionally_insert_with`:
// 1. Reader wins race, ticket is valid, and reader inserts old version.
// Writer will overwrite the old version after the !is_fresh check.
// 2. Writer wins race. Reader will enter is_fresh check, lock entry, and
// find that its ticket is expired.
//
// 2. No race on `or_optionally_insert_with`:
// 1. Reader inserts first (via `or_optionally_insert_with`), writer enters !is_fresh
// check and overwrites entry.
// 1. There are two sub-cases here because the reader's entry could be evicted,
// but in either case the writer obviously overwrites it.
// 2. Writer inserts first (via `or_optionally_insert_with`), invalidates ticket.
// Then, two cases can follow:
// 1. Reader skips `or_optionally_insert_with` (because entry is present), enters
// !is_fresh check, and does not insert because its ticket is expired.
// 2. The writer's cache entry is evicted already, so reader enters
// `or_optionally_insert_with`. The ticket is expired so we do not insert.
//
// The other cases are where there is already an entry. In this case neither reader
// nor writer will enter `or_optionally_insert_with` callback. Instead they will both enter
// the !is_fresh check and lock the entry:
// 1. If the reader locks first, it will insert its old version. Then the writer
// will lock and overwrite it with the newer version.
// 2. If the writer locks first, it will have already expired the ticket, and the
// reader will not insert anything.
//
// There may also be more than one concurrent reader. However, the only way the two
// readers can have different versions is if there is concurrently a writer that wrote
// a new version. In this case all stale readers will fail the ticket check, and only
// up-to-date readers will remain. So we cannot have a bad insert caused by two readers
// racing to insert, both with valid tickets.
.or_optionally_insert_with(|| {
check_ticket().ok()?;
Some(Arc::new(Mutex::new(value.clone())))
})
// Note: Ticket::Write cannot expire, but an insert can still fail, in the case where
// a writer and reader are racing to call `or_optionally_insert_with`, the reader wins,
// but then fails to insert because its ticket is expired. Then no entry at all is inserted.
.ok_or(())?;

// !is_fresh means we did not insert a new entry in or_optionally_insert_with above.
if !entry.is_fresh() {
// !is_fresh means we lost the race, and entry holds the value that was
// inserted by the other thread. We need to check if we have a more recent value
// than the other reader.
let mut entry = entry.value().lock();
if value.is_newer_than(&entry) {
*entry = value;
}
check_ticket()?;

// Ticket expiry makes this assert impossible.
// TODO: relax to debug_assert?
assert!(!entry.is_newer_than(&value), "entry is newer than value");
*entry = value;
}

Ok(())
}

pub fn invalidate(&self, key: &K) {
Expand Down
16 changes: 14 additions & 2 deletions crates/sui-core/src/execution_cache/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
use tracing::trace;

use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec,
IntGauge, Registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
};

pub struct ExecutionCacheMetrics {
Expand All @@ -15,6 +15,7 @@ pub struct ExecutionCacheMetrics {
pub(crate) cache_negative_hits: IntCounterVec,
pub(crate) cache_misses: IntCounterVec,
pub(crate) cache_writes: IntCounterVec,
pub(crate) expired_tickets: IntCounter,
}

impl ExecutionCacheMetrics {
Expand Down Expand Up @@ -65,6 +66,13 @@ impl ExecutionCacheMetrics {
registry,
)
.unwrap(),

expired_tickets: register_int_counter_with_registry!(
"execution_cache_expired_tickets",
"Failed inserts to monotonic caches because of expired tickets",
registry,
)
.unwrap(),
}
}

Expand Down Expand Up @@ -121,4 +129,8 @@ impl ExecutionCacheMetrics {
pub(crate) fn record_cache_write(&self, collection: &'static str) {
self.cache_writes.with_label_values(&[collection]).inc();
}

pub(crate) fn record_ticket_expiry(&self) {
self.expired_tickets.inc();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,7 @@ async fn test_concurrent_lockers_same_tx() {

#[tokio::test]
async fn latest_object_cache_race_test() {
telemetry_subscribers::init_for_testing();
let authority = TestAuthorityBuilder::new().build().await;

let store = authority.database_for_testing().clone();
Expand Down Expand Up @@ -1258,11 +1259,19 @@ async fn latest_object_cache_race_test() {
let start = Instant::now();
std::thread::spawn(move || {
while start.elapsed() < Duration::from_secs(2) {
let Some(latest_version) = cache
// If you move the get_ticket_for_read to after we get the latest version,
// the test will fail! (this is good, it means the test is doing something)
let ticket = cache
.cached
.object_by_id_cache
.get_ticket_for_read(&object_id);

// get the latest version, but then let it become stale
let Some(latest_version) = cache
.dirty
.objects
.get(&object_id)
.and_then(|e| e.lock().version())
.and_then(|e| e.value().get_highest().map(|v| v.0))
else {
continue;
};
Expand All @@ -1275,14 +1284,30 @@ async fn latest_object_cache_race_test() {
let object =
Object::with_id_owner_version_for_testing(object_id, latest_version, owner);

// because we obtained the ticket before reading the object, we will not write a stale
// version to the cache.
cache.cache_latest_object_by_id(
&object_id,
LatestObjectCacheEntry::Object(latest_version, object.into()),
ticket,
);
}
})
};

// a thread that just invalidates the cache as fast as it can
let invalidator = {
let cache = cache.clone();
let start = Instant::now();
std::thread::spawn(move || {
while start.elapsed() < Duration::from_secs(2) {
cache.cached.object_by_id_cache.invalidate(&object_id);
// sleep for 1 to 10µs
std::thread::sleep(Duration::from_micros(rand::thread_rng().gen_range(1..10)));
}
})
};

// a thread that does nothing but watch to see if the cache goes back in time
let checker = {
let cache = cache.clone();
Expand All @@ -1300,7 +1325,7 @@ async fn latest_object_cache_race_test() {
continue;
};

assert!(cur >= latest);
assert!(cur >= latest, "{} >= {}", cur, latest);
latest = cur;
}
})
Expand All @@ -1309,4 +1334,5 @@ async fn latest_object_cache_race_test() {
writer.join().unwrap();
reader.join().unwrap();
checker.join().unwrap();
invalidator.join().unwrap();
}
Loading
Loading