Skip to content

Commit

Permalink
Merge pull request #455 from piodul/check-connection-on-session-create
Browse files Browse the repository at this point in the history
session: fail creating session when control connection cannot be established

In #440, a mechanism was added which, when a session is created and metadata fails to be fetched, uses dummy metadata in place of a real one. It was supposed to protect from situations where a faulty metadata parsing logic causes the whole session to fail; it's better to continue with incorrect metadata than lose availability.

However, metadata fetch was also used as an implicit check of whether the driver can connect to the cluster at all, as metadata fetch requires a live control connection. Unfortunately, the fallback added in #440 does not distinguish this case. When a user provides wrong contact points/tls configuration/password/etc., the session will be created successfully but users will only learn that it doesn't work after trying to send requests.

This PR improves the fallback logic. Now, session will fail to be created if control connection fails to be established, but it will continue with dummy metadata if the first fetch fails. Moreover, the error returned when the session is created contains information about why establishing the control connection failed.

Fixes: #443
  • Loading branch information
psarna authored Jun 8, 2022
2 parents 4470161 + fb9b5de commit 18bf4ca
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 44 deletions.
16 changes: 2 additions & 14 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,7 @@ impl Cluster {
fetch_schema_metadata,
);

let metadata = match metadata_reader.read_metadata().await {
Ok(data) => data,
Err(err) => {
warn!(
error = ?err,
"Initial metadata read failed, proceeding with metadata \
consisting only of the initial peer list and dummy tokens. \
This might result in suboptimal performance and schema \
information not being available."
);
Metadata::new_dummy(initial_peers)
}
};
let metadata = metadata_reader.read_metadata(true).await?;
let cluster_data = ClusterData::new(metadata, &pool_config, &HashMap::new(), &None);
cluster_data.wait_until_all_pools_are_initialized().await;
let cluster_data: Arc<ArcSwap<ClusterData>> =
Expand Down Expand Up @@ -465,7 +453,7 @@ impl ClusterWorker {

async fn perform_refresh(&mut self) -> Result<(), QueryError> {
// Read latest Metadata
let metadata = self.metadata_reader.read_metadata().await?;
let metadata = self.metadata_reader.read_metadata(false).await?;
let cluster_data: Arc<ClusterData> = self.cluster_data.load_full();

let new_cluster_data = Arc::new(ClusterData::new(
Expand Down
57 changes: 39 additions & 18 deletions scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ enum MaybePoolConnections {
Initializing,

// The pool is empty because either initial filling failed or all connections
// became broken; will be asynchronously refilled
Broken,
// became broken; will be asynchronously refilled. Contains an error
// from the last connection attempt.
Broken(QueryError),

// The pool has some connections which are usable (or will be removed soon)
Ready(PoolConnections),
Expand All @@ -77,7 +78,7 @@ impl std::fmt::Debug for MaybePoolConnections {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MaybePoolConnections::Initializing => write!(f, "Initializing"),
MaybePoolConnections::Broken => write!(f, "Broken"),
MaybePoolConnections::Broken(err) => write!(f, "Broken({:?})", err),
MaybePoolConnections::Ready(conns) => write!(f, "{:?}", conns),
}
}
Expand Down Expand Up @@ -328,10 +329,21 @@ impl NodeConnectionPool {
let conns = self.conns.load_full();
match &*conns {
MaybePoolConnections::Ready(pool_connections) => Ok(f(pool_connections)),
_ => Err(QueryError::IoError(Arc::new(std::io::Error::new(
ErrorKind::Other,
format!("No connections in the pool. Pool status: {:?}", *conns),
)))),
MaybePoolConnections::Broken(err) => {
Err(QueryError::IoError(Arc::new(std::io::Error::new(
ErrorKind::Other,
format!(
"No connections in the pool; last connection failed with: {}",
err
),
))))
}
MaybePoolConnections::Initializing => {
Err(QueryError::IoError(Arc::new(std::io::Error::new(
ErrorKind::Other,
"No connections in the pool, pool is still being initialized",
))))
}
}
}
}
Expand All @@ -352,8 +364,12 @@ impl Keepaliver {
Ready(NotSharded(conns)) => conns.clone(),
Ready(Sharded { connections, .. }) => connections.iter().flatten().cloned().collect(),
Initializing => vec![],
Broken => {
debug!("Cannot send connection keepalives for node {} as there are no alive connections in the pool", self.node_address);
Broken(err) => {
debug!(
"Cannot send connection keepalives for node {} as there are \
no alive connections in the pool; last error: {}",
self.node_address, err
);
vec![]
}
}
Expand Down Expand Up @@ -579,7 +595,7 @@ impl PoolRefiller {
evt = self.connection_errors.select_next_some(), if !self.connection_errors.is_empty() => {
if let Some(conn) = evt.connection.upgrade() {
debug!("[{}] Got error for connection {:p}: {:?}", self.address, Arc::as_ptr(&conn), evt.error);
self.remove_connection(conn);
self.remove_connection(conn, evt.error);
}
}

Expand All @@ -602,9 +618,6 @@ impl PoolRefiller {

// Schedule refilling here
if !refill_scheduled && self.need_filling() {
// Update shared_conns here even if there are no connections.
// This will signal the waiters in `wait_until_initialized`.
self.update_shared_conns();
if self.had_error_since_last_refill {
self.refill_delay_strategy.on_fill_error();
} else {
Expand Down Expand Up @@ -746,6 +759,12 @@ impl PoolRefiller {
"[{}] Failed to open connection to the non-shard-aware port: {:?}",
self.address, err,
);

// If all connection attempts in this fill attempt failed
// and the pool is empty, report this error.
if !self.is_filling() && self.is_empty() {
self.update_shared_conns(Some(err));
}
}
}
Ok((connection, error_receiver)) => {
Expand Down Expand Up @@ -809,7 +828,7 @@ impl PoolRefiller {
.push(wait_for_error(Arc::downgrade(&conn), error_receiver).boxed());
self.conns[shard_id].push(conn);

self.update_shared_conns();
self.update_shared_conns(None);
} else if evt.requested_shard.is_some() {
// This indicates that some shard-aware connections
// missed the target shard (probably due to NAT).
Expand Down Expand Up @@ -922,9 +941,11 @@ impl PoolRefiller {
}

// Updates `shared_conns` based on `conns`.
fn update_shared_conns(&mut self) {
// `last_error` must not be `None` if there is a possibility of the pool
// being empty.
fn update_shared_conns(&mut self, last_error: Option<QueryError>) {
let new_conns = if !self.has_connections() {
Arc::new(MaybePoolConnections::Broken)
Arc::new(MaybePoolConnections::Broken(last_error.unwrap()))
} else {
let new_conns = if let Some(sharder) = self.sharder.as_ref() {
debug_assert_eq!(self.conns.len(), sharder.nr_shards.get() as usize);
Expand All @@ -948,7 +969,7 @@ impl PoolRefiller {

// Removes given connection from the pool. It looks both into active
// connections and excess connections.
fn remove_connection(&mut self, connection: Arc<Connection>) {
fn remove_connection(&mut self, connection: Arc<Connection>, last_error: QueryError) {
let ptr = Arc::as_ptr(&connection);

let maybe_remove_in_vec = |v: &mut Vec<Arc<Connection>>| -> bool {
Expand Down Expand Up @@ -981,7 +1002,7 @@ impl PoolRefiller {
self.conns[shard_id].len(),
self.active_connection_count(),
);
self.update_shared_conns();
self.update_shared_conns(Some(last_error));
return;
}

Expand Down
27 changes: 26 additions & 1 deletion scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ use crate::transport::topology::{CollectionType, ColumnKind, CqlType, NativeType
use crate::QueryResult;
use crate::{IntoTypedRows, Session, SessionBuilder};
use bytes::Bytes;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::net::TcpListener;
use uuid::Uuid;

static UNIQUE_COUNTER: AtomicUsize = AtomicUsize::new(0);
Expand All @@ -36,6 +37,30 @@ pub(crate) fn unique_name() -> String {
name
}

#[tokio::test]
async fn test_connection_failure() {
// Make sure that Session::create fails when the control connection
// fails to connect.

// Create a dummy server which immediately closes the connection.
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let (fut, _handle) = async move {
loop {
let _ = listener.accept().await;
}
}
.remote_handle();
tokio::spawn(fut);

let res = SessionBuilder::new().known_node_addr(addr).build().await;
match res {
Ok(_) => panic!("Unexpected success"),
Err(err) => println!("Connection error (it was expected): {:?}", err),
}
}

#[tokio::test]
async fn test_unprepared_statement() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
Expand Down
36 changes: 25 additions & 11 deletions scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ impl MetadataReader {
}

/// Fetches current metadata from the cluster
pub async fn read_metadata(&mut self) -> Result<Metadata, QueryError> {
let mut result = self.fetch_metadata().await;
pub async fn read_metadata(&mut self, initial: bool) -> Result<Metadata, QueryError> {
let mut result = self.fetch_metadata(initial).await;
if let Ok(metadata) = result {
self.update_known_peers(&metadata);
return Ok(metadata);
Expand Down Expand Up @@ -275,7 +275,7 @@ impl MetadataReader {
"Retrying to establish the control connection on {}",
self.control_connection_address
);
result = self.fetch_metadata().await;
result = self.fetch_metadata(initial).await;
}

match &result {
Expand All @@ -292,15 +292,32 @@ impl MetadataReader {
result
}

async fn fetch_metadata(&self) -> Result<Metadata, QueryError> {
async fn fetch_metadata(&self, initial: bool) -> Result<Metadata, QueryError> {
// TODO: Timeouts?
self.control_connection.wait_until_initialized().await;
let conn = &*self.control_connection.random_connection()?;

query_metadata(
&self.control_connection,
let res = query_metadata(
conn,
self.control_connection_address.port(),
self.fetch_schema,
)
.await
.await;

if initial {
if let Err(err) = res {
warn!(
error = ?err,
"Initial metadata read failed, proceeding with metadata \
consisting only of the initial peer list and dummy tokens. \
This might result in suboptimal performance and schema \
information not being available."
);
return Ok(Metadata::new_dummy(&self.known_peers));
}
}

res
}

fn update_known_peers(&mut self, metadata: &Metadata) {
Expand Down Expand Up @@ -329,13 +346,10 @@ impl MetadataReader {
}

async fn query_metadata(
pool: &NodeConnectionPool,
conn: &Connection,
connect_port: u16,
fetch_schema: bool,
) -> Result<Metadata, QueryError> {
pool.wait_until_initialized().await;
let conn: &Connection = &*pool.random_connection()?;

let peers_query = query_peers(conn, connect_port);
let keyspaces_query = query_keyspaces(conn, fetch_schema);

Expand Down

0 comments on commit 18bf4ca

Please sign in to comment.