Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: modify cdc binary to always use new arch client except cdc server #1135

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 5 additions & 83 deletions cmd/cdc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,13 @@ package main

import (
"os"
"slices"
"strings"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/cmd/cdc/cli"
"github.com/pingcap/ticdc/cmd/cdc/server"
"github.com/pingcap/ticdc/cmd/cdc/version"
"github.com/pingcap/ticdc/cmd/util"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/tidb/pkg/util/collate"
tiflowCmd "github.com/pingcap/tiflow/pkg/cmd"
tiflowRedo "github.com/pingcap/tiflow/pkg/cmd/redo"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

// NewCmd creates the root command.
Expand All @@ -42,89 +36,17 @@ func NewCmd() *cobra.Command {
}
}

func addNewArchCommandTo(cmd *cobra.Command) {
cmd.AddCommand(server.NewCmdServer())
cmd.AddCommand(cli.NewCmdCli())
cmd.AddCommand(version.NewCmdVersion())
}

func isNewArchEnabledByConfig(serverConfigFilePath string) bool {
cfg := config.GetDefaultServerConfig()
if len(serverConfigFilePath) > 0 {
// strict decode config file, but ignore debug item
if err := util.StrictDecodeFile(serverConfigFilePath, "TiCDC server", cfg, config.DebugConfigurationItem); err != nil {
log.Error("failed to parse server configuration, please check the config file for errors and try again.", zap.Error(err))
return false
}
}

return cfg.Newarch
}

// Utility to remove a flag from os.Args
func removeFlagFromArgs(flag string) []string {
result := []string{os.Args[0]} // keep the command name
for i := 1; i < len(os.Args); i++ {
if os.Args[i] != flag {
result = append(result, os.Args[i])
}
}
return result
}

func parseConfigFlagFromOSArgs() string {
var serverConfigFilePath string
for i, arg := range os.Args[1:] {
if strings.HasPrefix(arg, "--config=") {
serverConfigFilePath = strings.SplitN(arg, "=", 2)[1]
} else if arg == "--config" && i+2 < len(os.Args) {
serverConfigFilePath = os.Args[i+2]
}
}

// If the command is `cdc cli changefeed`, means it's not a server config file.
if slices.Contains(os.Args, "cli") && slices.Contains(os.Args, "changefeed") {
serverConfigFilePath = ""
}

return serverConfigFilePath
}

func parseNewarchFlagFromOSArgs() bool {
newarch := false
for _, arg := range os.Args[1:] {
if arg == "--newarch" {
newarch = true
os.Args = removeFlagFromArgs("--newarch")
} else if arg == "-x" {
newarch = true
os.Args = removeFlagFromArgs("-x")
}
}
return newarch
}

// Run runs the root command.
func main() {
cmd := NewCmd()

cmd.SetOut(os.Stdout)
cmd.SetErr(os.Stderr)

newarch := false
var serverConfigFilePath string
cmd.PersistentFlags().BoolVarP(&newarch, "newarch", "x", false, "Run the new architecture of TiCDC (experimental feature)")
cmd.ParseFlags(os.Args[1:])

// Double check to aviod some corner cases
serverConfigFilePath = parseConfigFlagFromOSArgs()
newarch = parseNewarchFlagFromOSArgs() || (os.Getenv("TICDC_NEWARCH") == "true")

if newarch || isNewArchEnabledByConfig(serverConfigFilePath) {
addNewArchCommandTo(cmd)
} else {
tiflowCmd.AddTiCDCCommandTo(cmd)
}
cmd.AddCommand(server.NewCmdServer())
cmd.AddCommand(cli.NewCmdCli())
cmd.AddCommand(version.NewCmdVersion())
cmd.AddCommand(tiflowRedo.NewCmdRedo())

setNewCollationEnabled()
if err := cmd.Execute(); err != nil {
Expand Down
110 changes: 100 additions & 10 deletions cmd/cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package server

import (
"context"
"encoding/json"
"os"
"slices"
"strings"

"github.com/fatih/color"
Expand All @@ -26,6 +28,8 @@ import (
"github.com/pingcap/ticdc/pkg/logger"
"github.com/pingcap/ticdc/pkg/version"
"github.com/pingcap/ticdc/server"
tiflowServer "github.com/pingcap/tiflow/pkg/cmd/server"
tiflowConfig "github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/security"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -54,6 +58,7 @@ func newOptions() *options {
// addFlags receives a *cobra.Command reference and binds
// flags related to template printing to it.
func (o *options) addFlags(cmd *cobra.Command) {
cmd.Flags().BoolVarP(&o.serverConfig.Newarch, "newarch", "x", o.serverConfig.Newarch, "Run the new architecture of TiCDC server")
cmd.Flags().StringVar(&o.serverConfig.ClusterID, "cluster-id", "default", "Set cdc cluster id")
cmd.Flags().StringVar(&o.serverConfig.Addr, "addr", o.serverConfig.Addr, "Set the listening address")
cmd.Flags().StringVar(&o.serverConfig.AdvertiseAddr, "advertise-addr", o.serverConfig.AdvertiseAddr, "Set the advertise listening address for client communication")
Expand Down Expand Up @@ -157,6 +162,8 @@ func (o *options) complete(command *cobra.Command) error {
cfg.ClusterID = o.serverConfig.ClusterID
case "pd", "config":
// do nothing
case "newarch", "x":
cfg.Newarch = o.serverConfig.Newarch
default:
log.Panic("unknown flag, please report a bug", zap.String("flagName", flag.Name))
}
Expand All @@ -181,6 +188,7 @@ func (o *options) validate() error {
if len(o.pdEndpoints) == 0 {
return errors.ErrInvalidServerOption.GenWithStack("empty PD address")
}
log.Info("validate pd address", zap.Strings("pd", o.pdEndpoints), zap.Int("pdCount", len(o.pdEndpoints)))
for _, ep := range o.pdEndpoints {
// NOTICE: The configuration used here is the one that has been completed,
// as it may be configured by the configuration file.
Expand All @@ -206,6 +214,83 @@ func (o *options) getCredential() *security.Credential {
}
}

func parseConfigFlagFromOSArgs() string {
var serverConfigFilePath string
for i, arg := range os.Args[1:] {
if strings.HasPrefix(arg, "--config=") {
serverConfigFilePath = strings.SplitN(arg, "=", 2)[1]
} else if arg == "--config" && i+2 < len(os.Args) {
serverConfigFilePath = os.Args[i+2]
}
}
log.Info("parse config file path from os.Args", zap.String("config", serverConfigFilePath))

// If the command is `cdc cli changefeed`, means it's not a server config file.
if slices.Contains(os.Args, "cli") && slices.Contains(os.Args, "changefeed") {
serverConfigFilePath = ""
}

return serverConfigFilePath
}

func isNewArchEnabledByConfig(serverConfigFilePath string) bool {
cfg := config.GetDefaultServerConfig()
if len(serverConfigFilePath) > 0 {
// strict decode config file, but ignore debug item
if err := util.StrictDecodeFile(serverConfigFilePath, "TiCDC server", cfg, config.DebugConfigurationItem); err != nil {
log.Error("failed to parse server configuration, please check the config file for errors and try again.", zap.Error(err))
return false
}
}

return cfg.Newarch
}

func isNewArchEnabled(o *options) bool {
newarch := o.serverConfig.Newarch
if newarch {
log.Debug("Set newarch from command line")
return newarch
}

newarch = os.Getenv("TICDC_NEWARCH") == "true"
if newarch {
log.Debug("Set newarch from environment variable")
return newarch
}

serverConfigFilePath := parseConfigFlagFromOSArgs()
newarch = isNewArchEnabledByConfig(serverConfigFilePath)
if newarch {
log.Debug("Set newarch from config file")
}
return newarch
}

func runTiFlowServer(o *options, cmd *cobra.Command) error {
cfgData, err := json.Marshal(o.serverConfig)
if err != nil {
return errors.Trace(err)
}

var oldCfg tiflowConfig.ServerConfig
err = json.Unmarshal(cfgData, &oldCfg)
if err != nil {
return errors.Trace(err)
}

var oldOptions tiflowServer.Options
oldOptions.ServerConfig = &oldCfg
oldOptions.ServerPdAddr = strings.Join(o.pdEndpoints, ",")
oldOptions.ServerConfigFilePath = o.serverConfigFilePath
oldOptions.CaPath = o.caPath
oldOptions.CertPath = o.certPath
oldOptions.KeyPath = o.keyPath
oldOptions.AllowedCertCN = o.allowedCertCN

return tiflowServer.Run(&oldOptions, cmd)
}

// NewCmdServer creates the `server` command.
func NewCmdServer() *cobra.Command {
o := newOptions()
Expand All @@ -215,17 +300,22 @@ func NewCmdServer() *cobra.Command {
Short: "Start a TiCDC server server",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
err := o.complete(cmd)
if err != nil {
return err
}
err = o.validate()
if err != nil {
return err
if isNewArchEnabled(o) {
log.Info("Running TiCDC server in new architecture")
err := o.complete(cmd)
if err != nil {
return err
}
err = o.validate()
if err != nil {
return err
}
err = o.run(cmd)
cobra.CheckErr(err)
return nil
}
err = o.run(cmd)
cobra.CheckErr(err)
return nil
log.Info("Running TiCDC server in old architecture")
return runTiFlowServer(o, cmd)
},
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/pingcap/tidb v1.1.0-beta.0.20241223052309-3735ed55a394
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7
github.com/pingcap/tidb/pkg/parser v0.0.0-20241223052309-3735ed55a394
github.com/pingcap/tiflow v0.0.0-20250307070542-b67943012af2
github.com/pingcap/tiflow v0.0.0-20250318113613-e23fa78655fa
github.com/prometheus/client_golang v1.20.5
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98Fbf
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c=
github.com/pingcap/tidb/pkg/parser v0.0.0-20241223052309-3735ed55a394 h1:Kd5UaT2mbA1gB0G19dkyRB4/Y5DYN8Sy/dgWYg9zNwI=
github.com/pingcap/tidb/pkg/parser v0.0.0-20241223052309-3735ed55a394/go.mod h1:Hju1TEWZvrctQKbztTRwXH7rd41Yq0Pgmq4PrEKcq7o=
github.com/pingcap/tiflow v0.0.0-20250307070542-b67943012af2 h1:V9eAn/kK3rTCzf/IdiULIbrG0ToiZKFzJtIczSyHC/g=
github.com/pingcap/tiflow v0.0.0-20250307070542-b67943012af2/go.mod h1:kLYTUjCGZfBFLtSTboIZrYY1Nv4nZJm2XoX+HgiT78s=
github.com/pingcap/tiflow v0.0.0-20250318113613-e23fa78655fa h1:9jGrp7qawtE8nYKYkDpk+r2b/J4WOU1kMY/ojCeUEEM=
github.com/pingcap/tiflow v0.0.0-20250318113613-e23fa78655fa/go.mod h1:kLYTUjCGZfBFLtSTboIZrYY1Nv4nZJm2XoX+HgiT78s=
github.com/pingcap/tipb v0.0.0-20241105053214-f91fdb81a69e h1:7DdrYVwWpYr4o1AyKl8T376B4h2RsMEjkmom8MxQuuM=
github.com/pingcap/tipb v0.0.0-20241105053214-f91fdb81a69e/go.mod h1:zrnYy8vReNODg8G0OiYaX9OK+kpq+rK1jHmvd1DnIWw=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
Expand Down