From fd8410bc613329f4532fffc2850d8be80b8b7b3d Mon Sep 17 00:00:00 2001 From: Julius Goryavsky Date: Tue, 30 Oct 2018 03:01:06 +0100 Subject: [PATCH] Sporadic Galera failures when testing MariaDB with mtr This patch fixes several bugs in Galera that lead to sporadic failures when testing MariaDB server with the mtr due to false error messages (or warnings) that are not related to the new fixes. The patch includes a some changes taken from the latest versions of Galera from Codership, as well as some changes taken from PXC version of Galera, as well as some corrections made by me. 1) Some mtr tests sometimes fails due to "[Warning] WSREP: gcs_caused() returned -1" warnings: Currently gcs_.caused() function works only when the group is primary, and fails if the group is non-primary or even if the group in a transient state (during configuration changes). Instead of failing immediately, this patch changes gcs_.caused() to return EAGAIN error code when function was called while group in a transient state. On receiving EAGAIN error code ReplicatorSMM::causal_read() retries to obtain a new seqno (by calling gcs_.caused() again). 2) Some mtr tests sometimes fails due to "[Warning] WSREP: Failed to report last committed " warnings: This is because when processing cluster configuration changes, the GCS layer does not always timely update the group->last_applied variable. To correct this error, I added an additional call to the group_redo_last_applied() function. In addition, to protect against other similar situations, I added a cycle to re-call gcs_.set_last_applied() in case of failure due to interruption of internal operations in the current Galera implementation. 3) Some mtr test sometimes fails when node is evicted from the cluster in middle of SST. Even when node evicted, the SST script may completes normally. After this, the node calls the gcs_join() function and tries to join the cluster. However, this is impossible, because the node is already evicted. Therefore, the _join() function (which called from gcs_join) fails. Then node does IST (which also fails), after/during which it is aborted. To fix this, we should avoid joining the cluster through gcs_join function if node is evicted. To do this, we should check the current connection state in the gcs_join() function to return from it immediately if the node's communication channel was closed. 4) If SST fails due to a network error, the node that acted as a donor sometimes does not return to its original state, which leads to failure due to the inability to continue the test execution (due to a timeout). If sst_sent() fails node should restore itself back to joined state. The sst_sent function can fail. commonly due to network errors, where DONOR may lose connectivity to JOINER (or existing cluster). But on re-join it should restore the original state without waiting for transition to JOINER state. SST failure on JOINER will gracefully shutdown the joiner. https://jira.mariadb.org/browse/MDEV-17565 --- galera/src/certification.cpp | 5 ++- galera/src/certification.hpp | 3 +- galera/src/galera_gcs.hpp | 31 +++++++++++++-- galera/src/galera_service_thd.cpp | 38 ++++++++++++++---- galera/src/replicator_smm.cpp | 57 +++++++++++++++++++++------ galera/src/replicator_str.cpp | 4 +- galera/tests/write_set_check.cpp | 8 ++-- gcache/src/GCache.hpp | 8 ++++ gcs/src/SConscript | 17 ++++---- gcs/src/gcs.cpp | 63 +++++++++++++++++++++--------- gcs/src/gcs.hpp | 8 +++- gcs/src/gcs_core.cpp | 64 ++++++++++++++++++++----------- gcs/src/gcs_core.hpp | 4 +- gcs/src/gcs_gcomm.cpp | 20 +++++++--- gcs/src/gcs_group.cpp | 16 ++++++-- gcs/src/gcs_node.cpp | 1 + gcs/src/gcs_test.cpp | 4 ++ gcs/src/unit_tests/SConscript | 1 + 18 files changed, 259 insertions(+), 93 deletions(-) diff --git a/galera/src/certification.cpp b/galera/src/certification.cpp index a087e2df7..dfd200a93 100644 --- a/galera/src/certification.cpp +++ b/galera/src/certification.cpp @@ -808,7 +808,7 @@ galera::Certification::do_test_preordered(TrxHandle* trx) } -galera::Certification::Certification(gu::Config& conf, ServiceThd& thd) +galera::Certification::Certification(gu::Config& conf, ServiceThd& thd, gcache::GCache& gcache) : version_ (-1), trx_map_ (), @@ -816,6 +816,7 @@ galera::Certification::Certification(gu::Config& conf, ServiceThd& thd) cert_index_ng_ (), deps_set_ (), service_thd_ (thd), + gcache_ (gcache), mutex_ (), trx_size_warn_count_ (0), initial_position_ (-1), @@ -1066,7 +1067,7 @@ wsrep_seqno_t galera::Certification::set_trx_committed(TrxHandle* trx) deps_set_.erase(i); } - if (gu_unlikely(index_purge_required())) + if (gu_unlikely(gcache_.cleanup_required() || index_purge_required())) { ret = get_safe_to_discard_seqno_(); } diff --git a/galera/src/certification.hpp b/galera/src/certification.hpp index cac4d4b14..d42b2e417 100644 --- a/galera/src/certification.hpp +++ b/galera/src/certification.hpp @@ -48,7 +48,7 @@ namespace galera TEST_FAILED } TestResult; - Certification(gu::Config& conf, ServiceThd& thd); + Certification(gu::Config& conf, ServiceThd& thd, gcache::GCache& gcache); ~Certification(); void assign_initial_position(wsrep_seqno_t seqno, int versiono); @@ -183,6 +183,7 @@ namespace galera CertIndexNG cert_index_ng_; DepsSet deps_set_; ServiceThd& service_thd_; + gcache::GCache& gcache_; gu::Mutex mutex_; size_t trx_size_warn_count_; wsrep_seqno_t initial_position_; diff --git a/galera/src/galera_gcs.hpp b/galera/src/galera_gcs.hpp index c3cb271a4..4f3eb367f 100644 --- a/galera/src/galera_gcs.hpp +++ b/galera/src/galera_gcs.hpp @@ -43,7 +43,8 @@ namespace galera virtual ssize_t replv(const WriteSetVector&, gcs_action& act, bool) = 0; virtual ssize_t repl (gcs_action& act, bool) = 0; - virtual gcs_seqno_t caused() = 0; + virtual void caused(gcs_seqno_t& seqno, + gu::datetime::Date& wait_until) = 0; virtual ssize_t schedule() = 0; virtual ssize_t interrupt(ssize_t) = 0; virtual ssize_t resume_recv() = 0; @@ -141,7 +142,23 @@ namespace galera return gcs_repl(conn_, &act, scheduled); } - gcs_seqno_t caused() { return gcs_caused(conn_); } + void caused(gcs_seqno_t& seqno, gu::datetime::Date& wait_until) + { + long err; + + while ((err = gcs_caused(conn_, seqno)) == -EAGAIN && + gu::datetime::Date::calendar() < wait_until) + { + usleep(1000); + } + + if (err == -EAGAIN) err = -ETIMEDOUT; + + if (err < 0) + { + gu_throw_error(-err); + } + } ssize_t schedule() { return gcs_schedule(conn_); } @@ -233,6 +250,11 @@ namespace galera size_t max_action_size() const { return GCS_MAX_ACT_SIZE; } + void join_notification() + { + gcs_join_notification(conn_); + } + private: Gcs(const Gcs&); @@ -311,7 +333,10 @@ namespace galera return ret; } - gcs_seqno_t caused() { return global_seqno_; } + void caused(gcs_seqno_t& seqno, gu::datetime::Date& wait_until) + { + seqno = global_seqno_; + } ssize_t schedule() { diff --git a/galera/src/galera_service_thd.cpp b/galera/src/galera_service_thd.cpp index cb9e5e877..8c04770ef 100644 --- a/galera/src/galera_service_thd.cpp +++ b/galera/src/galera_service_thd.cpp @@ -52,21 +52,43 @@ galera::ServiceThd::thd_func (void* arg) { if (data.act_ & A_LAST_COMMITTED) { - ssize_t const ret - (st->gcs_.set_last_applied(data.last_committed_)); + static const size_t max_set_attempts(4); + size_t attempts = 0; + ssize_t ret; - if (gu_unlikely(ret < 0)) + do { - log_warn << "Failed to report last committed " - << data.last_committed_ << ", " << ret - << " (" << strerror (-ret) << ')'; - // @todo: figure out what to do in this case + ret = st->gcs_.set_last_applied(data.last_committed_); + + if (gu_likely(ret != -EINTR)) + { + break; + } + + attempts++; + + // gcs_set_last_applied() may return EINTR if the send + // monitor was interruped, this is not an error and + // in this case there is no need to display a warning: + log_debug << "Reporting of last committed was " + "interrupted: " + << data.last_committed_ + << "\nRetrying " << attempts << "th time"; } - else + while (attempts != max_set_attempts); + + if (gu_likely(ret >= 0)) { log_debug << "Reported last committed: " << data.last_committed_; } + else + { + log_warn << "Failed to report last committed " + << data.last_committed_ << ", " << ret + << " (" << strerror (-ret) << ')'; + // @todo: figure out what to do in this case + } } if (data.act_ & A_RELEASE_SEQNO) diff --git a/galera/src/replicator_smm.cpp b/galera/src/replicator_smm.cpp index 041b2ba3b..07b6afc68 100644 --- a/galera/src/replicator_smm.cpp +++ b/galera/src/replicator_smm.cpp @@ -164,7 +164,7 @@ galera::ReplicatorSMM::ReplicatorSMM(const struct wsrep_init_args* args) ist_receiver_ (config_, slave_pool_, args->node_address), ist_senders_ (gcs_, gcache_), wsdb_ (), - cert_ (config_, service_thd_), + cert_ (config_, service_thd_, gcache_), local_monitor_ (), apply_monitor_ (), commit_monitor_ (), @@ -859,8 +859,8 @@ wsrep_status_t galera::ReplicatorSMM::replay_trx(TrxHandle* trx, void* trx_ctx) ApplyOrder ao(*trx); gu_trace(apply_monitor_.enter(ao)); trx->set_state(TrxHandle::S_MUST_REPLAY_CM); - // fall through } + // fall through case TrxHandle::S_MUST_REPLAY_CM: if (co_mode_ != CommitOrder::BYPASS) { @@ -895,6 +895,13 @@ wsrep_status_t galera::ReplicatorSMM::replay_trx(TrxHandle* trx, void* trx_ctx) catch (gu::Exception& e) { st_.mark_corrupt(); + + /* Before doing a graceful exit ensure that node isolate itself + from the cluster. This will cause the quorum to re-evaluate + and if minority nodes are left with different set of data + they can turn non-Primary to avoid further data consistency issue. */ + param_set("gmcast.isolate", "1"); + throw; } @@ -967,12 +974,19 @@ wsrep_status_t galera::ReplicatorSMM::post_rollback(TrxHandle* trx) wsrep_status_t galera::ReplicatorSMM::causal_read(wsrep_gtid_t* gtid) { - wsrep_seqno_t cseq(static_cast(gcs_.caused())); + wsrep_seqno_t cseq; + gu::datetime::Date wait_until(gu::datetime::Date::calendar() + + causal_read_timeout_); - if (cseq < 0) + try { - log_warn << "gcs_caused() returned " << cseq << " (" << strerror(-cseq) - << ')'; + gcs_.caused(cseq, wait_until); + assert(cseq >= 0); + } + catch (gu::Exception& e) + { + log_warn << "gcs_caused() returned " << -e.get_errno() + << " (" << strerror(e.get_errno()) << ")"; return WSREP_TRX_FAIL; } @@ -985,8 +999,6 @@ wsrep_status_t galera::ReplicatorSMM::causal_read(wsrep_gtid_t* gtid) // at monitor drain and disallowing further waits until // configuration change related operations (SST etc) have been // finished. - gu::datetime::Date wait_until(gu::datetime::Date::calendar() - + causal_read_timeout_); if (gu_likely(co_mode_ != CommitOrder::BYPASS)) { commit_monitor_.wait(cseq, wait_until); @@ -1198,6 +1210,13 @@ galera::ReplicatorSMM::sst_sent(const wsrep_gtid_t& state_id, int const rcode) if (state_() != S_DONOR) { log_error << "sst sent called when not SST donor, state " << state_(); + /* If sst_sent() fails node should restore itself back to the joined + state. The sst_sent function can fail. commonly due to network errors, + where DONOR may lose connectivity to JOINER (or existing cluster). + But on re-join it should restore the original state without waiting + for transition to JOINER state (DONOR->JOINER->JOINED->SYNCED). + SST failure on JOINER will gracefully shutdown the joiner.*/ + gcs_.join_notification(); return WSREP_CONN_FAIL; } @@ -1247,7 +1266,14 @@ void galera::ReplicatorSMM::process_trx(void* recv_ctx, TrxHandle* trx) log_fatal << "Failed to apply trx: " << *trx; log_fatal << e.what(); - log_fatal << "Node consistency compromized, aborting..."; + log_fatal << "Node consistency compromised, aborting..."; + + /* Before doing a graceful exit ensure that node isolate itself + from the cluster. This will cause the quorum to re-evaluate + and if minority nodes are left with different set of data + they can turn non-Primary to avoid further data consistency issue. */ + param_set("gmcast.isolate", "1"); + abort(); } break; @@ -1412,6 +1438,12 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx, size_t app_req_len(0); const_cast(view_info).state_gap = st_required; + + // We need to set the protocol version BEFORE the view callback, so that + // any version-dependent code is run using the correct version instead of -1. + if (view_info.view >= 0) // Primary configuration + establish_protocol_versions (repl_proto); + wsrep_cb_status_t const rcode( view_cb_(app_ctx_, recv_ctx, &view_info, 0, 0, &app_req, &app_req_len)); @@ -1420,6 +1452,7 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx, assert(app_req_len <= 0); log_fatal << "View callback failed. This is unrecoverable, " << "restart required."; + local_monitor_.leave(lo); close(); abort(); } @@ -1428,14 +1461,13 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx, log_fatal << "Local state UUID " << state_uuid_ << " is different from group state UUID " << group_uuid << ", and SST request is null: restart required."; + local_monitor_.leave(lo); close(); abort(); } if (view_info.view >= 0) // Primary configuration { - establish_protocol_versions (repl_proto); - // we have to reset cert initial position here, SST does not contain // cert index yet (see #197). // Also this must be done before releasing GCache buffers. @@ -1528,6 +1560,7 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx, { log_fatal << "Internal error: unexpected next state for " << "non-prim: " << next_state << ". Restart required."; + local_monitor_.leave(lo); close(); abort(); } @@ -1870,6 +1903,6 @@ galera::ReplicatorSMM::update_state_uuid (const wsrep_uuid_t& uuid) void galera::ReplicatorSMM::abort() { - gcs_.close(); + close(); gu_abort(); } diff --git a/galera/src/replicator_str.cpp b/galera/src/replicator_str.cpp index cb8272269..568f79710 100644 --- a/galera/src/replicator_str.cpp +++ b/galera/src/replicator_str.cpp @@ -536,7 +536,9 @@ ReplicatorSMM::prepare_state_request (const void* const sst_req, } catch (gu::Exception& e) { - log_warn + log_info << "State gap can't be serviced using IST." + " Switching to SST"; + log_info << "Failed to prepare for incremental state transfer: " << e.what() << ". IST will be unavailable."; } diff --git a/galera/tests/write_set_check.cpp b/galera/tests/write_set_check.cpp index b85708012..9943fc2e5 100644 --- a/galera/tests/write_set_check.cpp +++ b/galera/tests/write_set_check.cpp @@ -51,6 +51,7 @@ namespace gu::Config& conf() { return conf_; } galera::ServiceThd& thd() { return thd_; } + gcache::GCache& gcache() { return gcache_; } private: @@ -408,7 +409,7 @@ START_TEST(test_cert_hierarchical_v1) size_t nws(sizeof(wsi)/sizeof(wsi[0])); TestEnv env; - galera::Certification cert(env.conf(), env.thd()); + galera::Certification cert(env.conf(), env.thd(), env.gcache()); int const version(1); cert.assign_initial_position(0, version); galera::TrxHandle::Params const trx_params("", version,KeySet::MAX_VERSION); @@ -530,7 +531,7 @@ START_TEST(test_cert_hierarchical_v2) size_t nws(sizeof(wsi)/sizeof(wsi[0])); TestEnv env; - galera::Certification cert(env.conf(), env.thd()); + galera::Certification cert(env.conf(), env.thd(), env.gcache()); cert.assign_initial_position(0, version); galera::TrxHandle::Params const trx_params("", version,KeySet::MAX_VERSION); @@ -580,7 +581,8 @@ START_TEST(test_trac_726) const int version(2); TestEnv env; - galera::Certification cert(env.conf(), env.thd()); + galera::Certification cert(env.conf(), env.thd(), env.gcache()); + galera::TrxHandle::Params const trx_params("", version,KeySet::MAX_VERSION); wsrep_uuid_t uuid1 = {{1, }}; wsrep_uuid_t uuid2 = {{2, }}; diff --git a/gcache/src/GCache.hpp b/gcache/src/GCache.hpp index d1f61e1bb..ef74127de 100644 --- a/gcache/src/GCache.hpp +++ b/gcache/src/GCache.hpp @@ -99,6 +99,14 @@ namespace gcache int64_t& seqno_d, ssize_t& size); + /*! + * Implements the cleanup policy test. + */ + bool cleanup_required() + { + return (params.keep_pages_size() && ps.total_size() > params.keep_pages_size()); + } + class Buffer { public: diff --git a/gcs/src/SConscript b/gcs/src/SConscript index 6566e5ab6..9d91b61b3 100644 --- a/gcs/src/SConscript +++ b/gcs/src/SConscript @@ -7,6 +7,7 @@ libgcs_env = env.Clone() # Include paths libgcs_env.Append(CPPPATH = Split(''' + #/common #/galerautils/src #/gcomm/src #/gcache/src @@ -15,18 +16,18 @@ libgcs_env.Append(CPPPATH = Split(''' # Backends (TODO: Get from global options) libgcs_env.Append(CPPFLAGS = ' -DGCS_USE_GCOMM') # For C-style logging -libgcs_env.Append(CPPFLAGS = ' -DGALERA_LOG_H_ENABLE_CXX -Wno-variadic-macros') +libgcs_env.Append(CPPFLAGS = ' -DGALERA_LOG_H_ENABLE_CXX') # Disable old style cast warns until code is fixed -libgcs_env.Append(CPPFLAGS = ' -Wno-old-style-cast') +libgcs_env.Replace(CXXFLAGS = libgcs_env['CXXFLAGS'].replace('-Wold-style-cast', '')) +libgcs_env.Replace(CXXFLAGS = libgcs_env['CXXFLAGS'].replace('-Weffc++', '')) # Allow zero sized arrays libgcs_env.Replace(CCFLAGS = libgcs_env['CCFLAGS'].replace('-pedantic', '')) -libgcs_env.Append(CPPFLAGS = ' -Wno-missing-field-initializers') -libgcs_env.Append(CPPFLAGS = ' -Wno-effc++') +libgcs_env.Append(CCFLAGS = ' -Wno-missing-field-initializers') +libgcs_env.Append(CCFLAGS = ' -Wno-variadic-macros') -print libgcs_env['CFLAGS'] -print libgcs_env['CCFLAGS'] -print libgcs_env['CPPFLAGS'] -print libgcs_env['CXXFLAGS'] +print('gcs flags:') +for f in ['CFLAGS', 'CXXFLAGS', 'CCFLAGS', 'CPPFLAGS']: + print(f + ': ' + libgcs_env[f].strip()) gcs4garb_env = libgcs_env.Clone() diff --git a/gcs/src/gcs.cpp b/gcs/src/gcs.cpp index f038bcb50..62ef9e9eb 100644 --- a/gcs/src/gcs.cpp +++ b/gcs/src/gcs.cpp @@ -175,6 +175,10 @@ struct gcs_conn /* #603, #606 join control */ bool volatile need_to_join; gcs_seqno_t volatile join_seqno; + void join_notification() + { + need_to_join = true; + } /* sync control */ bool sync_sent_; @@ -880,17 +884,14 @@ _join (gcs_conn_t* conn, gcs_seqno_t seqno) while (-EAGAIN == (err = gcs_core_send_join (conn->core, seqno))) usleep (10000); - switch (err) + if (gu_unlikely(err < 0)) { - case -ENOTCONN: gu_warn ("Sending JOIN failed: %d (%s). " "Will retry in new primary component.", err, strerror(-err)); - case 0: - return 0; - default: - gu_error ("Sending JOIN failed: %d (%s).", err, strerror(-err)); return err; } + + return 0; } /*! Handles configuration action */ @@ -1035,8 +1036,8 @@ gcs_handle_act_state_req (gcs_conn_t* conn, /*! Allocates buffer with malloc to pass to the upper layer. */ static long -gcs_handle_state_change (gcs_conn_t* conn, - const struct gcs_act* act) +gcs_handle_state_change (gcs_conn_t* conn, + struct gcs_act* act) { gu_debug ("Got '%s' dated %lld", gcs_act_type_to_str (act->type), gcs_seqno_gtoh(*(gcs_seqno_t*)act->buf)); @@ -1378,14 +1379,11 @@ static void *gcs_recv_thread (void *arg) } else if (conn->my_idx == rcvd.sender_idx) { - gu_fatal("Protocol violation: unordered local action not in repl_q:" - " { {%p, %zd, %s}, %ld, %lld }.", + gu_debug("Discarding: unordered local action not in repl_q: " + "{ {%p, %zd, %s}, %ld, %lld }.", rcvd.act.buf, rcvd.act.buf_len, gcs_act_type_to_str(rcvd.act.type), rcvd.sender_idx, rcvd.id); - assert(0); - ret = -ENOTRECOVERABLE; - break; } else { @@ -1405,9 +1403,12 @@ static void *gcs_recv_thread (void *arg) } else if (ret < 0) { + /* We must set connection state to 'closed' to avoid the race + condition between gcs_recv_thread() and gcs_recv(), which + could lead to assertion in gcs_recv: */ + gcs_shift_state (conn, GCS_CONN_CLOSED); /* In case of error call _close() to release repl_q waiters. */ (void)_close(conn, false); - gcs_shift_state (conn, GCS_CONN_CLOSED); } gu_info ("RECV thread exiting %d: %s", ret, strerror(-ret)); return NULL; @@ -1597,9 +1598,9 @@ long gcs_interrupt (gcs_conn_t* conn, long handle) return gcs_sm_interrupt (conn->sm, handle); } -gcs_seqno_t gcs_caused(gcs_conn_t* conn) +long gcs_caused(gcs_conn_t* conn, gcs_seqno_t& seqno) { - return gcs_core_caused(conn->core); + return gcs_core_caused(conn->core, seqno); } /* Puts action in the send queue and returns after it is replicated */ @@ -1998,10 +1999,29 @@ gcs_set_last_applied (gcs_conn_t* conn, gcs_seqno_t seqno) long gcs_join (gcs_conn_t* conn, gcs_seqno_t seqno) { - conn->join_seqno = seqno; - conn->need_to_join = true; + // Even when node is evicted from the cluster in middle of SST, + // the SST may completes normally. After this, the node calls + // the gcs_join function and tries to join the cluster. However, + // this is impossible, because the node is already evicted. + // Therefore, the _join() function (which called from gcs_join) + // fails. Then node does IST (which also fails), after/during + // which it is aborted. To fix this, we should avoid joining + // the cluster through gcs_join function if node is evicted. + // To do this, we should check the current connection state + // in the gcs_join() function to return from it immediately + // if the node's communication channel was closed: + + if (conn->state < GCS_CONN_CLOSED) + { + conn->join_seqno = seqno; + conn->need_to_join = true; - return _join (conn, seqno); + return _join (conn, seqno); + } + else + { + return GCS_CLOSED_ERROR; + } } gcs_seqno_t gcs_local_sequence(gcs_conn_t* conn) @@ -2295,3 +2315,8 @@ const char* gcs_param_get (gcs_conn_t* conn, const char* key) return NULL; } + +void gcs_join_notification(gcs_conn_t* conn) +{ + conn->need_to_join = true; +} diff --git a/gcs/src/gcs.hpp b/gcs/src/gcs.hpp index 8e0e58bad..3af3eed9f 100644 --- a/gcs/src/gcs.hpp +++ b/gcs/src/gcs.hpp @@ -280,9 +280,11 @@ extern long gcs_resume_recv (gcs_conn_t* conn); * After action with this seqno is applied, this thread is guaranteed to see * all the changes made by the client, even on other nodes. * - * @return global sequence number or negative error code + * @retval 0 success + * @retval -EPERM operation not permitted (in NON_PRIMARY state) + * @retval -EAGAIN operation may be retried later (in transient state) */ -extern gcs_seqno_t gcs_caused(gcs_conn_t* conn); +extern long gcs_caused (gcs_conn_t* conn, gcs_seqno_t& seqno); /*! @brief Sends state transfer request * Broadcasts state transfer request which will be passed to one of the @@ -451,6 +453,8 @@ extern void gcs_flush_stats(gcs_conn_t *conn); void gcs_get_status(gcs_conn_t* conn, gu::Status& status); +extern void gcs_join_notification(gcs_conn_t *conn); + /*! A node with this name will be treated as a stateless arbitrator */ #define GCS_ARBITRATOR_NAME "garb" diff --git a/gcs/src/gcs_core.cpp b/gcs/src/gcs_core.cpp index 060922135..5af4c0cc0 100644 --- a/gcs/src/gcs_core.cpp +++ b/gcs/src/gcs_core.cpp @@ -97,6 +97,7 @@ core_act_t; typedef struct causal_act { gcs_seqno_t* act_id; + long* error; gu_mutex_t* mtx; gu_cond_t* cond; } causal_act_t; @@ -677,12 +678,18 @@ core_handle_last_msg (gcs_core_t* core, gcs_group_handle_last_msg (&core->group, msg); if (commit_cut) { /* commit cut changed */ - if ((act->buf = malloc (sizeof (commit_cut)))) { - act->type = GCS_ACT_COMMIT_CUT; + + int const buf_len(sizeof(commit_cut)); + void* const buf(malloc(buf_len)); + + if (gu_likely(NULL != (buf))) { /* #701 - everything that goes into the action buffer * is expected to be serialized. */ - *((gcs_seqno_t*)act->buf) = gcs_seqno_htog(commit_cut); - act->buf_len = sizeof(commit_cut); + *((gcs_seqno_t*)buf) = gcs_seqno_htog(commit_cut); + assert(NULL == act->buf); + act->buf = buf; + act->buf_len = buf_len; + act->type = GCS_ACT_COMMIT_CUT; return act->buf_len; } else { @@ -812,6 +819,7 @@ core_handle_comp_msg (gcs_core_t* core, "WAIT_STATE_MSG. Can't continue."); ret = -ENOTRECOVERABLE; assert(0); + // fall through default: gu_fatal ("Failed to handle component message: %d (%s)!", ret, strerror (-ret)); @@ -1021,23 +1029,33 @@ core_msg_to_action (gcs_core_t* core, static long core_msg_causal(gcs_core_t* conn, struct gcs_recv_msg* msg) { - causal_act_t* act; - if (gu_unlikely(msg->size != sizeof(*act))) + if (gu_unlikely(msg->size != sizeof(causal_act_t))) { gu_error("invalid causal act len %ld, expected %ld", - msg->size, sizeof(*act)); + msg->size, sizeof(causal_act_t)); return -EPROTO; } - gcs_seqno_t const causal_seqno = - GCS_GROUP_PRIMARY == conn->group.state ? - conn->group.act_id_ : GCS_SEQNO_ILL; - - act = (causal_act_t*)msg->buf; + causal_act_t* act= (causal_act_t*)msg->buf; gu_mutex_lock(act->mtx); - *act->act_id = causal_seqno; - gu_cond_signal(act->cond); + { + switch (conn->group.state) + { + case GCS_GROUP_PRIMARY: + *act->act_id = conn->group.act_id_; + break; + case GCS_GROUP_WAIT_STATE_UUID: + case GCS_GROUP_WAIT_STATE_MSG: + *act->error = -EAGAIN; + break; + default: + *act->error = -EPERM; + } + + gu_cond_signal(act->cond); + } gu_mutex_unlock(act->mtx); + return msg->size; } @@ -1324,20 +1342,22 @@ gcs_core_send_fc (gcs_core_t* core, const void* fc, size_t fc_size) return ret; } -gcs_seqno_t -gcs_core_caused(gcs_core_t* core) +long +gcs_core_caused (gcs_core_t* core, gcs_seqno_t& seqno) { - long ret; - gcs_seqno_t act_id = GCS_SEQNO_ILL; + long error = 0; gu_mutex_t mtx; gu_cond_t cond; - causal_act_t act = {&act_id, &mtx, &cond}; + causal_act_t act = {&seqno, &error, &mtx, &cond}; gu_mutex_init (&mtx, NULL); gu_cond_init (&cond, NULL); gu_mutex_lock (&mtx); { - ret = core_msg_send_retry (core, &act, sizeof(act), GCS_MSG_CAUSAL); + long ret = core_msg_send_retry (core, + &act, + sizeof(act), + GCS_MSG_CAUSAL); if (ret == sizeof(act)) { @@ -1346,14 +1366,14 @@ gcs_core_caused(gcs_core_t* core) else { assert (ret < 0); - act_id = ret; + error = ret; } } gu_mutex_unlock (&mtx); gu_mutex_destroy (&mtx); gu_cond_destroy (&cond); - return act_id; + return error; } long diff --git a/gcs/src/gcs_core.hpp b/gcs/src/gcs_core.hpp index 1b3bc9456..df3038d34 100644 --- a/gcs/src/gcs_core.hpp +++ b/gcs/src/gcs_core.hpp @@ -154,8 +154,8 @@ gcs_core_send_sync (gcs_core_t* core, gcs_seqno_t seqno); extern long gcs_core_send_fc (gcs_core_t* core, const void* fc, size_t fc_size); -extern gcs_seqno_t -gcs_core_caused(gcs_core_t* core); +extern long +gcs_core_caused (gcs_core_t* core, gcs_seqno_t& seqno); extern long gcs_core_param_set (gcs_core_t* core, const char* key, const char* value); diff --git a/gcs/src/gcs_gcomm.cpp b/gcs/src/gcs_gcomm.cpp index fc4b80873..8acc6410c 100644 --- a/gcs/src/gcs_gcomm.cpp +++ b/gcs/src/gcs_gcomm.cpp @@ -223,7 +223,7 @@ class GCommConn : public Consumer, public Toplay error_ = ENOTCONN; int err; - if ((err = pthread_create(&thd_, 0, &run_fn, this)) != 0) + if ((err = gu_thread_create(&thd_, 0, &run_fn, this)) != 0) { gu_throw_error(err) << "Failed to create thread"; } @@ -301,11 +301,18 @@ class GCommConn : public Consumer, public Toplay pthread_join(thd_, 0); { gcomm::Critical crit(*net_); - log_info << "gcomm: closing backend"; - tp_->close(error_ != 0 || force == true); - gcomm::disconnect(tp_, this); - delete tp_; - tp_ = 0; + if (tp_ == 0) + { + log_info << "gcomm: backend already closed"; + } + else + { + log_info << "gcomm: closing backend"; + tp_->close(error_ != 0 || force == true); + gcomm::disconnect(tp_, this); + delete tp_; + tp_ = 0; + } } const Message* msg; @@ -704,6 +711,7 @@ static GCS_BACKEND_RECV_FN(gcomm_recv) if (cm_size <= msg->buf_len) { memcpy(msg->buf, cm, cm_size); + msg->size = cm_size; recv_buf.pop_front(); msg->type = GCS_MSG_COMPONENT; } diff --git a/gcs/src/gcs_group.cpp b/gcs/src/gcs_group.cpp index 2e6eb40ed..15fa329c4 100644 --- a/gcs/src/gcs_group.cpp +++ b/gcs/src/gcs_group.cpp @@ -255,7 +255,7 @@ group_check_donor (gcs_group_t* group) gu_warn ("Donor %s is no longer in the group. State transfer cannot " "be completed, need to abort. Aborting...", donor_id); - gu_abort(); + // gu_abort(); } return; @@ -342,6 +342,8 @@ group_post_state_exchange (gcs_group_t* group) } assert (group->prim_num > 0); + + group_redo_last_applied(group); } else { // non-primary configuration @@ -757,9 +759,15 @@ gcs_group_handle_join_msg (gcs_group_t* group, const gcs_recv_msg_t* msg) } } else { - gu_info ("%d.%d (%s): State transfer %s %d.%d (%s) complete.", - sender_idx, sender->segment, sender->name, st_dir, - peer_idx, peer ? peer->segment : -1, peer_name); + if (GCS_NODE_STATE_JOINED == sender->status) { + gu_info ("%d.%d (%s): State transfer %s %d.%d (%s) complete.", + sender_idx, sender->segment, sender->name, st_dir, + peer_idx, peer ? peer->segment : -1, peer_name); + } + else { + assert(sender->desync_count > 0); + return 0; // don't deliver up + } } } } diff --git a/gcs/src/gcs_node.cpp b/gcs/src/gcs_node.cpp index 49a49d8f7..c9d0ac08f 100644 --- a/gcs/src/gcs_node.cpp +++ b/gcs/src/gcs_node.cpp @@ -181,6 +181,7 @@ gcs_node_update_status (gcs_node_t* node, const gcs_state_quorum_t* quorum) else { node->desync_count = 1; } + // fall through case GCS_NODE_STATE_SYNCED: node->count_last_applied = true; break; diff --git a/gcs/src/gcs_test.cpp b/gcs/src/gcs_test.cpp index c6472d544..fae878ad4 100644 --- a/gcs/src/gcs_test.cpp +++ b/gcs/src/gcs_test.cpp @@ -644,15 +644,19 @@ static long gcs_test_conf (gcs_test_conf_t *conf, long argc, char *argv[]) case 6: conf->n_recv = strtol (argv[5], &endptr, 10); if ('\0' != *endptr) goto error; + // fall through case 5: conf->n_send = strtol (argv[4], &endptr, 10); if ('\0' != *endptr) goto error; + // fall through case 4: conf->n_repl = strtol (argv[3], &endptr, 10); if ('\0' != *endptr) goto error; + // fall through case 3: conf->n_tries = strtol (argv[2], &endptr, 10); if ('\0' != *endptr) goto error; + // fall through case 2: conf->backend = argv[1]; break; diff --git a/gcs/src/unit_tests/SConscript b/gcs/src/unit_tests/SConscript index 57f33fad7..b4cd3f0f0 100644 --- a/gcs/src/unit_tests/SConscript +++ b/gcs/src/unit_tests/SConscript @@ -5,6 +5,7 @@ env = check_env.Clone() # Include paths env.Append(CPPPATH = Split(''' + #/common #/galerautils/src #/gcache/src #/gcs/src