Skip to content

Commit 57ecfb9

Browse files
authored
*: implement the diagnostics inspection framework (pingcap#14114)
Signed-off-by: Lonng <[email protected]>
1 parent 279ad0d commit 57ecfb9

12 files changed

+576
-82
lines changed

executor/builder.go

+7
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,13 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
13101310
extractor: v.Extractor.(*plannercore.ClusterLogTableExtractor),
13111311
},
13121312
}
1313+
case strings.ToLower(infoschema.TableInspectionResult):
1314+
return &ClusterReaderExec{
1315+
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
1316+
retriever: &inspectionRetriever{
1317+
extractor: v.Extractor.(*plannercore.InspectionResultTableExtractor),
1318+
},
1319+
}
13131320
}
13141321
}
13151322
tb, _ := b.is.TableByID(v.Table.ID)

executor/cluster_reader.go

+18-20
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ type clusterConfigRetriever struct {
8686
}
8787

8888
// retrieve implements the clusterRetriever interface
89-
func (e *clusterConfigRetriever) retrieve(_ context.Context, ctx sessionctx.Context) ([][]types.Datum, error) {
89+
func (e *clusterConfigRetriever) retrieve(_ context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
9090
if e.extractor.SkipRequest || e.retrieved {
9191
return nil, nil
9292
}
@@ -97,8 +97,7 @@ func (e *clusterConfigRetriever) retrieve(_ context.Context, ctx sessionctx.Cont
9797
rows [][]types.Datum
9898
err error
9999
}
100-
101-
serversInfo, err := infoschema.GetClusterServerInfo(ctx)
100+
serversInfo, err := infoschema.GetClusterServerInfo(sctx)
102101
failpoint.Inject("mockClusterConfigServerInfo", func(val failpoint.Value) {
103102
if s := val.(string); len(s) > 0 {
104103
// erase the error
@@ -118,7 +117,7 @@ func (e *clusterConfigRetriever) retrieve(_ context.Context, ctx sessionctx.Cont
118117
address := srv.Address
119118
statusAddr := srv.StatusAddr
120119
if len(statusAddr) == 0 {
121-
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("%s node %s does not contain status address", typ, address))
120+
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("%s node %s does not contain status address", typ, address))
122121
continue
123122
}
124123
wg.Add(1)
@@ -195,7 +194,7 @@ func (e *clusterConfigRetriever) retrieve(_ context.Context, ctx sessionctx.Cont
195194
var results []result
196195
for result := range ch {
197196
if result.err != nil {
198-
ctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
197+
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
199198
continue
200199
}
201200
results = append(results, result)
@@ -214,13 +213,13 @@ type clusterServerInfoRetriever struct {
214213
}
215214

216215
// retrieve implements the clusterRetriever interface
217-
func (e *clusterServerInfoRetriever) retrieve(_ context.Context, ctx sessionctx.Context) ([][]types.Datum, error) {
216+
func (e *clusterServerInfoRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
218217
if e.extractor.SkipRequest || e.retrieved {
219218
return nil, nil
220219
}
221220
e.retrieved = true
222221

223-
serversInfo, err := infoschema.GetClusterServerInfo(ctx)
222+
serversInfo, err := infoschema.GetClusterServerInfo(sctx)
224223
if err != nil {
225224
return nil, err
226225
}
@@ -253,7 +252,7 @@ func (e *clusterServerInfoRetriever) retrieve(_ context.Context, ctx sessionctx.
253252
go func(index int, address, serverTP string) {
254253
util.WithRecovery(func() {
255254
defer wg.Done()
256-
items, err := getServerInfoByGRPC(address, infoTp)
255+
items, err := getServerInfoByGRPC(ctx, address, infoTp)
257256
if err != nil {
258257
ch <- result{idx: index, err: err}
259258
return
@@ -269,7 +268,7 @@ func (e *clusterServerInfoRetriever) retrieve(_ context.Context, ctx sessionctx.
269268
var results []result
270269
for result := range ch {
271270
if result.err != nil {
272-
ctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
271+
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
273272
continue
274273
}
275274
results = append(results, result)
@@ -299,7 +298,7 @@ func serverInfoItemToRows(items []*diagnosticspb.ServerInfoItem, tp, addr string
299298
return rows
300299
}
301300

302-
func getServerInfoByGRPC(address string, tp diagnosticspb.ServerInfoType) ([]*diagnosticspb.ServerInfoItem, error) {
301+
func getServerInfoByGRPC(ctx context.Context, address string, tp diagnosticspb.ServerInfoType) ([]*diagnosticspb.ServerInfoItem, error) {
303302
opt := grpc.WithInsecure()
304303
security := config.GetGlobalConfig().Security
305304
if len(security.ClusterSSLCA) != 0 {
@@ -321,8 +320,7 @@ func getServerInfoByGRPC(address string, tp diagnosticspb.ServerInfoType) ([]*di
321320
}()
322321

323322
cli := diagnosticspb.NewDiagnosticsClient(conn)
324-
// FIXME: use session context instead of context.Background().
325-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
323+
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
326324
defer cancel()
327325
r, err := cli.ServerInfo(ctx, &diagnosticspb.ServerInfoRequest{Tp: tp})
328326
if err != nil {
@@ -413,9 +411,9 @@ func (h *logResponseHeap) Pop() interface{} {
413411
return x
414412
}
415413

416-
func (e *clusterLogRetriever) startRetrieving(ctx sessionctx.Context) ([]chan logStreamResult, error) {
414+
func (e *clusterLogRetriever) startRetrieving(ctx context.Context, sctx sessionctx.Context) ([]chan logStreamResult, error) {
417415
isFailpointTestMode := false
418-
serversInfo, err := infoschema.GetClusterServerInfo(ctx)
416+
serversInfo, err := infoschema.GetClusterServerInfo(sctx)
419417
failpoint.Inject("mockClusterLogServerInfo", func(val failpoint.Value) {
420418
if s := val.(string); len(s) > 0 {
421419
// erase the error
@@ -488,7 +486,7 @@ func (e *clusterLogRetriever) startRetrieving(ctx sessionctx.Context) ([]chan lo
488486
address := srv.Address
489487
statusAddr := srv.StatusAddr
490488
if len(statusAddr) == 0 {
491-
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("%s node %s does not contain status address", typ, address))
489+
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.Errorf("%s node %s does not contain status address", typ, address))
492490
continue
493491
}
494492
ch := make(chan logStreamResult)
@@ -511,7 +509,7 @@ func (e *clusterLogRetriever) startRetrieving(ctx sessionctx.Context) ([]chan lo
511509
defer terror.Call(conn.Close)
512510

513511
cli := diagnosticspb.NewDiagnosticsClient(conn)
514-
stream, err := cli.SearchLog(context.Background(), req)
512+
stream, err := cli.SearchLog(ctx, req)
515513
if err != nil {
516514
ch <- logStreamResult{addr: address, typ: serverType, err: err}
517515
return
@@ -535,14 +533,14 @@ func (e *clusterLogRetriever) startRetrieving(ctx sessionctx.Context) ([]chan lo
535533
return results, nil
536534
}
537535

538-
func (e *clusterLogRetriever) retrieve(_ context.Context, ctx sessionctx.Context) ([][]types.Datum, error) {
536+
func (e *clusterLogRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
539537
if e.extractor.SkipRequest || e.isDrained {
540538
return nil, nil
541539
}
542540

543541
if !e.retrieving {
544542
e.retrieving = true
545-
results, err := e.startRetrieving(ctx)
543+
results, err := e.startRetrieving(ctx, sctx)
546544
if err != nil {
547545
e.isDrained = true
548546
return nil, err
@@ -554,7 +552,7 @@ func (e *clusterLogRetriever) retrieve(_ context.Context, ctx sessionctx.Context
554552
result := <-ch
555553
if result.err != nil || len(result.messages) == 0 {
556554
if result.err != nil {
557-
ctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
555+
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
558556
}
559557
continue
560558
}
@@ -581,7 +579,7 @@ func (e *clusterLogRetriever) retrieve(_ context.Context, ctx sessionctx.Context
581579
if len(minTimeItem.messages) == 0 {
582580
result := <-minTimeItem.next
583581
if result.err != nil {
584-
ctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
582+
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
585583
continue
586584
}
587585
if len(result.messages) > 0 {

executor/diagnostics.go

+179
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2019 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package executor
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"sort"
20+
"strings"
21+
22+
"github.com/pingcap/failpoint"
23+
plannercore "github.com/pingcap/tidb/planner/core"
24+
"github.com/pingcap/tidb/sessionctx"
25+
"github.com/pingcap/tidb/sessionctx/variable"
26+
"github.com/pingcap/tidb/types"
27+
"github.com/pingcap/tidb/util/set"
28+
"github.com/pingcap/tidb/util/sqlexec"
29+
)
30+
31+
type (
32+
// inspectionResult represents a abnormal diagnosis result
33+
inspectionResult struct {
34+
// represents the diagnostics item, e.g: `ddl.lease` `raftstore.cpuusage`
35+
item string
36+
// diagnosis result value base on current cluster status
37+
value string
38+
reference string
39+
severity string
40+
suggestion string
41+
}
42+
43+
inspectionRule interface {
44+
name() string
45+
inspect(ctx context.Context, sctx sessionctx.Context, filter set.StringSet) []inspectionResult
46+
}
47+
)
48+
49+
var inspectionRules = []inspectionRule{
50+
&configInspection{},
51+
&versionInspection{},
52+
}
53+
54+
type inspectionRetriever struct {
55+
retrieved bool
56+
extractor *plannercore.InspectionResultTableExtractor
57+
}
58+
59+
func (e *inspectionRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
60+
if e.retrieved || e.extractor.SkipInspection {
61+
return nil, nil
62+
}
63+
e.retrieved = true
64+
65+
// Some data of cluster-level memory tables will be retrieved many times in different inspection rules,
66+
// and the cost of retrieving some data is expensive. We use the `TableSnapshot` to cache those data
67+
// and obtain them lazily, and provide a consistent view of inspection tables for each inspection rules.
68+
// All cached snapshots should be released at the end of retrieving. So all diagnosis rules should query
69+
// `cluster_config/cluster_hardware/cluster_load/cluster_info` in `inspection_schema`.
70+
// e.g:
71+
// SELECT * FROM inspection_schema.cluster_config
72+
// instead of:
73+
// SELECT * FROM information_schema.cluster_config
74+
sctx.GetSessionVars().InspectionTableCache = map[string]variable.TableSnapshot{}
75+
defer func() { sctx.GetSessionVars().InspectionTableCache = nil }()
76+
77+
failpoint.InjectContext(ctx, "mockMergeMockInspectionTables", func() {
78+
// Merge mock snapshots injected from failpoint for test purpose
79+
mockTables, ok := ctx.Value("__mockInspectionTables").(map[string]variable.TableSnapshot)
80+
if ok {
81+
for name, snap := range mockTables {
82+
sctx.GetSessionVars().InspectionTableCache[strings.ToLower(name)] = snap
83+
}
84+
}
85+
})
86+
87+
rules := e.extractor.Rules
88+
items := e.extractor.Items
89+
var finalRows [][]types.Datum
90+
for _, r := range inspectionRules {
91+
name := r.name()
92+
if len(rules) > 0 && !rules.Exist(name) {
93+
continue
94+
}
95+
results := r.inspect(ctx, sctx, items)
96+
if len(results) == 0 {
97+
continue
98+
}
99+
// make result stable
100+
sort.Slice(results, func(i, j int) bool {
101+
if lhs, rhs := results[i].item, results[j].item; lhs != rhs {
102+
return lhs < rhs
103+
}
104+
return results[i].value < results[j].value
105+
})
106+
for _, result := range results {
107+
finalRows = append(finalRows, types.MakeDatums(
108+
name,
109+
result.item,
110+
result.value,
111+
result.reference,
112+
result.severity,
113+
result.suggestion,
114+
))
115+
}
116+
}
117+
return finalRows, nil
118+
}
119+
120+
type configInspection struct{}
121+
122+
func (configInspection) name() string {
123+
return "config"
124+
}
125+
126+
func (configInspection) inspect(_ context.Context, sctx sessionctx.Context, filter set.StringSet) []inspectionResult {
127+
// check the configuration consistent
128+
sql := "select type, `key`, count(distinct value) as c from inspection_schema.cluster_config group by type, `key` having c > 1"
129+
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
130+
if err != nil {
131+
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("check configuration consistency failed: %v", err))
132+
}
133+
134+
var results []inspectionResult
135+
for _, row := range rows {
136+
if len(filter) > 0 && !filter.Exist(row.GetString(1)) {
137+
continue
138+
}
139+
results = append(results, inspectionResult{
140+
item: row.GetString(1), // key
141+
value: fmt.Sprintf("%d", row.GetInt64(2)), // count
142+
reference: "1",
143+
severity: "P2",
144+
suggestion: fmt.Sprintf("select * from information_schema.cluster_config where type='%s' and `key`='%s'",
145+
row.GetString(0), row.GetString(1)),
146+
})
147+
}
148+
return results
149+
}
150+
151+
type versionInspection struct{}
152+
153+
func (versionInspection) name() string {
154+
return "version"
155+
}
156+
157+
func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, filter set.StringSet) []inspectionResult {
158+
// check the configuration consistent
159+
sql := "select type, count(distinct git_hash) as c from inspection_schema.cluster_info group by type having c > 1;"
160+
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
161+
if err != nil {
162+
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("check version consistency failed: %v", err))
163+
}
164+
165+
var results []inspectionResult
166+
for _, row := range rows {
167+
if len(filter) > 0 && !filter.Exist(row.GetString(0)) {
168+
continue
169+
}
170+
results = append(results, inspectionResult{
171+
item: row.GetString(0), // type
172+
value: fmt.Sprintf("%d", row.GetInt64(1)), // count
173+
reference: "1",
174+
severity: "P1",
175+
suggestion: fmt.Sprintf("select * from information_schema.cluster_info where type='%s'", row.GetString(0)),
176+
})
177+
}
178+
return results
179+
}

0 commit comments

Comments
 (0)