Skip to content

Commit

Permalink
support more type in postgres-extended (#94)
Browse files Browse the repository at this point in the history
* * support INTERVAL && TIMESTAMPTZ type

Signed-off-by: Zejiong Dong <[email protected]>

* * add test
* support array type

Signed-off-by: Zejiong Dong <[email protected]>

Signed-off-by: Zejiong Dong <[email protected]>
  • Loading branch information
ZENOTME authored Oct 25, 2022
1 parent 143e5b2 commit 5fb0751
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 72 deletions.
1 change: 1 addition & 0 deletions sqllogictest-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ humantime = "2"
itertools = "0.10"
log = "0.4"
postgres-types = { version = "0.2.3", features = ["derive", "with-chrono-0_4"] }
pg_interval = "0.4"
quick-junit = { version = "0.2" }
rand = "0.8"
rust_decimal = { version = "1.7.0", features = ["tokio-pg"] }
Expand Down
251 changes: 180 additions & 71 deletions sqllogictest-bin/src/engines/postgres_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::sync::Arc;

use anyhow::Context;
use async_trait::async_trait;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
use pg_interval::Interval;
use postgres_types::Type;
use rust_decimal::Decimal;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -73,6 +74,60 @@ macro_rules! array_process {
}
}
};
($row:ident, $output:ident, $idx:ident, $t:ty, $convert:ident) => {
let value: Option<Vec<Option<$t>>> = $row.get($idx);
match value {
Some(value) => {
write!($output, "{{").unwrap();
for (i, v) in value.iter().enumerate() {
match v {
Some(v) => {
write!($output, "{}", $convert(v)).unwrap();
}
None => {
write!($output, "NULL").unwrap();
}
}
if i < value.len() - 1 {
write!($output, ",").unwrap();
}
}
write!($output, "}}").unwrap();
}
None => {
write!($output, "NULL").unwrap();
}
}
};
($self:ident, $row:ident, $output:ident, $idx:ident, $t:ty, $ty_name:expr) => {
let value: Option<Vec<Option<$t>>> = $row.get($idx);
match value {
Some(value) => {
write!($output, "{{").unwrap();
for (i, v) in value.iter().enumerate() {
match v {
Some(v) => {
let sql = format!("select ($1::{})::varchar", stringify!($ty_name));
let tmp_rows = $self.client.query(&sql, &[&v]).await.unwrap();
let value: &str = tmp_rows.get(0).unwrap().get(0);
assert!(value.len() > 0);
write!($output, "{}", value).unwrap();
}
None => {
write!($output, "NULL").unwrap();
}
}
if i < value.len() - 1 {
write!($output, ",").unwrap();
}
}
write!($output, "}}").unwrap();
}
None => {
write!($output, "NULL").unwrap();
}
}
};
}

macro_rules! single_process {
Expand All @@ -87,6 +142,72 @@ macro_rules! single_process {
}
}
};
($row:ident, $output:ident, $idx:ident, $t:ty, $convert:ident) => {
let value: Option<$t> = $row.get($idx);
match value {
Some(value) => {
write!($output, "{}", $convert(&value)).unwrap();
}
None => {
write!($output, "NULL").unwrap();
}
}
};
($self:ident, $row:ident, $output:ident, $idx:ident, $t:ty, $ty_name:expr) => {
let value: Option<$t> = $row.get($idx);
match value {
Some(value) => {
let sql = format!("select ($1::{})::varchar", stringify!($ty_name));
let tmp_rows = $self.client.query(&sql, &[&value]).await.unwrap();
let value: &str = tmp_rows.get(0).unwrap().get(0);
assert!(value.len() > 0);
write!($output, "{}", value).unwrap();
}
None => {
write!($output, "NULL").unwrap();
}
}
};
}

fn bool_to_str(value: &bool) -> &'static str {
if *value {
"t"
} else {
"f"
}
}

fn varchar_to_str(value: &str) -> String {
if value.is_empty() {
"(empty)".to_string()
} else {
value.to_string()
}
}

fn float4_to_str(value: &f32) -> String {
if value.is_nan() {
"NaN".to_string()
} else if *value == f32::INFINITY {
"Infinity".to_string()
} else if *value == f32::NEG_INFINITY {
"-Infinity".to_string()
} else {
value.to_string()
}
}

fn float8_to_str(value: &f64) -> String {
if value.is_nan() {
"NaN".to_string()
} else if *value == f64::INFINITY {
"Infinity".to_string()
} else if *value == f64::NEG_INFINITY {
"-Infinity".to_string()
} else {
value.to_string()
}
}

#[async_trait]
Expand All @@ -113,23 +234,7 @@ impl sqllogictest::AsyncDB for PostgresExtended {
if idx != 0 {
write!(output, " ").unwrap();
}

match column.type_().clone() {
Type::VARCHAR | Type::TEXT => {
let value: Option<&str> = row.get(idx);
match value {
Some(value) => {
if value.is_empty() {
write!(output, "(empty)").unwrap();
} else {
write!(output, "{}", value).unwrap();
}
}
None => {
write!(output, "NULL").unwrap();
}
}
}
Type::INT2 => {
single_process!(row, output, idx, i16);
}
Expand All @@ -139,67 +244,21 @@ impl sqllogictest::AsyncDB for PostgresExtended {
Type::INT8 => {
single_process!(row, output, idx, i64);
}
Type::BOOL => {
let value: Option<bool> = row.get(idx);
match value {
Some(value) => {
if value {
write!(output, "t").unwrap();
} else {
write!(output, "f").unwrap();
}
}
None => {
write!(output, "NULL").unwrap();
}
}
}
Type::FLOAT4 => {
let value: Option<f32> = row.get(idx);
match value {
Some(value) => {
if value == f32::INFINITY {
write!(output, "Infinity").unwrap();
} else if value == f32::NEG_INFINITY {
write!(output, "-Infinity").unwrap();
} else {
write!(output, "{}", value).unwrap();
}
}
None => {
write!(output, "NULL").unwrap();
}
}
}
Type::FLOAT8 => {
let value: Option<f64> = row.get(idx);
match value {
Some(value) => {
if value == f64::INFINITY {
write!(output, "Infinity").unwrap();
} else if value == f64::NEG_INFINITY {
write!(output, "-Infinity").unwrap();
} else {
write!(output, "{}", value).unwrap();
}
}
None => {
write!(output, "NULL").unwrap();
}
}
}
Type::NUMERIC => {
single_process!(row, output, idx, Decimal);
}
Type::TIMESTAMP => {
single_process!(row, output, idx, NaiveDateTime);
}
Type::DATE => {
single_process!(row, output, idx, NaiveDate);
}
Type::TIME => {
single_process!(row, output, idx, NaiveTime);
}
Type::TIMESTAMP => {
single_process!(row, output, idx, NaiveDateTime);
}
Type::BOOL => {
single_process!(row, output, idx, bool, bool_to_str);
}
Type::INT2_ARRAY => {
array_process!(row, output, idx, i16);
}
Expand All @@ -209,15 +268,65 @@ impl sqllogictest::AsyncDB for PostgresExtended {
Type::INT8_ARRAY => {
array_process!(row, output, idx, i64);
}
Type::BOOL_ARRAY => {
array_process!(row, output, idx, bool, bool_to_str);
}
Type::FLOAT4_ARRAY => {
array_process!(row, output, idx, f32);
array_process!(row, output, idx, f32, float4_to_str);
}
Type::FLOAT8_ARRAY => {
array_process!(row, output, idx, f64);
array_process!(row, output, idx, f64, float8_to_str);
}
Type::NUMERIC_ARRAY => {
array_process!(row, output, idx, Decimal);
}
Type::DATE_ARRAY => {
array_process!(row, output, idx, NaiveDate);
}
Type::TIME_ARRAY => {
array_process!(row, output, idx, NaiveTime);
}
Type::TIMESTAMP_ARRAY => {
array_process!(row, output, idx, NaiveDateTime);
}
Type::VARCHAR_ARRAY | Type::TEXT_ARRAY => {
array_process!(row, output, idx, String, varchar_to_str);
}
Type::VARCHAR | Type::TEXT => {
single_process!(row, output, idx, String, varchar_to_str);
}
Type::FLOAT4 => {
single_process!(row, output, idx, f32, float4_to_str);
}
Type::FLOAT8 => {
single_process!(row, output, idx, f64, float8_to_str);
}
Type::INTERVAL => {
single_process!(self, row, output, idx, Interval, INTERVAL);
}
Type::TIMESTAMPTZ => {
single_process!(
self,
row,
output,
idx,
DateTime<chrono::Utc>,
TIMESTAMPTZ
);
}
Type::INTERVAL_ARRAY => {
array_process!(self, row, output, idx, Interval, INTERVAL);
}
Type::TIMESTAMPTZ_ARRAY => {
array_process!(
self,
row,
output,
idx,
DateTime<chrono::Utc>,
TIMESTAMPTZ
);
}
_ => {
todo!("Don't support {} type now.", column.type_().name())
}
Expand Down
Loading

0 comments on commit 5fb0751

Please sign in to comment.