Skip to content

Commit c8b469d

Browse files
authored
Schema mapping support (apache#18)
1 parent 49b806a commit c8b469d

File tree

3 files changed

+74
-27
lines changed

3 files changed

+74
-27
lines changed

src/ast/mod.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,9 +1134,22 @@ pub enum Password {
11341134
NullPassword,
11351135
}
11361136

1137+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Copy)]
1138+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1139+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
1140+
pub enum MappingType {
1141+
Table,
1142+
Schema,
1143+
}
1144+
1145+
/// CREATE MIRROR mirror_name FROM
1146+
/// peer_1 TO peer_2
1147+
/// WITH TABLE MAPPING (sch1.tbl1:sch2.tbl2, sch1.tbl3:sch2.tbl3)
1148+
/// WITH OPTIONS (option1 = value1, option2 = value2, ...)
1149+
/// as well as
11371150
/// CREATE MIRROR mirror_name FROM
11381151
/// peer_1 TO peer_2
1139-
/// WITH TABLE MAPPING sch1.tbl1:sch2.tbl2, sch1.tbl3:sch2.tbl3
1152+
/// WITH SCHEMA MAPPING (sch1:sch2)
11401153
/// WITH OPTIONS (option1 = value1, option2 = value2, ...)
11411154
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
11421155
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -1149,9 +1162,11 @@ pub struct CreateMirrorForCDC {
11491162
// name of the target peer
11501163
pub target_peer: ObjectName,
11511164
// list of mappings from source to target tables.
1152-
pub table_mappings: Vec<TableMapping>,
1165+
pub mappings: Vec<Mapping>,
11531166
// Options for the mirror job.
11541167
pub with_options: Vec<SqlOption>,
1168+
// mapping type: currently either SCHEMA or TABLE
1169+
pub mapping_type: MappingType,
11551170
}
11561171

11571172
/// CREATE MIRROR mirror_name
@@ -3081,12 +3096,13 @@ impl fmt::Display for Statement {
30813096
CreateMirror::CDC(cdc) => {
30823097
write!(
30833098
f,
3084-
"CREATE MIRROR {not_exists_clause}{mirror_name} FROM {source} TO {target} WITH TABLE MAPPING ({formatted_table_mappings})",
3099+
"CREATE MIRROR {not_exists_clause}{mirror_name} FROM {source} TO {target} WITH {mapping_type} MAPPING ({formatted_table_mappings})",
30853100
not_exists_clause = if *if_not_exists { "IF NOT EXISTS " } else { "" },
30863101
mirror_name = cdc.mirror_name,
30873102
source = cdc.source_peer,
30883103
target = cdc.target_peer,
3089-
formatted_table_mappings = display_comma_separated(&cdc.table_mappings)
3104+
formatted_table_mappings = display_comma_separated(&cdc.mappings),
3105+
mapping_type = if cdc.mapping_type == MappingType::Table { "TABLE" } else { "SCHEMA" }
30903106
)?;
30913107
if !cdc.with_options.is_empty() {
30923108
write!(f, " WITH ({})", display_comma_separated(&cdc.with_options))?;
@@ -4596,19 +4612,19 @@ impl fmt::Display for SearchModifier {
45964612
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
45974613
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
45984614
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
4599-
pub struct TableMapping {
4615+
pub struct Mapping {
46004616
/// The value of the source table identifier without quotes.
46014617
pub source_table_identifier: ObjectName,
46024618
/// The value of the destination table identifier without quotes.
4603-
pub target_table_identifier: ObjectName,
4619+
pub target_identifier: ObjectName,
46044620
}
46054621

4606-
impl fmt::Display for TableMapping {
4622+
impl fmt::Display for Mapping {
46074623
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
46084624
write!(
46094625
f,
46104626
"{}:{}",
4611-
self.source_table_identifier, self.target_table_identifier
4627+
self.source_table_identifier, self.target_identifier
46124628
)
46134629
}
46144630
}

src/parser.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3814,9 +3814,9 @@ impl<'a> Parser<'a> {
38143814
}
38153815
}
38163816

3817-
fn parse_table_mappings(&mut self) -> Result<Vec<TableMapping>, ParserError> {
3817+
fn parse_mappings(&mut self) -> Result<Vec<Mapping>, ParserError> {
38183818
self.expect_token(&Token::LParen)?;
3819-
let table_mappings = self.parse_comma_separated(Parser::parse_table_mapping)?;
3819+
let table_mappings = self.parse_comma_separated(Parser::parse_mapping)?;
38203820
self.expect_token(&Token::RParen)?;
38213821
Ok(table_mappings)
38223822
}
@@ -7145,9 +7145,16 @@ impl<'a> Parser<'a> {
71457145
}
71467146
} else {
71477147
// mirror CDC
7148-
self.expect_keywords(&[Keyword::WITH, Keyword::TABLE, Keyword::MAPPING])?;
7148+
self.expect_keyword(Keyword::WITH)?;
7149+
let mapping_type =
7150+
match self.expect_one_of_keywords(&[Keyword::TABLE, Keyword::SCHEMA])? {
7151+
Keyword::TABLE => MappingType::Table,
7152+
Keyword::SCHEMA => MappingType::Schema,
7153+
_ => unreachable!(),
7154+
};
7155+
self.expect_keyword(Keyword::MAPPING)?;
71497156

7150-
let table_mappings = self.parse_table_mappings()?;
7157+
let mappings = self.parse_mappings()?;
71517158

71527159
let with_options = self.parse_options(Keyword::WITH)?;
71537160

@@ -7157,8 +7164,9 @@ impl<'a> Parser<'a> {
71577164
mirror_name,
71587165
source_peer,
71597166
target_peer,
7160-
table_mappings,
7167+
mappings,
71617168
with_options,
7169+
mapping_type,
71627170
}),
71637171
})
71647172
}
@@ -7242,13 +7250,13 @@ impl<'a> Parser<'a> {
72427250
})
72437251
}
72447252

7245-
fn parse_table_mapping(&mut self) -> Result<TableMapping, ParserError> {
7253+
fn parse_mapping(&mut self) -> Result<Mapping, ParserError> {
72467254
let source_table_identifier = self.parse_object_name()?;
72477255
self.expect_token(&Token::Colon)?;
72487256
let destination_table_identifier = self.parse_object_name()?;
7249-
Ok(TableMapping {
7257+
Ok(Mapping {
72507258
source_table_identifier,
7251-
target_table_identifier: destination_table_identifier,
7259+
target_identifier: destination_table_identifier,
72527260
})
72537261
}
72547262
}

tests/sqlparser_postgres.rs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2972,13 +2972,13 @@ fn parse_create_single_mirror_no_options() {
29722972
assert_eq!(cdc.mirror_name, ObjectName(vec![Ident::new("test_mirror")]));
29732973
assert_eq!(cdc.source_peer, ObjectName(vec![Ident::new("p1")]));
29742974
assert_eq!(cdc.target_peer, ObjectName(vec![Ident::new("p2")]));
2975-
assert_eq!(cdc.table_mappings.len(), 1);
2975+
assert_eq!(cdc.mappings.len(), 1);
29762976
assert_eq!(
2977-
cdc.table_mappings[0].source_table_identifier,
2977+
cdc.mappings[0].source_table_identifier,
29782978
ObjectName(vec![Ident::new("s1"), Ident::new("t1")])
29792979
);
29802980
assert_eq!(
2981-
cdc.table_mappings[0].target_table_identifier,
2981+
cdc.mappings[0].target_identifier,
29822982
ObjectName(vec![Ident::new("s2"), Ident::new("t2")])
29832983
);
29842984
assert_eq!(cdc.with_options.len(), 0);
@@ -3019,9 +3019,9 @@ fn parse_create_single_mirror() {
30193019
assert_eq!(cdc.mirror_name, ObjectName(vec![Ident::new("test_mirror")]));
30203020
assert_eq!(cdc.source_peer, ObjectName(vec![Ident::new("p1")]));
30213021
assert_eq!(cdc.target_peer, ObjectName(vec![Ident::new("p2")]));
3022-
assert_eq!(cdc.table_mappings.len(), 1);
3023-
assert_eq!(cdc.table_mappings[0].source_table_identifier, ObjectName(vec![Ident::new("s1"), Ident::new("t1")]));
3024-
assert_eq!(cdc.table_mappings[0].target_table_identifier, ObjectName(vec![Ident::new("s2"), Ident::new("t2")]));
3022+
assert_eq!(cdc.mappings.len(), 1);
3023+
assert_eq!(cdc.mappings[0].source_table_identifier, ObjectName(vec![Ident::new("s1"), Ident::new("t1")]));
3024+
assert_eq!(cdc.mappings[0].target_identifier, ObjectName(vec![Ident::new("s2"), Ident::new("t2")]));
30253025
assert_eq!(cdc.with_options.len(), 1);
30263026
assert_eq!(cdc.with_options[0].name, Ident::new("key1"));
30273027
assert_eq!(cdc.with_options[0].value, Value::SingleQuotedString("value1".into()));
@@ -3038,16 +3038,39 @@ fn parse_create_multi_mirror() {
30383038
assert_eq!(cdc.mirror_name, ObjectName(vec![Ident::new("test_mirror")]));
30393039
assert_eq!(cdc.source_peer, ObjectName(vec![Ident::new("p1")]));
30403040
assert_eq!(cdc.target_peer, ObjectName(vec![Ident::new("p2")]));
3041-
assert_eq!(cdc.table_mappings.len(), 2);
3042-
assert_eq!(cdc.table_mappings[0].source_table_identifier, ObjectName(vec![Ident::new("s1"), Ident::new("t1")]));
3043-
assert_eq!(cdc.table_mappings[0].target_table_identifier, ObjectName(vec![Ident::new("s2"), Ident::new("t2")]));
3044-
assert_eq!(cdc.table_mappings[1].source_table_identifier, ObjectName(vec![Ident::new("s1"), Ident::new("t3")]));
3045-
assert_eq!(cdc.table_mappings[1].target_table_identifier, ObjectName(vec![Ident::new("s2"), Ident::new("t4")]));
3041+
assert_eq!(cdc.mappings.len(), 2);
3042+
assert_eq!(cdc.mappings[0].source_table_identifier, ObjectName(vec![Ident::new("s1"), Ident::new("t1")]));
3043+
assert_eq!(cdc.mappings[0].target_identifier, ObjectName(vec![Ident::new("s2"), Ident::new("t2")]));
3044+
assert_eq!(cdc.mappings[1].source_table_identifier, ObjectName(vec![Ident::new("s1"), Ident::new("t3")]));
3045+
assert_eq!(cdc.mappings[1].target_identifier, ObjectName(vec![Ident::new("s2"), Ident::new("t4")]));
30463046
assert_eq!(cdc.with_options.len(), 2);
30473047
assert_eq!(cdc.with_options[0].name, Ident::new("key1"));
30483048
assert_eq!(cdc.with_options[0].value, Value::SingleQuotedString("value1".into()));
30493049
assert_eq!(cdc.with_options[1].name, Ident::new("key2"));
30503050
assert_eq!(cdc.with_options[1].value, Value::SingleQuotedString("value2".into()));
3051+
assert_eq!(cdc.mapping_type, MappingType::Table);
3052+
},
3053+
_ => unreachable!(),
3054+
}
3055+
}
3056+
3057+
#[test]
3058+
fn parse_create_mirror_with_schema() {
3059+
match pg().verified_stmt("CREATE MIRROR test_mirror FROM p1 TO p2 WITH SCHEMA MAPPING (s1:s2) WITH (key1 = 'value1', key2 = 'value2')") {
3060+
Statement::CreateMirror { if_not_exists,create_mirror: CDC(cdc) } => {
3061+
assert!(!if_not_exists);
3062+
assert_eq!(cdc.mirror_name, ObjectName(vec![Ident::new("test_mirror")]));
3063+
assert_eq!(cdc.source_peer, ObjectName(vec![Ident::new("p1")]));
3064+
assert_eq!(cdc.target_peer, ObjectName(vec![Ident::new("p2")]));
3065+
assert_eq!(cdc.mappings.len(), 1);
3066+
assert_eq!(cdc.mappings[0].source_table_identifier, ObjectName(vec![Ident::new("s1")]));
3067+
assert_eq!(cdc.mappings[0].target_identifier, ObjectName(vec![Ident::new("s2")]));
3068+
assert_eq!(cdc.with_options.len(), 2);
3069+
assert_eq!(cdc.with_options[0].name, Ident::new("key1"));
3070+
assert_eq!(cdc.with_options[0].value, Value::SingleQuotedString("value1".into()));
3071+
assert_eq!(cdc.with_options[1].name, Ident::new("key2"));
3072+
assert_eq!(cdc.with_options[1].value, Value::SingleQuotedString("value2".into()));
3073+
assert_eq!(cdc.mapping_type, MappingType::Schema);
30513074
},
30523075
_ => unreachable!(),
30533076
}

0 commit comments

Comments
 (0)