Skip to content

Commit

Permalink
storcon: sk heartbeat fixes (#10891)
Browse files Browse the repository at this point in the history
This PR does the following things:

* The initial heartbeat round blocks the storage controller from
becoming online again. If all safekeepers are unresponsive, this can
cause storage controller startup to be very slow. The original intent of
#10583 was that heartbeats don't affect normal functionality of the
storage controller. So add a short timeout to prevent it from impeding
storcon functionality.

* Fix the URL of the utilization endpoint.

* Don't send heartbeats to safekeepers which are decomissioned.

Part of #9011

context: https://neondb.slack.com/archives/C033RQ5SPDH/p1739966807592589
  • Loading branch information
arpad-m authored Feb 19, 2025
1 parent 1f9511d commit 9ba2a87
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 12 deletions.
2 changes: 1 addition & 1 deletion safekeeper/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Client {
}

pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint);
let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
Expand Down
8 changes: 7 additions & 1 deletion storage_controller/src/heartbeater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use std::{
};
use tokio_util::sync::CancellationToken;

use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization};
use pageserver_api::{
controller_api::{NodeAvailability, SkSchedulingPolicy},
models::PageserverUtilization,
};

use thiserror::Error;
use utils::{id::NodeId, logging::SecretString};
Expand Down Expand Up @@ -311,6 +314,9 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe

let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, sk) in &*safekeepers {
if sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned {
continue;
}
heartbeat_futs.push({
let jwt_token = self
.jwt_token
Expand Down
16 changes: 12 additions & 4 deletions storage_controller/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ pub struct Safekeeper {
cancel: CancellationToken,
listen_http_addr: String,
listen_http_port: u16,
scheduling_policy: SkSchedulingPolicy,
id: NodeId,
availability: SafekeeperState,
}

impl Safekeeper {
pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
let scheduling_policy = SkSchedulingPolicy::from_str(&skp.scheduling_policy).unwrap();
Self {
cancel,
listen_http_addr: skp.host.clone(),
listen_http_port: skp.http_port as u16,
id: NodeId(skp.id as u64),
skp,
availability: SafekeeperState::Offline,
scheduling_policy,
}
}
pub(crate) fn base_url(&self) -> String {
Expand All @@ -46,6 +49,13 @@ impl Safekeeper {
pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
self.availability = availability;
}
pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy {
self.scheduling_policy
}
pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
self.scheduling_policy = scheduling_policy;
self.skp.scheduling_policy = String::from(scheduling_policy);
}
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
pub(crate) async fn with_client_retries<T, O, F>(
&self,
Expand Down Expand Up @@ -129,10 +139,8 @@ impl Safekeeper {
self.id.0
);
}
self.skp = crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(),
);
self.skp =
crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
self.listen_http_port = http_port as u16;
self.listen_http_addr = host;
}
Expand Down
8 changes: 5 additions & 3 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,9 @@ impl Service {
.heartbeater_ps
.heartbeat(Arc::new(nodes_to_heartbeat))
.await;
let res_sk = self.heartbeater_sk.heartbeat(all_sks).await;
// Put a small, but reasonable timeout to get the initial heartbeats of the safekeepers to avoid a storage controller downtime
const SK_TIMEOUT: Duration = Duration::from_secs(5);
let res_sk = tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks)).await;

let mut online_nodes = HashMap::new();
if let Ok(deltas) = res_ps {
Expand All @@ -837,7 +839,7 @@ impl Service {
}

let mut online_sks = HashMap::new();
if let Ok(deltas) = res_sk {
if let Ok(Ok(deltas)) = res_sk {
for (node_id, status) in deltas.0 {
match status {
SafekeeperState::Available {
Expand Down Expand Up @@ -7960,7 +7962,7 @@ impl Service {
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.skp.scheduling_policy = String::from(scheduling_policy);
sk.set_scheduling_policy(scheduling_policy);

locked.safekeepers = Arc::new(safekeepers);
}
Expand Down
14 changes: 11 additions & 3 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3238,12 +3238,17 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Pause"
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
target.safekeeper_scheduling_policy(inserted["id"], "Active")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Decomissioned"
assert newest_info["scheduling_policy"] == "Active"
# Ensure idempotency
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
target.safekeeper_scheduling_policy(inserted["id"], "Active")
newest_info = target.get_safekeeper(inserted["id"])
assert newest_info
assert newest_info["scheduling_policy"] == "Active"
# change back to paused again
target.safekeeper_scheduling_policy(inserted["id"], "Pause")

def storcon_heartbeat():
assert env.storage_controller.log_contains(
Expand All @@ -3252,6 +3257,9 @@ def storcon_heartbeat():

wait_until(storcon_heartbeat)

# Now decomission it
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")


def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]
Expand Down

1 comment on commit 9ba2a87

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7697 tests run: 7314 passed, 0 failed, 383 skipped (full report)


Flaky tests (2)

Postgres 15

Postgres 14

Code coverage* (full report)

  • functions: 32.9% (8622 of 26200 functions)
  • lines: 48.8% (72727 of 148881 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
9ba2a87 at 2025-02-19T19:37:53.528Z :recycle:

Please sign in to comment.