Skip to content

Commit 2d8ac90

Browse files
authored
tests: fix integration test bank (#1136)
close #1121
1 parent ced8224 commit 2d8ac90

File tree

4 files changed

+86
-54
lines changed

4 files changed

+86
-54
lines changed

logservice/schemastore/schema_store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error {
189189
zap.Uint64("storeFinishedDDLTS", s.finishedDDLTs))
190190
continue
191191
}
192-
log.Info("handle ddl job",
192+
log.Info("handle a ddl job",
193193
zap.Int64("schemaID", event.Job.SchemaID),
194194
zap.String("schemaName", event.Job.SchemaName),
195195
zap.Int64("tableID", event.Job.TableID),

tests/integration_tests/bank/case.go

+78-51
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,25 @@
1414
package main
1515

1616
import (
17+
"bufio"
1718
"context"
1819
"database/sql"
19-
"encoding/json"
2020
"fmt"
21-
"io"
2221
"math/rand"
23-
"net/http"
22+
"os"
23+
"path/filepath"
24+
"regexp"
25+
"strconv"
2426
"strings"
2527
"sync/atomic"
2628
"time"
2729

2830
_ "github.com/go-sql-driver/mysql" // MySQL driver
2931
"github.com/pingcap/errors"
3032
"github.com/pingcap/log"
31-
cerror "github.com/pingcap/ticdc/pkg/errors"
32-
"github.com/pingcap/ticdc/pkg/retry"
33+
cerror "github.com/pingcap/tiflow/pkg/errors"
34+
"github.com/pingcap/tiflow/pkg/retry"
35+
"github.com/tikv/client-go/v2/oracle"
3336
"go.uber.org/zap"
3437
"golang.org/x/sync/errgroup"
3538
)
@@ -260,11 +263,15 @@ func (s *bankTest) prepare(ctx context.Context, db *sql.DB, accounts, tableID, c
260263
}
261264

262265
func (*bankTest) verify(ctx context.Context, db *sql.DB, accounts, tableID int, tag string, endTs uint64) error {
266+
retryInterval := 1000
263267
return retry.Do(ctx,
264268
func() (err error) {
265269
defer func() {
266270
if err != nil {
267-
log.Error("bank test verify failed", zap.Error(err))
271+
physical := oracle.GetTimeFromTS(endTs)
272+
physical = physical.Add(time.Duration(retryInterval) * time.Millisecond)
273+
endTs = oracle.GoTimeToTS(physical)
274+
log.Error("bank test verify failed", zap.Error(err), zap.Time("physical", physical), zap.Uint64("nextRetrySnapshotTs", endTs))
268275
}
269276
}()
270277
// use a single connection to keep the same database session.
@@ -281,11 +288,16 @@ func (*bankTest) verify(ctx context.Context, db *sql.DB, accounts, tableID int,
281288
var obtained, expect int
282289

283290
query := fmt.Sprintf("SELECT SUM(balance) as total FROM accounts%d", tableID)
284-
if err := conn.QueryRowContext(ctx, query).Scan(&obtained); err != nil {
285-
log.Warn("query failed", zap.String("query", query), zap.Error(err), zap.String("tag", tag))
291+
var nullableResult sql.NullInt64
292+
if err := conn.QueryRowContext(ctx, query).Scan(&nullableResult); err != nil {
293+
log.Error("query failed", zap.String("query", query), zap.Error(err), zap.String("tag", tag))
286294
return errors.Trace(err)
287295
}
288296

297+
if nullableResult.Valid {
298+
obtained = int(nullableResult.Int64)
299+
}
300+
289301
expect = accounts * initBalance
290302
if obtained != expect {
291303
return errors.Errorf("verify balance failed, accounts%d expect %d, but got %d", tableID, expect, obtained)
@@ -307,7 +319,7 @@ func (*bankTest) verify(ctx context.Context, db *sql.DB, accounts, tableID int,
307319
}
308320

309321
return nil
310-
}, retry.WithBackoffMaxDelay(500), retry.WithBackoffMaxDelay(120*1000), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError))
322+
}, retry.WithBackoffBaseDelay(int64(retryInterval)), retry.WithBackoffMaxDelay(120*1000), retry.WithMaxTries(20), retry.WithIsRetryableErr(cerror.IsRetryableError))
311323
}
312324

313325
// tryDropDB will drop table if data incorrect and panic error likes bad connect.
@@ -486,8 +498,9 @@ func run(
486498
}
487499
}
488500

489-
// DDL is a strong sync point in TiCDC. Once finishmark table is replicated to downstream
490-
// all previous DDL and DML are replicated too.
501+
// After we implement concurrent DDL of different tables, DDL is no longer a strong sync point in TiCDC.
502+
// We need to use changefeed's checkpoint > ddl.FinishedTS to ensure all previous DDL and DML before
503+
// the DDL are synced.
491504
mustExec(ctx, upstreamDB, `CREATE TABLE IF NOT EXISTS finishmark (foo BIGINT PRIMARY KEY)`)
492505
waitCtx, waitCancel := context.WithTimeout(ctx, 15*time.Minute)
493506
endTs, err := getDownStreamSyncedEndTs(waitCtx, downstreamDB, downstreamAPIEndpoint, "finishmark")
@@ -623,55 +636,69 @@ func getDownStreamSyncedEndTs(ctx context.Context, db *sql.DB, tidbAPIEndpoint,
623636
log.Error("get downstream sync end ts failed due to timeout", zap.String("table", tableName), zap.Error(ctx.Err()))
624637
return 0, ctx.Err()
625638
case <-time.After(2 * time.Second):
626-
result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName)
639+
result, ok := tryGetEndTsFromLog(db, tableName)
627640
if ok {
628641
return result, nil
629642
}
630643
}
631644
}
632645
}
633646

634-
func tryGetEndTs(db *sql.DB, tidbAPIEndpoint, tableName string) (result uint64, ok bool) {
635-
// Note: We should not use `END_TS` in the table, because it is encoded in
636-
// the format `2023-03-16 18:12:51`, it's not precise enough.
637-
query := "SELECT JOB_ID FROM information_schema.ddl_jobs WHERE table_name = ?"
638-
log.Info("try get end ts", zap.String("query", query), zap.String("tableName", tableName))
639-
var jobID uint64
640-
row := db.QueryRow(query, tableName)
641-
if err := row.Scan(&jobID); err != nil {
642-
if err != sql.ErrNoRows {
643-
log.Info("rows scan failed", zap.Error(err))
647+
func tryGetEndTsFromLog(db *sql.DB, tableName string) (result uint64, ok bool) {
648+
log.Info("try parse finishedTs from ticdc log", zap.String("tableName", tableName))
649+
650+
logFilePath := "/tmp/tidb_cdc_test/bank"
651+
cdcLogFiles := make([]string, 0)
652+
// walk all file with cdc prefix
653+
err := filepath.WalkDir(logFilePath, func(path string, d os.DirEntry, err error) error {
654+
if err != nil {
655+
return err
644656
}
645-
return 0, false
646-
}
647-
ddlJobURL := fmt.Sprintf(
648-
"http://%s/ddl/history?start_job_id=%d&limit=1", tidbAPIEndpoint, jobID)
649-
ddlJobResp, err := http.Get(ddlJobURL)
650-
if err != nil {
651-
log.Warn("fail to get DDL history",
652-
zap.String("URL", ddlJobURL), zap.Error(err))
653-
return 0, false
654-
}
655-
defer ddlJobResp.Body.Close()
656-
ddlJobJSON, err := io.ReadAll(ddlJobResp.Body)
657+
if !d.IsDir() {
658+
if strings.Contains(d.Name(), "down") && strings.Contains(d.Name(), "cdc") && strings.Contains(d.Name(), "log") {
659+
cdcLogFiles = append(cdcLogFiles, path)
660+
fmt.Println(path)
661+
}
662+
}
663+
return nil
664+
})
657665
if err != nil {
658-
log.Warn("fail to read DDL history",
659-
zap.String("URL", ddlJobURL), zap.Error(err))
660-
return 0, false
666+
log.Error("Failed to walk dir: %v", zap.Error(err))
661667
}
662-
ddlJob := []struct {
663-
Binlog struct {
664-
FinishedTS uint64 `json:"FinishedTS"`
665-
} `json:"binlog"`
666-
}{{}}
667-
err = json.Unmarshal(ddlJobJSON, &ddlJob)
668-
if err != nil {
669-
log.Warn("fail to unmarshal DDL history",
670-
zap.String("URL", ddlJobURL), zap.String("resp", string(ddlJobJSON)), zap.Error(err))
671-
return 0, false
668+
log.Info("total files", zap.Any("file", cdcLogFiles))
669+
670+
logRegex := regexp.MustCompile(`handle a ddl job`)
671+
tableNameRegex := regexp.MustCompile(tableName + "`")
672+
timeStampRegex := regexp.MustCompile(`jobFinishTs=([0-9]+)`)
673+
for _, f := range cdcLogFiles {
674+
file, err := os.Open(f)
675+
if err != nil {
676+
log.Error("Failed to open file: %v", zap.Error(err))
677+
}
678+
defer file.Close()
679+
680+
scanner := bufio.NewScanner(file)
681+
for scanner.Scan() {
682+
line := scanner.Text()
683+
if !logRegex.MatchString(line) || !tableNameRegex.MatchString(line) {
684+
continue
685+
}
686+
687+
matches := timeStampRegex.FindStringSubmatch(line)
688+
if len(matches) > 1 {
689+
fmt.Println("found first match line: ", matches[1], ": ", line)
690+
// convert to uint64
691+
result, err := strconv.ParseUint(matches[1], 10, 64)
692+
if err != nil {
693+
log.Error("Failed to parse uint64: %v", zap.Error(err))
694+
}
695+
return result, true
696+
}
697+
}
698+
699+
if err := scanner.Err(); err != nil {
700+
log.Error("Error scanning file: %v", zap.Error(err))
701+
}
672702
}
673-
log.Info("get end ts",
674-
zap.String("tableName", tableName),
675-
zap.Uint64("ts", ddlJob[0].Binlog.FinishedTS))
676-
return ddlJob[0].Binlog.FinishedTS, true
703+
return 0, false
677704
}

tests/integration_tests/bank/run.sh

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ function prepare() {
2020
run_sql "CREATE DATABASE bank" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
2121

2222
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
23-
2423
run_cdc_cli changefeed create --sink-uri="mysql://root@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/"
24+
25+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8400" --pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" --logsuffix "down"
26+
run_cdc_cli changefeed create --sink-uri="blackhole://" -c "changefeed-for-find-finished-ts" --server "http://127.0.0.1:8400"
2527
}
2628

2729
trap stop_tidb_cluster EXIT

tools/workload/main.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,10 @@ func doUpdate(conn *sql.Conn, workload schema.Workload, input chan updateTask) {
420420
res, err = execute(conn, updateSql, workload, task.Table)
421421
}
422422
if err != nil {
423-
plog.Info("update error", zap.Error(err), zap.String("sql", updateSql[:20]))
423+
if len(updateSql) > 20 {
424+
updateSql = updateSql[:20] + "..."
425+
}
426+
plog.Info("update error", zap.Error(err), zap.String("sql", updateSql))
424427
errCount.Add(1)
425428
}
426429
if res != nil {

0 commit comments

Comments
 (0)