diff --git a/go.mod b/go.mod index fe0328031c..89d194b5d4 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.71 - github.com/openimsdk/tools v0.0.50-alpha.67 + github.com/openimsdk/protocol v0.0.72-alpha.70 + github.com/openimsdk/tools v0.0.50-alpha.64 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 0b8c863258..3792a5e341 100644 --- a/go.sum +++ b/go.sum @@ -317,12 +317,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= -github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70= -github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.67 h1:K7kguqvPbjldHAi7pGhcG2ERkctCqG9ZFlteT7UKaxM= -github.com/openimsdk/tools v0.0.50-alpha.67/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ= +github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/tools v0.0.50-alpha.64 h1:KmtE8V2K8atQJJg1xq2ySSrPQyf8ldwk8fw6jRnsxCw= +github.com/openimsdk/tools v0.0.50-alpha.64/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go index a3540d1cf2..7a36bb605f 100644 --- a/internal/api/config_manager.go +++ b/internal/api/config_manager.go @@ -1,313 +1,308 @@ package api -// -//import ( -// "encoding/json" -// "reflect" -// "strconv" -// "time" -// -// "github.com/gin-gonic/gin" -// "github.com/openimsdk/open-im-server/v3/pkg/apistruct" -// "github.com/openimsdk/open-im-server/v3/pkg/authverify" -// "github.com/openimsdk/open-im-server/v3/pkg/common/config" -// "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" -// "github.com/openimsdk/open-im-server/v3/version" -// "github.com/openimsdk/tools/apiresp" -// "github.com/openimsdk/tools/errs" -// "github.com/openimsdk/tools/log" -// "github.com/openimsdk/tools/utils/runtimeenv" -// clientv3 "go.etcd.io/etcd/client/v3" -//) -// -//const ( -// // wait for Restart http call return -// waitHttp = time.Millisecond * 200 -//) -// -//type ConfigManager struct { -// imAdminUserID []string -// config *config.AllConfig -// client *clientv3.Client -// -// configPath string -// runtimeEnv string -//} -// -//func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { -// cm := &ConfigManager{ -// imAdminUserID: IMAdminUserID, -// config: cfg, -// client: client, -// configPath: configPath, -// runtimeEnv: runtimeEnv, -// } -// return cm -//} -// -//func (cm *ConfigManager) CheckAdmin(c *gin.Context) { -// if err := authverify.CheckAdmin(c, cm.imAdminUserID); err != nil { -// apiresp.GinError(c, err) -// c.Abort() -// } -//} -// -//func (cm *ConfigManager) GetConfig(c *gin.Context) { -// var req apistruct.GetConfigReq -// if err := c.BindJSON(&req); err != nil { -// apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) -// return -// } -// conf := cm.config.Name2Config(req.ConfigName) -// if conf == nil { -// apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) -// return -// } -// b, err := json.Marshal(conf) -// if err != nil { -// apiresp.GinError(c, err) -// return -// } -// apiresp.GinSuccess(c, string(b)) -//} -// -//func (cm *ConfigManager) GetConfigList(c *gin.Context) { -// var resp apistruct.GetConfigListResp -// resp.ConfigNames = cm.config.GetConfigNames() -// resp.Environment = runtimeenv.PrintRuntimeEnvironment() -// resp.Version = version.Version -// -// apiresp.GinSuccess(c, resp) -//} -// -//func (cm *ConfigManager) SetConfig(c *gin.Context) { -// if cm.config.Discovery.Enable != config.ETCD { -// apiresp.GinError(c, errs.New("only etcd support set config").Wrap()) -// return -// } -// var req apistruct.SetConfigReq -// if err := c.BindJSON(&req); err != nil { -// apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) -// return -// } -// var err error -// switch req.ConfigName { -// case cm.config.Discovery.GetConfigFileName(): -// err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Kafka.GetConfigFileName(): -// err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.LocalCache.GetConfigFileName(): -// err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Log.GetConfigFileName(): -// err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Minio.GetConfigFileName(): -// err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Mongo.GetConfigFileName(): -// err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Notification.GetConfigFileName(): -// err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.API.GetConfigFileName(): -// err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.CronTask.GetConfigFileName(): -// err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.MsgGateway.GetConfigFileName(): -// err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.MsgTransfer.GetConfigFileName(): -// err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Push.GetConfigFileName(): -// err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Auth.GetConfigFileName(): -// err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Conversation.GetConfigFileName(): -// err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Friend.GetConfigFileName(): -// err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Group.GetConfigFileName(): -// err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Msg.GetConfigFileName(): -// err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Third.GetConfigFileName(): -// err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.User.GetConfigFileName(): -// err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Redis.GetConfigFileName(): -// err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Share.GetConfigFileName(): -// err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// case cm.config.Webhooks.GetConfigFileName(): -// err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm) -// default: -// apiresp.GinError(c, errs.ErrArgs.Wrap()) -// return -// } -// if err != nil { -// apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) -// return -// } -// apiresp.GinSuccess(c, nil) -//} -// -//func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, cm *ConfigManager) error { -// conf := new(T) -// err := json.Unmarshal([]byte(req.Data), &conf) -// if err != nil { -// return errs.ErrArgs.WithDetail(err.Error()).Wrap() -// } -// eq := reflect.DeepEqual(old, conf) -// if eq { -// return nil -// } -// data, err := json.Marshal(conf) -// if err != nil { -// return errs.ErrArgs.WithDetail(err.Error()).Wrap() -// } -// _, err = cm.client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) -// if err != nil { -// return errs.WrapMsg(err, "save to etcd failed") -// } -// return nil -//} -// -//func (cm *ConfigManager) ResetConfig(c *gin.Context) { -// go func() { -// if err := cm.resetConfig(c, true); err != nil { -// log.ZError(c, "reset config err", err) -// } -// }() -// apiresp.GinSuccess(c, nil) -//} -// -//func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...clientv3.Op) error { -// txn := cm.client.Txn(c) -// type initConf struct { -// old any -// new any -// } -// configMap := map[string]*initConf{ -// cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)}, -// cm.config.Kafka.GetConfigFileName(): {old: &cm.config.Kafka, new: new(config.Kafka)}, -// cm.config.LocalCache.GetConfigFileName(): {old: &cm.config.LocalCache, new: new(config.LocalCache)}, -// cm.config.Log.GetConfigFileName(): {old: &cm.config.Log, new: new(config.Log)}, -// cm.config.Minio.GetConfigFileName(): {old: &cm.config.Minio, new: new(config.Minio)}, -// cm.config.Mongo.GetConfigFileName(): {old: &cm.config.Mongo, new: new(config.Mongo)}, -// cm.config.Notification.GetConfigFileName(): {old: &cm.config.Notification, new: new(config.Notification)}, -// cm.config.API.GetConfigFileName(): {old: &cm.config.API, new: new(config.API)}, -// cm.config.CronTask.GetConfigFileName(): {old: &cm.config.CronTask, new: new(config.CronTask)}, -// cm.config.MsgGateway.GetConfigFileName(): {old: &cm.config.MsgGateway, new: new(config.MsgGateway)}, -// cm.config.MsgTransfer.GetConfigFileName(): {old: &cm.config.MsgTransfer, new: new(config.MsgTransfer)}, -// cm.config.Push.GetConfigFileName(): {old: &cm.config.Push, new: new(config.Push)}, -// cm.config.Auth.GetConfigFileName(): {old: &cm.config.Auth, new: new(config.Auth)}, -// cm.config.Conversation.GetConfigFileName(): {old: &cm.config.Conversation, new: new(config.Conversation)}, -// cm.config.Friend.GetConfigFileName(): {old: &cm.config.Friend, new: new(config.Friend)}, -// cm.config.Group.GetConfigFileName(): {old: &cm.config.Group, new: new(config.Group)}, -// cm.config.Msg.GetConfigFileName(): {old: &cm.config.Msg, new: new(config.Msg)}, -// cm.config.Third.GetConfigFileName(): {old: &cm.config.Third, new: new(config.Third)}, -// cm.config.User.GetConfigFileName(): {old: &cm.config.User, new: new(config.User)}, -// cm.config.Redis.GetConfigFileName(): {old: &cm.config.Redis, new: new(config.Redis)}, -// cm.config.Share.GetConfigFileName(): {old: &cm.config.Share, new: new(config.Share)}, -// cm.config.Webhooks.GetConfigFileName(): {old: &cm.config.Webhooks, new: new(config.Webhooks)}, -// } -// -// changedKeys := make([]string, 0, len(configMap)) -// for k, v := range configMap { -// err := config.Load( -// cm.configPath, -// k, -// config.EnvPrefixMap[k], -// cm.runtimeEnv, -// v.new, -// ) -// if err != nil { -// log.ZError(c, "load config failed", err) -// continue -// } -// equal := reflect.DeepEqual(v.old, v.new) -// if !checkChange || !equal { -// changedKeys = append(changedKeys, k) -// } -// } -// -// for _, k := range changedKeys { -// data, err := json.Marshal(configMap[k].new) -// if err != nil { -// log.ZError(c, "marshal config failed", err) -// continue -// } -// ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data))) -// } -// if len(ops) > 0 { -// txn.Then(ops...) -// _, err := txn.Commit() -// if err != nil { -// return errs.WrapMsg(err, "commit etcd txn failed") -// } -// } -// return nil -//} -// -//func (cm *ConfigManager) Restart(c *gin.Context) { -// go cm.restart(c) -// apiresp.GinSuccess(c, nil) -//} -// -//func (cm *ConfigManager) restart(c *gin.Context) { -// time.Sleep(waitHttp) // wait for Restart http call return -// t := time.Now().Unix() -// _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) -// if err != nil { -// log.ZError(c, "restart etcd put key failed", err) -// } -//} -// -//func (cm *ConfigManager) SetEnableConfigManager(c *gin.Context) { -// if cm.config.Discovery.Enable != config.ETCD { -// apiresp.GinError(c, errs.New("only etcd support config manager").Wrap()) -// return -// } -// var req apistruct.SetEnableConfigManagerReq -// if err := c.BindJSON(&req); err != nil { -// apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) -// return -// } -// var enableStr string -// if req.Enable { -// enableStr = etcd.Enable -// } else { -// enableStr = etcd.Disable -// } -// resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) -// if err != nil { -// apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) -// return -// } -// if !(resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable) && req.Enable { -// go func() { -// time.Sleep(waitHttp) // wait for Restart http call return -// err := cm.resetConfig(c, false, clientv3.OpPut(etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr)) -// if err != nil { -// log.ZError(c, "resetConfig failed", err) -// } -// }() -// } else { -// _, err = cm.client.Put(c, etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr) -// if err != nil { -// apiresp.GinError(c, errs.WrapMsg(err, "setEnableConfigManager failed")) -// return -// } -// } -// -// apiresp.GinSuccess(c, nil) -//} -// -//func (cm *ConfigManager) GetEnableConfigManager(c *gin.Context) { -// resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) -// if err != nil { -// apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) -// return -// } -// var enable bool -// if resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable { -// enable = true -// } -// apiresp.GinSuccess(c, &apistruct.GetEnableConfigManagerResp{Enable: enable}) -//} +import ( + "encoding/json" + "reflect" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" + "github.com/openimsdk/open-im-server/v3/version" + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/runtimeenv" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + // wait for Restart http call return + waitHttp = time.Millisecond * 200 +) + +type ConfigManager struct { + imAdminUserID []string + config *config.AllConfig + client *clientv3.Client + + configPath string + runtimeEnv string +} + +func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { + cm := &ConfigManager{ + imAdminUserID: IMAdminUserID, + config: cfg, + client: client, + configPath: configPath, + runtimeEnv: runtimeEnv, + } + return cm +} + +func (cm *ConfigManager) CheckAdmin(c *gin.Context) { + if err := authverify.CheckAdmin(c, cm.imAdminUserID); err != nil { + apiresp.GinError(c, err) + c.Abort() + } +} + +func (cm *ConfigManager) GetConfig(c *gin.Context) { + var req apistruct.GetConfigReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + conf := cm.config.Name2Config(req.ConfigName) + if conf == nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) + return + } + b, err := json.Marshal(conf) + if err != nil { + apiresp.GinError(c, err) + return + } + apiresp.GinSuccess(c, string(b)) +} + +func (cm *ConfigManager) GetConfigList(c *gin.Context) { + var resp apistruct.GetConfigListResp + resp.ConfigNames = cm.config.GetConfigNames() + resp.Environment = runtimeenv.PrintRuntimeEnvironment() + resp.Version = version.Version + + apiresp.GinSuccess(c, resp) +} + +func (cm *ConfigManager) SetConfig(c *gin.Context) { + if cm.config.Discovery.Enable != config.ETCD { + apiresp.GinError(c, errs.New("only etcd support set config").Wrap()) + return + } + var req apistruct.SetConfigReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var err error + switch req.ConfigName { + case cm.config.Discovery.GetConfigFileName(): + err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Kafka.GetConfigFileName(): + err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.LocalCache.GetConfigFileName(): + err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Log.GetConfigFileName(): + err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Minio.GetConfigFileName(): + err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Mongo.GetConfigFileName(): + err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Notification.GetConfigFileName(): + err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.API.GetConfigFileName(): + err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.CronTask.GetConfigFileName(): + err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.MsgGateway.GetConfigFileName(): + err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.MsgTransfer.GetConfigFileName(): + err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Push.GetConfigFileName(): + err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Auth.GetConfigFileName(): + err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Conversation.GetConfigFileName(): + err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Friend.GetConfigFileName(): + err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Group.GetConfigFileName(): + err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Msg.GetConfigFileName(): + err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Third.GetConfigFileName(): + err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.User.GetConfigFileName(): + err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Redis.GetConfigFileName(): + err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Share.GetConfigFileName(): + err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm) + case cm.config.Webhooks.GetConfigFileName(): + err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm) + default: + apiresp.GinError(c, errs.ErrArgs.Wrap()) + return + } + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + apiresp.GinSuccess(c, nil) +} + +func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, cm *ConfigManager) error { + conf := new(T) + err := json.Unmarshal([]byte(req.Data), &conf) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + eq := reflect.DeepEqual(old, conf) + if eq { + return nil + } + data, err := json.Marshal(conf) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + _, err = cm.client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) + if err != nil { + return errs.WrapMsg(err, "save to etcd failed") + } + return nil +} + +func (cm *ConfigManager) ResetConfig(c *gin.Context) { + go func() { + if err := cm.resetConfig(c, true); err != nil { + log.ZError(c, "reset config err", err) + } + }() + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) resetConfig(c *gin.Context, checkChange bool, ops ...clientv3.Op) error { + txn := cm.client.Txn(c) + type initConf struct { + old any + new any + } + configMap := map[string]*initConf{ + cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)}, + cm.config.Kafka.GetConfigFileName(): {old: &cm.config.Kafka, new: new(config.Kafka)}, + cm.config.LocalCache.GetConfigFileName(): {old: &cm.config.LocalCache, new: new(config.LocalCache)}, + cm.config.Log.GetConfigFileName(): {old: &cm.config.Log, new: new(config.Log)}, + cm.config.Minio.GetConfigFileName(): {old: &cm.config.Minio, new: new(config.Minio)}, + cm.config.Mongo.GetConfigFileName(): {old: &cm.config.Mongo, new: new(config.Mongo)}, + cm.config.Notification.GetConfigFileName(): {old: &cm.config.Notification, new: new(config.Notification)}, + cm.config.API.GetConfigFileName(): {old: &cm.config.API, new: new(config.API)}, + cm.config.CronTask.GetConfigFileName(): {old: &cm.config.CronTask, new: new(config.CronTask)}, + cm.config.MsgGateway.GetConfigFileName(): {old: &cm.config.MsgGateway, new: new(config.MsgGateway)}, + cm.config.MsgTransfer.GetConfigFileName(): {old: &cm.config.MsgTransfer, new: new(config.MsgTransfer)}, + cm.config.Push.GetConfigFileName(): {old: &cm.config.Push, new: new(config.Push)}, + cm.config.Auth.GetConfigFileName(): {old: &cm.config.Auth, new: new(config.Auth)}, + cm.config.Conversation.GetConfigFileName(): {old: &cm.config.Conversation, new: new(config.Conversation)}, + cm.config.Friend.GetConfigFileName(): {old: &cm.config.Friend, new: new(config.Friend)}, + cm.config.Group.GetConfigFileName(): {old: &cm.config.Group, new: new(config.Group)}, + cm.config.Msg.GetConfigFileName(): {old: &cm.config.Msg, new: new(config.Msg)}, + cm.config.Third.GetConfigFileName(): {old: &cm.config.Third, new: new(config.Third)}, + cm.config.User.GetConfigFileName(): {old: &cm.config.User, new: new(config.User)}, + cm.config.Redis.GetConfigFileName(): {old: &cm.config.Redis, new: new(config.Redis)}, + cm.config.Share.GetConfigFileName(): {old: &cm.config.Share, new: new(config.Share)}, + cm.config.Webhooks.GetConfigFileName(): {old: &cm.config.Webhooks, new: new(config.Webhooks)}, + } + + changedKeys := make([]string, 0, len(configMap)) + for k, v := range configMap { + err := config.Load( + cm.configPath, + k, + config.EnvPrefixMap[k], + cm.runtimeEnv, + v.new, + ) + if err != nil { + log.ZError(c, "load config failed", err) + continue + } + equal := reflect.DeepEqual(v.old, v.new) + if !checkChange || !equal { + changedKeys = append(changedKeys, k) + } + } + + for _, k := range changedKeys { + data, err := json.Marshal(configMap[k].new) + if err != nil { + log.ZError(c, "marshal config failed", err) + continue + } + ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data))) + } + if len(ops) > 0 { + txn.Then(ops...) + _, err := txn.Commit() + if err != nil { + return errs.WrapMsg(err, "commit etcd txn failed") + } + } + return nil +} + +func (cm *ConfigManager) Restart(c *gin.Context) { + go cm.restart(c) + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) restart(c *gin.Context) { + time.Sleep(waitHttp) // wait for Restart http call return + t := time.Now().Unix() + _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) + if err != nil { + log.ZError(c, "restart etcd put key failed", err) + } +} + +func (cm *ConfigManager) SetEnableConfigManager(c *gin.Context) { + var req apistruct.SetEnableConfigManagerReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var enableStr string + if req.Enable { + enableStr = etcd.Enable + } else { + enableStr = etcd.Disable + } + resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) + return + } + if !(resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable) && req.Enable { + go func() { + time.Sleep(waitHttp) // wait for Restart http call return + err := cm.resetConfig(c, false, clientv3.OpPut(etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr)) + if err != nil { + log.ZError(c, "resetConfig failed", err) + } + }() + } else { + _, err = cm.client.Put(c, etcd.BuildKey(etcd.EnableConfigCenterKey), enableStr) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "setEnableConfigManager failed")) + return + } + } + + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) GetEnableConfigManager(c *gin.Context) { + resp, err := cm.client.Get(c, etcd.BuildKey(etcd.EnableConfigCenterKey)) + if err != nil { + apiresp.GinError(c, errs.WrapMsg(err, "getEnableConfigManager failed")) + return + } + var enable bool + if resp.Count > 0 && string(resp.Kvs[0].Value) == etcd.Enable { + enable = true + } + apiresp.GinSuccess(c, &apistruct.GetEnableConfigManagerResp{Enable: enable}) +} diff --git a/internal/api/init.go b/internal/api/init.go index fe8ac1cd05..9b7524e1cf 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -43,9 +43,8 @@ func Start(ctx context.Context, index int, cfg *Config) error { var client discovery.SvcDiscoveryRegistry - // Determine whether zk is passed according to whether it is a clustered deployment - client, err = kdisc.NewDiscoveryRegister(&cfg.Discovery, &cfg.Share, []string{ - cfg.Share.RpcRegisterName.MessageGateway, + client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv, []string{ + config.Discovery.RpcService.MessageGateway, }) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") diff --git a/internal/api/router.go b/internal/api/router.go index 72cafc6b30..bfd4e94d0b 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -292,6 +292,25 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co proDiscoveryGroup.GET("/msg_gateway", pd.MessageGateway) proDiscoveryGroup.GET("/msg_transfer", pd.MessageTransfer) } + + var etcdClient *clientv3.Client + if cfg.Discovery.Enable == config.ETCD { + etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + cm := NewConfigManager(cfg.Share.IMAdminUserID, cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv) + { + + configGroup := r.Group("/config", cm.CheckAdmin) + configGroup.POST("/get_config_list", cm.GetConfigList) + configGroup.POST("/get_config", cm.GetConfig) + configGroup.POST("/set_config", cm.SetConfig) + configGroup.POST("/reset_config", cm.ResetConfig) + configGroup.POST("/set_enable_config_manager", cm.SetEnableConfigManager) + configGroup.POST("/get_enable_config_manager", cm.GetEnableConfigManager) + } + { + r.POST("/restart", cm.CheckAdmin, cm.Restart) + } return r, nil } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 2e6a0f2deb..e69a8649ca 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -62,6 +62,9 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { []string{ conf.Share.RpcRegisterName.MessageGateway, }, + []string{ + conf.Discovery.RpcService.MessageGateway, + }, s.InitServer, ) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 1ac97eeb11..9043767820 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -82,7 +82,7 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return err } - client, err := discRegister.NewDiscoveryRegister(&config.Discovery, &config.Share, nil) + client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv, nil) if err != nil { return err } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 689244dd16..9c5a1c6cbe 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -18,10 +18,12 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" @@ -39,36 +41,47 @@ type CronTaskConfig struct { Discovery config.Discovery } -func Start(ctx context.Context, config *CronTaskConfig) error { - log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords) - if config.CronTask.RetainChatRecords < 1 { +func Start(ctx context.Context, conf *CronTaskConfig) error { + conf.runTimeEnv = runtimeenv.PrintRuntimeEnvironment() + + log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", conf.runTimeEnv, "chatRecordsClearTime", conf.CronTask.CronExecuteTime, "msgDestructTime", conf.CronTask.RetainChatRecords) + if conf.CronTask.RetainChatRecords < 1 { return errs.New("msg destruct time must be greater than 1").Wrap() } - client, err := kdisc.NewDiscoveryRegister(&config.Discovery, &config.Share, nil) + client, err := kdisc.NewDiscoveryRegister(&conf.Discovery, conf.runTimeEnv, nil) if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) - ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) + ctx = mcontext.SetOpUserID(ctx, conf.Share.IMAdminUserID[0]) - msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) + msgConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Msg) if err != nil { return err } - thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + thirdConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Third) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) + conversationConn, err := client.GetConn(ctx, conf.Discovery.RpcService.Conversation) if err != nil { return err } + if conf.Discovery.Enable == config.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ + conf.CronTask.GetConfigFileName(), + conf.Share.GetConfigFileName(), + conf.Discovery.GetConfigFileName(), + }) + cm.Watch(ctx) + } + srv := &cronServer{ ctx: ctx, - config: config, + config: conf, cron: cron.New(), msgClient: msg.NewMsgClient(msgConn), conversationClient: pbconversation.NewConversationClient(conversationConn), @@ -84,7 +97,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if err := srv.registerClearUserMsg(); err != nil { return err } - log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) + log.ZDebug(ctx, "start cron task", "CronExecuteTime", conf.CronTask.CronExecuteTime) srv.cron.Start() <-ctx.Done() return nil diff --git a/pkg/apistruct/config_manager.go b/pkg/apistruct/config_manager.go new file mode 100644 index 0000000000..9b8641c9d6 --- /dev/null +++ b/pkg/apistruct/config_manager.go @@ -0,0 +1,24 @@ +package apistruct + +type GetConfigReq struct { + ConfigName string `json:"configName"` +} + +type GetConfigListResp struct { + Environment string `json:"environment"` + Version string `json:"version"` + ConfigNames []string `json:"configNames"` +} + +type SetConfigReq struct { + ConfigName string `json:"configName"` + Data string `json:"data"` +} + +type SetEnableConfigManagerReq struct { + Enable bool `json:"enable"` +} + +type GetEnableConfigManagerResp struct { + Enable bool `json:"enable"` +} diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index 54f65bc377..6c11218f6c 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -60,5 +60,8 @@ func (a *AuthRpcCmd) runE() error { []string{ a.authConfig.Share.RpcRegisterName.MessageGateway, }, + []string{ + a.authConfig.Discovery.RpcService.MessageGateway, + }, auth.Start) } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 3d252e2673..c456971d2b 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -58,7 +58,15 @@ func (a *ConversationRpcCmd) Exec() error { func (a *ConversationRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, - a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, - nil, + a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, + []string{ + a.conversationConfig.RpcConfig.GetConfigFileName(), + a.conversationConfig.RedisConfig.GetConfigFileName(), + a.conversationConfig.MongodbConfig.GetConfigFileName(), + a.conversationConfig.NotificationConfig.GetConfigFileName(), + a.conversationConfig.Share.GetConfigFileName(), + a.conversationConfig.LocalCacheConfig.GetConfigFileName(), + a.conversationConfig.Discovery.GetConfigFileName(), + }, nil, conversation.Start) } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 363ca375f9..684f27e400 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -59,7 +59,16 @@ func (a *FriendRpcCmd) Exec() error { func (a *FriendRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, - a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, - nil, + a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, + []string{ + a.relationConfig.RpcConfig.GetConfigFileName(), + a.relationConfig.RedisConfig.GetConfigFileName(), + a.relationConfig.MongodbConfig.GetConfigFileName(), + a.relationConfig.NotificationConfig.GetConfigFileName(), + a.relationConfig.Share.GetConfigFileName(), + a.relationConfig.WebhooksConfig.GetConfigFileName(), + a.relationConfig.LocalCacheConfig.GetConfigFileName(), + a.relationConfig.Discovery.GetConfigFileName(), + }, nil, relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 44fa712f6e..456b201afb 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -60,7 +60,16 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, - a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, - nil, + a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, + []string{ + a.groupConfig.RpcConfig.GetConfigFileName(), + a.groupConfig.RedisConfig.GetConfigFileName(), + a.groupConfig.MongodbConfig.GetConfigFileName(), + a.groupConfig.NotificationConfig.GetConfigFileName(), + a.groupConfig.Share.GetConfigFileName(), + a.groupConfig.WebhooksConfig.GetConfigFileName(), + a.groupConfig.LocalCacheConfig.GetConfigFileName(), + a.groupConfig.Discovery.GetConfigFileName(), + }, nil, group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index 64124696db..1a976410de 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -60,7 +60,17 @@ func (a *MsgRpcCmd) Exec() error { func (a *MsgRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, - a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, - nil, + a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, + []string{ + a.msgConfig.RpcConfig.GetConfigFileName(), + a.msgConfig.RedisConfig.GetConfigFileName(), + a.msgConfig.MongodbConfig.GetConfigFileName(), + a.msgConfig.KafkaConfig.GetConfigFileName(), + a.msgConfig.NotificationConfig.GetConfigFileName(), + a.msgConfig.Share.GetConfigFileName(), + a.msgConfig.WebhooksConfig.GetConfigFileName(), + a.msgConfig.LocalCacheConfig.GetConfigFileName(), + a.msgConfig.Discovery.GetConfigFileName(), + }, nil, msg.Start) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 62bdfceafe..86612ffec8 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -64,5 +64,8 @@ func (a *PushRpcCmd) runE() error { []string{ a.pushConfig.Share.RpcRegisterName.MessageGateway, }, + []string{ + a.pushConfig.Discovery.RpcService.MessageGateway, + }, push.Start) } diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 5edea43773..d01cd91c4c 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -87,6 +87,25 @@ func NewRootCmd(processName string, opts ...func(*CmdOpts)) *RootCmd { return rootCmd } +func (r *RootCmd) initEtcd() error { + configDirectory, _, err := r.getFlag(&r.Command) + if err != nil { + return err + } + disConfig := config.Discovery{} + env := runtimeenv.PrintRuntimeEnvironment() + err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], + env, &disConfig) + if err != nil { + return err + } + if disConfig.Enable == config.ETCD { + discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env, nil) + r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + return nil +} + func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error { cmdOpts := r.applyOptions(opts...) if err := r.initializeConfiguration(cmd, cmdOpts); err != nil { @@ -96,7 +115,9 @@ func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) e if err := r.initializeLogger(cmdOpts); err != nil { return errs.WrapMsg(err, "failed to initialize logger") } - + if err := r.etcdClient.Close(); err != nil { + return errs.WrapMsg(err, "failed to close etcd client") + } return nil } @@ -115,8 +136,65 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err } } // Load common log configuration file - return config.LoadConfig(filepath.Join(configDirectory, LogConfigFileName), - ConfigEnvPrefixMap[LogConfigFileName], &r.log) + return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], runtimeEnv, &r.log) +} + +func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error { + if r.etcdClient == nil { + return nil + } + ctx := context.TODO() + + res, err := r.etcdClient.Get(ctx, disetcd.BuildKey(disetcd.EnableConfigCenterKey)) + if err != nil { + log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Get EnableConfigCenterKey err: %v", errs.Wrap(err)) + return nil + } + if res.Count == 0 { + return nil + } else { + if string(res.Kvs[0].Value) == disetcd.Disable { + return nil + } else if string(res.Kvs[0].Value) != disetcd.Enable { + return errs.New("unknown EnableConfigCenter value").Wrap() + } + } + + update := func(configFileName string, configStruct any) error { + key := disetcd.BuildKey(configFileName) + etcdRes, err := r.etcdClient.Get(ctx, key) + if err != nil { + log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Get err: %v", errs.Wrap(err)) + return nil + } + if etcdRes.Count == 0 { + data, err := json.Marshal(configStruct) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + _, err = r.etcdClient.Put(ctx, disetcd.BuildKey(configFileName), string(data)) + if err != nil { + log.ZWarn(ctx, "root cmd updateConfigFromEtcd, etcd Put err: %v", errs.Wrap(err)) + } + return nil + } + err = json.Unmarshal(etcdRes.Kvs[0].Value, configStruct) + if err != nil { + return errs.WrapMsg(err, "failed to unmarshal config from etcd") + } + return nil + } + for configFileName, configStruct := range opts.configMap { + if err := update(configFileName, configStruct); err != nil { + return err + } + } + if err := update(config.LogConfigFileName, &r.log); err != nil { + return err + } + // Load common log configuration file + return nil + } func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts { diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index 30828cd530..281b2d6425 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -59,7 +59,16 @@ func (a *ThirdRpcCmd) Exec() error { func (a *ThirdRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, - a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, - nil, + a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, + []string{ + a.thirdConfig.RpcConfig.GetConfigFileName(), + a.thirdConfig.RedisConfig.GetConfigFileName(), + a.thirdConfig.MongodbConfig.GetConfigFileName(), + a.thirdConfig.NotificationConfig.GetConfigFileName(), + a.thirdConfig.Share.GetConfigFileName(), + a.thirdConfig.MinioConfig.GetConfigFileName(), + a.thirdConfig.LocalCacheConfig.GetConfigFileName(), + a.thirdConfig.Discovery.GetConfigFileName(), + }, nil, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 54aae7382b..98e3d99be6 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -60,7 +60,17 @@ func (a *UserRpcCmd) Exec() error { func (a *UserRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, - a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, - nil, + a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, + []string{ + a.userConfig.RpcConfig.GetConfigFileName(), + a.userConfig.RedisConfig.GetConfigFileName(), + a.userConfig.MongodbConfig.GetConfigFileName(), + a.userConfig.KafkaConfig.GetConfigFileName(), + a.userConfig.NotificationConfig.GetConfigFileName(), + a.userConfig.Share.GetConfigFileName(), + a.userConfig.WebhooksConfig.GetConfigFileName(), + a.userConfig.LocalCacheConfig.GetConfigFileName(), + a.userConfig.Discovery.GetConfigFileName(), + }, nil, user.Start) } diff --git a/pkg/common/discovery/etcd/config_manager.go b/pkg/common/discovery/etcd/config_manager.go new file mode 100644 index 0000000000..70d37c3237 --- /dev/null +++ b/pkg/common/discovery/etcd/config_manager.go @@ -0,0 +1,106 @@ +package etcd + +import ( + "context" + "os" + "os/exec" + "runtime" + "sync" + "syscall" + + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" + clientv3 "go.etcd.io/etcd/client/v3" +) + +var ( + ShutDowns []func() error +) + +func RegisterShutDown(shutDown ...func() error) { + ShutDowns = append(ShutDowns, shutDown...) +} + +type ConfigManager struct { + client *clientv3.Client + watchConfigNames []string + lock sync.Mutex +} + +func BuildKey(s string) string { + return ConfigKeyPrefix + s +} + +func NewConfigManager(client *clientv3.Client, configNames []string) *ConfigManager { + return &ConfigManager{ + client: client, + watchConfigNames: datautil.Batch(func(s string) string { return BuildKey(s) }, append(configNames, RestartKey))} +} + +func (c *ConfigManager) Watch(ctx context.Context) { + chans := make([]clientv3.WatchChan, 0, len(c.watchConfigNames)) + for _, name := range c.watchConfigNames { + chans = append(chans, c.client.Watch(ctx, name, clientv3.WithPrefix())) + } + + doWatch := func(watchChan clientv3.WatchChan) { + for watchResp := range watchChan { + if watchResp.Err() != nil { + log.ZError(ctx, "watch err", errs.Wrap(watchResp.Err())) + continue + } + for _, event := range watchResp.Events { + if event.IsModify() { + if datautil.Contain(string(event.Kv.Key), c.watchConfigNames...) { + c.lock.Lock() + err := restartServer(ctx) + if err != nil { + log.ZError(ctx, "restart server err", err) + } + c.lock.Unlock() + } + } + } + } + } + for _, ch := range chans { + go doWatch(ch) + } +} + +func restartServer(ctx context.Context) error { + exePath, err := os.Executable() + if err != nil { + return errs.New("get executable path fail").Wrap() + } + + args := os.Args + env := os.Environ() + + cmd := exec.Command(exePath, args[1:]...) + cmd.Env = env + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Stdin = os.Stdin + + if runtime.GOOS != "windows" { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + log.ZInfo(ctx, "shutdown server") + for _, f := range ShutDowns { + if err = f(); err != nil { + log.ZError(ctx, "shutdown fail", err) + } + } + + log.ZInfo(ctx, "restart server") + err = cmd.Start() + if err != nil { + return errs.New("restart server fail").Wrap() + } + log.ZInfo(ctx, "cmd start over") + + os.Exit(0) + return nil +} diff --git a/pkg/common/discoveryregister/const.go b/pkg/common/discoveryregister/const.go new file mode 100644 index 0000000000..c9b00fc2c9 --- /dev/null +++ b/pkg/common/discoveryregister/const.go @@ -0,0 +1,9 @@ +package etcd + +const ( + ConfigKeyPrefix = "/open-im/config/" + RestartKey = "restart" + EnableConfigCenterKey = "enable-config-center" + Enable = "enable" + Disable = "disable" +) diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index f7175491ca..0c367d55f9 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -25,7 +25,15 @@ import ( ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. -func NewDiscoveryRegister(discovery *config.Discovery, share *config.Share, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { +func NewDiscoveryRegister(discovery *config.Discovery, runtimeEnv string, watchNames []string) (discovery.SvcDiscoveryRegistry, error) { + if runtimeEnv == config.KUBERNETES { + return kubernetes.NewKubernetesConnManager(discovery.Kubernetes.Namespace, + grpc.WithDefaultCallOptions( + grpc.MaxCallSendMsgSize(1024*1024*20), + ), + ) + } + switch discovery.Enable { case "k8s": return kubernetes.NewK8sDiscoveryRegister(share.RpcRegisterName.MessageGateway) diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 3d4394c516..aa2e3cf35d 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -45,8 +45,8 @@ import ( // Start rpc server. func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, - registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *conf.Share, config T, - watchServiceNames []string, + registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, + watchConfigNames []string, watchServiceNames []string, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { @@ -85,7 +85,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf if autoSetPorts && discovery.Enable != conf.ETCD { return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() } - client, err := kdisc.NewDiscoveryRegister(discovery, share, watchServiceNames) + client, err := kdisc.NewDiscoveryRegister(discovery, runTimeEnv, watchServiceNames) if err != nil { return err }