Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queries)!: Query projections #5242

Merged
merged 9 commits into from
Nov 27, 2024
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async-trait = "0.1.81"
strum = { version = "0.25.0", default-features = false }
getset = "0.1.2"
hex-literal = "0.4.1"
derive-where = "1.2.7"

rand = { version = "0.8.5", default-features = false, features = ["getrandom", "alloc"] }
axum = { version = "0.7.5", default-features = false }
Expand Down
15 changes: 6 additions & 9 deletions crates/iroha/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, fmt::Debug};

use eyre::{eyre, Context, Result};
use http::StatusCode;
use iroha_data_model::query::QueryOutputBatchBoxTuple;
use iroha_torii_const::uri as torii_uri;
use parity_scale_codec::{DecodeAll, Encode};
use url::Url;
Expand All @@ -16,9 +17,8 @@ use crate::{
query::{
builder::{QueryBuilder, QueryExecutor},
parameters::ForwardCursor,
predicate::HasPredicateBox,
Query, QueryOutput, QueryOutputBatchBox, QueryRequest, QueryResponse, QueryWithParams,
SingularQuery, SingularQueryBox, SingularQueryOutputBox,
Query, QueryOutput, QueryRequest, QueryResponse, QueryWithParams, SingularQuery,
SingularQueryBox, SingularQueryOutputBox,
},
ValidationFail,
},
Expand Down Expand Up @@ -158,7 +158,7 @@ impl QueryExecutor for Client {
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBoxTuple, u64, Option<Self::Cursor>), Self::Error> {
let request_head = self.get_query_request_head();

let request = QueryRequest::Start(query);
Expand All @@ -178,7 +178,7 @@ impl QueryExecutor for Client {

fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBoxTuple, u64, Option<Self::Cursor>), Self::Error> {
let QueryCursor {
request_head,
cursor,
Expand Down Expand Up @@ -235,10 +235,7 @@ impl Client {
}

/// Build an iterable query and return a builder object
pub fn query<Q>(
&self,
query: Q,
) -> QueryBuilder<Self, Q, <<Q as Query>::Item as HasPredicateBox>::PredicateBoxType>
pub fn query<Q>(&self, query: Q) -> QueryBuilder<Self, Q, Q::Item>
where
Q: Query,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn multiple_blocks_created() -> Result<()> {
client
.query(FindAssets::new())
.filter_with(|asset| {
asset.id.account.eq(account_id) & asset.id.definition_id.eq(definition)
asset.id.account.eq(account_id) & asset.id.definition.eq(definition)
})
.execute_all()
})
Expand Down
8 changes: 7 additions & 1 deletion crates/iroha/tests/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iroha::{
client::Client,
data_model::{asset::AssetDefinition, prelude::*},
};
use iroha_data_model::query::dsl::SelectorTuple;
use iroha_test_network::*;
use nonzero_ext::nonzero;

Expand Down Expand Up @@ -60,7 +61,12 @@ fn fetch_size_should_work() -> Result<()> {
register_assets(&client)?;

let query = QueryWithParams::new(
QueryWithFilter::new(FindAssetsDefinitions::new(), CompoundPredicate::PASS).into(),
QueryWithFilter::new(
FindAssetsDefinitions::new(),
CompoundPredicate::PASS,
SelectorTuple::default(),
)
.into(),
QueryParams::new(
Pagination::new(Some(nonzero!(7_u64)), 1),
Sorting::default(),
Expand Down
11 changes: 4 additions & 7 deletions crates/iroha/tests/sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use eyre::{Result, WrapErr as _};
use iroha::{
client::QueryResult,
crypto::KeyPair,
data_model::{
account::Account, name::Name, prelude::*,
query::predicate::predicate_atoms::asset::AssetPredicateBox,
},
data_model::{account::Account, name::Name, prelude::*},
};
use iroha_test_network::*;
use iroha_test_samples::ALICE_ID;
Expand All @@ -24,7 +21,7 @@ fn correct_pagination_assets_after_creating_new_one() {
let missing_indices = vec![N_ASSETS / 2];
let pagination = Pagination::new(Some(nonzero!(N_ASSETS as u64 / 3)), N_ASSETS as u64 / 3);
let xor_filter =
AssetPredicateBox::build(|asset| asset.id.definition_id.name.starts_with("xor"));
CompoundPredicate::<Asset>::build(|asset| asset.id.definition.name.starts_with("xor"));

let sort_by_metadata_key = "sort".parse::<Name>().expect("Valid");
let sorting = Sorting::by_metadata_key(sort_by_metadata_key.clone());
Expand Down Expand Up @@ -201,7 +198,7 @@ fn correct_sorting_of_entities() {
let res = test_client
.query(FindAccounts::new())
.with_sorting(Sorting::by_metadata_key(sort_by_metadata_key.clone()))
.filter_with(|account| account.id.domain_id.eq(domain_id))
.filter_with(|account| account.id.domain.eq(domain_id))
.execute_all()
.expect("Valid");

Expand Down Expand Up @@ -339,7 +336,7 @@ fn sort_only_elements_which_have_sorting_key() -> Result<()> {
let res = test_client
.query(FindAccounts::new())
.with_sorting(Sorting::by_metadata_key(sort_by_metadata_key))
.filter_with(|account| account.id.domain_id.eq(domain_id))
.filter_with(|account| account.id.domain.eq(domain_id))
.execute_all()
.wrap_err("Failed to submit request")?;

Expand Down
51 changes: 34 additions & 17 deletions crates/iroha_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,7 @@ fn submit(
}

mod filter {
use iroha::data_model::query::predicate::{
predicate_atoms::{
account::AccountPredicateBox, asset::AssetPredicateBox, domain::DomainPredicateBox,
},
CompoundPredicate,
};
use iroha::data_model::query::dsl::CompoundPredicate;
use serde::Deserialize;

use super::*;
Expand All @@ -265,32 +260,32 @@ mod filter {
#[derive(Clone, Debug, clap::Parser)]
pub struct DomainFilter {
/// Predicate for filtering given as JSON5 string
#[clap(value_parser = parse_json5::<CompoundPredicate<DomainPredicateBox>>)]
pub predicate: CompoundPredicate<DomainPredicateBox>,
#[clap(value_parser = parse_json5::<CompoundPredicate<Domain>>)]
pub predicate: CompoundPredicate<Domain>,
}

/// Filter for account queries
#[derive(Clone, Debug, clap::Parser)]
pub struct AccountFilter {
/// Predicate for filtering given as JSON5 string
#[clap(value_parser = parse_json5::<CompoundPredicate<AccountPredicateBox>>)]
pub predicate: CompoundPredicate<AccountPredicateBox>,
#[clap(value_parser = parse_json5::<CompoundPredicate<Account>>)]
pub predicate: CompoundPredicate<Account>,
}

/// Filter for asset queries
#[derive(Clone, Debug, clap::Parser)]
pub struct AssetFilter {
/// Predicate for filtering given as JSON5 string
#[clap(value_parser = parse_json5::<CompoundPredicate<AssetPredicateBox>>)]
pub predicate: CompoundPredicate<AssetPredicateBox>,
#[clap(value_parser = parse_json5::<CompoundPredicate<Asset>>)]
pub predicate: CompoundPredicate<Asset>,
}

/// Filter for asset definition queries
#[derive(Clone, Debug, clap::Parser)]
pub struct AssetDefinitionFilter {
/// Predicate for filtering given as JSON5 string
#[clap(value_parser = parse_json5::<CompoundPredicate<AssetDefinitionPredicateBox>>)]
pub predicate: CompoundPredicate<AssetDefinitionPredicateBox>,
#[clap(value_parser = parse_json5::<CompoundPredicate<AssetDefinition>>)]
pub predicate: CompoundPredicate<AssetDefinition>,
}

fn parse_json5<T>(s: &str) -> Result<T, String>
Expand Down Expand Up @@ -1219,18 +1214,40 @@ mod json {
// we can't really do type-erased iterable queries in a nice way right now...
use iroha::data_model::query::builder::QueryExecutor;

let (mut first_batch, _remaining_items, mut continue_cursor) =
let (mut accumulated_batch, _remaining_items, mut continue_cursor) =
client.start_query(query)?;

while let Some(cursor) = continue_cursor {
let (next_batch, _remaining_items, next_continue_cursor) =
<Client as QueryExecutor>::continue_query(cursor)?;

first_batch.extend(next_batch);
accumulated_batch.extend(next_batch);
continue_cursor = next_continue_cursor;
}

context.print_data(&first_batch)?;
// for efficiency reasons iroha encodes query results in a columnar format,
// so we need to transpose the batch to get the format that is more natural for humans
let mut batches = vec![Vec::new(); accumulated_batch.len()];
for batch in accumulated_batch.into_iter() {
// downcast to json and extract the actual array
// dynamic typing is just easier to use here than introducing a bunch of new types only for iroha_cli
let batch = serde_json::to_value(batch)?;
let serde_json::Value::Object(batch) = batch else {
panic!("Expected the batch serialization to be a JSON object");
};
let (_ty, batch) = batch
.into_iter()
.next()
.expect("Expected the batch to have exactly one key");
let serde_json::Value::Array(batch_vec) = batch else {
panic!("Expected the batch payload to be a JSON array");
};
for (target, value) in batches.iter_mut().zip(batch_vec) {
target.push(value);
}
}

context.print_data(&batches)?;
}
}

Expand Down
69 changes: 57 additions & 12 deletions crates/iroha_core/src/query/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

use std::{fmt::Debug, num::NonZeroU64};

use iroha_data_model::query::QueryOutputBatchBox;
use iroha_data_model::{
prelude::SelectorTuple,
query::{
dsl::{EvaluateSelector, HasProjection, SelectorMarker},
QueryOutputBatchBox, QueryOutputBatchBoxTuple,
},
};
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};

Expand All @@ -25,29 +31,62 @@ pub enum Error {
Done,
}

fn evaluate_selector_tuple<T>(
batch: Vec<T>,
selector: &SelectorTuple<T>,
) -> QueryOutputBatchBoxTuple
where
T: HasProjection<SelectorMarker, AtomType = ()> + 'static,
T::Projection: EvaluateSelector<T>,
{
let mut batch_tuple = Vec::new();

let mut iter = selector.iter().peekable();

while let Some(item) = iter.next() {
if iter.peek().is_none() {
// do not clone the last item
batch_tuple.push(item.project(batch.into_iter()));
return QueryOutputBatchBoxTuple { tuple: batch_tuple };
}

batch_tuple.push(item.project_clone(batch.iter()));
}

// this should only happen for empty selectors
QueryOutputBatchBoxTuple { tuple: batch_tuple }
}

trait BatchedTrait {
fn next_batch(
&mut self,
cursor: u64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), Error>;
) -> Result<(QueryOutputBatchBoxTuple, Option<NonZeroU64>), Error>;
fn remaining(&self) -> u64;
}

struct BatchedInner<I> {
struct BatchedInner<I>
where
I: ExactSizeIterator,
I::Item: HasProjection<SelectorMarker, AtomType = ()>,
{
iter: I,
selector: SelectorTuple<I::Item>,
batch_size: NonZeroU64,
cursor: Option<u64>,
}

impl<I> BatchedTrait for BatchedInner<I>
where
I: ExactSizeIterator,
I::Item: HasProjection<SelectorMarker, AtomType = ()> + 'static,
<I::Item as HasProjection<SelectorMarker>>::Projection: EvaluateSelector<I::Item>,
QueryOutputBatchBox: From<Vec<I::Item>>,
{
fn next_batch(
&mut self,
cursor: u64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), Error> {
) -> Result<(QueryOutputBatchBoxTuple, Option<NonZeroU64>), Error> {
let Some(server_cursor) = self.cursor else {
// the server is done with the iterator
return Err(Error::Done);
Expand Down Expand Up @@ -76,7 +115,9 @@ where
.expect("`u32` should always fit into `usize`"),
)
.collect();
let batch = batch.into();

// evaluate the requested projections
let batch = evaluate_selector_tuple(batch, &self.selector);

// did we get enough elements to continue?
if current_batch_size >= expected_batch_size {
Expand All @@ -101,27 +142,31 @@ where
}
}

/// A query output iterator that combines batching and type erasure.
pub struct QueryBatchedErasedIterator {
/// A query output iterator that combines evaluating selectors, batching and type erasure.
pub struct ErasedQueryIterator {
inner: Box<dyn BatchedTrait + Send + Sync>,
}

impl Debug for QueryBatchedErasedIterator {
impl Debug for ErasedQueryIterator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryBatchedErasedIterator").finish()
}
}

impl QueryBatchedErasedIterator {
/// Creates a new batched iterator. Boxes the inner iterator to erase its type.
pub fn new<I>(iter: I, batch_size: NonZeroU64) -> Self
impl ErasedQueryIterator {
/// Creates a new erased query iterator. Boxes the inner iterator to erase its type.
pub fn new<I>(iter: I, selector: SelectorTuple<I::Item>, batch_size: NonZeroU64) -> Self
where
I: ExactSizeIterator + Send + Sync + 'static,
I::Item: HasProjection<SelectorMarker, AtomType = ()> + 'static,
<I::Item as HasProjection<SelectorMarker>>::Projection:
EvaluateSelector<I::Item> + Send + Sync,
QueryOutputBatchBox: From<Vec<I::Item>>,
{
Self {
inner: Box::new(BatchedInner {
iter,
selector,
batch_size,
cursor: Some(0),
}),
Expand All @@ -141,7 +186,7 @@ impl QueryBatchedErasedIterator {
pub fn next_batch(
&mut self,
cursor: u64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), Error> {
) -> Result<(QueryOutputBatchBoxTuple, Option<NonZeroU64>), Error> {
self.inner.next_batch(cursor)
}

Expand Down
Loading
Loading