Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: guard against WAL gaps in the interpreted protocol #10858

Merged
merged 12 commits into from
Feb 20, 2025
3 changes: 3 additions & 0 deletions libs/pageserver_api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ pub struct ConfigToml {
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
pub enable_read_path_debugging: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub validate_wal_contiguity: Option<bool>,
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -521,6 +523,7 @@ impl Default for ConfigToml {
} else {
None
},
validate_wal_contiguity: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions libs/wal_decoder/proto/interpreted_wal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package interpreted_wal;
message InterpretedWalRecords {
repeated InterpretedWalRecord records = 1;
optional uint64 next_record_lsn = 2;
optional uint64 raw_wal_start_lsn = 3;
}

message InterpretedWalRecord {
Expand Down
6 changes: 5 additions & 1 deletion libs/wal_decoder/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ pub struct InterpretedWalRecords {
pub records: Vec<InterpretedWalRecord>,
// Start LSN of the next record after the batch.
// Note that said record may not belong to the current shard.
pub next_record_lsn: Option<Lsn>,
pub next_record_lsn: Lsn,
// Inclusive start LSN of the PG WAL from which this the interpreted
// WAL records were extracted. Note that this is not necessarily the
// start LSN of the first interpreted record in the batch.
pub raw_wal_start_lsn: Option<Lsn>,
}

/// An interpreted Postgres WAL record, ready to be handled by the pageserver
Expand Down
9 changes: 7 additions & 2 deletions libs/wal_decoder/src/wire_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
.collect::<Result<Vec<_>, _>>()?;
Ok(proto::InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(|l| l.0),
next_record_lsn: Some(value.next_record_lsn.0),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
})
}
}
Expand Down Expand Up @@ -254,7 +255,11 @@ impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {

Ok(InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(Lsn::from),
next_record_lsn: value
.next_record_lsn
.map(Lsn::from)
.expect("Always provided"),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");

Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ pub struct PageServerConf {
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
/// files read.
pub enable_read_path_debugging: bool,

/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
/// safekeepers does not have gaps.
pub validate_wal_contiguity: bool,
}

/// Token for authentication to safekeepers
Expand Down Expand Up @@ -360,6 +364,7 @@ impl PageServerConf {
page_service_pipelining,
get_vectored_concurrent_io,
enable_read_path_debugging,
validate_wal_contiguity,
} = config_toml;

let mut conf = PageServerConf {
Expand Down Expand Up @@ -446,6 +451,7 @@ impl PageServerConf {
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
no_sync: no_sync.unwrap_or(false),
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
};

// ------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2873,6 +2873,7 @@ impl Timeline {
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
ingest_batch_size: self.conf.ingest_batch_size,
validate_wal_contiguity: self.conf.validate_wal_contiguity,
},
broker_client,
ctx,
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline/walreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct WalReceiverConf {
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
pub ingest_batch_size: u64,
pub validate_wal_contiguity: bool,
}

pub struct WalReceiver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ impl ConnectionManagerState {
let connect_timeout = self.conf.wal_connect_timeout;
let ingest_batch_size = self.conf.ingest_batch_size;
let protocol = self.conf.protocol;
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
Expand All @@ -558,6 +559,7 @@ impl ConnectionManagerState {
ctx,
node_id,
ingest_batch_size,
validate_wal_contiguity,
)
.await;

Expand Down Expand Up @@ -1563,6 +1565,7 @@ mod tests {
auth_token: None,
availability_zone: None,
ingest_batch_size: 1,
validate_wal_contiguity: false,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub(super) async fn handle_walreceiver_connection(
ctx: RequestContext,
safekeeper_node: NodeId,
ingest_batch_size: u64,
validate_wal_contiguity: bool,
) -> Result<(), WalReceiverError> {
debug_assert_current_span_has_tenant_and_timeline_id();

Expand Down Expand Up @@ -274,6 +275,7 @@ pub(super) async fn handle_walreceiver_connection(
} => Some((format, compression)),
};

let mut expected_wal_start = startpoint;
while let Some(replication_message) = {
select! {
_ = cancellation.cancelled() => {
Expand Down Expand Up @@ -340,13 +342,49 @@ pub(super) async fn handle_walreceiver_connection(
)
})?;

// Guard against WAL gaps. If the start LSN of the PG WAL section
// from which the interpreted records were extracted, doesn't match
// the end of the previous batch (or the starting point for the first batch),
// then kill this WAL receiver connection and start a new one.
if validate_wal_contiguity {
if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{}, {})",
expected_wal_start, raw_wal_start_lsn
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
std::cmp::Ordering::Less => {
// Other shards are reading WAL behind us.
// This is valid, but check that we received records
// that we haven't seen before.
if let Some(first_rec) = batch.records.first() {
if first_rec.next_record_lsn < last_rec_lsn {
let msg = format!(
"Received record with next_record_lsn multiple times ({} < {})",
first_rec.next_record_lsn, expected_wal_start
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
}
}
std::cmp::Ordering::Equal => {}
}
}
}

let InterpretedWalRecords {
records,
next_record_lsn,
raw_wal_start_lsn: _,
} = batch;

tracing::debug!(
"Received WAL up to {} with next_record_lsn={:?}",
"Received WAL up to {} with next_record_lsn={}",
streaming_lsn,
next_record_lsn
);
Expand Down Expand Up @@ -423,12 +461,11 @@ pub(super) async fn handle_walreceiver_connection(
// need to advance last record LSN on all shards. If we've not ingested the latest
// record, then set the LSN of the modification past it. This way all shards
// advance their last record LSN at the same time.
let needs_last_record_lsn_advance = match next_record_lsn {
Some(lsn) if lsn > modification.get_lsn() => {
modification.set_lsn(lsn).unwrap();
true
}
_ => false,
let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() {
modification.set_lsn(next_record_lsn).unwrap();
true
} else {
false
};

if uncommitted_records > 0 || needs_last_record_lsn_advance {
Expand All @@ -446,9 +483,8 @@ pub(super) async fn handle_walreceiver_connection(
timeline.get_last_record_lsn()
);

if let Some(lsn) = next_record_lsn {
last_rec_lsn = lsn;
}
last_rec_lsn = next_record_lsn;
expected_wal_start = streaming_lsn;

Some(streaming_lsn)
}
Expand Down
51 changes: 40 additions & 11 deletions safekeeper/src/send_interpreted_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,18 @@ impl InterpretedWalReader {

let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);

// Tracks the start of the PG WAL LSN from which the current batch of
// interpreted records originated.
let mut current_batch_wal_start_lsn: Option<Lsn> = None;

loop {
tokio::select! {
// Main branch for reading WAL and forwarding it
wal_or_reset = self.wal_stream.next() => {
let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
let WalBytes {
wal,
wal_start_lsn: _,
wal_start_lsn,
wal_end_lsn,
available_wal_end_lsn,
} = match wal {
Expand All @@ -315,6 +319,12 @@ impl InterpretedWalReader {
}
};

// We will already have a value if the previous chunks of WAL
// did not decode into anything useful.
if current_batch_wal_start_lsn.is_none() {
current_batch_wal_start_lsn = Some(wal_start_lsn);
}

wal_decoder.feed_bytes(&wal);

// Deserialize and interpret WAL records from this batch of WAL.
Expand Down Expand Up @@ -363,7 +373,9 @@ impl InterpretedWalReader {

let max_next_record_lsn = match max_next_record_lsn {
Some(lsn) => lsn,
None => { continue; }
None => {
continue;
}
};

// Update the current position such that new receivers can decide
Expand All @@ -377,21 +389,38 @@ impl InterpretedWalReader {
}
}

let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap();

// Send interpreted records downstream. Anything that has already been seen
// by a shard is filtered out.
let mut shard_senders_to_remove = Vec::new();
for (shard, states) in &mut self.shard_senders {
for state in states {
if max_next_record_lsn <= state.next_record_lsn {
continue;
}

let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();

let batch = InterpretedWalRecords {
records,
next_record_lsn: Some(max_next_record_lsn),
let batch = if max_next_record_lsn > state.next_record_lsn {
// This batch contains at least one record that this shard has not
// seen yet.
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();

InterpretedWalRecords {
records,
next_record_lsn: max_next_record_lsn,
raw_wal_start_lsn: Some(batch_wal_start_lsn),
}
} else if wal_end_lsn > state.next_record_lsn {
// All the records in this batch were seen by the shard
// However, the batch maps to a chunk of WAL that the
// shard has not yet seen. Notify if of the start LSN
// of the PG WAL chunk such that it doesn't look like a gap.
InterpretedWalRecords {
records: Vec::default(),
next_record_lsn: state.next_record_lsn,
raw_wal_start_lsn: Some(batch_wal_start_lsn),
}
} else {
// The shard has seen this chunk of WAL before. Skip it.
continue;
};

let res = state.tx.send(Batch {
Expand All @@ -403,7 +432,7 @@ impl InterpretedWalReader {
if res.is_err() {
shard_senders_to_remove.push(shard_sender_id);
} else {
state.next_record_lsn = max_next_record_lsn;
state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,8 @@ def __init__(self, config: NeonEnvBuilder):
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
"no_sync": True,
# Look for gaps in WAL received from safekeepeers
"validate_wal_contiguity": True,
}

# Batching (https://github.com/neondatabase/neon/issues/9377):
Expand Down
Loading