Skip to content

Commit

Permalink
Merge pull request codership#51 from codership/gh34
Browse files Browse the repository at this point in the history
Gh34
  • Loading branch information
ayurchen committed Mar 23, 2015
2 parents 9650cf3 + b4b07b7 commit b4934ce
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 80 deletions.
1 change: 1 addition & 0 deletions galera/src/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ libgaleraxx_srcs = [
'gcs_action_source.cpp',
'galera_info.cpp',
'replicator.cpp',
'ist_proto.cpp',
'ist.cpp',
'gcs_dummy.cpp',
'saved_state.cpp' ]
Expand Down
31 changes: 31 additions & 0 deletions galera/src/ist_proto.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright (C) 2015 Codership Oy <[email protected]>
//

#include "ist_proto.hpp"

std::ostream&
galera::ist::operator<< (std::ostream& os, const Message& m)
{
os << "ver: " << m.version()
<< ", type: " << m.type()
<< ", flags: " << m.flags()
<< ", ctrl: " << m.ctrl()
<< ", len: " << m.len()
<< ", seqno: " << m.seqno();

return os;
}

void
galera::ist::Message::throw_invalid_version(uint8_t const v)
{
gu_throw_error(EPROTO) << "invalid protocol version " << int(v)
<< ", expected " << version_;
}

void
galera::ist::Message::throw_corrupted_header()
{
gu_throw_error(EINVAL) << "Corrupted IST message header: " << *this;
}
64 changes: 45 additions & 19 deletions galera/src/ist_proto.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2011-2014 Codership Oy <[email protected]>
// Copyright (C) 2011-2015 Codership Oy <[email protected]>
//

#ifndef GALERA_IST_PROTO_HPP
Expand Down Expand Up @@ -75,12 +75,12 @@ namespace galera
ctrl_ (ctrl )
{}

int version() const { return version_; }
Type type() const { return type_ ; }
uint8_t flags() const { return flags_ ; }
int8_t ctrl() const { return ctrl_ ; }
uint32_t len() const { return len_ ; }
wsrep_seqno_t seqno() const { return seqno_; }
int version() const { return version_; }
Type type() const { return type_ ; }
uint8_t flags() const { return flags_ ; }
int8_t ctrl() const { return ctrl_ ; }
uint32_t len() const { return len_ ; }
wsrep_seqno_t seqno() const { return seqno_ ; }

void set_type_seqno(Type t, wsrep_seqno_t s)
{
Expand All @@ -93,7 +93,7 @@ namespace galera
{
// header: version 1 byte, type 1 byte, flags 1 byte,
// ctrl field 1 byte, length 4 bytes, seqno 8 bytes
return 4 + 4 + 8;
return 4 + 4 + 8 + sizeof(checksum_t);
}
else
{
Expand All @@ -106,9 +106,9 @@ namespace galera
size_t serialize(gu::byte_t* buf, size_t buflen, size_t offset)const
{
assert(version_ >= 4);
#ifndef NDEBUG
size_t orig_offset(offset);
#endif // NDEBUG

size_t const orig_offset(offset);

offset = gu::serialize1(uint8_t(version_), buf, buflen, offset);
offset = gu::serialize1(uint8_t(type_), buf, buflen, offset);
offset = gu::serialize1(flags_, buf, buflen, offset);
Expand All @@ -118,6 +118,11 @@ namespace galera
{
offset = gu::serialize4(len_, buf, buflen, offset);
offset = gu::serialize8(seqno_, buf, buflen, offset);

*reinterpret_cast<checksum_t*>(buf + offset) =
htog_checksum(buf + orig_offset, offset - orig_offset);

offset += sizeof(checksum_t);
}
else /**/
{
Expand All @@ -134,18 +139,13 @@ namespace galera
size_t offset)
{
assert(version_ >= 4);
#ifndef NDEBUG

size_t orig_offset(offset);
#endif // NDEBUG

uint8_t u8;
offset = gu::unserialize1(buf, buflen, offset, u8);

if (u8 != version_)
{
gu_throw_error(EPROTO) << "invalid protocol version "
<< int(u8)
<< ", expected " << version_;
}
if (gu_unlikely(u8 != version_)) throw_invalid_version(u8);

offset = gu::unserialize1(buf, buflen, offset, u8);
type_ = static_cast<Message::Type>(u8);
Expand All @@ -156,6 +156,16 @@ namespace galera
{
offset = gu::unserialize4(buf, buflen, offset, len_);
offset = gu::unserialize8(buf, buflen, offset, seqno_);

checksum_t const computed(htog_checksum(buf + orig_offset,
offset-orig_offset));
const checksum_t* expected
(reinterpret_cast<const checksum_t*>(buf + offset));

if (gu_unlikely(computed != *expected))
throw_corrupted_header();

offset += sizeof(checksum_t);
}
else
{
Expand All @@ -178,8 +188,24 @@ namespace galera
uint8_t version_;
uint8_t flags_;
int8_t ctrl_;

typedef uint64_t checksum_t;

// returns endian-adjusted checksum of buf
static checksum_t
htog_checksum(const void* const buf, size_t const size)
{
return
gu::htog<checksum_t>(gu::FastHash::digest<checksum_t>(buf,
size));
}

void throw_invalid_version(uint8_t v);
void throw_corrupted_header();
};

std::ostream& operator<< (std::ostream& os, const Message& m);

class Handshake : public Message
{
public:
Expand Down
2 changes: 1 addition & 1 deletion galera/src/trx_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ galera::TrxHandleSlave::sanity_checks() const
(F_ROLLBACK | F_BEGIN)))
{
log_warn << "Both F_BEGIN and F_ROLLBACK are set on trx. "
<< "This trx should not have been replicated at all: " << this;
<< "This trx should not have been replicated at all: " << *this;
}
}

Expand Down
91 changes: 68 additions & 23 deletions galerautils/src/gu_digest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class MMH3
~MMH3 () {}

template <typename T> static int
digest (const void* const in, size_t size, T& out)
digest (const void* const in, size_t const size, T& out)
{
byte_t tmp[16];
gu_mmh128(in, size, tmp);
Expand All @@ -39,7 +39,7 @@ class MMH3

/* experimental */
template <typename T> static T
digest (const void* const in, size_t size)
digest (const void* const in, size_t const size)
{
switch (sizeof(T))
{
Expand All @@ -56,7 +56,7 @@ class MMH3
gu_mmh128_append (&ctx_, buf, size);
}

template <size_t size>
template <size_t const size>
int gather (void* const buf) const
{
GU_COMPILE_ASSERT(size >= 16, wrong_buf_size);
Expand Down Expand Up @@ -90,25 +90,25 @@ class MMH3
}; /* class MMH3 */

template <> inline int
MMH3::digest (const void* const in, size_t size, uint8_t& out)
MMH3::digest (const void* const in, size_t const size, uint8_t& out)
{
out = gu_mmh128_32(in, size); return sizeof(out);
}

template <> inline int
MMH3::digest (const void* const in, size_t size, uint16_t& out)
MMH3::digest (const void* const in, size_t const size, uint16_t& out)
{
out = gu_mmh128_32(in, size); return sizeof(out);
}

template <> inline int
MMH3::digest (const void* const in, size_t size, uint32_t& out)
MMH3::digest (const void* const in, size_t const size, uint32_t& out)
{
out = gu_mmh128_32(in, size); return sizeof(out);
}

template <> inline int
MMH3::digest (const void* const in, size_t size, uint64_t& out)
MMH3::digest (const void* const in, size_t const size, uint64_t& out)
{
out = gu_mmh128_64(in, size); return sizeof(out);
}
Expand All @@ -133,7 +133,7 @@ class FastHash
public:

template <typename T> static int
digest (const void* const in, size_t size, T& out)
digest (const void* const in, size_t const size, T& out)
{
byte_t tmp[16];
gu_fast_hash128(in, size, tmp);
Expand All @@ -144,43 +144,88 @@ class FastHash

/* experimental */
template <typename T> static T
digest (const void* const in, size_t size)
{
switch (sizeof(T))
{
case 1: return gu_fast_hash32(in, size);
case 2: return gu_fast_hash32(in, size);
case 4: return gu_fast_hash32(in, size);
case 8: return gu_fast_hash64(in, size);
}
throw;
}
digest (const void* const in, size_t const size);
/* The above is undefined and should cause linking error in case that
* template gets instantiated instead of specialized ones below.
* Unfortunately GU_COMPILE_ASSERT() is unusable here - causes compilation
* errors in every unit that only includes this header (probably because
* method is static).
* Perhaps templating the class would have done the trick */

}; /* FastHash */

template <> inline int
FastHash::digest (const void* const in, size_t size, uint8_t& out)
FastHash::digest (const void* const in, size_t const size, uint8_t& out)
{
out = gu_fast_hash32(in, size); return sizeof(out);
}

template <> inline int
FastHash::digest (const void* const in, size_t size, uint16_t& out)
FastHash::digest (const void* const in, size_t const size, uint16_t& out)
{
out = gu_fast_hash32(in, size); return sizeof(out);
}

template <> inline int
FastHash::digest (const void* const in, size_t size, uint32_t& out)
FastHash::digest (const void* const in, size_t const size, uint32_t& out)
{
out = gu_fast_hash32(in, size); return sizeof(out);
}

template <> inline int
FastHash::digest (const void* const in, size_t size, uint64_t& out)
FastHash::digest (const void* const in, size_t const size, uint64_t& out)
{
out = gu_fast_hash64(in, size); return sizeof(out);
}

template <> inline uint8_t
FastHash::digest<uint8_t>(const void* const in, size_t const size)
{
return gu_fast_hash32(in, size);
}

template <> inline uint16_t
FastHash::digest<uint16_t>(const void* const in, size_t const size)
{
return gu_fast_hash32(in, size);
}

template <> inline uint32_t
FastHash::digest<uint32_t>(const void* const in, size_t const size)
{
return gu_fast_hash32(in, size);
}

template <> inline uint64_t
FastHash::digest<uint64_t>(const void* const in, size_t const size)
{
return gu_fast_hash64(in, size);
}

template <> inline int8_t
FastHash::digest<int8_t>(const void* const in, size_t const size)
{
return gu_fast_hash32(in, size);
}

template <> inline int16_t
FastHash::digest<int16_t>(const void* const in, size_t const size)
{
return gu_fast_hash32(in, size);
}

template <> inline int32_t
FastHash::digest<int32_t>(const void* const in, size_t const size)
{
return gu_fast_hash32(in, size);
}

template <> inline int64_t
FastHash::digest<int64_t>(const void* const in, size_t const size)
{
return gu_fast_hash64(in, size);
}

} /* namespace gu */

#endif /* GU_DIGEST_HPP */
Loading

0 comments on commit b4934ce

Please sign in to comment.