Skip to content

Commit

Permalink
tests: move batch tests from session.rs to batch.rs module
Browse files Browse the repository at this point in the history
  • Loading branch information
dimakr committed Feb 14, 2025
1 parent 7d6dc0b commit fc9def4
Show file tree
Hide file tree
Showing 2 changed files with 283 additions and 348 deletions.
308 changes: 282 additions & 26 deletions scylla/tests/integration/batch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use crate::utils::setup_tracing;
use crate::utils::unique_keyspace_name;
use crate::utils::PerformDDL;
use crate::utils::{create_new_session_builder, scylla_supports_tablets};
use scylla::batch::Batch;
use scylla::batch::BatchType;
use scylla::client::session::Session;
Expand All @@ -9,13 +13,10 @@ use scylla::query::Query;
use scylla::value::{Counter, CqlValue, MaybeUnset};
use std::collections::HashMap;
use std::string::String;

use crate::utils::setup_tracing;
use crate::utils::unique_keyspace_name;
use crate::utils::PerformDDL;
use crate::utils::{create_new_session_builder, scylla_supports_tablets};
use std::sync::Arc;

use assert_matches::assert_matches;
use scylla::response::query_result::{QueryResult, QueryRowsResult};

const BATCH_COUNT: usize = 100;

Expand All @@ -25,6 +26,8 @@ async fn create_test_session(table_name: &str, supports_tablets: bool) -> Sessio

let mut create_ks = format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks);

// Need to disable tablets in this test because they don't support counters yet.
// (https://github.com/scylladb/scylladb/commit/c70f321c6f581357afdf3fd8b4fe8e5c5bb9736e).
if !supports_tablets && scylla_supports_tablets(&session).await {
create_ks += " AND TABLETS = {'enabled': false}"
}
Expand Down Expand Up @@ -142,6 +145,145 @@ async fn batch_statements_and_values_mismatch_detected() {
}
}

#[tokio::test]
async fn test_batch() {
setup_tracing();
let session = Arc::new(create_new_session_builder().build().await.unwrap());
let ks = unique_keyspace_name();

session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
session
.ddl(format!(
"CREATE TABLE IF NOT EXISTS {}.t_batch (a int, b int, c text, primary key (a, b))",
ks
))
.await
.unwrap();

let prepared_statement = session
.prepare(format!(
"INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)",
ks
))
.await
.unwrap();

// TODO: Add API that supports binding values to statements in batch creation process,
// to avoid problem of statements/values count mismatch
use scylla::batch::Batch;
let mut batch: Batch = Default::default();
batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (?, ?, ?)", ks)[..]);
batch.append_statement(&format!("INSERT INTO {}.t_batch (a, b, c) VALUES (7, 11, '')", ks)[..]);
batch.append_statement(prepared_statement.clone());

let four_value: i32 = 4;
let hello_value: String = String::from("hello");
let session_clone = session.clone();
// We're spawning to a separate task here to test that it works even in that case, because in some scenarios
// (specifically if the `BatchValuesIter` associated type is not dropped before await boundaries)
// the implicit auto trait propagation on batch will be such that the returned future is not Send (depending on
// some lifetime for some unknown reason), so can't be spawned on tokio.
// See https://github.com/scylladb/scylla-rust-driver/issues/599 for more details
tokio::spawn(async move {
let values = (
(1_i32, 2_i32, "abc"),
(),
(1_i32, &four_value, hello_value.as_str()),
);
session_clone.batch(&batch, values).await.unwrap();
})
.await
.unwrap();

let mut results: Vec<(i32, i32, String)> = session
.query_unpaged(format!("SELECT a, b, c FROM {}.t_batch", ks), &[])
.await
.unwrap()
.into_rows_result()
.unwrap()
.rows::<(i32, i32, String)>()
.unwrap()
.collect::<Result<_, _>>()
.unwrap();

results.sort();
assert_eq!(
results,
vec![
(1, 2, String::from("abc")),
(1, 4, String::from("hello")),
(7, 11, String::from(""))
]
);

// Test repreparing statement inside a batch
let mut batch: Batch = Default::default();
batch.append_statement(prepared_statement);
let values = ((4_i32, 20_i32, "foobar"),);

// This statement flushes the prepared statement cache
session
.ddl(format!(
"ALTER TABLE {}.t_batch WITH gc_grace_seconds = 42",
ks
))
.await
.unwrap();
session.batch(&batch, values).await.unwrap();

let results: Vec<(i32, i32, String)> = session
.query_unpaged(
format!("SELECT a, b, c FROM {}.t_batch WHERE a = 4", ks),
&[],
)
.await
.unwrap()
.into_rows_result()
.unwrap()
.rows::<(i32, i32, String)>()
.unwrap()
.collect::<Result<_, _>>()
.unwrap();

assert_eq!(results, vec![(4, 20, String::from("foobar"))]);
}

// This is a regression test for #1134.
#[tokio::test]
async fn test_batch_to_multiple_tables() {
setup_tracing();
let session = create_new_session_builder().build().await.unwrap();
let ks = unique_keyspace_name();

session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
session.use_keyspace(&ks, true).await.unwrap();
session
.ddl("CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))")
.await
.unwrap();
session
.ddl("CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))")
.await
.unwrap();

let prepared_statement = session
.prepare(
"
BEGIN BATCH
INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?);
INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?);
APPLY BATCH;
",
)
.await
.unwrap();

session
.execute_unpaged(&prepared_statement, (1, 2, "ala", 4, 5, "ma"))
.await
.unwrap();
}

#[tokio::test]
async fn test_batch_of_simple_statements() {
setup_tracing();
Expand Down Expand Up @@ -324,34 +466,148 @@ async fn test_batch_of_mixed_prepared_and_simple_statements() {
verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
}

// Batches containing LWT queries (IF col = som) return rows with information whether the queries were applied.
#[tokio::test]
async fn test_cas_batch() {
setup_tracing();
let test_name = String::from("test_cas_batch");
async fn test_batch_lwts() {
let test_name = String::from("test_batch_lwts");
let session = create_test_session(&test_name, false).await;

let query_str = format!(
"INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?) IF NOT EXISTS",
&test_name
);
let prepared = session.prepare(Query::new(query_str)).await.unwrap();
session
.query_unpaged(
format!("INSERT INTO {} (k0, k1, v) VALUES (?, ?, ?)", &test_name),
(&test_name, 0, 0),
)
.await
.unwrap();

let mut batch = Batch::new(BatchType::Unlogged);
let mut batch_values = Vec::with_capacity(BATCH_COUNT);
for i in 0..BATCH_COUNT as i32 {
batch.append_statement(prepared.clone());
batch_values.push((&test_name, i, i + 1));
batch.append_statement(
format!(
"UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0",
&test_name, &test_name
)
.as_str(),
);
batch.append_statement(
format!(
"INSERT INTO {} (k0, k1, v) VALUES ('{}', 123, 321)",
&test_name, &test_name
)
.as_str(),
);
batch.append_statement(
format!(
"UPDATE {} SET v = 1 WHERE k0 = '{}' AND k1 = 0 IF v = 0",
&test_name, &test_name
)
.as_str(),
);

let batch_res: QueryResult = session.batch(&batch, ((), (), ())).await.unwrap();
let batch_deserializer = batch_res.into_rows_result().unwrap();

// Scylla always returns a result set with additional 'applied' column, which indicates whether the statement was applied.
// In contrast, Cassandra returns only 'applied' column for applied transactions; and 'applied' column
// with result set for rejected ones.
// This is described in https://opensource.docs.scylladb.com/stable/features/lwt.html and
// https://opensource.docs.scylladb.com/stable/kb/lwt-differences.html).
let is_scylla: bool = batch_deserializer.column_specs().len() == 4;

if is_scylla {
test_batch_lwts_for_scylla(&session, &batch, &batch_deserializer, &test_name).await;
} else {
test_batch_lwts_for_cassandra(&session, &batch, &batch_deserializer, &test_name).await;
}
let result = session.batch(&batch, batch_values.clone()).await.unwrap();
let row: (bool, Option<String>, Option<i32>, Option<i32>) =
result.into_rows_result().unwrap().first_row().unwrap();
assert!(row.0, "First CAS batch should be applied");
}

verify_batch_insert(&session, &test_name, BATCH_COUNT).await;
async fn test_batch_lwts_for_scylla(
session: &Session,
batch: &Batch,
query_rows_result: &QueryRowsResult,
k0_value: &str,
) {
// Alias required by clippy
type IntOrNull = Option<i32>;
type StrOrNull = Option<String>;

// Returned columns are:
// [applied], k0, k1, v
let batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = query_rows_result
.rows()
.unwrap()
.collect::<Result<_, _>>()
.unwrap();

let k0_value = k0_value.to_string();
let expected_batch_res_rows = vec![
(true, Some(k0_value.clone()), Some(0), Some(0)),
(true, None, None, None),
(true, Some(k0_value.clone()), Some(0), Some(0)),
];

assert_eq!(batch_res_rows, expected_batch_res_rows);

let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap();
let prepared_batch_res: QueryResult =
session.batch(&prepared_batch, ((), (), ())).await.unwrap();

let prepared_batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = prepared_batch_res
.into_rows_result()
.unwrap()
.rows()
.unwrap()
.map(|r| r.unwrap())
.collect();

let expected_prepared_batch_res_rows = vec![
(false, Some(k0_value.clone()), Some(0), Some(1)),
(false, None, None, None),
(false, Some(k0_value.clone()), Some(0), Some(1)),
];

assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows);
}

async fn test_batch_lwts_for_cassandra(
session: &Session,
batch: &Batch,
query_rows_result: &QueryRowsResult,
k0_value: &str,
) {
// Alias required by clippy
type IntOrNull = Option<i32>;
type StrOrNull = Option<String>;

// Returned columns are:
// [applied]
let batch_res_rows: Vec<(bool,)> = query_rows_result
.rows()
.unwrap()
.map(|r| r.unwrap())
.collect();

let expected_batch_res_rows = vec![(true,)];

assert_eq!(batch_res_rows, expected_batch_res_rows);

let prepared_batch: Batch = session.prepare_batch(batch).await.unwrap();
let prepared_batch_res: QueryResult =
session.batch(&prepared_batch, ((), (), ())).await.unwrap();

// Returned columns are:
// [applied], k0, k1, v
let prepared_batch_res_rows: Vec<(bool, StrOrNull, IntOrNull, IntOrNull)> = prepared_batch_res
.into_rows_result()
.unwrap()
.rows()
.unwrap()
.map(|r| r.unwrap())
.collect();

let expected_prepared_batch_res_rows =
vec![(false, Some(k0_value.to_string()), Some(0), Some(1))];

let result2 = session.batch(&batch, batch_values).await.unwrap();
let row: (bool, Option<String>, Option<i32>, Option<i32>) =
result2.into_rows_result().unwrap().first_row().unwrap();
assert!(!row.0, "Second CAS batch should not be applied");
assert_eq!(prepared_batch_res_rows, expected_prepared_batch_res_rows);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit fc9def4

Please sign in to comment.