-
Notifications
You must be signed in to change notification settings - Fork 0
/
pxld.go
158 lines (132 loc) · 3.38 KB
/
pxld.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package pxld
import (
"encoding/json"
"io"
"math"
"os"
"time"
)
// LogLine is the representation of the log message's binary data in Go struct
type LogLine struct {
MessageLength uint64 `json:"message_length"`
RawMessage []byte `json:"raw_message"` // this is without message length data prepended
ThreadID uint64 `json:"thread_id"`
Username string `json:"username"`
Schema string `json:"schema"`
StartAt time.Time `json:"start_at"`
EndAt time.Time `json:"end_at"`
QueryDigest string `json:"query_digest"`
HID uint64 `json:"hid,omitempty"`
ClientAddr string `json:"client_addr"`
ServerAddr string `json:"server_addr,omitempty"` // this depends on HID value
Query string `json:"query"`
Duration time.Duration `json:"duration_ns"`
}
func (l *LogLine) String() string {
raw, _ := json.MarshalIndent(l, "", " ")
return string(raw)
}
// Decode is used to decode a ProxySQL's query log data into a slice of LogLine
func Decode(r io.Reader) (l []*LogLine, err error) {
l = []*LogLine{}
// read until encountering EOF or unexpected error
for {
var line *LogLine
line, err = decodeLine(r)
if err != nil {
break
}
l = append(l, line)
}
if err == io.EOF {
err = nil
}
return
}
// DecodeFile is used to decode a ProxySQL's query log file into a slice of LogLine
func DecodeFile(fp string) (l []*LogLine, err error) {
var f *os.File
f, err = os.OpenFile(fp, os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
return
}
defer f.Close()
return Decode(f)
}
func decodeLine(dataStream io.Reader) (line *LogLine, err error) {
line = &LogLine{}
// first read message length, this is an uint64, so 8 bytes
// somehow turn this 8 bytes into uint64
// this consume first 8 bytes of the buffer
line.MessageLength, err = GetMessageLength(dataStream)
if err != nil {
return
}
// read all the message and replace dataStream
line.RawMessage, dataStream, err = GetMessage(line.MessageLength, dataStream)
if err != nil {
return
}
// then consume the next 1 byte, if 0 proceed, if not 0
// then just return with error as this is not a valid ProxySQL Query Log
err = IsProxySQLQuery(dataStream)
if err != nil {
return
}
// then read thread id
line.ThreadID, err = GetThreadID(dataStream)
if err != nil {
return
}
// then username
line.Username, err = GetUsername(dataStream)
if err != nil {
return
}
// then schema name
line.Schema, err = GetSchema(dataStream)
if err != nil {
return
}
// then client addr
line.ClientAddr, err = GetClientAddr(dataStream)
if err != nil {
return
}
// then HID
line.HID, err = GetHID(dataStream)
if err != nil {
return
}
// if HID not null, read server addr
// HID is null if the same as maximum of uint64
if line.HID != math.MaxUint64 {
line.ServerAddr, err = GetServerAddr(dataStream)
if err != nil {
return
}
}
// then start time
line.StartAt, err = GetStartAt(dataStream)
if err != nil {
return
}
// then end time
line.EndAt, err = GetEndAt(dataStream)
if err != nil {
return
}
// then calculate duration
line.Duration = line.EndAt.Sub(line.StartAt)
// then query digest
line.QueryDigest, err = GetQueryDigest(dataStream)
if err != nil {
return
}
// then get the actual query
line.Query, err = GetQuery(dataStream)
if err != nil {
return
}
return
}