Skip to content

Commit

Permalink
Add tests for query API for errors/no results
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Feb 10, 2025
1 parent 60a8aca commit 0dfb164
Showing 1 changed file with 113 additions and 35 deletions.
148 changes: 113 additions & 35 deletions scylla/tests/ccm_integration/query_result.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use scylla::{batch::{Batch, BatchType}, client::session::Session, query::Query};
use scylla_cql::frame::request::query::{PagingState, PagingStateResponse};
use futures::TryStreamExt;
use scylla::errors::PagerExecutionError;
use scylla::{
batch::{Batch, BatchType},
client::session::Session,
query::Query,
};
use scylla_cql::frame::request::query::{PagingState, PagingStateResponse};

use crate::common::utils::{create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL};
use crate::common::utils::{
create_new_session_builder, setup_tracing, unique_keyspace_name, PerformDDL,
};

const PAGE_SIZE: i32 = 100;
const ROWS_PER_PARTITION: i32 = 1000;
Expand Down Expand Up @@ -38,7 +45,11 @@ async fn initialize_cluster() -> (Session, String) {
batch_part1.append_statement(prepared.clone());
batch_values1.push((PARTITION_KEY1, i, i));
batch_part2.append_statement(prepared.clone());
batch_values2.push((PARTITION_KEY2, i + ROWS_PER_PARTITION, i + ROWS_PER_PARTITION));
batch_values2.push((
PARTITION_KEY2,
i + ROWS_PER_PARTITION,
i + ROWS_PER_PARTITION,
));
}

session.batch(&batch_part1, &batch_values1).await.unwrap();
Expand Down Expand Up @@ -69,10 +80,10 @@ async fn query_should_only_iterate_over_rows_in_current_page() {

assert_eq!(page_results.len(), PAGE_SIZE as usize);
match paging_state_response {
PagingStateResponse::HasMorePages { state: _ } => {},
PagingStateResponse::HasMorePages { state: _ } => {}
PagingStateResponse::NoMorePages => {
panic!("Expected more pages");
},
}
}
}

Expand All @@ -83,13 +94,8 @@ async fn query_should_iterate_over_all_pages_asynchronously_single_partition() {
let mut query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));
query.set_page_size(PAGE_SIZE);

let query_result = session
.query_iter(query, (PARTITION_KEY1,))
.await
.unwrap();
let mut iter = query_result
.rows_stream::<(String, i32, i32)>()
.unwrap();
let query_result = session.query_iter(query, (PARTITION_KEY1,)).await.unwrap();
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();

let mut i = 0;

Expand All @@ -109,13 +115,8 @@ async fn query_should_iterate_over_all_pages_asynchronously_cross_partition() {
let mut query = Query::new(format!("SELECT * FROM {}.t", ks));
query.set_page_size(PAGE_SIZE);

let query_result = session
.query_iter(query, ())
.await
.unwrap();
let mut iter = query_result
.rows_stream::<(String, i32, i32)>()
.unwrap();
let query_result = session.query_iter(query, ()).await.unwrap();
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();

let mut i = 0;
while let Ok(Some((a, b, c))) = iter.try_next().await {
Expand All @@ -131,11 +132,54 @@ async fn query_should_iterate_over_all_pages_asynchronously_cross_partition() {
assert_eq!(i, 2 * ROWS_PER_PARTITION);
}

#[tokio::test]
async fn query_iter_no_results() {
let (session, ks) = initialize_cluster().await;

let query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));

let query_result = session.query_iter(query, ("part3",)).await.unwrap();
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();

assert_eq!(iter.try_next().await.unwrap(), None);
}

#[tokio::test]
async fn query_iter_prepare_error() {
let (session, ks) = initialize_cluster().await;

let query = Query::new(format!("SELECT * FROM {}.test where k0 = ?", ks));

let query_result = session.query_iter(query, (PARTITION_KEY1,)).await;

assert!(matches!(
query_result,
Err(PagerExecutionError::PrepareError(_))
));
}

#[tokio::test]
async fn query_iter_serialization_error() {
let (session, ks) = initialize_cluster().await;

let query = Query::new(format!("SELECT * FROM {}.t where k0 = ?", ks));

let query_result = session.query_iter(query, (1,)).await;

assert!(matches!(
query_result,
Err(PagerExecutionError::SerializationError(_))
));
}

#[tokio::test]
async fn execute_should_only_iterate_over_rows_in_current_page() {
let (session, ks) = initialize_cluster().await;

let mut prepared_query = session.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks)).await.unwrap();
let mut prepared_query = session
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
.await
.unwrap();
prepared_query.set_page_size(PAGE_SIZE);

let paging_state = PagingState::start();
Expand All @@ -153,27 +197,28 @@ async fn execute_should_only_iterate_over_rows_in_current_page() {

assert_eq!(page_results.len(), PAGE_SIZE as usize);
match paging_state_response {
PagingStateResponse::HasMorePages { state: _ } => {},
PagingStateResponse::HasMorePages { state: _ } => {}
PagingStateResponse::NoMorePages => {
panic!("Expected more pages");
},
}
}
}

#[tokio::test]
async fn execute_should_iterate_over_all_pages_asynchronously_single_partition() {
let (session, ks) = initialize_cluster().await;

let mut prepared_query = session.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks)).await.unwrap();
let mut prepared_query = session
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
.await
.unwrap();
prepared_query.set_page_size(PAGE_SIZE);

let query_result = session
.execute_iter(prepared_query, (PARTITION_KEY1,))
.await
.unwrap();
let mut iter = query_result
.rows_stream::<(String, i32, i32)>()
.unwrap();
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();

let mut i = 0;

Expand All @@ -190,16 +235,14 @@ async fn execute_should_iterate_over_all_pages_asynchronously_single_partition()
async fn execute_should_iterate_over_all_pages_asynchronously_cross_partition() {
let (session, ks) = initialize_cluster().await;

let mut prepared_query = session.prepare(format!("SELECT * FROM {}.t", ks)).await.unwrap();
prepared_query.set_page_size(PAGE_SIZE);

let query_result = session
.execute_iter(prepared_query, ())
let mut prepared_query = session
.prepare(format!("SELECT * FROM {}.t", ks))
.await
.unwrap();
let mut iter = query_result
.rows_stream::<(String, i32, i32)>()
.unwrap();
prepared_query.set_page_size(PAGE_SIZE);

let query_result = session.execute_iter(prepared_query, ()).await.unwrap();
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();

let mut i = 0;
while let Ok(Some((a, b, c))) = iter.try_next().await {
Expand All @@ -214,3 +257,38 @@ async fn execute_should_iterate_over_all_pages_asynchronously_cross_partition()
}
assert_eq!(i, 2 * ROWS_PER_PARTITION);
}

#[tokio::test]
async fn execute_iter_no_results() {
let (session, ks) = initialize_cluster().await;

let prepared_query = session
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
.await
.unwrap();

let query_result = session
.execute_iter(prepared_query, ("part3",))
.await
.unwrap();
let mut iter = query_result.rows_stream::<(String, i32, i32)>().unwrap();

assert_eq!(iter.try_next().await.unwrap(), None);
}

#[tokio::test]
async fn execute_iter_serialization_error() {
let (session, ks) = initialize_cluster().await;

let prepared_query = session
.prepare(format!("SELECT * FROM {}.t where k0 = ?", ks))
.await
.unwrap();

let query_result = session.execute_iter(prepared_query, (1,)).await;

assert!(matches!(
query_result,
Err(PagerExecutionError::SerializationError(_))
));
}

0 comments on commit 0dfb164

Please sign in to comment.