From d49ae4bea04978eb5dfcde2cb7a8188e6e42a4b3 Mon Sep 17 00:00:00 2001 From: Julia Yakovlev Date: Wed, 12 Feb 2025 12:22:56 +0200 Subject: [PATCH] test: introduce new tests for balanced query routing The new tests that DefaultPolicy was as expected for LWT and regular queries for tablets and vnodes. The different configurations will be tested here. For current stage were added 3 tests for next configurations: - tablets ON; token_awere policy; node preferences None; permit falover ON - tablets ON; token_awere policy; node preferences None; permit falover OFF - tablets ON; token_awere policy; node preferences DC; permit falover ON More tests will be added later --- .../tests/ccm_integration/default_policy.rs | 257 ++++++++++++++++++ scylla/tests/ccm_integration/main.rs | 2 +- 2 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 scylla/tests/ccm_integration/default_policy.rs diff --git a/scylla/tests/ccm_integration/default_policy.rs b/scylla/tests/ccm_integration/default_policy.rs new file mode 100644 index 000000000..2fd0b14c4 --- /dev/null +++ b/scylla/tests/ccm_integration/default_policy.rs @@ -0,0 +1,257 @@ +use crate::common::utils::{ + create_new_session_builder, scylla_supports_tablets, setup_tracing, unique_keyspace_name, PerformDDL +}; +use scylla::client::execution_profile::ExecutionProfile; +use scylla::policies::load_balancing::{DefaultPolicy, LatencyAwarenessBuilder, LoadBalancingPolicy}; +use scylla::policies::retry::FallthroughRetryPolicy; +use scylla::client::session::Session; +use scylla::query::Query; +use scylla::observability::history::{HistoryCollector, StructuredHistory,}; +use std::sync::Arc; +use tokio::sync::Mutex; +use crate::ccm::cluster::{Cluster, ClusterOptions}; +use crate::ccm::{run_ccm_test, CLUSTER_VERSION}; + +fn get_queried_node(mut history: StructuredHistory) -> String { + let mut node_addr: String = "".to_string(); + for query in &mut history.requests { + for fiber in std::iter::once(&mut query.non_speculative_fiber) + .chain(query.speculative_fibers.iter_mut()) + { + for attempt in &mut fiber.attempts { + println!("Attempt node_addr: {}", attempt.node_addr); + node_addr = attempt.node_addr.to_string(); + } + } + } + + node_addr +} + +fn cluster_3_node() -> ClusterOptions { + ClusterOptions { + name: "cluster_3_node".to_string(), + version: CLUSTER_VERSION.clone(), + nodes: vec![3], + ..ClusterOptions::default() + } +} +// run_ccm_test(cluster_1_node, test).await; +async fn test_query_routed_according_to_policy(cluster: Arc>, policy: Arc, tablets_enabled: bool, _running_nodes_amount: i32) { + // Test that DefaultPolicy was as expected for LWT and regular queries for tablets and vnodes + // policy: DefaultPolicy object defined according to test + // supports_tablets: True (tablets test) / False (vnodes test) + // running_nodes_amount: cluster state - how much alive nodes (maximum is 3) + + setup_tracing(); + + // This is just to increase the likelihood that only intended prepared statements (which contain this mark) are captured by the proxy. + const MAGIC_MARK: i32 = 123; + let cluster = cluster.lock().await; + + let handle = ExecutionProfile::builder() + .load_balancing_policy(policy.clone()) + .retry_policy(Arc::new(FallthroughRetryPolicy)) + .build() + .into_handle(); + + let history_listener = Arc::new(HistoryCollector::new()); + + // DB preparation phase + let session: Session = cluster.make_session_builder() + .await + .default_execution_profile_handle(handle) + .build() + .await + .unwrap(); + + // *** This is part of original test "lwt_optimisation.rs", uses proxy. Check may be need be implemented with CCM + // let (supported_frame, _shard) = supported_rx.recv().await.unwrap(); + // let supported_options = types::read_string_multimap(&mut &*supported_frame.body).unwrap(); + // let supported_features = ProtocolFeatures::parse_from_supported(&supported_options); + + // This will branch our test for cases both when cluster supports the optimisations and when it does not. + // let supports_optimisation_mark = supported_features.lwt_optimization_meta_bit_mask.is_some(); + // *** END - This is part of original test "lwt_optimisation.rs", uses proxy. Check may be need be implemented with CCM + + // Create schema + let ks = unique_keyspace_name(); + + // Enable or disable tablets for the keyspace + let mut tablets_clause = ""; + if scylla_supports_tablets(&session).await{ + if tablets_enabled{ + tablets_clause = " AND tablets = {'enabled': true}"; + } else{ + tablets_clause = " AND tablets = {'enabled': false}"; + } + } + + let create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}{}", ks, tablets_clause); + + session.ddl(create_ks).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + + session + .ddl("CREATE TABLE t (a int primary key, b int)") + .await + .unwrap(); + + let mut non_lwt_query: Query = Query::new("INSERT INTO t (a, b) VALUES (?, 1)"); + let mut lwt_query: Query = Query::new("UPDATE t SET b=3 WHERE a=? IF b=2"); + non_lwt_query.set_history_listener(history_listener.clone()); + lwt_query.set_history_listener(history_listener.clone()); + + // We will check which nodes were queried, for both LWT and non-LWT prepared statements. + for _ in 0..15 { + let _ignore_non_lwt_error = session.query_unpaged(non_lwt_query.clone(), (MAGIC_MARK,)).await; + let non_lwt_query_history: StructuredHistory = history_listener.clone_structured_history(); + // Query history like: + // === Request #0 === + // | start_time: 2025-02-13 12:49:15.035087534 UTC + // | Non-speculative attempts: + // | - Attempt #0 sent to 127.0.2.3:9042 + // | request send time: 2025-02-13 12:49:15.035286454 UTC + // | Success at 2025-02-13 12:49:15.036940277 UTC + // | + // | Request successful at 2025-02-13 12:49:15.036962012 UTC + + println!("Non LWT query history: {}", non_lwt_query_history); + let queried_node_port = get_queried_node(non_lwt_query_history); + let queried_node = queried_node_port.split(":").next().unwrap(); + println!("non-LWT queried_node: {:?}", queried_node); + // TODO: get expected node IP + let expected_result = "127.0.0.1"; + assert_eq!( + expected_result, + queried_node + ); + } + + for _ in 0..15 { + let _ignore_lwt_error = session.query_unpaged(lwt_query.clone(), (MAGIC_MARK,)).await; + let lwt_query_history: StructuredHistory = history_listener.clone_structured_history(); + println!("LWT query history: {}", lwt_query_history); + let queried_node_port = get_queried_node(lwt_query_history); + let queried_node = queried_node_port.split(":").next().unwrap(); + println!("LWT queried_node: {:?}", queried_node); + // TODO: get expected node IP + let expected_result = "127.0.0.1"; + assert_eq!( + expected_result, + queried_node + ); + } + + // *** This is part of original test "lwt_optimisation.rs", uses proxy. Check may be need be implemented with CCM + // if supports_optimisation_mark { + // // We make sure that the driver properly marked prepared statements wrt being LWT. + // assert!(!prepared_non_lwt.is_confirmed_lwt()); + // assert!(prepared_lwt.is_confirmed_lwt()); + // } + + // assert!(prepared_non_lwt.is_token_aware()); + // assert!(prepared_lwt.is_token_aware()); + + // We execute non-LWT statements and ensure that multiple nodes were queried. + // + // Note that our DefaultPolicy no longer performs round robin, but instead randomly picks a replica. + // To see multiple replicas here, we cannot choose a fixed pick seed, so we must rely on randomness. + // It happened several times in CI that *not all* replicas were queried, but now we only + // assert that *more than one* replica is queried. Moreover, we increased iterations + // from 15 to 30 in hope this will suffice to prevent flakiness. + // Alternatively, we could give up this part of the test and only test LWT part, but then + // we couldn't be sure that in non-LWT case the driver truly chooses various replicas. + // for _ in 0..30 { + // session.execute_unpaged(&prepared_non_lwt, (MAGIC_MARK,)).await.unwrap(); + // } + + // assert_multiple_replicas_queried(&mut prepared_rxs); + + // We execute LWT statements, and... + // for _ in 0..15 { + // session.execute_unpaged(&prepared_lwt, (MAGIC_MARK,)).await.unwrap(); + // } + + // if supports_optimisation_mark { + // // ...if cluster supports LWT, we assert that one replica was always queried first (and effectively only that one was queried). + // assert_one_replica_queried(&mut prepared_rxs); + // } else { + // // ...else we assert that replicas were shuffled as in case of non-LWT. + // assert_multiple_replicas_queried(&mut prepared_rxs); + // } + + // running_proxy + // *** END - This is part of original test "lwt_optimisation.rs", uses proxy. Check may be need be implemented with CCM +} + + +#[tokio::test] +#[ntest::timeout(20000)] +#[cfg_attr(not(ccm_tests), ignore)] +async fn query_routed_optimally_with_tablets_and_policy_token_aware_on_preferences_none_permit_failover_on(){ + async fn inner_test(cluster: Arc>){ + let policy = DefaultPolicy::builder() + .token_aware(true) + .permit_dc_failover(true) + .build(); + test_query_routed_according_to_policy(cluster, policy, true, 3).await; + } + run_ccm_test(cluster_3_node, inner_test).await; +} + +#[tokio::test] +#[ntest::timeout(20000)] +#[cfg_attr(not(ccm_tests), ignore)] +async fn query_routed_optimally_with_tablets_and_policy_token_aware_on_preferences_none_permit_failover_off(){ + async fn inner_test(cluster: Arc>){ + let policy = DefaultPolicy::builder() + .token_aware(true) + .permit_dc_failover(false) + .build(); + test_query_routed_according_to_policy(cluster, policy, true, 3).await; + } + run_ccm_test(cluster_3_node, inner_test).await; +} + +#[tokio::test] +#[ntest::timeout(20000)] +#[cfg_attr(not(ccm_tests), ignore)] +async fn query_routed_optimally_with_tablets_and_policy_token_aware_on_preferences_dc_permit_failover_on(){ + async fn inner_test(cluster: Arc>){ + // TODO: + // let cluster_lock = cluster.lock().await; + // let node = cluster_lock.nodes().iter().next(); + // let datacenter_id: String = "1".to_string(); + let policy = DefaultPolicy::builder() + .prefer_datacenter("dc1".to_string()) + .token_aware(true) + .permit_dc_failover(true) + .build(); + test_query_routed_according_to_policy(cluster, policy, true, 3).await; + } + run_ccm_test(cluster_3_node, inner_test).await; + +} + +// This is a regression test for #696. +#[tokio::test] +#[ntest::timeout(2000)] +async fn latency_aware_query_completes() { + setup_tracing(); + let policy = DefaultPolicy::builder() + .latency_awareness(LatencyAwarenessBuilder::default()) + .build(); + let handle = ExecutionProfile::builder() + .load_balancing_policy(policy) + .build() + .into_handle(); + + let session = create_new_session_builder() + .default_execution_profile_handle(handle) + .build() + .await + .unwrap(); + + session.query_unpaged("whatever", ()).await.unwrap_err(); +} \ No newline at end of file diff --git a/scylla/tests/ccm_integration/main.rs b/scylla/tests/ccm_integration/main.rs index 3f9373b47..c27c24b38 100644 --- a/scylla/tests/ccm_integration/main.rs +++ b/scylla/tests/ccm_integration/main.rs @@ -1,8 +1,8 @@ #[path = "../common/mod.rs"] mod common; - mod authenticate; pub(crate) mod ccm; mod test_example; #[cfg(feature = "ssl")] mod tls; +mod default_policy;