Skip to content

Commit

Permalink
fix: avoid crashing during the connection traversal (#2651)
Browse files Browse the repository at this point in the history
* fix: avoid crashing during the connection traversal

---------

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Feb 23, 2024
1 parent 9ed5513 commit 7f93a8f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
41 changes: 32 additions & 9 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,11 @@ void Connection::DispatchOperations::operator()(const InvalidationMessage& msg)

Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service)
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service), name_{} {
: io_buf_(kMinReadSize),
http_listener_(http_listener),
ssl_ctx_(ctx),
service_(service),
name_{} {
static atomic_uint32_t next_id{1};

protocol_ = protocol;
Expand Down Expand Up @@ -486,7 +490,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,

Connection::~Connection() {
#ifdef DFLY_USE_SSL
SSL_CTX_free(ctx_);
SSL_CTX_free(ssl_ctx_);
#endif
}

Expand Down Expand Up @@ -557,13 +561,18 @@ void Connection::HandleRequests() {

FiberSocketBase* peer = socket_.get();
#ifdef DFLY_USE_SSL
if (ctx_) {
if (ssl_ctx_) {
const bool no_tls_on_admin_port = absl::GetFlag(FLAGS_no_tls_on_admin_port);
if (!(IsPrivileged() && no_tls_on_admin_port)) {
unique_ptr<tls::TlsSocket> tls_sock = make_unique<tls::TlsSocket>(std::move(socket_));
tls_sock->InitSSL(ctx_);
FiberSocketBase::AcceptResult aresult = tls_sock->Accept();
SetSocket(tls_sock.release());
// Must be done atomically before the premption point in Accept so that at any
// point in time, the socket_ is defined.
{
FiberAtomicGuard fg;
unique_ptr<tls::TlsSocket> tls_sock = make_unique<tls::TlsSocket>(std::move(socket_));
tls_sock->InitSSL(ssl_ctx_);
SetSocket(tls_sock.release());
}
FiberSocketBase::AcceptResult aresult = socket_->Accept();

if (!aresult) {
LOG(WARNING) << "Error handshaking " << aresult.error().message();
Expand All @@ -582,13 +591,17 @@ void Connection::HandleRequests() {
if (http_res) {
if (*http_res) {
VLOG(1) << "HTTP1.1 identified";
is_http_ = true;
HttpConnection http_conn{http_listener_};
http_conn.SetSocket(peer);
auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer());
io_buf_.ConsumeInput(io_buf_.InputLen());
if (!ec) {
http_conn.HandleRequests();
}

// Release the ownership of the socket from http_conn so it would stay with
// this connection.
http_conn.ReleaseSocket();
} else {
cc_.reset(service_->CreateContext(peer, this));
Expand All @@ -615,7 +628,12 @@ void Connection::RegisterBreakHook(BreakerCb breaker_cb) {
}

std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() const {
CHECK(service_ && socket_);
if (!socket_) {
LOG(DFATAL) << "unexpected null socket_ "
<< " phase " << unsigned(phase_) << ", is_http: " << unsigned(is_http_);
return {};
}

CHECK_LT(unsigned(phase_), NUM_PHASES);

string before;
Expand All @@ -639,7 +657,12 @@ std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() co
static_assert(PHASE_NAMES[SHUTTING_DOWN] == "shutting_down");

absl::StrAppend(&before, "id=", id_, " addr=", re, " laddr=", le);
absl::StrAppend(&before, " fd=", socket_->native_handle(), " name=", name_);
absl::StrAppend(&before, " fd=", socket_->native_handle());
if (is_http_) {
absl::StrAppend(&before, " http=true");
} else {
absl::StrAppend(&before, " name=", name_);
}

string after;
absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id));
Expand Down
11 changes: 6 additions & 5 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class Connection : public util::Connection {
ConnectionStats* stats_ = nullptr;

util::HttpListenerBase* http_listener_;
SSL_CTX* ctx_;
SSL_CTX* ssl_ctx_;

ServiceInterface* service_;

Expand All @@ -416,12 +416,8 @@ class Connection : public util::Connection {
// Needed for access from different threads by EnsureAsyncMemoryBudget().
QueueBackpressure* queue_backpressure_;

// Connection migration vars, see RequestAsyncMigration() above.
bool migration_enabled_;
util::fb2::ProactorBase* migration_request_ = nullptr;

bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Pooled pipeline messages per-thread
// Aggregated while handling pipelines, gradually released while handling regular commands.
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;
Expand All @@ -431,6 +427,11 @@ class Connection : public util::Connection {

// a flag indicating whether the client has turned on client tracking.
bool tracking_enabled_ = false;
bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Connection migration vars, see RequestAsyncMigration() above.
bool migration_enabled_ = false;
bool is_http_ = false;
};

} // namespace facade

0 comments on commit 7f93a8f

Please sign in to comment.