Skip to content

Commit

Permalink
pageserver: guard against WAL gaps in the interpreted protocol (#10858)
Browse files Browse the repository at this point in the history
## Problem

The interpreted SK <-> PS protocol does not guard against gaps (neither
does the Vanilla one, but that's beside the point).

## Summary of changes

Extend the protocol to include the start LSN of the PG WAL section from
which the records were interpreted.
Validation is enabled via a config flag on the pageserver and works as
follows:

**Case 1**: `raw_wal_start_lsn` is smaller than the requested LSN
There can't be gaps here, but we check that the shard received records
which it hasn't seen before.

**Case 2**: `raw_wal_start_lsn` is equal to the requested LSN
This is the happy case. No gap and nothing to check

**Case 3**: `raw_wal_start_lsn` is greater than the requested LSN
This is a gap.

To make Case 3 work I had to bend the protocol a bit.
We read record chunks of WAL which aren't record aligned and feed them
to the decoder.
The picture below shows a shard which subscribes at a position somewhere
within Record 2.
We already have a wal reader which is below that position so we wait to
catch up.
We read some wal in Read 1 (all of Record 1 and some of Record 2). The
new shard doesn't
need Record 1 (it has already processed it according to the starting
position), but we read
past it's starting position. When we do Read 2, we decode Record 2 and
ship it off to the shard,
but the starting position of Read 2 is greater than the starting
position the shard requested.
This looks like a gap.


![image](https://github.com/user-attachments/assets/8aed292e-5d62-46a3-9b01-fbf9dc25efe0)

To make it work, we extend the protocol to send an empty
`InterpretedWalRecords` to shards
if the WAL the records originated from ends the requested start
position. On the pageserver,
that just updates the tracking LSNs in memory (no-op really). This gives
us a workaround for
the fake gap.

As a drive by, make `InterpretedWalRecords::next_record_lsn` mandatory
in the application level definition.
It's always included.

Related: neondatabase/cloud#23935
  • Loading branch information
VladLazar authored Feb 20, 2025
1 parent d571553 commit 3499641
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 30 deletions.
3 changes: 3 additions & 0 deletions libs/pageserver_api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ pub struct ConfigToml {
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
pub enable_read_path_debugging: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub validate_wal_contiguity: Option<bool>,
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -521,6 +523,7 @@ impl Default for ConfigToml {
} else {
None
},
validate_wal_contiguity: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions libs/wal_decoder/proto/interpreted_wal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package interpreted_wal;
message InterpretedWalRecords {
repeated InterpretedWalRecord records = 1;
optional uint64 next_record_lsn = 2;
optional uint64 raw_wal_start_lsn = 3;
}

message InterpretedWalRecord {
Expand Down
6 changes: 5 additions & 1 deletion libs/wal_decoder/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ pub struct InterpretedWalRecords {
pub records: Vec<InterpretedWalRecord>,
// Start LSN of the next record after the batch.
// Note that said record may not belong to the current shard.
pub next_record_lsn: Option<Lsn>,
pub next_record_lsn: Lsn,
// Inclusive start LSN of the PG WAL from which the interpreted
// WAL records were extracted. Note that this is not necessarily the
// start LSN of the first interpreted record in the batch.
pub raw_wal_start_lsn: Option<Lsn>,
}

/// An interpreted Postgres WAL record, ready to be handled by the pageserver
Expand Down
9 changes: 7 additions & 2 deletions libs/wal_decoder/src/wire_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
.collect::<Result<Vec<_>, _>>()?;
Ok(proto::InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(|l| l.0),
next_record_lsn: Some(value.next_record_lsn.0),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
})
}
}
Expand Down Expand Up @@ -254,7 +255,11 @@ impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {

Ok(InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(Lsn::from),
next_record_lsn: value
.next_record_lsn
.map(Lsn::from)
.expect("Always provided"),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");

Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ pub struct PageServerConf {
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
/// files read.
pub enable_read_path_debugging: bool,

/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
/// safekeepers does not have gaps.
pub validate_wal_contiguity: bool,
}

/// Token for authentication to safekeepers
Expand Down Expand Up @@ -360,6 +364,7 @@ impl PageServerConf {
page_service_pipelining,
get_vectored_concurrent_io,
enable_read_path_debugging,
validate_wal_contiguity,
} = config_toml;

let mut conf = PageServerConf {
Expand Down Expand Up @@ -446,6 +451,7 @@ impl PageServerConf {
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
no_sync: no_sync.unwrap_or(false),
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
};

// ------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2874,6 +2874,7 @@ impl Timeline {
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
ingest_batch_size: self.conf.ingest_batch_size,
validate_wal_contiguity: self.conf.validate_wal_contiguity,
},
broker_client,
ctx,
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline/walreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct WalReceiverConf {
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
pub ingest_batch_size: u64,
pub validate_wal_contiguity: bool,
}

pub struct WalReceiver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ impl ConnectionManagerState {
let connect_timeout = self.conf.wal_connect_timeout;
let ingest_batch_size = self.conf.ingest_batch_size;
let protocol = self.conf.protocol;
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
Expand All @@ -558,6 +559,7 @@ impl ConnectionManagerState {
ctx,
node_id,
ingest_batch_size,
validate_wal_contiguity,
)
.await;

Expand Down Expand Up @@ -1563,6 +1565,7 @@ mod tests {
auth_token: None,
availability_zone: None,
ingest_batch_size: 1,
validate_wal_contiguity: false,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub(super) async fn handle_walreceiver_connection(
ctx: RequestContext,
safekeeper_node: NodeId,
ingest_batch_size: u64,
validate_wal_contiguity: bool,
) -> Result<(), WalReceiverError> {
debug_assert_current_span_has_tenant_and_timeline_id();

Expand Down Expand Up @@ -274,6 +275,7 @@ pub(super) async fn handle_walreceiver_connection(
} => Some((format, compression)),
};

let mut expected_wal_start = startpoint;
while let Some(replication_message) = {
select! {
_ = cancellation.cancelled() => {
Expand Down Expand Up @@ -340,13 +342,49 @@ pub(super) async fn handle_walreceiver_connection(
)
})?;

// Guard against WAL gaps. If the start LSN of the PG WAL section
// from which the interpreted records were extracted, doesn't match
// the end of the previous batch (or the starting point for the first batch),
// then kill this WAL receiver connection and start a new one.
if validate_wal_contiguity {
if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{}, {})",
expected_wal_start, raw_wal_start_lsn
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
std::cmp::Ordering::Less => {
// Other shards are reading WAL behind us.
// This is valid, but check that we received records
// that we haven't seen before.
if let Some(first_rec) = batch.records.first() {
if first_rec.next_record_lsn < last_rec_lsn {
let msg = format!(
"Received record with next_record_lsn multiple times ({} < {})",
first_rec.next_record_lsn, expected_wal_start
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
}
}
std::cmp::Ordering::Equal => {}
}
}
}

let InterpretedWalRecords {
records,
next_record_lsn,
raw_wal_start_lsn: _,
} = batch;

tracing::debug!(
"Received WAL up to {} with next_record_lsn={:?}",
"Received WAL up to {} with next_record_lsn={}",
streaming_lsn,
next_record_lsn
);
Expand Down Expand Up @@ -423,12 +461,11 @@ pub(super) async fn handle_walreceiver_connection(
// need to advance last record LSN on all shards. If we've not ingested the latest
// record, then set the LSN of the modification past it. This way all shards
// advance their last record LSN at the same time.
let needs_last_record_lsn_advance = match next_record_lsn {
Some(lsn) if lsn > modification.get_lsn() => {
modification.set_lsn(lsn).unwrap();
true
}
_ => false,
let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() {
modification.set_lsn(next_record_lsn).unwrap();
true
} else {
false
};

if uncommitted_records > 0 || needs_last_record_lsn_advance {
Expand All @@ -446,9 +483,8 @@ pub(super) async fn handle_walreceiver_connection(
timeline.get_last_record_lsn()
);

if let Some(lsn) = next_record_lsn {
last_rec_lsn = lsn;
}
last_rec_lsn = next_record_lsn;
expected_wal_start = streaming_lsn;

Some(streaming_lsn)
}
Expand Down
51 changes: 40 additions & 11 deletions safekeeper/src/send_interpreted_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,18 @@ impl InterpretedWalReader {

let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);

// Tracks the start of the PG WAL LSN from which the current batch of
// interpreted records originated.
let mut current_batch_wal_start_lsn: Option<Lsn> = None;

loop {
tokio::select! {
// Main branch for reading WAL and forwarding it
wal_or_reset = self.wal_stream.next() => {
let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
let WalBytes {
wal,
wal_start_lsn: _,
wal_start_lsn,
wal_end_lsn,
available_wal_end_lsn,
} = match wal {
Expand All @@ -315,6 +319,12 @@ impl InterpretedWalReader {
}
};

// We will already have a value if the previous chunks of WAL
// did not decode into anything useful.
if current_batch_wal_start_lsn.is_none() {
current_batch_wal_start_lsn = Some(wal_start_lsn);
}

wal_decoder.feed_bytes(&wal);

// Deserialize and interpret WAL records from this batch of WAL.
Expand Down Expand Up @@ -363,7 +373,9 @@ impl InterpretedWalReader {

let max_next_record_lsn = match max_next_record_lsn {
Some(lsn) => lsn,
None => { continue; }
None => {
continue;
}
};

// Update the current position such that new receivers can decide
Expand All @@ -377,21 +389,38 @@ impl InterpretedWalReader {
}
}

let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap();

// Send interpreted records downstream. Anything that has already been seen
// by a shard is filtered out.
let mut shard_senders_to_remove = Vec::new();
for (shard, states) in &mut self.shard_senders {
for state in states {
if max_next_record_lsn <= state.next_record_lsn {
continue;
}

let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();

let batch = InterpretedWalRecords {
records,
next_record_lsn: Some(max_next_record_lsn),
let batch = if max_next_record_lsn > state.next_record_lsn {
// This batch contains at least one record that this shard has not
// seen yet.
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();

InterpretedWalRecords {
records,
next_record_lsn: max_next_record_lsn,
raw_wal_start_lsn: Some(batch_wal_start_lsn),
}
} else if wal_end_lsn > state.next_record_lsn {
// All the records in this batch were seen by the shard
// However, the batch maps to a chunk of WAL that the
// shard has not yet seen. Notify it of the start LSN
// of the PG WAL chunk such that it doesn't look like a gap.
InterpretedWalRecords {
records: Vec::default(),
next_record_lsn: state.next_record_lsn,
raw_wal_start_lsn: Some(batch_wal_start_lsn),
}
} else {
// The shard has seen this chunk of WAL before. Skip it.
continue;
};

let res = state.tx.send(Batch {
Expand All @@ -403,7 +432,7 @@ impl InterpretedWalReader {
if res.is_err() {
shard_senders_to_remove.push(shard_sender_id);
} else {
state.next_record_lsn = max_next_record_lsn;
state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn);
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,15 +1167,15 @@ def __init__(self, config: NeonEnvBuilder):
"max_batch_size": 32,
}

# Concurrent IO (https://github.com/neondatabase/neon/issues/9378):
# enable concurrent IO by default in tests and benchmarks.
# Compat tests are exempt because old versions fail to parse the new config.
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
if config.test_may_use_compatibility_snapshot_binaries:
log.info(
"Forcing use of binary-built-in default to avoid forward-compatibility related test failures"
"Skipping WAL contiguity validation to avoid forward-compatibility related test failures"
)
get_vectored_concurrent_io = None
else:
# Look for gaps in WAL received from safekeepeers
ps_cfg["validate_wal_contiguity"] = True

get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
if get_vectored_concurrent_io is not None:
ps_cfg["get_vectored_concurrent_io"] = {
"mode": self.pageserver_get_vectored_concurrent_io,
Expand Down

1 comment on commit 3499641

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7730 tests run: 7345 passed, 1 failed, 384 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_branch_creation_many[release-pg16-github-actions-selfhosted-random-1024]"

Code coverage* (full report)

  • functions: 32.9% (8618 of 26226 functions)
  • lines: 48.8% (72765 of 149069 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
3499641 at 2025-02-20T20:29:24.245Z :recycle:

Please sign in to comment.