Skip to content

Commit

Permalink
fix: use bindvars at replace partition
Browse files Browse the repository at this point in the history
This PR fixes syntax errors caused by incorrect handling of datetime
values in partitions during query execution and ensures they are parsed
correctly. It also improves security by using `bindvars` for part
values.
  • Loading branch information
rohithreddykota committed Dec 17, 2024
1 parent cc3ab06 commit bf4f06d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
3 changes: 2 additions & 1 deletion runtime/drivers/clickhouse/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ func (c *connection) InsertTableAsSelect(ctx context.Context, name, sql string,
for _, part := range partitions {
// alter the main table to replace the partition
err = c.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("ALTER TABLE %s %s REPLACE PARTITION %s FROM %s", safeSQLName(name), onClusterClause, part, safeSQLName(tempName)),
Query: fmt.Sprintf("ALTER TABLE %s %s REPLACE PARTITION ? FROM %s", safeSQLName(name), onClusterClause, safeSQLName(tempName)),
Args: []any{part},
Priority: 1,
})
if err != nil {
Expand Down
60 changes: 60 additions & 0 deletions runtime/drivers/clickhouse/olap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestClickhouseSingle(t *testing.T) {
t.Run("InsertTableAsSelect_WithAppend", func(t *testing.T) { testInsertTableAsSelect_WithAppend(t, olap) })
t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, olap) })
t.Run("InsertTableAsSelect_WithPartitionOverwrite", func(t *testing.T) { testInsertTableAsSelect_WithPartitionOverwrite(t, olap) })
t.Run("InsertTableAsSelect_WithPartitionOverwrite_DatePartition", func(t *testing.T) { testInsertTableAsSelect_WithPartitionOverwrite_DatePartition(t, olap) })
t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, olap) })
t.Run("TestIntervalType", func(t *testing.T) { testIntervalType(t, olap) })
}
Expand All @@ -57,6 +58,7 @@ func TestClickhouseCluster(t *testing.T) {
t.Run("InsertTableAsSelect_WithAppend", func(t *testing.T) { testInsertTableAsSelect_WithAppend(t, olap) })
t.Run("InsertTableAsSelect_WithMerge", func(t *testing.T) { testInsertTableAsSelect_WithMerge(t, olap) })
t.Run("InsertTableAsSelect_WithPartitionOverwrite", func(t *testing.T) { testInsertTableAsSelect_WithPartitionOverwrite(t, olap) })
t.Run("InsertTableAsSelect_WithPartitionOverwrite_DatePartition", func(t *testing.T) { testInsertTableAsSelect_WithPartitionOverwrite_DatePartition(t, olap) })
t.Run("TestDictionary", func(t *testing.T) { testDictionary(t, olap) })
}

Expand Down Expand Up @@ -289,6 +291,64 @@ func testInsertTableAsSelect_WithPartitionOverwrite(t *testing.T, olap drivers.O
}
}

func testInsertTableAsSelect_WithPartitionOverwrite_DatePartition(t *testing.T, olap drivers.OLAPStore) {
err := olap.CreateTableAsSelect(context.Background(), "replace_tbl", false, "SELECT date_add(hour, generate_series, toDate('2024-12-01')) AS dt, 'insert' AS value FROM generate_series(0, 4)", map[string]any{
"typs": "TABLE",
"engine": "MergeTree",
"table": "tbl",
"distributed.sharding_key": "rand()",
"incremental_strategy": drivers.IncrementalStrategyPartitionOverwrite,
"partition_by": "dt",
"order_by": "value",
"primary_key": "value",
})
require.NoError(t, err)

err = olap.InsertTableAsSelect(context.Background(), "replace_tbl", "SELECT date_add(hour, generate_series, toDate('2024-12-01')) AS dt, 'replace' AS value FROM generate_series(2, 5)", false, true, drivers.IncrementalStrategyPartitionOverwrite, nil)
require.NoError(t, err)

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT dt, value FROM replace_tbl ORDER BY dt"})
require.NoError(t, err)

var result []struct {
DT string
Value string
}

for res.Next() {
var r struct {
DT string
Value string
}
require.NoError(t, res.Scan(&r.DT, &r.Value))
result = append(result, r)
}

expected := []struct {
DT string
Value string
}{
{"2024-12-01T00:00:00Z", "insert"},
{"2024-12-01T01:00:00Z", "insert"},
{"2024-12-01T02:00:00Z", "replace"},
{"2024-12-01T03:00:00Z", "replace"},
{"2024-12-01T04:00:00Z", "replace"},
}

// Convert the result set to a map to represent the set
resultSet := make(map[string]string)
for _, r := range result {
resultSet[r.DT] = r.Value
}

// Check if the expected values are present in the result set
for _, e := range expected {
value, exists := resultSet[e.DT]
require.True(t, exists, "Expected DateTime %s to be present in the result set", e.DT)
require.Equal(t, e.Value, value, "Expected value for DateTime %s to be %s, but got %s", e.DT, e.Value, value)
}
}

func testDictionary(t *testing.T, olap drivers.OLAPStore) {
err := olap.CreateTableAsSelect(context.Background(), "dict", false, "SELECT 1 AS id, 'Earth' AS planet", map[string]any{"table": "Dictionary", "primary_key": "id"})
require.NoError(t, err)
Expand Down

0 comments on commit bf4f06d

Please sign in to comment.