Skip to content

Commit 880d879

Browse files
committed
MB-20054: Regression test - bucket is deleted with DCPBackfill running
Regression test for MB-20054 - the following abort is encountered when a DCPBackfill task is still running when a bucket is deleted: Assertion failed: (engine), function verifyEngine, file ep-engine/src/objectregistry.cc, line 58. This issue occurs because the DCPBackfill object (and associated objects ActiveStream and importantly ActiveStreams' readyQ of Items) is not deleted earlier in the shutdown sequence (via EvpDestroy), as we use ref-counted pointers for it and there is a still an outstanding reference by the AuxIO Thread which is running the task. Hence the DCPBackfill object is only deleted when we finally unregister the deleted bucket from the shared ExecutorPool - see the following backtrace: #1 0x00007f513b75a085 in abort () from /lib64/libc.so.6 #2 0x00007f51337034e2 in ObjectRegistry::onDeleteItem (pItem=<value optimized out>) at ep-engine/src/objectregistry.cc:157 #3 0x00007f5133652094 in ~Item (this=<value optimized out>) at ep-engine/src/item.h:352 #4 SingleThreadedRCPtr<Item>::~SingleThreadedRCPtr (this=<value optimized out>) at ep-engine/src/atomic.h:430 #5 0x00007f51336c7f47 in ~MutationResponse (this=0x3cd87880) at ep-engine/src/dcp-response.h:275 #6 MutationResponse::~MutationResponse (this=0x3cd87880) at ep-engine/src/dcp-response.h:275 #7 0x00007f51336d86aa in clear_UNLOCKED (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.cc:201 #8 ~ActiveStream (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.h:178 #9 ActiveStream::~ActiveStream (this=0x7a3f5fa0) at ep-engine/src/dcp-stream.h:179 #10 0x00007f51336cc808 in RCPtr<Stream>::~RCPtr (this=0xb1823780) at ep-engine/src/atomic.h:348 #11 0x00007f51336d77c7 in ~DCPBackfill (this=0xb1823740) at ep-engine/src/dcp-stream.cc:114 #12 DCPBackfill::~DCPBackfill (this=0xb1823740) at ep-engine/src/dcp-stream.cc:114 #13 0x00007f513368d95f in ~SingleThreadedRCPtr (this=0x5b55a20, e=0x59c4000, taskType=NO_TASK_TYPE) at ep-engine/src/atomic.h:430 #14 ExecutorPool::_stopTaskGroup (this=0x5b55a20, e=0x59c4000, taskType=NO_TASK_TYPE) at ep-engine/src/executorpool.cc:532 #15 0x00007f513368dad3 in ExecutorPool::_unregisterBucket (this=0x5b55a20, engine=0x59c4000) at ep-engine/src/executorpool.cc:551 #16 0x00007f513368e143 in ExecutorPool::unregisterBucket (this=0x5b55a20, engine=0x59c4000) at ep-engine/src/executorpool.cc:602 #17 0x00007f5133655f82 in EventuallyPersistentStore::~EventuallyPersistentStore (this=0x59e6000) at ep-engine/src/ep.cc:365 #18 0x00007f5133672a25 in EventuallyPersistentEngine::~EventuallyPersistentEngine (this=0x59c4000) at ep-engine/src/ep_engine.cc:5791 #19 0x00007f5133672c95 in EvpDestroy (handle=0x59c4000, force=<value optimized out>) at ep-engine/src/ep_engine.cc:143 To actually reproduce the issue is somewhat involved - we need to orchestrate the world such that we delete the engine while a DCPBackfill task is still running. We spin up a separate thread which will run the DCPBackfill task concurrently with destroy - specifically DCPBackfill must start running (and add items to the readyQ) before destroy(), it must then continue running (stop after) _stopTaskGroup is invoked. To achieve this we use a couple of condition variables to synchronise between the two threads - the timeline needs to look like: auxIO thread: [------- DCPBackfill ----------] main thread: [--destroy()--] [ExecutorPool::_stopTaskGroup] --------------------------------------------------------> time Change-Id: Ic64c419cb8e4e0af2378efba9711b121aacee15b Reviewed-on: http://review.couchbase.org/65520 Well-Formed: buildbot <[email protected]> Tested-by: buildbot <[email protected]> Reviewed-by: Jim Walker <[email protected]>
1 parent a417b8f commit 880d879

10 files changed

+254
-15
lines changed

src/connmap.cc

+9-4
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ void ConnNotifier::stop() {
109109
void ConnNotifier::notifyMutationEvent(void) {
110110
bool inverse = false;
111111
if (pendingNotification.compare_exchange_strong(inverse, true)) {
112-
cb_assert(task > 0);
113-
ExecutorPool::get()->wake(task);
112+
if (task > 0) {
113+
ExecutorPool::get()->wake(task);
114+
}
114115
}
115116
}
116117

@@ -754,7 +755,9 @@ bool TapConnMap::mapped(connection_t &tc) {
754755
void TapConnMap::shutdownAllConnections() {
755756
LOG(EXTENSION_LOG_WARNING, "Shutting down tap connections!");
756757

757-
connNotifier_->stop();
758+
if (connNotifier_ != NULL) {
759+
connNotifier_->stop();
760+
}
758761

759762
// Not safe to acquire both connsLock and releaseLock at the same time
760763
// (can trigger deadlock), so first acquire releaseLock to release all
@@ -1023,7 +1026,9 @@ DcpProducer *DcpConnMap::newProducer(const void* cookie,
10231026
void DcpConnMap::shutdownAllConnections() {
10241027
LOG(EXTENSION_LOG_WARNING, "Shutting down dcp connections!");
10251028

1026-
connNotifier_->stop();
1029+
if (connNotifier_ != NULL) {
1030+
connNotifier_->stop();
1031+
}
10271032

10281033
// Not safe to acquire both connsLock and releaseLock at the same time
10291034
// (can trigger deadlock), so first acquire releaseLock to release all

src/connmap.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,10 @@ class ConnMap {
254254
class ConnNotifier {
255255
public:
256256
ConnNotifier(conn_notifier_type ntype, ConnMap &cm)
257-
: notifier_type(ntype), connMap(cm), pendingNotification(false) { }
257+
: notifier_type(ntype),
258+
connMap(cm),
259+
task(0),
260+
pendingNotification(false) { }
258261

259262
void start();
260263

src/ep_engine.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -5661,7 +5661,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(const void* cookie,
56615661
uint32_t opaque,
56625662
uint32_t seqno,
56635663
uint32_t flags,
5664-
void *stream_name,
5664+
const void *stream_name,
56655665
uint16_t nname)
56665666
{
56675667
(void) opaque;

src/ep_engine.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
429429
uint32_t opaque,
430430
uint32_t seqno,
431431
uint32_t flags,
432-
void *stream_name,
432+
const void *stream_name,
433433
uint16_t nname);
434434

435435
ENGINE_ERROR_CODE dcpAddStream(const void* cookie,

src/executorthread.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ void ExecutorThread::run() {
7878
break;
7979
}
8080

81+
now = gethrtime();
8182
if (TaskQueue *q = manager->nextTask(*this, tick)) {
8283
EventuallyPersistentEngine *engine = currentTask->getEngine();
8384
ObjectRegistry::onSwitchThread(engine);
@@ -90,7 +91,6 @@ void ExecutorThread::run() {
9091

9192
// Measure scheduling overhead as difference between the time
9293
// that the task wanted to wake up and the current time
93-
now = gethrtime();
9494
hrtime_t woketime = currentTask->getWaketime();
9595
engine->getEpStore()->logQTime(currentTask->getTypeId(),
9696
now > woketime ? now - woketime

src/fakes/fake_executorpool.h

+8-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class SingleThreadedExecutorPool : public ExecutorPool {
8282
};
8383

8484
/* A fake execution 'thread', to be used with the FakeExecutorPool Allows
85-
* execution of tasks synchronously in the current thrad.
85+
* execution of tasks synchronously in the current thread.
8686
*/
8787
class FakeExecutorThread : public ExecutorThread {
8888
public:
@@ -93,8 +93,15 @@ class FakeExecutorThread : public ExecutorThread {
9393
void runCurrentTask() {
9494
// Only supports one-shot tasks
9595
EXPECT_FALSE(currentTask->run());
96+
completeCurrentTask();
97+
}
98+
99+
// 'completes' the current task; useful if the caller wants to seperately
100+
// run() the current task and then tidy up afterwards.
101+
void completeCurrentTask() {
96102
manager->doneWork(curTaskType);
97103
manager->cancel(currentTask->getId(), true);
104+
currentTask.reset();
98105
}
99106

100107
ExTask& getCurrentTask() {

src/warmup.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ void LoadValueCallback::callback(CacheLookup &lookup)
361361

362362

363363
Warmup::Warmup(EventuallyPersistentStore *st) :
364-
state(), store(st), startTime(0), metadata(0), warmup(0),
364+
state(), store(st), taskId(0), startTime(0), metadata(0), warmup(0),
365365
threadtask_count(0),
366366
estimateTime(0), estimatedItemCount(std::numeric_limits<size_t>::max()),
367367
cleanShutdown(true), corruptAccessLog(false), warmupComplete(false),

tests/module_tests/ep_unit_tests_main.cc

-4
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
#include "configuration.h"
2929
#include "stored-value.h"
3030

31-
/* static storage for environment variable set by putenv(). */
32-
static char allow_no_stats_env[] = "ALLOW_NO_STATS_UPDATE=yeah";
3331

3432
int main(int argc, char **argv) {
3533
bool log_to_stderr = false;
@@ -52,8 +50,6 @@ int main(int argc, char **argv) {
5250
}
5351
}
5452

55-
putenv(allow_no_stats_env);
56-
5753
init_mock_server(log_to_stderr);
5854
get_mock_server_api()->log->set_level(EXTENSION_LOG_DEBUG);
5955

tests/module_tests/evp_store_single_threaded_test.cc

+219
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "evp_store_test.h"
1919

2020
#include "fakes/fake_executorpool.h"
21+
#include "programs/engine_testapp/mock_server.h"
2122
#include "taskqueue.h"
2223

2324
/*
@@ -31,3 +32,221 @@ class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
3132
EventuallyPersistentStoreTest::SetUp();
3233
}
3334
};
35+
36+
static ENGINE_ERROR_CODE dummy_dcp_add_failover_cb(vbucket_failover_t* entry,
37+
size_t nentries,
38+
const void *cookie)
39+
{
40+
return ENGINE_SUCCESS;
41+
}
42+
43+
/* Arguments for the background thread used by
44+
* MB20054_onDeleteItem_during_bucket_deletion
45+
*/
46+
typedef struct {
47+
EventuallyPersistentEngine* engine;
48+
FakeExecutorThread& fake_executor_thread;
49+
SyncObject& backfill_cv;
50+
SyncObject& destroy_cv;
51+
TaskQueue* taskQ;
52+
} mb20054_backfill_thread_params;
53+
54+
static void MB20054_run_backfill_task(void* arg) {
55+
mb20054_backfill_thread_params* params = static_cast<mb20054_backfill_thread_params*>(arg);
56+
EventuallyPersistentEngine* engine = params->engine;
57+
FakeExecutorThread& auxio_thread = params->fake_executor_thread;
58+
SyncObject& backfill_cv = params->backfill_cv;
59+
SyncObject& destroy_cv = params->destroy_cv;
60+
61+
TaskQueue* lpAuxioQ = params->taskQ;
62+
63+
ObjectRegistry::onSwitchThread(engine);
64+
65+
// Run the DCPBackfill task to push items to readyQ. Should return
66+
// false (i.e. one-shot).
67+
EXPECT_FALSE(auxio_thread.getCurrentTask()->run());
68+
69+
// Notify the main thread that it can progress with destroying the
70+
// engine [A].
71+
{
72+
LockHolder lh(backfill_cv);
73+
backfill_cv.notifyOne();
74+
}
75+
76+
// Now wait ourselves for destroy to be completed [B].
77+
LockHolder lh(destroy_cv);
78+
destroy_cv.wait();
79+
80+
// This is the only "hacky" part of the test - we need to somehow
81+
// keep the DCPBackfill task 'running' - i.e. not call
82+
// completeCurrentTask - until the main thread is in
83+
// ExecutorPool::_stopTaskGroup. However we have no way from the test
84+
// to properly signal that we are *inside* _stopTaskGroup -
85+
// called from EVPStore's destructor.
86+
// Best we can do is spin on waiting for the DCPBackfill task to be
87+
// set to 'dead' - and only then completeCurrentTask; which will
88+
// cancel the task.
89+
while (!auxio_thread.getCurrentTask()->isdead()) {
90+
// spin.
91+
}
92+
auxio_thread.completeCurrentTask();
93+
94+
// Cleanup - fetch the next (final) task -
95+
// ActiveStreamCheckpointProcessorTask - so it can be cancelled
96+
// and executorpool shut down.
97+
auxio_thread.updateCurrentTime();
98+
EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
99+
EXPECT_EQ("Process checkpoint(s) for DCP producer",
100+
auxio_thread.getTaskName());
101+
auxio_thread.runCurrentTask();
102+
}
103+
104+
// Check that if onDeleteItem() is called during bucket deletion, we do not
105+
// abort due to not having a valid thread-local 'engine' pointer. This
106+
// has been observed when we have a DCPBackfill task which is deleted during
107+
// bucket shutdown, which has a non-zero number of Items which are destructed
108+
// (and call onDeleteItem).
109+
TEST_F(SingleThreadedEPStoreTest, MB20054_onDeleteItem_during_bucket_deletion) {
110+
SingleThreadedExecutorPool* task_executor =
111+
reinterpret_cast<SingleThreadedExecutorPool*>(ExecutorPool::get());
112+
113+
// Should start with no tasks registered on any queues.
114+
TaskQ& lp_task_q = task_executor->getLpTaskQ();
115+
for (int i = 0; i < lp_task_q.size(); i++) {
116+
ASSERT_EQ(0, lp_task_q[i]->getFutureQueueSize());
117+
ASSERT_EQ(0, lp_task_q[i]->getReadyQueueSize());
118+
}
119+
120+
// [[1] Set our state to active. This should add a VBStatePersistTask to
121+
// the WRITER queue.
122+
EXPECT_EQ(ENGINE_SUCCESS,
123+
store->setVBucketState(vbid, vbucket_state_active, false));
124+
125+
TaskQueue* lpWriterQ = task_executor->getLpTaskQ()[WRITER_TASK_IDX];
126+
TaskQueue* lpAuxioQ = task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
127+
128+
EXPECT_EQ(1, lpWriterQ->getFutureQueueSize());
129+
EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
130+
131+
// Use a FakeExecutorThread to fetch and run the persistTask.
132+
FakeExecutorThread writer_thread(task_executor, WRITER_TASK_IDX);
133+
writer_thread.updateCurrentTime();
134+
EXPECT_TRUE(lpWriterQ->fetchNextTask(writer_thread, false));
135+
EXPECT_EQ("Persisting a vbucket state for vbucket: 0",
136+
writer_thread.getTaskName());
137+
EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
138+
EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
139+
writer_thread.runCurrentTask();
140+
141+
// Perform one SET, then close it's checkpoint. This means that we no
142+
// longer have all sequence numbers in memory checkpoints, forcing the
143+
// DCP stream request to go to disk (backfill).
144+
store_item(vbid, "key", "value");
145+
146+
// Force a new checkpoint.
147+
RCPtr<VBucket> vb = store->getVbMap().getBucket(vbid);
148+
CheckpointManager& ckpt_mgr = vb->checkpointManager;
149+
ckpt_mgr.createNewCheckpoint();
150+
151+
EXPECT_EQ(0, lpWriterQ->getFutureQueueSize());
152+
EXPECT_EQ(0, lpWriterQ->getReadyQueueSize());
153+
154+
EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
155+
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
156+
157+
// Directly flush the vbucket, ensuring data is on disk.
158+
// (This would normally also wake up the checkpoint remover task, but
159+
// as that task was never registered with the ExecutorPool in this test
160+
// environment, we need to manually remove the prev checkpoint).
161+
EXPECT_EQ(1, store->flushVBucket(vbid));
162+
163+
bool new_ckpt_created;
164+
EXPECT_EQ(1,
165+
ckpt_mgr.removeClosedUnrefCheckpoints(vb, new_ckpt_created));
166+
vb.reset();
167+
168+
EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
169+
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
170+
171+
// Create a DCP producer, and start a stream request.
172+
std::string name("test_producer");
173+
EXPECT_EQ(ENGINE_SUCCESS,
174+
engine->dcpOpen(cookie, /*opaque:unused*/{}, /*seqno:unused*/{},
175+
DCP_OPEN_PRODUCER, name.data(), name.size()));
176+
177+
// Expect to have an ActiveStreamCheckpointProcessorTask, which is
178+
// initially snoozed (so we can't run it).
179+
EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
180+
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
181+
182+
uint64_t rollbackSeqno;
183+
// Actual stream request method (EvpDcpStreamReq) is static, so access via
184+
// the engine_interface.
185+
EXPECT_EQ(ENGINE_SUCCESS,
186+
engine->dcp.stream_req(&engine->interface, cookie, /*flags*/0,
187+
/*opaque*/0, /*vbucket*/vbid,
188+
/*start_seqno*/0, /*end_seqno*/-1,
189+
/*vb_uuid*/0xabcd, /*snap_start*/0,
190+
/*snap_end*/0, &rollbackSeqno,
191+
dummy_dcp_add_failover_cb));
192+
193+
// FutureQ should now have an additional DCPBackfill task.
194+
EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
195+
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
196+
197+
// Create an executor 'thread' to obtain shared ownership of the next
198+
// AuxIO task (which should be DCPBackfill). As long as this
199+
// object has it's currentTask set to DCPBackfill, the DCPBackfill task
200+
// will not be deleted.
201+
// Essentially we are simulating a concurrent thread running this task.
202+
FakeExecutorThread auxio_thread(task_executor, AUXIO_TASK_IDX);
203+
auxio_thread.updateCurrentTime();
204+
EXPECT_TRUE(lpAuxioQ->fetchNextTask(auxio_thread, false));
205+
EXPECT_EQ("DCP backfill for vbucket 0", auxio_thread.getTaskName());
206+
207+
// This is the one action we really need to perform 'concurrently' - delete
208+
// the engine while a DCPBackfill task is still running. We spin up a
209+
// separate thread which will run the DCPBackfill task
210+
// concurrently with destroy - specifically DCPBackfill must start running
211+
// (and add items to the readyQ) before destroy(), it must then continue
212+
// running (stop after) _stopTaskGroup is invoked.
213+
// To achieve this we use a couple of condition variables to synchronise
214+
// between the two threads - the timeline needs to look like:
215+
//
216+
// auxIO thread: [------- DCPBackfill ----------]
217+
// main thread: [destroy()] [ExecutorPool::_stopTaskGroup]
218+
//
219+
// --------------------------------------------------------> time
220+
//
221+
SyncObject backfill_cv;
222+
SyncObject destroy_cv;
223+
224+
cb_thread_t concurrent_task_thread;
225+
mb20054_backfill_thread_params params = {engine, auxio_thread, backfill_cv,
226+
destroy_cv, lpAuxioQ};
227+
228+
cb_create_thread(&concurrent_task_thread, MB20054_run_backfill_task, &params, 0);
229+
230+
// [A] Wait for DCPBackfill to complete.
231+
LockHolder lh(backfill_cv);
232+
backfill_cv.wait();
233+
234+
// 'Destroy' the engine - this doesn't delete the object, just shuts down
235+
// connections, marks streams as dead etc.
236+
engine->destroy(/*force*/false);
237+
destroy_mock_event_callbacks();
238+
239+
{
240+
LockHolder lh(destroy_cv);
241+
destroy_cv.notifyOne();
242+
}
243+
244+
// Need to have the current engine valid before deleting (this is what
245+
// EvpDestroy does normally; however we have a smart ptr to the engine
246+
// so must delete via that).
247+
ObjectRegistry::onSwitchThread(engine);
248+
delete engine;
249+
engine = NULL;
250+
251+
cb_join_thread(concurrent_task_thread);
252+
}

tests/module_tests/evp_store_test.cc

+10-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "connmap.h"
3333
#include "ep_engine.h"
3434
#include "flusher.h"
35+
#include "tapthrottle.h"
3536
#include "../mock/mock_dcp_producer.h"
3637

3738
#include "programs/engine_testapp/mock_server.h"
@@ -64,6 +65,12 @@ SynchronousEPEngine::SynchronousEPEngine(const std::string& extra_config)
6465

6566
// checkpointConfig is needed by CheckpointManager (via EPStore).
6667
checkpointConfig = new CheckpointConfig(*this);
68+
69+
// tapConfig is needed by doTapStats().
70+
tapConfig = new TapConfig(*this);
71+
72+
// tapThrottle is needed by doEngineStats().
73+
tapThrottle = new TapThrottle(configuration, stats);
6774
}
6875

6976
void SynchronousEPEngine::setEPStore(EventuallyPersistentStore* store) {
@@ -132,7 +139,9 @@ void EventuallyPersistentStoreTest::SetUp() {
132139
void EventuallyPersistentStoreTest::TearDown() {
133140
destroy_mock_cookie(cookie);
134141
destroy_mock_event_callbacks();
135-
engine->getDcpConnMap().manageConnections();
142+
if (engine) {
143+
engine->getDcpConnMap().manageConnections();
144+
}
136145

137146
// Need to have the current engine valid before deleting (this is what
138147
// EvpDestroy does normally; however we have a smart ptr to the engine

0 commit comments

Comments
 (0)