Skip to content

Commit 8d9d709

Browse files
nikunjyldemailly
authored andcommitted
Fixing bug, receiver sends invalid local checkpoints in long runing mode
Summary: Long running mode has been broken in the trunk, as well as in the new version. Receiver last checkpoint was not being cleared. How to see it broken: 1. Make a receiver in long running mode with an older version 2. Start a sender with newer version and do a transfer 3. Start another sender with newer version. This step should fail Third step fails because the new sender doesn't understand the checkpoint sent by the last version Reviewed By: ldemailly Differential Revision: D2581890 fb-gh-sync-id: 2968b3d96d23cd94597879f8bd126ac91686f0f7
1 parent 27ed573 commit 8d9d709

6 files changed

+83
-11
lines changed

CMakeLists.txt

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2)
2222
# There is no C per se in WDT but if you use CXX only here many checks fail
2323
# Version is Major.Minor.YYMMDDX for up to 10 releases per day
2424
# Minor currently is also the protocol version - has to match with Protocol.cpp
25-
project("WDT" LANGUAGES C CXX VERSION 1.22.1510250)
25+
project("WDT" LANGUAGES C CXX VERSION 1.22.1510270)
2626

2727
# On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2)
2828
set(CMAKE_CXX_STANDARD 11)
@@ -315,4 +315,6 @@ if (BUILD_TESTING)
315315
add_test(NAME ReceiverThrottlerRefCountTest COMMAND
316316
"${CMAKE_CURRENT_SOURCE_DIR}/test/receiver_throttler_ref_count_test.sh")
317317

318+
add_test(NAME WdtLongRunningTest COMMAND
319+
"${CMAKE_CURRENT_SOURCE_DIR}/test/wdt_long_running_test.py")
318320
endif(BUILD_TESTING)

ReceiverThread.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ ReceiverState ReceiverThread::sendLocalCheckpoint() {
223223
localCheckpoint.numBlocks = -1;
224224
checkpoints.emplace_back(localCheckpoint);
225225
} else {
226+
VLOG(1) << *this << " sending local checkpoint " << checkpoint_;
226227
checkpoints.emplace_back(checkpoint_);
227228
}
228229

@@ -289,7 +290,7 @@ ReceiverState ReceiverThread::processSettingsCmd() {
289290
return FINISH_WITH_ERROR;
290291
}
291292
if (senderProtocolVersion != threadProtocolVersion_) {
292-
LOG(ERROR) << "Receiver and sender protocol version mismatch "
293+
LOG(ERROR) << *this << " Receiver and sender protocol version mismatch "
293294
<< senderProtocolVersion << " " << threadProtocolVersion_;
294295
int negotiatedProtocol = Protocol::negotiateProtocol(
295296
senderProtocolVersion, threadProtocolVersion_);
@@ -899,6 +900,9 @@ void ReceiverThread::reset() {
899900
senderReadTimeout_ = senderWriteTimeout_ = -1;
900901
curConnectionVerified_ = false;
901902
threadStats_.reset();
903+
checkpoints_.clear();
904+
newCheckpoints_.clear();
905+
checkpoint_ = Checkpoint(socket_.getPort());
902906
}
903907

904908
ReceiverThread::~ReceiverThread() {

SenderThread.cpp

+14-7
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,23 @@ SenderState SenderThread::readLocalCheckPoint() {
8585
numReconnectWithoutProgress_++;
8686
return CONNECT;
8787
}
88+
bool isValidCheckpoint = true;
8889
if (!Protocol::decodeCheckpoints(threadProtocolVersion_, buf_, decodeOffset,
8990
checkpointLen, checkpoints)) {
9091
LOG(ERROR) << "checkpoint decode failure "
91-
<< folly::humanify(std::string(buf_, checkpointLen));
92-
threadStats_.setErrorCode(PROTOCOL_ERROR);
93-
return END;
94-
}
95-
if (checkpoints.size() != 1 || checkpoints[0].port != port_) {
96-
LOG(ERROR) << "illegal local checkpoint "
97-
<< folly::humanify(std::string(buf_, checkpointLen));
92+
<< folly::humanify(std::string(buf_, numRead));
93+
isValidCheckpoint = false;
94+
} else if (checkpoints.size() != 1) {
95+
LOG(ERROR) << "Illegal local checkpoint, unexpected num checkpoints "
96+
<< checkpoints.size() << " "
97+
<< folly::humanify(std::string(buf_, numRead));
98+
isValidCheckpoint = false;
99+
} else if (checkpoints[0].port != port_) {
100+
LOG(ERROR) << "illegal checkpoint, checkpoint " << checkpoints[0]
101+
<< " doesn't match the port " << port_;
102+
isValidCheckpoint = false;
103+
}
104+
if (!isValidCheckpoint) {
98105
threadStats_.setErrorCode(PROTOCOL_ERROR);
99106
return END;
100107
}

WdtConfig.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88

99
#define WDT_VERSION_MAJOR 1
1010
#define WDT_VERSION_MINOR 22
11-
#define WDT_VERSION_BUILD 1510250
11+
#define WDT_VERSION_BUILD 1510270
1212
// Add -fbcode to version str
13-
#define WDT_VERSION_STR "1.22.1510250-fbcode"
13+
#define WDT_VERSION_STR "1.22.1510270-fbcode"
1414
// Tie minor and proto version
1515
#define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR
1616

test/common_utils.py

+7
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@
1111
import tempfile
1212
import errno
1313

14+
def get_wdt_version():
15+
dummy_cmd = "_bin/wdt/wdt --version"
16+
dummy_process = subprocess.Popen(dummy_cmd.split(),
17+
stdout=subprocess.PIPE)
18+
protocol_string = dummy_process.stdout.readline().strip()
19+
return protocol_string.split()[4]
20+
1421
def start_receiver(receiver_cmd, root_dir, test_count):
1522
print("Receiver: " + receiver_cmd)
1623
server_log = "{0}/server{1}.log".format(root_dir, test_count)

test/wdt_long_running_test.py

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#!/usr/bin/env python
2+
import shutil
3+
from common_utils import *
4+
5+
def run_test(wdtbin, test_name, test_count, root_dir, connection_url):
6+
test_description = ("Test #{0}, [{1}]").format(test_count, test_name)
7+
print(test_description)
8+
sender_cmd = (wdtbin + " -directory {0}/src "
9+
"-connection_url \'{1}\'").format(root_dir, connection_url)
10+
transfer_status = run_sender(sender_cmd, root_dir, test_count)
11+
if transfer_status:
12+
print("Failed " + test_description + ". Check logs"
13+
+ " in " + root_dir)
14+
exit(transfer_status)
15+
16+
def main():
17+
wdt_version = get_wdt_version()
18+
print("wdt protocol version " + wdt_version)
19+
root_dir = create_test_directory("/tmp")
20+
src_dir = root_dir + "/src"
21+
generate_random_files(src_dir, 4096)
22+
23+
wdtbin_opts = "-full_reporting -num_ports 4"
24+
wdtbin = "_bin/wdt/wdt " + wdtbin_opts
25+
#receiver version should be one behind
26+
receiver_version = (int(wdt_version) - 1)
27+
receiver_cmd = (wdtbin + " -start_port 0 -run_as_daemon "
28+
"-skip_writes -protocol_version {0}").format(receiver_version)
29+
#start the receiver in long running mode
30+
print(receiver_cmd)
31+
(receiver_process, connection_url) = start_receiver(receiver_cmd,
32+
root_dir, 0)
33+
34+
run_test(wdtbin, "sender 1 same version", 1, root_dir, connection_url)
35+
run_test(wdtbin, "sender 2 same version", 2, root_dir, connection_url)
36+
37+
protocol_key = "recpv"
38+
prev_str = "{0}={1}".format(protocol_key, receiver_version)
39+
new_str = "{0}={1}".format(protocol_key, wdt_version)
40+
connection_url_new_version = connection_url.replace(prev_str, new_str)
41+
42+
run_test(wdtbin, "sender 1 newer version", 3, root_dir,
43+
connection_url_new_version)
44+
run_test(wdtbin, "sender 2 newer version", 4, root_dir,
45+
connection_url_new_version)
46+
#since receiver is in long running mode, kill it
47+
receiver_process.kill()
48+
print("Tests successful! Removing logs and data from " + root_dir)
49+
shutil.rmtree(root_dir)
50+
51+
if __name__ == "__main__":
52+
main()

0 commit comments

Comments
 (0)