Skip to content

Commit

Permalink
Merge branch 'v1.66.x' into release_version_1.66.1-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Aug 28, 2024
2 parents 6db0828 + 00d3ec8 commit e81b6b6
Show file tree
Hide file tree
Showing 53 changed files with 2,131 additions and 1,146 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ jobs:
goversion: '1.22'
testflags: -race

- type: tests
goversion: '1.22'
testflags: '-race -tags=buffer_pooling'

- type: tests
goversion: '1.22'
goarch: 386
Expand Down
10 changes: 10 additions & 0 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,16 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
setConnectedAddress(&scs, curAddr)
}
acbw.stateListener(scs)
acbw.ac.mu.Lock()
defer acbw.ac.mu.Unlock()
if s == connectivity.Ready {
// When changing states to READY, reset stateReadyChan. Wait until
// after we notify the LB policy's listener(s) in order to prevent
// ac.getTransport() from unblocking before the LB policy starts
// tracking the subchannel as READY.
close(acbw.ac.stateReadyChan)
acbw.ac.stateReadyChan = make(chan struct{})
}
})
}

Expand Down
34 changes: 30 additions & 4 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ import (
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/experimental"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/test/bufconn"

Expand Down Expand Up @@ -153,6 +153,33 @@ const (
warmuptime = time.Second
)

var useNopBufferPool atomic.Bool

type swappableBufferPool struct {
mem.BufferPool
}

func (p swappableBufferPool) Get(length int) *[]byte {
var pool mem.BufferPool
if useNopBufferPool.Load() {
pool = mem.NopBufferPool{}
} else {
pool = p.BufferPool
}
return pool.Get(length)
}

func (p swappableBufferPool) Put(i *[]byte) {
if useNopBufferPool.Load() {
return
}
p.BufferPool.Put(i)
}

func init() {
internal.SetDefaultBufferPoolForTesting.(func(mem.BufferPool))(swappableBufferPool{mem.DefaultBufferPool()})
}

var (
allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
Expand Down Expand Up @@ -343,10 +370,9 @@ func makeClients(bf stats.Features) ([]testgrpc.BenchmarkServiceClient, func())
}
switch bf.RecvBufferPool {
case recvBufferPoolNil:
// Do nothing.
useNopBufferPool.Store(true)
case recvBufferPoolSimple:
opts = append(opts, experimental.WithRecvBufferPool(grpc.NewSharedBufferPool()))
sopts = append(sopts, experimental.RecvBufferPool(grpc.NewSharedBufferPool()))
// Do nothing as buffering is enabled by default.
default:
logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool)
}
Expand Down
25 changes: 11 additions & 14 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,14 +825,14 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
}

ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: copyAddresses(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
stateChan: make(chan struct{}),
state: connectivity.Idle,
cc: cc,
addrs: copyAddresses(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
stateReadyChan: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Start with our address set to the first address; this may be updated if
Expand Down Expand Up @@ -1179,8 +1179,8 @@ type addrConn struct {
addrs []resolver.Address // All addresses that the resolver resolved to.

// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity.State
stateChan chan struct{} // closed and recreated on every state change.
state connectivity.State
stateReadyChan chan struct{} // closed and recreated on every READY state change.

backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
Expand All @@ -1193,9 +1193,6 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
if ac.state == s {
return
}
// When changing states, reset the state change channel.
close(ac.stateChan)
ac.stateChan = make(chan struct{})
ac.state = s
ac.channelz.ChannelMetrics.State.Store(&s)
if lastErr == nil {
Expand Down Expand Up @@ -1513,7 +1510,7 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport {
func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) {
for ctx.Err() == nil {
ac.mu.Lock()
t, state, sc := ac.transport, ac.state, ac.stateChan
t, state, sc := ac.transport, ac.state, ac.stateReadyChan
ac.mu.Unlock()
if state == connectivity.Ready {
return t, nil
Expand Down
69 changes: 62 additions & 7 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,73 @@ package grpc
import (
"google.golang.org/grpc/encoding"
_ "google.golang.org/grpc/encoding/proto" // to register the Codec for "proto"
"google.golang.org/grpc/mem"
)

// baseCodec contains the functionality of both Codec and encoding.Codec, but
// omits the name/string, which vary between the two and are not needed for
// anything besides the registry in the encoding package.
// baseCodec captures the new encoding.CodecV2 interface without the Name
// function, allowing it to be implemented by older Codec and encoding.Codec
// implementations. The omitted Name function is only needed for the register in
// the encoding package and is not part of the core functionality.
type baseCodec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
Marshal(v any) (mem.BufferSlice, error)
Unmarshal(data mem.BufferSlice, v any) error
}

// getCodec returns an encoding.CodecV2 for the codec of the given name (if
// registered). Initially checks the V2 registry with encoding.GetCodecV2 and
// returns the V2 codec if it is registered. Otherwise, it checks the V1 registry
// with encoding.GetCodec and if it is registered wraps it with newCodecV1Bridge
// to turn it into an encoding.CodecV2. Returns nil otherwise.
func getCodec(name string) encoding.CodecV2 {
if codecV1 := encoding.GetCodec(name); codecV1 != nil {
return newCodecV1Bridge(codecV1)
}

return encoding.GetCodecV2(name)
}

func newCodecV0Bridge(c Codec) baseCodec {
return codecV0Bridge{codec: c}
}

func newCodecV1Bridge(c encoding.Codec) encoding.CodecV2 {
return codecV1Bridge{
codecV0Bridge: codecV0Bridge{codec: c},
name: c.Name(),
}
}

var _ baseCodec = codecV0Bridge{}

type codecV0Bridge struct {
codec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
}
}

func (c codecV0Bridge) Marshal(v any) (mem.BufferSlice, error) {
data, err := c.codec.Marshal(v)
if err != nil {
return nil, err
}
return mem.BufferSlice{mem.NewBuffer(&data, nil)}, nil
}

func (c codecV0Bridge) Unmarshal(data mem.BufferSlice, v any) (err error) {
return c.codec.Unmarshal(data.Materialize(), v)
}

var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)
var _ encoding.CodecV2 = codecV1Bridge{}

type codecV1Bridge struct {
codecV0Bridge
name string
}

func (c codecV1Bridge) Name() string {
return c.name
}

// Codec defines the interface gRPC uses to encode and decode messages.
// Note that implementations of this interface must be thread safe;
Expand Down
2 changes: 1 addition & 1 deletion codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func (s) TestGetCodecForProtoIsNotNil(t *testing.T) {
if encoding.GetCodec(proto.Name) == nil {
if encoding.GetCodecV2(proto.Name) == nil {
t.Fatalf("encoding.GetCodec(%q) must not be nil by default", proto.Name)
}
}
27 changes: 5 additions & 22 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ func init() {
internal.WithBinaryLogger = withBinaryLogger
internal.JoinDialOptions = newJoinDialOption
internal.DisableGlobalDialOptions = newDisableGlobalDialOptions
internal.WithRecvBufferPool = withRecvBufferPool
internal.WithBufferPool = withBufferPool
}

// dialOptions configure a Dial call. dialOptions are set by the DialOption
Expand Down Expand Up @@ -92,7 +93,6 @@ type dialOptions struct {
defaultServiceConfigRawJSON *string
resolvers []resolver.Builder
idleTimeout time.Duration
recvBufferPool SharedBufferPool
defaultScheme string
maxCallAttempts int
}
Expand Down Expand Up @@ -677,11 +677,11 @@ func defaultDialOptions() dialOptions {
WriteBufferSize: defaultWriteBufSize,
UseProxy: true,
UserAgent: grpcUA,
BufferPool: mem.DefaultBufferPool(),
},
bs: internalbackoff.DefaultExponential,
healthCheckFunc: internal.HealthCheckFunc,
idleTimeout: 30 * time.Minute,
recvBufferPool: nopBufferPool{},
defaultScheme: "dns",
maxCallAttempts: defaultMaxCallAttempts,
}
Expand Down Expand Up @@ -758,25 +758,8 @@ func WithMaxCallAttempts(n int) DialOption {
})
}

// WithRecvBufferPool returns a DialOption that configures the ClientConn
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
//
// If you are unsure about how to implement a memory pool but want to utilize one,
// begin with grpc.NewSharedBufferPool.
//
// Note: The shared buffer pool feature will not be active if any of the following
// options are used: WithStatsHandler, EnableTracing, or binary logging. In such
// cases, the shared buffer pool will be ignored.
//
// Deprecated: use experimental.WithRecvBufferPool instead. Will be deleted in
// v1.60.0 or later.
func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption {
return withRecvBufferPool(bufferPool)
}

func withRecvBufferPool(bufferPool SharedBufferPool) DialOption {
func withBufferPool(bufferPool mem.BufferPool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.recvBufferPool = bufferPool
o.copts.BufferPool = bufferPool
})
}
5 changes: 3 additions & 2 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type Codec interface {
Name() string
}

var registeredCodecs = make(map[string]Codec)
var registeredCodecs = make(map[string]any)

// RegisterCodec registers the provided Codec for use with all gRPC clients and
// servers.
Expand Down Expand Up @@ -126,5 +126,6 @@ func RegisterCodec(codec Codec) {
//
// The content-subtype is expected to be lowercase.
func GetCodec(contentSubtype string) Codec {
return registeredCodecs[contentSubtype]
c, _ := registeredCodecs[contentSubtype].(Codec)
return c
}
Loading

0 comments on commit e81b6b6

Please sign in to comment.