Skip to content

Commit aeafa81

Browse files
authored
debug(ticdc): add more logs to the kafka sink and enable sarama log (#11815)
close #11823
1 parent e2b6bb1 commit aeafa81

16 files changed

+132
-58
lines changed

cdc/sink/dmlsink/mq/dmlproducer/kafka_dml_producer.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ func (k *kafkaDMLProducer) AsyncSendMessage(
119119
k.failpointCh <- errors.New("kafka sink injected error")
120120
failpoint.Return(nil)
121121
})
122-
return k.asyncProducer.AsyncSend(ctx, topic, partition,
123-
message.Key, message.Value, message.Callback)
122+
return k.asyncProducer.AsyncSend(ctx, topic, partition, message)
124123
}
125124

126125
func (k *kafkaDMLProducer) Close() {

cmd/kafka-consumer/main.go

-4
Original file line numberDiff line numberDiff line change
@@ -529,10 +529,6 @@ func (g *eventsGroup) Append(e *model.RowChangedEvent) {
529529
}
530530

531531
func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent {
532-
sort.Slice(g.events, func(i, j int) bool {
533-
return g.events[i].CommitTs < g.events[j].CommitTs
534-
})
535-
536532
i := sort.Search(len(g.events), func(i int) bool {
537533
return g.events[i].CommitTs > resolveTs
538534
})

dm/tests/download-integration-test-binaries.sh

+6-3
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,13 @@ branch=$1
4848
file_server_url="http://fileserver.pingcap.net"
4949

5050
# Get sha1 based on branch name.
51-
tidb_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1")
52-
tikv_sha1=$(curl "${file_server_url}/download/refs/pingcap/tikv/${branch}/sha1")
53-
pd_sha1=$(curl "${file_server_url}/download/refs/pingcap/pd/${branch}/sha1")
51+
# tidb_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1")
52+
# tikv_sha1=$(curl "${file_server_url}/download/refs/pingcap/tikv/${branch}/sha1")
53+
# pd_sha1=$(curl "${file_server_url}/download/refs/pingcap/pd/${branch}/sha1")
5454
tidb_tools_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb-tools/master/sha1")
55+
tidb_sha1="15a52d8b9c7eb373fa0b9b71a0ac346f652b7cdf"
56+
tikv_sha1="b4bddeeb995e7bedc1973ce9e856eeb2d856ce9b"
57+
pd_sha1="3f8217cc0efbf496effa2e11cc630f9c30b71ff7"
5558

5659
# All download links.
5760
tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz"

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,5 @@ replace sourcegraph.com/sourcegraph/appdash => github.com/sourcegraph/appdash v0
394394
replace sourcegraph.com/sourcegraph/appdash-data => github.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
395395

396396
replace gorm.io/driver/mysql v1.4.5 => gorm.io/driver/mysql v1.3.3
397+
398+
replace github.com/IBM/sarama => github.com/3AceShowHand/sarama v0.0.0-20241204051647-318559e536ae

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
3030
cloud.google.com/go/storage v1.39.1 h1:MvraqHKhogCOTXTlct/9C3K3+Uy2jBmFYb3/Sp6dVtY=
3131
cloud.google.com/go/storage v1.39.1/go.mod h1:xK6xZmxZmo+fyP7+DEF6FhNc24/JAe95OLyOHCXFH1o=
3232
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
33+
github.com/3AceShowHand/sarama v0.0.0-20241204051647-318559e536ae h1:TZxjgE3hc0iM0GIxz1RywfrEheWz8CsSYAEXyo6EZV8=
34+
github.com/3AceShowHand/sarama v0.0.0-20241204051647-318559e536ae/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
3335
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
3436
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4=
3537
github.com/99designs/keyring v1.2.1 h1:tYLp1ULvO7i3fI5vE21ReQuj99QFSs7lGm0xWyJo87o=
@@ -62,8 +64,6 @@ github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
6264
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
6365
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
6466
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
65-
github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c=
66-
github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
6767
github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk=
6868
github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI=
6969
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=

pkg/logutil/log.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -224,13 +224,11 @@ func initMySQLLogger() error {
224224
// initSaramaLogger hacks logger used in sarama lib
225225
func initSaramaLogger(level zapcore.Level) error {
226226
// only available less than info level
227-
if !zapcore.InfoLevel.Enabled(level) {
228-
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
229-
if err != nil {
230-
return errors.Trace(err)
231-
}
232-
sarama.Logger = logger
227+
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
228+
if err != nil {
229+
return errors.Trace(err)
233230
}
231+
sarama.Logger = logger
234232
return nil
235233
}
236234

pkg/sink/codec/canal/canal_json_row_event_encoder.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -399,14 +399,16 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent(
399399
return errors.Trace(err)
400400
}
401401
m := &common.Message{
402-
Key: nil,
403-
Value: value,
404-
Ts: e.CommitTs,
405-
Schema: &e.Table.Schema,
406-
Table: &e.Table.Table,
407-
Type: model.MessageTypeRow,
408-
Protocol: config.ProtocolCanalJSON,
409-
Callback: callback,
402+
Key: nil,
403+
Value: value,
404+
Ts: e.CommitTs,
405+
TableID: e.Table.TableID,
406+
HandleKey: e.GetHandleKeyColumnValues(),
407+
Schema: &e.Table.Schema,
408+
Table: &e.Table.Table,
409+
Type: model.MessageTypeRow,
410+
Protocol: config.ProtocolCanalJSON,
411+
Callback: callback,
410412
}
411413
m.IncRowsCount()
412414

pkg/sink/codec/common/message.go

+30-4
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
package common
1515

1616
import (
17+
"bytes"
1718
"encoding/binary"
1819
"encoding/json"
1920
"fmt"
21+
"strconv"
2022
"time"
2123

24+
"github.com/IBM/sarama"
2225
"github.com/pingcap/tiflow/cdc/model"
2326
"github.com/pingcap/tiflow/pkg/config"
2427
"github.com/tikv/client-go/v2/oracle"
@@ -35,7 +38,9 @@ const MaxRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
3538
type Message struct {
3639
Key []byte
3740
Value []byte
38-
Ts uint64 // reserved for possible output sorting
41+
Ts uint64 // reserved for possible output sorting
42+
TableID int64
43+
HandleKey []string
3944
Schema *string // schema
4045
Table *string // table
4146
Type model.MessageType // type
@@ -53,10 +58,31 @@ func (m *Message) String() string {
5358
}
5459

5560
// Length returns the expected size of the Kafka message
56-
// We didn't append any `Headers` when send the message, so ignore the calculations related to it.
57-
// If `ProducerMessage` Headers fields used, this method should also adjust.
5861
func (m *Message) Length() int {
59-
return len(m.Key) + len(m.Value) + MaxRecordOverhead
62+
headers := m.Headers()
63+
var headerLen int
64+
for _, header := range headers {
65+
headerLen += len(header.Key) + len(header.Value) + 2*binary.MaxVarintLen32
66+
}
67+
return headerLen + len(m.Key) + len(m.Value) + MaxRecordOverhead
68+
}
69+
70+
// Headers returns the headers of Kafka message
71+
func (m *Message) Headers() []sarama.RecordHeader {
72+
var handleKey bytes.Buffer
73+
for idx, key := range m.HandleKey {
74+
handleKey.WriteString(key)
75+
if idx != len(m.HandleKey)-1 {
76+
handleKey.WriteString(",")
77+
}
78+
}
79+
handle := handleKey.Bytes()
80+
headers := []sarama.RecordHeader{
81+
{Key: []byte("tableID"), Value: []byte(strconv.FormatUint(uint64(m.TableID), 10))},
82+
{Key: []byte("commitTs"), Value: []byte(strconv.FormatUint(m.Ts, 10))},
83+
{Key: []byte("handleKey"), Value: handle},
84+
}
85+
return headers
6086
}
6187

6288
// PhysicalTime returns physical time part of Ts in time.Time

pkg/sink/codec/open/open_protocol_encoder.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,20 @@ func (d *BatchEncoder) AppendRowChangedEvent(
104104
return errors.Trace(err)
105105
}
106106

107-
// for single message that is longer than max-message-bytes
108-
// 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead`
109-
length := len(key) + len(value) + common.MaxRecordOverhead + 16 + 8
107+
m := &common.Message{
108+
Key: key,
109+
Value: value,
110+
Ts: e.CommitTs,
111+
TableID: e.Table.TableID,
112+
HandleKey: e.GetHandleKeyColumnValues(),
113+
Schema: &e.Table.Schema,
114+
Table: &e.Table.Table,
115+
Type: model.MessageTypeRow,
116+
Protocol: config.ProtocolOpen,
117+
Callback: callback,
118+
}
119+
120+
length := m.Length() + 16 + 8
110121
if length > d.config.MaxMessageBytes {
111122
if d.config.LargeMessageHandle.Disabled() {
112123
log.Warn("Single message is too large for open-protocol",

pkg/sink/codec/open/open_protocol_encoder_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func TestMaxMessageBytes(t *testing.T) {
175175
ctx := context.Background()
176176
topic := ""
177177
// just can hold it.
178-
a := 173
178+
a := 231
179179
codecConfig := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(a)
180180
builder, err := NewBatchEncoderBuilder(ctx, codecConfig)
181181
require.NoError(t, err)

pkg/sink/kafka/factory.go

+41-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/pingcap/log"
2323
"github.com/pingcap/tiflow/cdc/model"
2424
cerror "github.com/pingcap/tiflow/pkg/errors"
25+
"github.com/pingcap/tiflow/pkg/sink/codec/common"
2526
"github.com/pingcap/tiflow/pkg/util"
2627
"go.uber.org/zap"
2728
)
@@ -76,8 +77,7 @@ type AsyncProducer interface {
7677
// AsyncSend is the input channel for the user to write messages to that they
7778
// wish to send.
7879
AsyncSend(ctx context.Context, topic string,
79-
partition int32, key []byte, value []byte,
80-
callback func()) error
80+
partition int32, message *common.Message) error
8181

8282
// AsyncRunCallback process the messages that has sent to kafka,
8383
// and run tha attached callback. the caller should call this
@@ -220,9 +220,16 @@ func (p *saramaAsyncProducer) Close() {
220220
}()
221221
}
222222

223+
type item struct {
224+
handleKey string
225+
commitTs string
226+
offset int64
227+
}
228+
223229
func (p *saramaAsyncProducer) AsyncRunCallback(
224230
ctx context.Context,
225231
) error {
232+
memo := make(map[string]map[int32]item)
226233
for {
227234
select {
228235
case <-ctx.Done():
@@ -242,6 +249,29 @@ func (p *saramaAsyncProducer) AsyncRunCallback(
242249
if callback != nil {
243250
callback()
244251
}
252+
tableID := string(ack.Headers[0].Value)
253+
commitTs := string(ack.Headers[1].Value)
254+
handleKey := string(ack.Headers[2].Value)
255+
256+
partitionMemo := memo[tableID]
257+
if partitionMemo == nil {
258+
partitionMemo = make(map[int32]item)
259+
memo[tableID] = partitionMemo
260+
}
261+
previous := partitionMemo[ack.Partition]
262+
if ack.Offset < previous.offset {
263+
log.Warn("kafka async producer receive an out-of-order message response",
264+
zap.String("tableID", tableID), zap.Int32("partition", ack.Partition),
265+
zap.Int64("oldOffset", previous.offset), zap.String("oldCommitTs", previous.commitTs),
266+
zap.String("oldHandleKey", previous.handleKey),
267+
zap.Int64("newOffset", ack.Offset), zap.String("commitTs", commitTs),
268+
zap.String("handleKey", handleKey))
269+
}
270+
partitionMemo[ack.Partition] = item{
271+
handleKey: handleKey,
272+
commitTs: commitTs,
273+
offset: ack.Offset,
274+
}
245275
}
246276
case err := <-p.producer.Errors():
247277
// We should not wrap a nil pointer if the pointer
@@ -262,21 +292,24 @@ func (p *saramaAsyncProducer) AsyncRunCallback(
262292
func (p *saramaAsyncProducer) AsyncSend(ctx context.Context,
263293
topic string,
264294
partition int32,
265-
key []byte,
266-
value []byte,
267-
callback func(),
295+
message *common.Message,
268296
) error {
297+
headers := message.Headers()
269298
msg := &sarama.ProducerMessage{
270299
Topic: topic,
271300
Partition: partition,
272-
Key: sarama.StringEncoder(key),
273-
Value: sarama.ByteEncoder(value),
274-
Metadata: callback,
301+
Headers: headers,
302+
Key: sarama.StringEncoder(message.Key),
303+
Value: sarama.ByteEncoder(message.Value),
304+
Metadata: message.Callback,
275305
}
276306
select {
277307
case <-ctx.Done():
278308
return errors.Trace(ctx.Err())
279309
case p.producer.Input() <- msg:
280310
}
311+
log.Info("async producer send message",
312+
zap.Int64("tableID", message.TableID), zap.ByteString("handleKey", headers[2].Value),
313+
zap.Int32("partition", partition), zap.Uint64("commitTs", message.Ts))
281314
return nil
282315
}

pkg/sink/kafka/mock_factory.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/pingcap/errors"
2323
"github.com/pingcap/tiflow/cdc/model"
2424
cerror "github.com/pingcap/tiflow/pkg/errors"
25+
"github.com/pingcap/tiflow/pkg/sink/codec/common"
2526
"github.com/pingcap/tiflow/pkg/util"
2627
)
2728

@@ -167,15 +168,14 @@ func (p *MockSaramaAsyncProducer) AsyncRunCallback(
167168

168169
// AsyncSend implement the AsyncProducer interface.
169170
func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string,
170-
partition int32, key []byte, value []byte,
171-
callback func(),
171+
partition int32, message *common.Message,
172172
) error {
173173
msg := &sarama.ProducerMessage{
174174
Topic: topic,
175175
Partition: partition,
176-
Key: sarama.StringEncoder(key),
177-
Value: sarama.ByteEncoder(value),
178-
Metadata: callback,
176+
Key: sarama.StringEncoder(message.Key),
177+
Value: sarama.ByteEncoder(message.Value),
178+
Metadata: message.Callback,
179179
}
180180
select {
181181
case <-ctx.Done():

pkg/sink/kafka/v2/factory.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/pingcap/tiflow/cdc/model"
2727
"github.com/pingcap/tiflow/pkg/errors"
2828
"github.com/pingcap/tiflow/pkg/security"
29+
"github.com/pingcap/tiflow/pkg/sink/codec/common"
2930
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
3031
"github.com/pingcap/tiflow/pkg/util"
3132
"github.com/segmentio/kafka-go"
@@ -364,8 +365,7 @@ func (a *asyncWriter) Close() {
364365
// AsyncSend is the input channel for the user to write messages to that they
365366
// wish to send.
366367
func (a *asyncWriter) AsyncSend(ctx context.Context, topic string,
367-
partition int32, key []byte, value []byte,
368-
callback func(),
368+
partition int32, message *common.Message,
369369
) error {
370370
select {
371371
case <-ctx.Done():
@@ -375,9 +375,9 @@ func (a *asyncWriter) AsyncSend(ctx context.Context, topic string,
375375
return a.w.WriteMessages(ctx, kafka.Message{
376376
Topic: topic,
377377
Partition: int(partition),
378-
Key: key,
379-
Value: value,
380-
WriterData: callback,
378+
Key: message.Key,
379+
Value: message.Value,
380+
WriterData: message.Callback,
381381
})
382382
}
383383

pkg/sink/kafka/v2/factory_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/pingcap/tiflow/cdc/model"
2424
cerror "github.com/pingcap/tiflow/pkg/errors"
2525
"github.com/pingcap/tiflow/pkg/security"
26+
"github.com/pingcap/tiflow/pkg/sink/codec/common"
2627
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
2728
v2mock "github.com/pingcap/tiflow/pkg/sink/kafka/v2/mock"
2829
"github.com/pingcap/tiflow/pkg/util"
@@ -244,14 +245,14 @@ func TestAsyncWriterAsyncSend(t *testing.T) {
244245

245246
ctx, cancel := context.WithCancel(context.Background())
246247

247-
callback := func() {}
248+
message := &common.Message{Key: []byte{'1'}, Value: []byte{}, Callback: func() {}}
248249
mw.EXPECT().WriteMessages(gomock.Any(), gomock.Any()).Return(nil)
249-
err := w.AsyncSend(ctx, "topic", 1, []byte{'1'}, []byte{}, callback)
250+
err := w.AsyncSend(ctx, "topic", 1, message)
250251
require.NoError(t, err)
251252

252253
cancel()
253254

254-
err = w.AsyncSend(ctx, "topic", 1, []byte{'1'}, []byte{}, callback)
255+
err = w.AsyncSend(ctx, "topic", 1, nil)
255256
require.ErrorIs(t, err, context.Canceled)
256257
}
257258

scripts/download-integration-test-binaries.sh

+6-3
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,13 @@ function download_binaries() {
117117
file_server_url="http://fileserver.pingcap.net"
118118

119119
# Get sha1 based on branch name.
120-
tidb_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1")
121-
tikv_sha1=$(curl "${file_server_url}/download/refs/pingcap/tikv/${branch}/sha1")
122-
pd_sha1=$(curl "${file_server_url}/download/refs/pingcap/pd/${branch}/sha1")
120+
# tidb_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1")
121+
# tikv_sha1=$(curl "${file_server_url}/download/refs/pingcap/tikv/${branch}/sha1")
122+
# pd_sha1=$(curl "${file_server_url}/download/refs/pingcap/pd/${branch}/sha1")
123123
tiflash_sha1=$(curl "${file_server_url}/download/refs/pingcap/tiflash/${branch}/sha1")
124+
tidb_sha1="15a52d8b9c7eb373fa0b9b71a0ac346f652b7cdf"
125+
tikv_sha1="b4bddeeb995e7bedc1973ce9e856eeb2d856ce9b"
126+
pd_sha1="3f8217cc0efbf496effa2e11cc630f9c30b71ff7"
124127

125128
# All download links.
126129
tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz"

0 commit comments

Comments
 (0)