Skip to content

Commit

Permalink
pageserver: hide validation behind a flag
Browse files Browse the repository at this point in the history
  • Loading branch information
VladLazar committed Feb 20, 2025
1 parent b42fac6 commit c612bd3
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 23 deletions.
2 changes: 2 additions & 0 deletions libs/pageserver_api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ pub struct ConfigToml {
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
pub enable_read_path_debugging: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub validate_wal_contiguity: Option<bool>,
}

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");

Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ pub struct PageServerConf {
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
/// files read.
pub enable_read_path_debugging: bool,

/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
/// safekeepers does not have gaps.
pub validate_wal_contiguity: bool,
}

/// Token for authentication to safekeepers
Expand Down Expand Up @@ -360,6 +364,7 @@ impl PageServerConf {
page_service_pipelining,
get_vectored_concurrent_io,
enable_read_path_debugging,
validate_wal_contiguity,
} = config_toml;

let mut conf = PageServerConf {
Expand Down Expand Up @@ -446,6 +451,7 @@ impl PageServerConf {
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
no_sync: no_sync.unwrap_or(false),
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
};

// ------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2861,6 +2861,7 @@ impl Timeline {
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
ingest_batch_size: self.conf.ingest_batch_size,
validate_wal_contiguity: self.conf.validate_wal_contiguity,
},
broker_client,
ctx,
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline/walreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct WalReceiverConf {
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
pub ingest_batch_size: u64,
pub validate_wal_contiguity: bool,
}

pub struct WalReceiver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ impl ConnectionManagerState {
ctx,
node_id,
ingest_batch_size,
self.conf.validate_wal_contiguity,
)
.await;

Expand Down Expand Up @@ -1563,6 +1564,7 @@ mod tests {
auth_token: None,
availability_zone: None,
ingest_batch_size: 1,
validate_wal_contiguity: false,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub(super) async fn handle_walreceiver_connection(
ctx: RequestContext,
safekeeper_node: NodeId,
ingest_batch_size: u64,
validate_wal_contiguity: bool,
) -> Result<(), WalReceiverError> {
debug_assert_current_span_has_tenant_and_timeline_id();

Expand Down Expand Up @@ -345,32 +346,34 @@ pub(super) async fn handle_walreceiver_connection(
// from which the interpreted records were extracted, doesn't match
// the end of the previous batch (or the starting point for the first batch),
// then kill this WAL receiver connection and start a new one.
if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{}, {})",
expected_wal_start, raw_wal_start_lsn
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
std::cmp::Ordering::Less => {
// Other shards are reading WAL behind us.
// This is valid, but check that we received records
// that we haven't seen before.
if let Some(first_rec) = batch.records.first() {
if first_rec.next_record_lsn < last_rec_lsn {
let msg = format!(
"Received record with next_record_lsn multiple times ({} < {})",
first_rec.next_record_lsn, expected_wal_start
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
if validate_wal_contiguity {
if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{}, {})",
expected_wal_start, raw_wal_start_lsn
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
std::cmp::Ordering::Less => {
// Other shards are reading WAL behind us.
// This is valid, but check that we received records
// that we haven't seen before.
if let Some(first_rec) = batch.records.first() {
if first_rec.next_record_lsn < last_rec_lsn {
let msg = format!(
"Received record with next_record_lsn multiple times ({} < {})",
first_rec.next_record_lsn, expected_wal_start
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
}
}
std::cmp::Ordering::Equal => {}
}
std::cmp::Ordering::Equal => {}
}
}

Expand Down
2 changes: 2 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,8 @@ def __init__(self, config: NeonEnvBuilder):
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
"no_sync": True,
# Look for gaps in WAL received from safekeepeers
"validate_wal_contiguity": True,
}

# Batching (https://github.com/neondatabase/neon/issues/9377):
Expand Down

0 comments on commit c612bd3

Please sign in to comment.