1
1
package com .ctrip .xpipe .redis .core .protocal .cmd ;
2
2
3
+ import com .ctrip .xpipe .api .command .Command ;
3
4
import com .ctrip .xpipe .api .command .CommandFuture ;
4
5
import com .ctrip .xpipe .api .command .CommandFutureListener ;
5
6
import com .ctrip .xpipe .api .payload .InOutPayload ;
8
9
import com .ctrip .xpipe .gtid .GtidSet ;
9
10
import com .ctrip .xpipe .lifecycle .LifecycleHelper ;
10
11
import com .ctrip .xpipe .netty .commands .NettyClient ;
11
- import com .ctrip .xpipe .pool .FixedObjectPool ;
12
12
import com .ctrip .xpipe .pool .ReturnObjectException ;
13
13
import com .ctrip .xpipe .redis .core .exception .RedisRuntimeException ;
14
- import com .ctrip .xpipe .redis .core .protocal .*;
14
+ import com .ctrip .xpipe .redis .core .protocal .RedisClientProtocol ;
15
+ import com .ctrip .xpipe .redis .core .protocal .Xsync ;
16
+ import com .ctrip .xpipe .redis .core .protocal .XsyncObserver ;
15
17
import com .ctrip .xpipe .redis .core .protocal .protocal .AbstractBulkStringParser ;
16
18
import com .ctrip .xpipe .redis .core .protocal .protocal .EofType ;
17
19
import com .ctrip .xpipe .redis .core .protocal .protocal .RdbBulkStringParser ;
20
22
import com .ctrip .xpipe .utils .StringUtil ;
21
23
import io .netty .buffer .ByteBuf ;
22
24
import io .netty .channel .Channel ;
23
- import io .netty .channel .ChannelFuture ;
24
- import io .netty .channel .ChannelFutureListener ;
25
25
26
26
import java .io .IOException ;
27
27
import java .nio .channels .WritableByteChannel ;
28
28
import java .util .LinkedList ;
29
29
import java .util .List ;
30
30
import java .util .concurrent .ScheduledExecutorService ;
31
+ import java .util .concurrent .atomic .AtomicLong ;
31
32
32
33
import static com .ctrip .xpipe .redis .core .protocal .Xsync .XSYNC_STATE .READING_COMMANDS ;
33
34
@@ -47,22 +48,34 @@ public class DefaultXsync extends AbstractRedisCommand<Object> implements Xsync,
47
48
48
49
private GtidSet rdbDataGtidSet ;
49
50
51
+ private long rdbOffset ;
52
+
50
53
private NettyClient nettyClient ;
51
54
52
55
private List <XsyncObserver > observers = new LinkedList <>();
53
56
54
57
protected Xsync .XSYNC_STATE xsyncState = XSYNC_STATE .XSYNC_COMMAND_WAIT_RESPONSE ;
55
58
59
+ private int listeningPort ;
60
+
61
+ private AtomicLong currentCommandOffset ;
62
+
56
63
public DefaultXsync (String host , int port , GtidSet gtidSetExcluded , Object vectorClockExcluded , ScheduledExecutorService scheduled ) {
57
64
super (host , port , scheduled );
58
65
this .gitdSetExcluded = gtidSetExcluded ;
59
66
this .vectorClockExcluded = vectorClockExcluded ;
60
67
}
61
68
62
- public DefaultXsync (SimpleObjectPool <NettyClient > clientPool , GtidSet gtidSetExcluded , Object vectorClockExcluded , ScheduledExecutorService scheduled ) {
69
+ public DefaultXsync (SimpleObjectPool <NettyClient > clientPool , GtidSet gtidSetExcluded , Object vectorClockExcluded , ScheduledExecutorService scheduled , int listeningPort ) {
63
70
super (clientPool , scheduled );
64
71
this .gitdSetExcluded = gtidSetExcluded ;
65
72
this .vectorClockExcluded = vectorClockExcluded ;
73
+ this .listeningPort = listeningPort ;
74
+ this .currentCommandOffset = new AtomicLong (0 );
75
+ }
76
+
77
+ public NettyClient getNettyClient () {
78
+ return nettyClient ;
66
79
}
67
80
68
81
@ Override
@@ -118,9 +131,11 @@ protected Object doReceiveResponse(Channel channel, ByteBuf byteBuf) throws Exce
118
131
break ;
119
132
120
133
case READING_COMMANDS :
134
+ int prevIndex = byteBuf .readerIndex ();
121
135
Object cmdPayload = super .doReceiveResponse (channel , byteBuf );
136
+ currentCommandOffset .addAndGet (byteBuf .readerIndex () - prevIndex );
122
137
if (cmdPayload instanceof Object []) {
123
- doOnCommand ((Object []) cmdPayload );
138
+ doOnCommand (currentCommandOffset . getAndSet ( 0 ), (Object []) cmdPayload );
124
139
} else if (null != cmdPayload ) {
125
140
getLogger ().info ("[doReceiveResponse][{}][unknown payload] {}, {}" , READING_COMMANDS , this , cmdPayload );
126
141
throw new RedisRuntimeException ("unknown payload:" + cmdPayload );
@@ -153,17 +168,24 @@ protected void handleRedisResponse(Channel channel, String xsync) throws IOExcep
153
168
}
154
169
155
170
if (split [0 ].equalsIgnoreCase (FULL_SYNC )) {
156
- if (split .length != 2 ) {
171
+ if (split .length < 2 ) {
157
172
throw new RedisRuntimeException ("unknown reply:" + xsync );
158
173
}
159
- this .rdbDataGtidSet = new GtidSet (split [1 ]);
174
+ this .rdbDataGtidSet = GtidSet .PLACE_HOLDER .equals (split [1 ]) ? new GtidSet (GtidSet .EMPTY_GTIDSET ) : new GtidSet (split [1 ]);
175
+ if (split .length > 2 ) {
176
+ this .rdbOffset = Long .parseLong (split [2 ]);
177
+ }
160
178
getLogger ().debug ("[readRedisResponse][FULL]{}, {} {}" , ChannelUtil .getDesc (channel ), this , rdbDataGtidSet );
161
179
xsyncState = XSYNC_STATE .READING_RDB ;
162
180
doOnFullSync ();
163
181
} else if (split [0 ].equalsIgnoreCase (PARTIAL_SYNC )) {
164
182
xsyncState = READING_COMMANDS ;
165
183
getLogger ().debug ("[readRedisResponse][PARTIAL]{}, {}" , ChannelUtil .getDesc (channel ), this );
166
- doOnContinue ();
184
+ long continueOffset = 0 ;
185
+ if (split .length > 1 ) {
186
+ continueOffset = Long .parseLong (split [1 ]);
187
+ }
188
+ doOnContinue (continueOffset );
167
189
} else {
168
190
throw new RedisRuntimeException ("unknown reply:" + xsync );
169
191
}
@@ -173,31 +195,31 @@ private void doOnFullSync() {
173
195
getLogger ().debug ("[doOnFullSync] {}" , this );
174
196
for (XsyncObserver observer : observers ) {
175
197
try {
176
- observer .onFullSync (rdbDataGtidSet );
198
+ observer .onFullSync (rdbDataGtidSet , rdbOffset );
177
199
} catch (Throwable th ) {
178
200
getLogger ().error ("[doOnFullSync][fail] {}" , observer , th );
179
201
}
180
202
}
181
203
resetClient ();
182
204
}
183
205
184
- private void doOnContinue () {
206
+ private void doOnContinue (long continueOffset ) {
185
207
getLogger ().debug ("[doOnContinue] {}" , this );
186
208
for (XsyncObserver observer : observers ) {
187
209
try {
188
- observer .onContinue (gitdSetExcluded );
210
+ observer .onContinue (gitdSetExcluded , continueOffset );
189
211
} catch (Throwable th ) {
190
212
getLogger ().error ("[doOnContinue][fail] {}" , observer , th );
191
213
}
192
214
}
193
215
resetClient ();
194
216
}
195
217
196
- private void doOnCommand (Object [] rawCmdArgs ) {
218
+ private void doOnCommand (long commandOffset , Object [] rawCmdArgs ) {
197
219
getLogger ().debug ("[doOnCommand] {}" , this );
198
220
for (XsyncObserver observer : observers ) {
199
221
try {
200
- observer .onCommand (rawCmdArgs );
222
+ observer .onCommand (commandOffset , rawCmdArgs );
201
223
} catch (Throwable th ) {
202
224
getLogger ().error ("[doOnCommand][fail] {}" , observer , th );
203
225
}
@@ -209,7 +231,7 @@ private void beginReadRdb() {
209
231
getLogger ().debug ("[beginReadRdb] {}" , this );
210
232
for (XsyncObserver observer : observers ) {
211
233
try {
212
- observer .beginReadRdb (eofType , rdbDataGtidSet );
234
+ observer .beginReadRdb (eofType , rdbDataGtidSet , rdbOffset );
213
235
} catch (Throwable th ) {
214
236
getLogger ().error ("[beginReadRdb][fail] {}" , observer , th );
215
237
}
@@ -223,36 +245,44 @@ private void onRdbData(ByteBuf byteBuf) {
223
245
observer .onRdbData (byteBuf .slice ());
224
246
} catch (Throwable th ) {
225
247
getLogger ().error ("[notifyRdbData][fail] {}" , observer , th );
248
+ throw th ;
226
249
}
227
250
}
228
251
}
229
252
230
253
private void endReadRdb () {
231
254
getLogger ().debug ("[endReadRdb] {}" , this );
232
- if (eofType .putOnLineOnAck ()) {
233
- Replconf replconf = new Replconf (new FixedObjectPool <>(nettyClient ), Replconf .ReplConfType .ACK , scheduled , "1" );
234
- replconf .execute ().addListener (commandFuture -> {
235
- if (!commandFuture .isSuccess ()) {
236
- getLogger ().info ("[endReadRdb][ack] fail" , commandFuture .cause ());
237
- future ().setFailure (new XpipeRuntimeException ("ack after rdb fail" , commandFuture .cause ()));
238
- }
239
- });
240
- }
241
255
242
256
for (XsyncObserver observer : observers ) {
243
257
try {
244
- observer .endReadRdb (eofType , rdbDataGtidSet );
258
+ observer .endReadRdb (eofType , rdbDataGtidSet , rdbOffset );
245
259
} catch (Throwable th ) {
246
260
getLogger ().error ("[notifyRdbData][fail] {}" , observer , th );
247
261
}
248
262
}
249
263
}
250
264
265
+ private Command <Object > replConfListeningPort () {
266
+
267
+ return new Replconf (getClientPool (), Replconf .ReplConfType .LISTENING_PORT , scheduled ,
268
+ String .valueOf (listeningPort ));
269
+ }
270
+
251
271
@ Override
252
272
protected void afterCommandExecute (NettyClient nettyClient ) {
253
273
// temporary solution, handle channel evicted by channel pool
254
274
255
275
this .nettyClient = nettyClient ;
276
+
277
+ replConfListeningPort ().execute ().addListener (new CommandFutureListener <Object >() {
278
+ @ Override
279
+ public void operationComplete (CommandFuture <Object > commandFuture ) throws Exception {
280
+ if (!commandFuture .isSuccess ()) {
281
+ close ();
282
+ }
283
+ }
284
+ });
285
+
256
286
nettyClient .channel ().closeFuture ().addListener (closeFuture -> {
257
287
if (!future ().isDone ()) {
258
288
future ().setFailure (new XpipeRuntimeException ("channel closed" ));
0 commit comments