Skip to content

Commit

Permalink
Merge pull request codership#50 from codership/gh49
Browse files Browse the repository at this point in the history
Gh49
  • Loading branch information
ayurchen committed Mar 23, 2015
2 parents 60ad098 + fc35674 commit 9650cf3
Show file tree
Hide file tree
Showing 24 changed files with 318 additions and 221 deletions.
46 changes: 21 additions & 25 deletions galera/src/galera_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <galerautils.h>
#include <string.h>

#include <vector>

using namespace galera;

static size_t
Expand All @@ -15,57 +17,51 @@ view_info_size (int members)

/* create view info out of configuration message */
wsrep_view_info_t* galera_view_info_create (const gcs_act_cchange& conf,
int const my_idx,
wsrep_uuid_t& my_uuid)
{
wsrep_view_info_t* ret = static_cast<wsrep_view_info_t*>(
::malloc(view_info_size(conf.memb_num)));
::malloc(view_info_size(conf.memb.size())));

if (ret)
{
const char* str = conf.memb;
int m;

wsrep_uuid_t uuid;
memcpy(uuid.data, conf.uuid.data, sizeof(uuid.data));
wsrep_seqno_t seqno = conf.seqno != GCS_SEQNO_ILL ?
conf.seqno : WSREP_SEQNO_UNDEFINED;
wsrep_gtid_t gtid = { uuid, seqno };
wsrep_uuid_t const uuid(to_wsrep_uuid(conf.uuid));
wsrep_seqno_t const seqno
(conf.seqno != GCS_SEQNO_ILL ? conf.seqno : WSREP_SEQNO_UNDEFINED);
wsrep_gtid_t const gtid = { uuid, seqno };

ret->state_id = gtid;
ret->view = conf.conf_id;
ret->status = conf.conf_id != -1 ?
WSREP_VIEW_PRIMARY : WSREP_VIEW_NON_PRIMARY;
ret->my_idx = -1;
ret->memb_num = conf.memb_num;
ret->memb_num = conf.memb.size();
ret->proto_ver = conf.appl_proto_ver;

for (m = 0; m < ret->memb_num; m++) {
wsrep_member_info_t* member = &ret->members[m];
for (int m = 0; m < ret->memb_num; ++m)
{
const gcs_act_cchange::member& cm(conf.memb[m]); // from
wsrep_member_info_t& wm(ret->members[m]); // to

size_t id_len = strlen(str);
gu_uuid_scan (str, id_len,reinterpret_cast<gu_uuid_t*>(&member->id));
str = str + id_len + 1;
wm.id = to_wsrep_uuid(cm.uuid_);

if (member->id == my_uuid)
if (wm.id == my_uuid)
{
ret->my_idx = m;
}

strncpy(member->name, str, sizeof(member->name) - 1);
member->name[sizeof(member->name) - 1] = '\0';
str = str + strlen(str) + 1;
strncpy(wm.name, cm.name_.c_str(), sizeof(wm.name) - 1);
wm.name[sizeof(wm.name) - 1] = '\0';

strncpy(member->incoming, str, sizeof(member->incoming) - 1);
member->incoming[sizeof(member->incoming) - 1] = '\0';
str = str + strlen(str) + 1;
strncpy(wm.incoming, cm.incoming_.c_str(), sizeof(wm.incoming) - 1);
wm.incoming[sizeof(wm.incoming) - 1] = '\0';

str += sizeof(gcs_seqno_t); // skip cached seqno.
}

if (WSREP_UUID_UNDEFINED == my_uuid && conf.my_idx >= 0)
if (WSREP_UUID_UNDEFINED == my_uuid && my_idx >= 0)
{
assert(-1 == ret->my_idx);
ret->my_idx = conf.my_idx;
ret->my_idx = my_idx;
assert(ret->my_idx < ret->memb_num);
my_uuid = ret->members[ret->my_idx].id;
}
Expand Down
5 changes: 3 additions & 2 deletions galera/src/galera_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
#include "wsrep_api.h"

/* create view info out of configuration message
* if my_uuid is defined - use it to determine my_idx,
* otherwise set my_uuid according to conf.my_idx */
* if my_uuid is defined - use it to determine wsrep_view_info_t::my_idx,
* otherwise set my_uuid according to my_idx */
extern wsrep_view_info_t*
galera_view_info_create (const gcs_act_cchange& conf,
int my_idx,
wsrep_uuid_t& my_uuid);

/* make a copy of view info object */
Expand Down
43 changes: 24 additions & 19 deletions galera/src/gcs_dummy.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//
// Copyright (C) 2011-2012 Codership Oy <[email protected]>
// Copyright (C) 2011-2015 Codership Oy <[email protected]>
//

#include "galera_gcs.hpp"
#include "uuid.hpp"

namespace galera
{
Expand Down Expand Up @@ -74,31 +75,33 @@ namespace galera
{
gcs_act_cchange cc;

gcs_node_state_t const my_state
(primary ? GCS_NODE_STATE_JOINED : GCS_NODE_STATE_NON_PRIM);

if (primary)
{
++global_seqno_;
cc.seqno = global_seqno_;

cc.seqno = global_seqno_;
cc.conf_id = 1;
memcpy (cc.uuid.data, &uuid_, sizeof(uuid_));
cc.memb_num = 1;
cc.my_idx = 0;
cc.my_state = GCS_NODE_STATE_JOINED;
cc.uuid = uuid_;
cc.repl_proto_ver = repl_proto_ver_;
cc.appl_proto_ver = appl_proto_ver_;

char* const str(cc.memb);
ssize_t offt(0);
offt += gu_uuid_print (&uuid_, str, GU_UUID_STR_LEN+1) + 1;
offt += sprintf (str + offt, "%s", my_name_.c_str()) + 1;
sprintf (str + offt, "%s", incoming_.c_str());
/* we have single member here */
gcs_act_cchange::member m;

m.uuid_ = uuid_;
m.name_ = my_name_;
m.incoming_ = incoming_;
m.state_ = my_state;

cc.memb.push_back(m);
}
else
{
cc.seqno = GCS_SEQNO_ILL;
cc.conf_id = -1;
cc.memb_num = 0;
cc.my_idx = -1;
cc.my_state = GCS_NODE_STATE_NON_PRIM;
}

cc_size_ = cc.write(&cc_);
Expand All @@ -109,7 +112,6 @@ namespace galera
return -ENOMEM;
}


return cc_size_;
}

Expand All @@ -124,7 +126,6 @@ namespace galera

if (ret > 0)
{
// state_ = S_CONNECTED;
cond_.signal();
ret = 0;
}
Expand Down Expand Up @@ -204,14 +205,18 @@ namespace galera

gcs_act_cchange const cc(act.buf, act.size);

if (cc.my_idx < 0)
act.seqno_g = (cc.conf_id >= 0 ? 0 : -1);

int const my_idx(act.seqno_g);

if (my_idx < 0)
{
assert (0 == cc.memb_num);
assert (0 == cc.memb.size());
state_ = S_CLOSED;
}
else
{
assert (1 == cc.memb_num);
assert (1 == cc.memb.size());
state_ = S_CONNECTED;
}

Expand Down
15 changes: 8 additions & 7 deletions galera/src/ist_proto.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,22 +555,23 @@ namespace galera
case Message::T_CCHANGE:
case Message::T_SKIP:
{
size_t offset(0);
size_t offset(0);
int64_t seqno_g(msg.seqno()); // compatibility with 3.x

if (gu_unlikely(version_ < 8)) // compatibility with 3.x
{
assert(msg.type() == Message::T_TRX);

int64_t seqno_g, seqno_d;
int64_t seqno_d;

buf.resize(sizeof(seqno_g) + sizeof(seqno_d));

n = asio::read(socket, asio::buffer(&buf[0],buf.size()));
if (n != buf.size())
{
gu_throw_error(EPROTO) <<
"error reading trx meta data";
assert(0);
gu_throw_error(EPROTO)
<< "error reading trx meta data";
}

offset = gu::unserialize8(&buf[0],buf.size(),0,seqno_g);
Expand All @@ -597,10 +598,10 @@ namespace galera

msg.set_type_seqno(type, seqno_g);
}
else
else // end compatibility with 3.x
{
assert(seqno_g > 0);
} // end compatibility with 3.x
}

assert(msg.seqno() > 0);

Expand Down Expand Up @@ -631,7 +632,7 @@ namespace galera
* but it should not change below. Saving const for later
* assert(). */
Message::Type const msg_type(msg.type());
gcs_act_type const gcs_type
gcs_act_type const gcs_type
(msg_type == Message::T_CCHANGE ?
GCS_ACT_CCHANGE : GCS_ACT_WRITESET);

Expand Down
50 changes: 29 additions & 21 deletions galera/src/replicator_smm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ wsrep_status_t galera::ReplicatorSMM::async_recv(void* recv_ctx)
gcs_act_cchange const cc;
wsrep_uuid_t tmp(uuid_);
wsrep_view_info_t* const err_view
(galera_view_info_create(cc, tmp));
(galera_view_info_create(cc, -1, tmp));
view_cb_(app_ctx_, recv_ctx, err_view, 0, 0);
free(err_view);
}
Expand Down Expand Up @@ -1492,13 +1492,14 @@ galera::ReplicatorSMM::update_incoming_list(const wsrep_view_info_t& view)
}
}

static galera::Replicator::State state2repl(const gcs_act_cchange& conf)
static galera::Replicator::State state2repl(gcs_node_state const my_state,
int const my_idx)
{
switch (conf.my_state)
switch (my_state)
{
case GCS_NODE_STATE_NON_PRIM:
if (conf.my_idx >= 0) return galera::Replicator::S_CONNECTED;
else return galera::Replicator::S_CLOSING;
if (my_idx >= 0) return galera::Replicator::S_CONNECTED;
else return galera::Replicator::S_CLOSING;
case GCS_NODE_STATE_PRIM:
return galera::Replicator::S_CONNECTED;
case GCS_NODE_STATE_JOINER:
Expand All @@ -1509,10 +1510,11 @@ static galera::Replicator::State state2repl(const gcs_act_cchange& conf)
return galera::Replicator::S_SYNCED;
case GCS_NODE_STATE_DONOR:
return galera::Replicator::S_DONOR;
case GCS_NODE_STATE_MAX:;
case GCS_NODE_STATE_MAX:
assert(0);
}

gu_throw_fatal << "unhandled gcs state: " << conf.my_state;
gu_throw_fatal << "unhandled gcs state: " << my_state;
GU_DEBUG_NORETURN;
}

Expand All @@ -1524,8 +1526,6 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx,

gcs_act_cchange const conf(cc.buf, cc.size);

assert(cc.seqno_g == conf.seqno);

bool const from_IST(0 == cc.seqno_l);

LocalOrder lo(cc.seqno_l);
Expand All @@ -1545,12 +1545,22 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx,
assert(!from_IST || conf.repl_proto_ver >= 8);
}

// we must have either my_idx or uuid_ defined
assert(cc.seqno_g >= 0 || uuid_ != WSREP_UUID_UNDEFINED);

wsrep_uuid_t new_uuid(uuid_);
wsrep_view_info_t* const view_info(galera_view_info_create(conf, new_uuid));
wsrep_view_info_t* const view_info
(galera_view_info_create(conf, (!from_IST ? cc.seqno_g : -1), new_uuid));
int const my_idx(view_info->my_idx);
gcs_node_state_t const my_state
(my_idx >= 0 ? conf.memb[my_idx].state_ : GCS_NODE_STATE_NON_PRIM);

assert(my_state >= GCS_NODE_STATE_NON_PRIM);
assert(my_state < GCS_NODE_STATE_MAX);

wsrep_seqno_t const group_seqno(view_info->state_id.seqno);
const wsrep_uuid_t& group_uuid (view_info->state_id.uuid);
assert(group_seqno == cc.seqno_g);
assert(group_seqno == conf.seqno);

if (!from_IST)
{
Expand All @@ -1570,10 +1580,10 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx,

log_info << "####### My UUID: " << uuid_;

if (cc.seqno_g != WSREP_SEQNO_UNDEFINED &&
cc.seqno_g <= sst_seqno_)
if (conf.seqno != WSREP_SEQNO_UNDEFINED &&
conf.seqno <= sst_seqno_)
{
log_info << "####### skipping CC " << cc.seqno_g
log_info << "####### skipping CC " << conf.seqno
<< (from_IST ? ", from IST" : ", local");

// applied already in SST/IST, skip
Expand All @@ -1593,12 +1603,11 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx,

update_incoming_list(*view_info);

log_info << "####### processing CC " << cc.seqno_g
log_info << "####### processing CC " << conf.seqno
<< (from_IST ? ", from IST" : ", local");

bool const st_required
(state_transfer_required(*view_info,
conf.my_state == GCS_NODE_STATE_PRIM));
(state_transfer_required(*view_info, my_state == GCS_NODE_STATE_PRIM));

void* app_req(0);
size_t app_req_len(0);
Expand Down Expand Up @@ -1651,7 +1660,7 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx,
}
}

Replicator::State const next_state(state2repl(conf));
Replicator::State const next_state(state2repl(my_state, my_idx));

if (conf.conf_id >= 0) // Primary configuration
{
Expand Down Expand Up @@ -1712,8 +1721,7 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx,
else if (conf.seqno > cert_.position())
{
assert(!app_waits_sst);
assert(group_uuid == conf.uuid);
assert(group_seqno == cc.seqno_g);
assert(group_uuid == conf.uuid);
assert(group_seqno == conf.seqno);

/* since CC does not pass certification, need to adjust cert
Expand Down Expand Up @@ -1835,7 +1843,7 @@ galera::ReplicatorSMM::process_conf_change(void* recv_ctx,

free(view_info);

if (conf.conf_id < 0 && conf.memb_num == 0) {
if (conf.conf_id < 0 && conf.memb.size() == 0) {
assert(cc.seqno_l > 0);
assert(S_CLOSING == next_state);
log_debug << "Received SELF-LEAVE. Closing connection.";
Expand Down
10 changes: 10 additions & 0 deletions galera/src/uuid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ namespace galera
return *reinterpret_cast<gu_uuid_t*>(&uuid);
}

inline const wsrep_uuid_t& to_wsrep_uuid(const gu_uuid_t& uuid)
{
return *reinterpret_cast<const wsrep_uuid_t*>(&uuid);
}

inline wsrep_uuid_t& to_wsrep_uuid(gu_uuid_t& uuid)
{
return *reinterpret_cast<wsrep_uuid_t*>(&uuid);
}

inline bool operator==(const wsrep_uuid_t& a, const wsrep_uuid_t& b)
{
return to_gu_uuid(a) == to_gu_uuid(b);
Expand Down
Loading

0 comments on commit 9650cf3

Please sign in to comment.