Skip to content

Commit

Permalink
refactor: refactor the picker (#19)
Browse files Browse the repository at this point in the history
* refactor: refactor the picker

* refactor: refactor the picker

* refactor: refactor the picker
  • Loading branch information
chenquan authored Jul 9, 2023
1 parent 1e2c0dd commit 8b88615
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 72 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/grpc v1.55.1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/grpc v1.55.1 h1:36vzoa06ohIaveIgzr0qWDCImkKeVjvnNSV6uOmwnOw=
google.golang.org/grpc v1.55.1/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
68 changes: 3 additions & 65 deletions multiple.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ import (
"context"
"database/sql"
"strings"
"sync/atomic"
"time"

"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zeromicro/go-zero/core/trace"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -31,17 +27,14 @@ type (
DBConf struct {
Leader string
Followers []string `json:",optional"`
// BackLeader back to Leader when all Followers are not available.
BackLeader bool `json:",optional"`
FollowerHeartbeat time.Duration `json:",default=60s"`
}

SqlOption func(*multipleSqlConn)

multipleSqlConn struct {
leader sqlx.SqlConn
enableFollower bool
p2cPicker atomic.Value // picker
p2cPicker picker // picker
followers []sqlx.SqlConn
conf DBConf
accept func(error) bool
Expand Down Expand Up @@ -69,15 +62,7 @@ func NewMultipleSqlConn(driverName string, conf DBConf, opts ...SqlOption) sqlx.
opt(conn)
}

conn.p2cPicker.Store(newP2cPicker(followers, conn.accept))

if conn.enableFollower {
ctx, cancelFunc := context.WithCancel(context.Background())
proc.AddShutdownListener(func() {
cancelFunc()
})
go conn.startFollowerHeartbeat(ctx)
}
conn.p2cPicker = newP2cPicker(followers, conn.accept)

return conn
}
Expand Down Expand Up @@ -180,7 +165,7 @@ func (m *multipleSqlConn) getQueryDB(ctx context.Context, query string) queryDB
return queryDB{conn: m.leader}
}

result, err := m.p2cPicker.Load().(picker).pick()
result, err := m.p2cPicker.pick()
if err == nil {
return queryDB{
conn: result.conn,
Expand All @@ -190,41 +175,9 @@ func (m *multipleSqlConn) getQueryDB(ctx context.Context, query string) queryDB
}
}

if !m.conf.BackLeader {
return queryDB{error: err}
}

return queryDB{conn: m.leader}
}

func (m *multipleSqlConn) heartbeat() {
conns := make([]sqlx.SqlConn, 0, len(m.followers))
for i, follower := range m.followers {
err := pingDB(follower)
if err != nil {
logx.Errorw("follower db heartbeat failure, it will be automatically removed", logx.Field("err", err), logx.Field("db", i))

continue
}

conns = append(conns, follower)
}

m.p2cPicker.Store(newP2cPicker(conns, m.accept))
}

func (m *multipleSqlConn) startFollowerHeartbeat(ctx context.Context) {
ticker := time.NewTicker(m.conf.FollowerHeartbeat)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.heartbeat()
}
}
}

func (m *multipleSqlConn) startSpan(ctx context.Context) (context.Context, oteltrace.Span) {
tracer := trace.TracerFromContext(ctx)
ctx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindClient))
Expand Down Expand Up @@ -281,21 +234,6 @@ func (q *queryDB) query(ctx context.Context, query func(ctx context.Context, con
return query(ctx, q.conn)
}

func pingDB(conn sqlx.SqlConn) error {
return pingCtxDB(context.Background(), conn)
}

func pingCtxDB(ctx context.Context, conn sqlx.SqlConn) error {
db, err := conn.RawDB()
if err != nil {
return err
}

return db.PingContext(ctx)
}

// -------------

func WithAccept(accept func(err error) bool) SqlOption {
return func(conn *multipleSqlConn) {
conn.accept = accept
Expand Down
6 changes: 2 additions & 4 deletions multiple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ func TestNewMultipleSqlConn(t *testing.T) {
leaderMock.ExpectPing().WillDelayFor(time.Millisecond)

mysql := NewMultipleSqlConn(mockedDatasource, DBConf{
Leader: leader,
Followers: []string{follower1},
BackLeader: false,
FollowerHeartbeat: time.Minute,
Leader: leader,
Followers: []string{follower1},
})

follower1Mock.ExpectExec("any")
Expand Down

0 comments on commit 8b88615

Please sign in to comment.