Skip to content

Commit

Permalink
BigQuery update sql has default where clause and select sql has defau…
Browse files Browse the repository at this point in the history
…lt from clause (#3418)
  • Loading branch information
agarwali authored Feb 26, 2025
1 parent cc14f79 commit 7c8763c
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,18 @@ public VisitorResult visit(PhysicalPlanNode prev, Update current, VisitorContext
}
else
{
if (current.whereCondition().isPresent())
{
logicalPlanNodeList.add(current.whereCondition().get());
}
visitWhereCondition(current, logicalPlanNodeList);
logicalPlanNodeList.addAll(current.keyValuePairs());
}

return new VisitorResult(updateStatement, logicalPlanNodeList);
}

public void visitWhereCondition(Update current, List<LogicalPlanNode> logicalPlanNodeList)
{
if (current.whereCondition().isPresent())
{
logicalPlanNodeList.add(current.whereCondition().get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,34 +44,7 @@ public VisitorResult visit(PhysicalPlanNode prev, Selection current, VisitorCont
List<Condition> whereConditions = new ArrayList<>();
current.condition().ifPresent(whereConditions::add);

if (current.source().isPresent())
{
Dataset dataset = current.source().get();
/* Optimize Scenarios where using Derived Dataset:
Convert unnecessary inner queries like this
select id from (select * from table where condition)
to
select id from table where condition
*/
if (dataset instanceof DerivedDataset)
{
DerivedDataset derivedDataset = (DerivedDataset) dataset;
Condition filterCondition = LogicalPlanUtils.getDatasetFilterCondition(derivedDataset);
whereConditions.add(filterCondition);
logicalPlanNodeList.add(derivedDataset.datasetReference());
}
else if (dataset instanceof FilteredDataset)
{
FilteredDataset filteredDataset = (FilteredDataset) dataset;
Condition filterCondition = filteredDataset.filter();
whereConditions.add(filterCondition);
logicalPlanNodeList.add(filteredDataset.datasetReference());
}
else
{
logicalPlanNodeList.add(dataset);
}
}
visitSource(current, logicalPlanNodeList, whereConditions);

if (current.fields().isEmpty())
{
Expand Down Expand Up @@ -101,4 +74,36 @@ else if (dataset instanceof FilteredDataset)

return new VisitorResult(selectStatement, logicalPlanNodeList);
}

public void visitSource(Selection current, List<LogicalPlanNode> logicalPlanNodeList, List<Condition> whereConditions)
{
if (current.source().isPresent())
{
Dataset dataset = current.source().get();
/* Optimize Scenarios where using Derived Dataset:
Convert unnecessary inner queries like this
select id from (select * from table where condition)
to
select id from table where condition
*/
if (dataset instanceof DerivedDataset)
{
DerivedDataset derivedDataset = (DerivedDataset) dataset;
Condition filterCondition = LogicalPlanUtils.getDatasetFilterCondition(derivedDataset);
whereConditions.add(filterCondition);
logicalPlanNodeList.add(derivedDataset.datasetReference());
}
else if (dataset instanceof FilteredDataset)
{
FilteredDataset filteredDataset = (FilteredDataset) dataset;
Condition filterCondition = filteredDataset.filter();
whereConditions.add(filterCondition);
logicalPlanNodeList.add(filteredDataset.datasetReference());
}
else
{
logicalPlanNodeList.add(dataset);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@
<packaging>jar</packaging>
<name>Legend Engine - XT - Persistence - Component - Relational Bigquery</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencyManagement>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.datasets.FieldType;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.PartitionKey;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.SchemaDefinition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesDatasetReference;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.StagedFilesSelection;
Expand All @@ -33,6 +34,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.operations.Delete;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Drop;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Truncate;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Update;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchEndTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp;
import org.finos.legend.engine.persistence.components.logicalplan.values.DatetimeValue;
Expand Down Expand Up @@ -62,8 +64,10 @@
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.ExternalDatasetVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.FieldVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.PartitionKeyVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.SelectionVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.SQLCreateVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.SQLDropVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.SQLUpdateVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.SchemaDefinitionVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.StagedFilesDatasetReferenceVisitor;
import org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor.StagedFilesDatasetVisitor;
Expand Down Expand Up @@ -119,6 +123,8 @@ public class BigQuerySink extends AnsiSqlSink
logicalPlanVisitorByClass.put(ClusterKey.class, new ClusterKeyVisitor());
logicalPlanVisitorByClass.put(PartitionKey.class, new PartitionKeyVisitor());
logicalPlanVisitorByClass.put(Alter.class, new AlterVisitor());
logicalPlanVisitorByClass.put(Selection.class, new SelectionVisitor());
logicalPlanVisitorByClass.put(Update.class, new SQLUpdateVisitor());
logicalPlanVisitorByClass.put(Delete.class, new DeleteVisitor());
logicalPlanVisitorByClass.put(Field.class, new FieldVisitor());
logicalPlanVisitorByClass.put(Truncate.class, new TruncateVisitor());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2025 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor;

import java.util.List;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Equals;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Update;
import org.finos.legend.engine.persistence.components.logicalplan.values.ObjectValue;

public class SQLUpdateVisitor extends org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.SQLUpdateVisitor
{

/*
UPDATE always needs A WHERE CLAUSE IN BigQuery
If the condition is not provided, default condition used: 1 = 1
*/
@Override
public void visitWhereCondition(Update current, List<LogicalPlanNode> logicalPlanNodeList)
{
if (current.whereCondition().isPresent())
{
super.visitWhereCondition(current, logicalPlanNodeList);
}
else
{
Condition condition = Equals.of(ObjectValue.of(1), ObjectValue.of(1));
logicalPlanNodeList.add(condition);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2025 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.bigquery.sql.visitor;

import java.util.List;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlanNode;
import org.finos.legend.engine.persistence.components.logicalplan.conditions.Condition;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Dataset;
import org.finos.legend.engine.persistence.components.logicalplan.datasets.Selection;
import org.finos.legend.engine.persistence.components.logicalplan.values.NumericalValue;

public class SelectionVisitor extends org.finos.legend.engine.persistence.components.relational.ansi.sql.visitors.SelectionVisitor
{

private static final String SELECT_1_ALIAS = "legend_persistence_X";

/*
SELECTION with WHERE always needs A FROM CLAUSE IN BigQuery
If the source is not provided, default source is SELECT 1 as X
*/
@Override
public void visitSource(Selection current, List<LogicalPlanNode> logicalPlanNodeList, List<Condition> whereConditions)
{
if (current.source().isPresent() || whereConditions.isEmpty())
{
super.visitSource(current, logicalPlanNodeList, whereConditions);
}
else
{
Dataset dataset = Selection.builder().addFields(NumericalValue.of(1L)).alias(SELECT_1_ALIAS).build();
logicalPlanNodeList.add(dataset);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2025 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.util;

import java.time.Clock;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import org.finos.legend.engine.persistence.components.logicalplan.LogicalPlan;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Insert;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Operation;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Update;
import org.finos.legend.engine.persistence.components.logicalplan.values.BatchStartTimestamp;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.bigquery.BigQuerySink;
import org.finos.legend.engine.persistence.components.relational.transformer.RelationalTransformer;
import org.finos.legend.engine.persistence.components.transformer.TransformOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class LockInfoUtilsTest
{

private final ZonedDateTime executionZonedDateTime = ZonedDateTime.of(2000, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
private final TransformOptions transformOptions = TransformOptions.builder().executionTimestampClock(Clock.fixed(executionZonedDateTime.toInstant(), ZoneOffset.UTC)).build();

private LockInfoDataset lockInfoDataset = LockInfoDataset.builder().name("main_table_lock").build();


@Test
public void testInitializeLockInfo()
{
LockInfoUtils store = new LockInfoUtils(lockInfoDataset);
Insert operation = store.initializeLockInfo(BatchStartTimestamp.INSTANCE);
RelationalTransformer transformer = new RelationalTransformer(BigQuerySink.get(), transformOptions);
LogicalPlan logicalPlan = LogicalPlan.builder().addOps(operation).build();
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedSql = "INSERT INTO main_table_lock (`insert_ts_utc`) " +
"(SELECT PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000') FROM (SELECT 1) as legend_persistence_X WHERE NOT (EXISTS (SELECT * FROM main_table_lock as main_table_lock)))";
Assertions.assertEquals(expectedSql, list.get(0));
}

@Test
public void testInitializeLockInfoForMultiIngest()
{
LockInfoUtils store = new LockInfoUtils(lockInfoDataset);
List<Operation> operation = store.initializeLockInfoForMultiIngest(Optional.empty(), BatchStartTimestamp.INSTANCE);
RelationalTransformer transformer = new RelationalTransformer(BigQuerySink.get(), transformOptions);
LogicalPlan logicalPlan = LogicalPlan.builder().addAllOps(operation).build();
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedInsertSql = "INSERT INTO main_table_lock (`insert_ts_utc`, `batch_id`) " +
"(SELECT PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000'),0 FROM (SELECT 1) as legend_persistence_X WHERE NOT (EXISTS (SELECT * FROM main_table_lock as main_table_lock)))";
Assertions.assertEquals(expectedInsertSql, list.get(0));
}

@Test
public void testInitializeLockInfoForMultiIngestWithBatchIdValue()
{
LockInfoUtils store = new LockInfoUtils(lockInfoDataset);
List<Operation> operation = store.initializeLockInfoForMultiIngest(Optional.of(50L), BatchStartTimestamp.INSTANCE);
RelationalTransformer transformer = new RelationalTransformer(BigQuerySink.get(), transformOptions);
LogicalPlan logicalPlan = LogicalPlan.builder().addAllOps(operation).build();
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedUpdateSql = "UPDATE main_table_lock as main_table_lock SET main_table_lock.`batch_id` = 50 WHERE main_table_lock.`batch_id` IS NULL";
String expectedInsertSql = "INSERT INTO main_table_lock (`insert_ts_utc`, `batch_id`) " +
"(SELECT PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000'),50 FROM (SELECT 1) as legend_persistence_X WHERE NOT (EXISTS (SELECT * FROM main_table_lock as main_table_lock)))";
Assertions.assertEquals(expectedInsertSql, list.get(0));
Assertions.assertEquals(expectedUpdateSql, list.get(1));
}

@Test
public void testUpdateMetaStore()
{
LockInfoUtils store = new LockInfoUtils(lockInfoDataset);
Update operation = store.updateLockInfo(BatchStartTimestamp.INSTANCE);
RelationalTransformer transformer = new RelationalTransformer(BigQuerySink.get(), transformOptions);
LogicalPlan logicalPlan = LogicalPlan.builder().addOps(operation).build();
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedSql = "UPDATE main_table_lock as main_table_lock SET main_table_lock.`last_used_ts_utc` = PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S','2000-01-01 00:00:00.000000') WHERE 1 = 1";
Assertions.assertEquals(expectedSql, list.get(0));
}

@Test
public void testUpdateBatchId()
{
LockInfoUtils store = new LockInfoUtils(lockInfoDataset);
LogicalPlan logicalPlan = store.updateBatchId(10);
RelationalTransformer transformer = new RelationalTransformer(BigQuerySink.get(), transformOptions);
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedSql = "UPDATE main_table_lock as main_table_lock SET main_table_lock.`batch_id` = 10 WHERE 1 = 1";
Assertions.assertEquals(expectedSql, list.get(0));
}

@Test
public void testGetLogicalPlanForNextBatchIdValue()
{
LockInfoUtils store = new LockInfoUtils(lockInfoDataset);
LogicalPlan logicalPlan = store.getLogicalPlanForNextBatchIdValue();
RelationalTransformer transformer = new RelationalTransformer(BigQuerySink.get(), transformOptions);
SqlPlan physicalPlan = transformer.generatePhysicalPlan(logicalPlan);
List<String> list = physicalPlan.getSqlList();
String expectedSql = "SELECT MAX(main_table_lock.`batch_id`)+1 FROM main_table_lock as main_table_lock";
Assertions.assertEquals(expectedSql, list.get(0));
}
}

0 comments on commit 7c8763c

Please sign in to comment.