Skip to content

Commit fcacb68

Browse files
MazterQyouwaralexrom
authored andcommitted
feat: Support EXISTS subquery
1 parent 5dfb7a8 commit fcacb68

File tree

4 files changed

+73
-10
lines changed

4 files changed

+73
-10
lines changed

datafusion/core/src/logical_plan/plan.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::error::DataFusionError;
2626
use crate::logical_plan::dfschema::DFSchemaRef;
2727
use crate::sql::parser::FileType;
2828
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
29-
use datafusion_common::DFSchema;
29+
use datafusion_common::{DFField, DFSchema};
3030
use std::fmt::Formatter;
3131
use std::{
3232
collections::HashSet,
@@ -282,13 +282,16 @@ pub struct Subquery {
282282
pub enum SubqueryType {
283283
/// Scalar (SELECT, WHERE) evaluating to one value
284284
Scalar,
285-
// This will be extended with `Exists` and `AnyAll` types.
285+
/// EXISTS(...) evaluating to true if at least one row was produced
286+
Exists,
287+
// This will be extended with `AnyAll` type.
286288
}
287289

288290
impl Display for SubqueryType {
289291
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
290292
let subquery_type = match self {
291293
SubqueryType::Scalar => "Scalar",
294+
SubqueryType::Exists => "Exists",
292295
};
293296
write!(f, "{}", subquery_type)
294297
}
@@ -315,15 +318,31 @@ impl Subquery {
315318
pub fn transform_dfschema(schema: &DFSchema, typ: SubqueryType) -> DFSchema {
316319
match typ {
317320
SubqueryType::Scalar => schema.clone(),
318-
// Schema will be transformed for `Exists` and `AnyAll`
321+
SubqueryType::Exists => {
322+
let new_fields = schema
323+
.fields()
324+
.iter()
325+
.map(|field| {
326+
let new_field = Subquery::transform_field(field.field(), typ);
327+
if let Some(qualifier) = field.qualifier() {
328+
DFField::from_qualified(qualifier, new_field)
329+
} else {
330+
DFField::from(new_field)
331+
}
332+
})
333+
.collect();
334+
DFSchema::new_with_metadata(new_fields, schema.metadata().clone())
335+
.unwrap()
336+
} // Schema will be transformed for `AnyAll` as well
319337
}
320338
}
321339

322340
/// Transform Arrow field according to subquery type
323341
pub fn transform_field(field: &Field, typ: SubqueryType) -> Field {
324342
match typ {
325343
SubqueryType::Scalar => field.clone(),
326-
// Field will be transformed for `Exists` and `AnyAll`
344+
SubqueryType::Exists => Field::new(field.name(), DataType::Boolean, false),
345+
// Field will be transformed for `AnyAll` as well
327346
}
328347
}
329348
}

datafusion/core/src/physical_plan/subquery.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::task::{Context, Poll};
3030
use crate::error::{DataFusionError, Result};
3131
use crate::logical_plan::{Subquery, SubqueryType};
3232
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
33-
use arrow::array::new_null_array;
33+
use arrow::array::{new_null_array, BooleanArray};
3434
use arrow::datatypes::{Schema, SchemaRef};
3535
use arrow::error::{ArrowError, Result as ArrowResult};
3636
use arrow::record_batch::RecordBatch;
@@ -215,12 +215,23 @@ impl ExecutionPlan for SubqueryExec {
215215
.to_string(),
216216
)),
217217
},
218+
SubqueryType::Exists => match subquery_batch
219+
.column(0)
220+
.len()
221+
{
222+
0 => subquery_arrays[subquery_i]
223+
.push(Arc::new(BooleanArray::from(vec![false]))),
224+
_ => subquery_arrays[subquery_i]
225+
.push(Arc::new(BooleanArray::from(vec![true]))),
226+
},
218227
};
219228
} else {
220229
match subquery_type {
221230
SubqueryType::Scalar => {
222231
subquery_arrays[subquery_i].push(null_array())
223232
}
233+
SubqueryType::Exists => subquery_arrays[subquery_i]
234+
.push(Arc::new(BooleanArray::from(vec![false]))),
224235
};
225236
}
226237
}

datafusion/core/src/sql/planner.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2300,11 +2300,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
23002300
})
23012301
}
23022302

2303-
// FIXME: Exists is unsupported but all the queries we need return false
2304-
SQLExpr::Exists(_) => {
2305-
warn!("EXISTS(...) is not supported yet. Replacing with scalar `false` value.");
2306-
Ok(Expr::Literal(ScalarValue::Boolean(Some(false))))
2307-
}
2303+
SQLExpr::Exists(q) => self.subquery_to_plan(q, SubqueryType::Exists, schema),
23082304

23092305
// FIXME: ArraySubquery is unsupported but all the queries we need return empty array
23102306
SQLExpr::ArraySubquery(_) => {
@@ -5074,6 +5070,22 @@ mod tests {
50745070
quick_test(sql, expected);
50755071
}
50765072

5073+
#[test]
5074+
fn subquery_exists() {
5075+
let sql = "select person.id, exists(select person.id) from person where exists(select 1 where false)";
5076+
let expected = "Projection: #person.id, #__subquery-1.person.id\
5077+
\n Subquery: types=[Exists]\
5078+
\n Filter: #__subquery-0.Int64(1)\
5079+
\n Subquery: types=[Exists]\
5080+
\n TableScan: person projection=None\
5081+
\n Projection: Int64(1), alias=__subquery-0\
5082+
\n Filter: Boolean(false)\
5083+
\n EmptyRelation\
5084+
\n Projection: ^#person.id, alias=__subquery-1\
5085+
\n EmptyRelation";
5086+
quick_test(sql, expected);
5087+
}
5088+
50775089
#[test]
50785090
fn join_on_disjunction_condition() {
50795091
let sql = "SELECT id, order_id \

datafusion/core/tests/sql/subquery.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,27 @@ async fn subquery_select_and_where_with_from() -> Result<()> {
148148
Ok(())
149149
}
150150

151+
#[tokio::test]
152+
async fn subquery_exists() -> Result<()> {
153+
let ctx = SessionContext::new();
154+
register_aggregate_simple_csv(&ctx).await?;
155+
156+
let sql = "SELECT DISTINCT c1 FROM aggregate_simple o WHERE EXISTS(SELECT 1 FROM aggregate_simple p WHERE o.c1 * 2 = p.c1) ORDER BY c1";
157+
let actual = execute_to_batches(&ctx, sql).await;
158+
159+
let expected = vec![
160+
"+---------+",
161+
"| c1 |",
162+
"+---------+",
163+
"| 0.00001 |",
164+
"| 0.00002 |",
165+
"+---------+",
166+
];
167+
assert_batches_eq!(expected, &actual);
168+
169+
Ok(())
170+
}
171+
151172
#[tokio::test]
152173
async fn subquery_projection_pushdown() -> Result<()> {
153174
let ctx = SessionContext::new();

0 commit comments

Comments
 (0)