From 40b214fb327898a392d03d15abe118338c527f0f Mon Sep 17 00:00:00 2001 From: kl7sn Date: Mon, 19 Feb 2024 19:30:22 +0800 Subject: [PATCH 1/4] =?UTF-8?q?fix:=20=E5=85=BC=E5=AE=B9=20grpc.MaxCallRec?= =?UTF-8?q?vMsgSize=20=E5=86=99=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/egrpc/config.go | 4 +++- client/egrpc/container.go | 8 +++++--- client/egrpc/option_test.go | 17 ++++++++++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/client/egrpc/config.go b/client/egrpc/config.go index a625ad07..5c4a4bf9 100644 --- a/client/egrpc/config.go +++ b/client/egrpc/config.go @@ -10,6 +10,8 @@ import ( "github.com/gotomicro/ego/core/util/xtime" ) +const DefaultMaxCallRecvMsgSize = 1024 * 1024 * 4 + // Config ... type Config struct { Addr string // 连接地址,直连为127.0.0.1:9001,服务发现为etcd:///appname @@ -58,6 +60,6 @@ func DefaultConfig() *Config { EnableAccessInterceptorRes: false, EnableServiceConfig: true, // EnableCPUUsage: true, - MaxCallRecvMsgSize: 1024 * 1024 * 4, + MaxCallRecvMsgSize: DefaultMaxCallRecvMsgSize, } } diff --git a/client/egrpc/container.go b/client/egrpc/container.go index 9b5d1c27..fa0706bf 100644 --- a/client/egrpc/container.go +++ b/client/egrpc/container.go @@ -74,11 +74,13 @@ func (c *Container) Build(options ...Option) *Component { WithDialOption(grpc.WithChainUnaryInterceptor(c.metricUnaryClientInterceptor())), ) } - options = append(options, WithDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.config.MaxCallRecvMsgSize)))) - for _, option := range options { option(c) } - + // 兼容代码直接配置 grpc.MaxCallRecvMsgSize + // 并保持配置文件高优先级 + if c.config.MaxCallRecvMsgSize != DefaultMaxCallRecvMsgSize { + WithDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.config.MaxCallRecvMsgSize)))(c) + } return newComponent(c.name, c.config, c.logger) } diff --git a/client/egrpc/option_test.go b/client/egrpc/option_test.go index 5dd81e49..65061aa3 100644 --- a/client/egrpc/option_test.go +++ b/client/egrpc/option_test.go @@ -6,9 +6,11 @@ import ( "time" "github.com/BurntSushi/toml" - "github.com/gotomicro/ego/core/econf" "github.com/stretchr/testify/assert" + "google.golang.org/grpc" "google.golang.org/grpc/test/bufconn" + + "github.com/gotomicro/ego/core/econf" ) func newCmp(t *testing.T, opt Option) *Component { @@ -74,3 +76,16 @@ func TestWithName(t *testing.T) { cmp := newCmp(t, opt) assert.Equal(t, "hello", cmp.name) } + +func TestMaxCallRecvMsgSize(t *testing.T) { + opt := WithMaxRecvMsgSize(1024) + cmp := newCmp(t, opt) + assert.Equal(t, 1024, cmp.config.MaxCallRecvMsgSize) +} + +func TestMaxCallRecvMsgSizeWithDialOption(t *testing.T) { + var opts []grpc.DialOption + opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(2048))) + cmp := newCmp(t, WithDialOption(opts...)) + assert.Equal(t, "grpc", cmp.name) +} From 4c4c657806598dee8f01770be420542e55d4ae1c Mon Sep 17 00:00:00 2001 From: kl7sn Date: Tue, 20 Feb 2024 15:19:11 +0800 Subject: [PATCH 2/4] grpc client & golint Signed-off-by: kl7sn --- .github/workflows/go.yml | 2 +- client/egrpc/component.go | 4 ++++ client/egrpc/container.go | 39 +++++++++++-------------------- client/ehttp/resolver/resolver.go | 14 +++++------ core/util/xdebug/print_test.go | 5 ++-- server/egin/component_test.go | 8 ++++++- server/egin/options_test.go | 5 ++-- server/egrpc/interceptor.go | 2 +- server/egrpc/interceptor_test.go | 16 ++++++------- 9 files changed, 47 insertions(+), 48 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index f7a5395f..51ed6344 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -14,7 +14,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.19 + go-version: 1.20 - name: Build run: go build -v ./... diff --git a/client/egrpc/component.go b/client/egrpc/component.go index 3fd51437..c177957d 100644 --- a/client/egrpc/component.go +++ b/client/egrpc/component.go @@ -67,6 +67,10 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen dialOptions = append(dialOptions, grpc.FailOnNonTempDialError(config.EnableFailOnNonTempDialError)) + if config.MaxCallRecvMsgSize != DefaultMaxCallRecvMsgSize { + dialOptions = append(dialOptions, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(config.MaxCallRecvMsgSize))) + } + startTime := time.Now() cc, err := grpc.DialContext(ctx, config.Addr, dialOptions...) diff --git a/client/egrpc/container.go b/client/egrpc/container.go index fa0706bf..67df7a9b 100644 --- a/client/egrpc/container.go +++ b/client/egrpc/container.go @@ -42,45 +42,34 @@ func Load(key string) *Container { // Build constructs a specific component from container. func (c *Container) Build(options ...Option) *Component { + var unaryInterceptors []grpc.UnaryClientInterceptor + var streamInterceptors []grpc.StreamClientInterceptor // 最先执行trace if c.config.EnableTraceInterceptor { - options = append(options, - WithDialOption(grpc.WithChainUnaryInterceptor(c.traceUnaryClientInterceptor())), - WithDialOption(grpc.WithChainStreamInterceptor(c.traceStreamClientInterceptor())), - ) + unaryInterceptors = append(unaryInterceptors, c.traceUnaryClientInterceptor()) + streamInterceptors = append(streamInterceptors, c.traceStreamClientInterceptor()) } - - // 其次执行,自定义header头,这样才能赋值到ctx里 - // options = append(options, WithDialOption(grpc.WithChainUnaryInterceptor(customHeader(transport.CustomContextKeys())))) - // 默认日志 - options = append(options, WithDialOption(grpc.WithChainUnaryInterceptor(c.loggerUnaryClientInterceptor()))) - + unaryInterceptors = append(unaryInterceptors, c.loggerUnaryClientInterceptor()) if eapp.IsDevelopmentMode() { - options = append(options, WithDialOption(grpc.WithChainUnaryInterceptor(c.debugUnaryClientInterceptor()))) + unaryInterceptors = append(unaryInterceptors, c.debugUnaryClientInterceptor()) } - if c.config.EnableAppNameInterceptor { - options = append(options, WithDialOption(grpc.WithChainUnaryInterceptor(c.defaultUnaryClientInterceptor()))) - options = append(options, WithDialOption(grpc.WithChainStreamInterceptor(c.defaultStreamClientInterceptor()))) + unaryInterceptors = append(unaryInterceptors, c.defaultUnaryClientInterceptor()) + streamInterceptors = append(streamInterceptors, c.defaultStreamClientInterceptor()) } - if c.config.EnableTimeoutInterceptor { - options = append(options, WithDialOption(grpc.WithChainUnaryInterceptor(c.timeoutUnaryClientInterceptor()))) + unaryInterceptors = append(unaryInterceptors, c.timeoutUnaryClientInterceptor()) } - if c.config.EnableMetricInterceptor { - options = append(options, - WithDialOption(grpc.WithChainUnaryInterceptor(c.metricUnaryClientInterceptor())), - ) + unaryInterceptors = append(unaryInterceptors, c.metricUnaryClientInterceptor()) } for _, option := range options { option(c) } - // 兼容代码直接配置 grpc.MaxCallRecvMsgSize - // 并保持配置文件高优先级 - if c.config.MaxCallRecvMsgSize != DefaultMaxCallRecvMsgSize { - WithDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(c.config.MaxCallRecvMsgSize)))(c) - } + c.config.dialOptions = append(c.config.dialOptions, + grpc.WithChainStreamInterceptor(streamInterceptors...), + grpc.WithChainUnaryInterceptor(unaryInterceptors...), + ) return newComponent(c.name, c.config, c.logger) } diff --git a/client/ehttp/resolver/resolver.go b/client/ehttp/resolver/resolver.go index 9fdcb3a2..4ab35c9d 100644 --- a/client/ehttp/resolver/resolver.go +++ b/client/ehttp/resolver/resolver.go @@ -17,7 +17,7 @@ var ( // m is a map from scheme to resolver builder. m = make(map[string]Builder) // defaultScheme is the default scheme to use. - defaultScheme = "http" + // defaultScheme = "http" ) // Builder creates a resolver that will be used to watch name resolution updates. @@ -99,12 +99,12 @@ func (b *baseBuilder) Scheme() string { } type baseResolver struct { - target eregistry.Target // 使用ego的target,因为官方的target后续会不兼容 - stop chan struct{} - reg eregistry.Registry - cancel context.CancelFunc - addrSlices []string - nodeInfo map[string]*attributes.Attributes // node节点的属性 + target eregistry.Target // 使用ego的target,因为官方的target后续会不兼容 + stop chan struct{} + reg eregistry.Registry + cancel context.CancelFunc + // addrSlices []string + nodeInfo map[string]*attributes.Attributes // node节点的属性 } func (b *baseResolver) GetAddr() string { diff --git a/core/util/xdebug/print_test.go b/core/util/xdebug/print_test.go index 24f6971e..6b505128 100644 --- a/core/util/xdebug/print_test.go +++ b/core/util/xdebug/print_test.go @@ -6,11 +6,12 @@ import ( ) func TestMakeReqAndResError(t *testing.T) { - err := MakeReqAndResError("test", "test", "test", time.Now().Sub(time.Now()), "test", "test") + + err := MakeReqAndResError("test", "test", "test", time.Until(time.Now()), "test", "test") t.Log(err) } func TestMakeReqAndResInfo(t *testing.T) { - err := MakeReqAndResInfo("test", "test", "test", time.Now().Sub(time.Now()), "test", "test") + err := MakeReqAndResInfo("test", "test", "test", time.Until(time.Now()), "test", "test") t.Log(err) } diff --git a/server/egin/component_test.go b/server/egin/component_test.go index 128f64e7..3054dd17 100644 --- a/server/egin/component_test.go +++ b/server/egin/component_test.go @@ -326,7 +326,13 @@ func eginClient(ctx context.Context, gin *Component, url string) (err error) { return err } r, err := client.Do(req) + if err != nil { + return err + } _, err = io.ReadAll(r.Body) - r.Body.Close() + defer func() { _ = r.Body.Close() }() + if err != nil { + return err + } return nil } diff --git a/server/egin/options_test.go b/server/egin/options_test.go index c40dfcd3..47d33d03 100644 --- a/server/egin/options_test.go +++ b/server/egin/options_test.go @@ -8,8 +8,9 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/gotomicro/ego/core/elog" "github.com/stretchr/testify/assert" + + "github.com/gotomicro/ego/core/elog" ) func TestInterceptor(t *testing.T) { @@ -99,7 +100,7 @@ func TestWithContextTimeout(t *testing.T) { func TestWithListener(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // 处理http请求 - io.WriteString(w, "Hello,Client") + _, _ = io.WriteString(w, "Hello,Client") })) defer server.Close() diff --git a/server/egrpc/interceptor.go b/server/egrpc/interceptor.go index b1a64544..c5dde6c2 100644 --- a/server/egrpc/interceptor.go +++ b/server/egrpc/interceptor.go @@ -420,7 +420,7 @@ func (c *Container) sentinelInterceptor() grpc.UnaryServerInterceptor { return handler(ctx, req) } - var entry *sentinelBase.SentinelEntry = nil + // var entry *sentinelBase.SentinelEntry = nil entry, blockErr := sentinel.Entry( resourceName, sentinel.WithResourceType(sentinelBase.ResTypeRPC), diff --git a/server/egrpc/interceptor_test.go b/server/egrpc/interceptor_test.go index 3e3fd25c..cde5db5f 100644 --- a/server/egrpc/interceptor_test.go +++ b/server/egrpc/interceptor_test.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/grpc/test/bufconn" @@ -43,7 +44,7 @@ func Test_getPeerIP(t *testing.T) { assert.Equal(t, "127.0.0.1", value) } -//func Test_enableCPUUsage(t *testing.T) { +// func Test_enableCPUUsage(t *testing.T) { // md := metadata.New(map[string]string{ // "enable-cpu-usage": "true", // }) @@ -61,7 +62,7 @@ func Test_getPeerIP(t *testing.T) { // ctx3 := metadata.NewIncomingContext(context.Background(), md3) // value3 := enableCPUUsage(ctx3) // assert.Equal(t, false, value3) -//} +// } func Test_ServerAccessLogger(t *testing.T) { // 使用非异步日志 @@ -81,7 +82,7 @@ func Test_ServerAccessLogger(t *testing.T) { }() client, err := grpc.Dial("", - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { return cmp.Listener().(*bufconn.Listener).Dial() })) @@ -113,7 +114,7 @@ func Test_ServerPanicAccessLogger(t *testing.T) { }() client, err := grpc.Dial("", - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { return cmp.Listener().(*bufconn.Listener).Dial() })) @@ -148,7 +149,7 @@ func Test_ServerAccessAppName(t *testing.T) { }() client, err := grpc.Dial("", - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { return cmp.Listener().(*bufconn.Listener).Dial() })) @@ -185,7 +186,7 @@ func TestPrometheus(t *testing.T) { }() client, err := grpc.Dial("", - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { return cmp.Listener().(*bufconn.Listener).Dial() })) @@ -237,7 +238,4 @@ type PanicGreeter struct { // SayHello ... func (g PanicGreeter) SayHello(context context.Context, request *helloworld.HelloRequest) (*helloworld.HelloResponse, error) { panic("we have a panic") - return &helloworld.HelloResponse{ - Message: "Hello", - }, nil } From 4d970f58e4b4b9e8458f3ae6a54208d2b69d8eff Mon Sep 17 00:00:00 2001 From: askuy Date: Tue, 20 Feb 2024 10:20:30 +0800 Subject: [PATCH 3/4] Update go.yml --- .github/workflows/go.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 51ed6344..0b406bab 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -14,8 +14,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.20 - + go-version: 1.21 - name: Build run: go build -v ./... From cff6cf6970e2c0cf621a5c060da3717520d29a86 Mon Sep 17 00:00:00 2001 From: kl7sn Date: Tue, 20 Feb 2024 15:19:11 +0800 Subject: [PATCH 4/4] grpc client & golint Signed-off-by: kl7sn --- .github/workflows/go.yml | 1 + client/egrpc/config_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 0b406bab..f8e8cd14 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -15,6 +15,7 @@ jobs: uses: actions/setup-go@v2 with: go-version: 1.21 + - name: Build run: go build -v ./... diff --git a/client/egrpc/config_test.go b/client/egrpc/config_test.go index 9bfc147a..02b1635f 100644 --- a/client/egrpc/config_test.go +++ b/client/egrpc/config_test.go @@ -33,5 +33,6 @@ func TestDefaultConfig(t *testing.T) { EnableServiceConfig: true, keepAlive: nil, dialOptions: nil, + MaxCallRecvMsgSize: DefaultMaxCallRecvMsgSize, }, DefaultConfig())) }