Skip to content

Commit

Permalink
Fixing issue with many subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
gamedev8 committed Feb 14, 2024
1 parent d1b3f3d commit a4d43ce
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 28 deletions.
9 changes: 5 additions & 4 deletions server/MessageHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace gazellemq::server {
int fd{};
protected:
bool isZombie{};
bool mustDisconnect{};
bool isDisconnecting{};

public:
explicit MessageHandler(int fileDescriptor)
Expand All @@ -34,7 +34,7 @@ namespace gazellemq::server {
* @return
*/
[[nodiscard]] bool getMustDisconnect() const {
return mustDisconnect;
return isDisconnecting;
}

/**
Expand All @@ -43,13 +43,14 @@ namespace gazellemq::server {
void markForRemoval() {
if (!isZombie) {
isZombie = true;
isDisconnecting = false;
clientName.append(" [Zombie]");
}
}

void forceDisconnect() {
if (!mustDisconnect) {
mustDisconnect = true;
if (!isDisconnecting) {
isDisconnecting = true;
}
}

Expand Down
22 changes: 15 additions & 7 deletions server/MessagePublisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace gazellemq::server {
MessagePublisherState_receiveData,
MessagePublisherState_disconnect,
MessagePublisherState_sendAck,
MessagePublisherState_zombie,
};

char readBuffer[MAX_READ_BUF]{};
Expand All @@ -38,17 +39,18 @@ namespace gazellemq::server {
* Disconnects from the server
* @param ring
*/
void beginDisconnect(struct io_uring* ring) {
virtual void beginDisconnect(struct io_uring* ring) {
io_uring_sqe* sqe = io_uring_get_sqe(ring);
io_uring_prep_close(sqe, fd);
state = MessagePublisherState_disconnect;
io_uring_sqe_set_data(sqe, this);

state = MessagePublisherState_disconnect;
io_uring_submit(ring);
}

void onDisconnected(struct io_uring *ring, int res) {
printf("A publisher disconnected\n");
printf("A publisher disconnected [%s]\n", clientName.c_str());
state = MessagePublisherState_zombie;
markForRemoval();
}

Expand Down Expand Up @@ -92,7 +94,7 @@ namespace gazellemq::server {
if (res <= 0) {
// The client has disconnected
beginDisconnect(ring);
} else if (!mustDisconnect && !isZombie) {
} else if (!isDisconnecting && !isZombie) {
streamMessage(readBuffer, res);

beginReceiveData(ring);
Expand Down Expand Up @@ -180,7 +182,7 @@ namespace gazellemq::server {
std::swap(this->clientName, other.clientName);
std::swap(this->fd, other.fd);
std::swap(this->isZombie, other.isZombie);
std::swap(this->mustDisconnect, other.mustDisconnect);
std::swap(this->isDisconnecting, other.isDisconnecting);
}

MessagePublisher& operator=(MessagePublisher &&other) noexcept {
Expand All @@ -196,7 +198,7 @@ namespace gazellemq::server {
std::swap(this->clientName, other.clientName);
std::swap(this->fd, other.fd);
std::swap(this->isZombie, other.isZombie);
std::swap(this->mustDisconnect, other.mustDisconnect);
std::swap(this->isDisconnecting, other.isDisconnecting);
return *this;
}

Expand All @@ -210,7 +212,6 @@ namespace gazellemq::server {
}

void disconnect(struct io_uring *ring) {
mustDisconnect = false;
beginDisconnect(ring);
}

Expand All @@ -220,6 +221,11 @@ namespace gazellemq::server {
* @param res
*/
void handleEvent(struct io_uring *ring, int res) override {
if (isDisconnecting) {
onDisconnected(ring, res);
return;
}

switch (state) {
case MessagePublisherState_notSet:
beginSendAck(ring);
Expand All @@ -233,6 +239,8 @@ namespace gazellemq::server {
case MessagePublisherState_disconnect:
onDisconnected(ring, res);
break;
default:
break;
}
}
};
Expand Down
43 changes: 39 additions & 4 deletions server/MessageSubscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ namespace gazellemq::server {
MessageSubscriberState_receiveSubscriptions,
MessageSubscriberState_sendAck,
MessageSubscriberState_ready,
MessageSubscriberState_sendData
MessageSubscriberState_sendData,
MessageSubscriberState_disconnect,
MessageSubscriberState_zombie,
};

MessageSubscriberState state{MessageSubscriberState_notSet};
Expand Down Expand Up @@ -90,7 +92,7 @@ namespace gazellemq::server {
std::swap(this->clientName, other.clientName);
std::swap(this->fd, other.fd);
std::swap(this->isZombie, other.isZombie);
std::swap(this->mustDisconnect, other.mustDisconnect);
std::swap(this->isDisconnecting, other.isDisconnecting);
}

MessageSubscriber& operator=(MessageSubscriber &&other) noexcept {
Expand All @@ -104,7 +106,7 @@ namespace gazellemq::server {
std::swap(this->clientName, other.clientName);
std::swap(this->fd, other.fd);
std::swap(this->isZombie, other.isZombie);
std::swap(this->mustDisconnect, other.mustDisconnect);
std::swap(this->isDisconnecting, other.isDisconnecting);

return *this;
}
Expand All @@ -122,12 +124,31 @@ namespace gazellemq::server {
return true;
}

/**
* Disconnects from the server
* @param ring
*/
virtual void beginDisconnect(struct io_uring* ring) {
io_uring_sqe* sqe = io_uring_get_sqe(ring);
io_uring_prep_close(sqe, fd);
state = MessageSubscriberState_disconnect;
io_uring_sqe_set_data(sqe, this);

io_uring_submit(ring);
}

void onDisconnected(struct io_uring *ring, int res) {
printf("A subscriber disconnected [%s]\n", clientName.c_str());
state = MessageSubscriberState_zombie;
markForRemoval();
}

/**
* Returns true if this subscriber is not transferring data.
* @return
*/
[[nodiscard]] bool isIdle() const {
return currentMessage.isDone() && pendingMessages.empty() && state == MessageSubscriberState_ready;
return currentMessage.isDone() && pendingMessages.empty() && (state == MessageSubscriberState_ready || state == MessageSubscriberState_zombie);
}

/**
Expand Down Expand Up @@ -191,6 +212,7 @@ namespace gazellemq::server {
}
} else {
printError("onSendCurrentMessageComplete", res);
beginDisconnect(ring);
}
}

Expand All @@ -214,7 +236,17 @@ namespace gazellemq::server {
printf("Subscriber connected - %s\n", clientName.c_str());
}

/**
* Does the appropriate action based on the current state
* @param ring
* @param res
*/
void handleEvent(struct io_uring *ring, int res) override {
if (isDisconnecting) {
onDisconnected(ring, res);
return;
}

switch (state) {
case MessageSubscriberState_notSet:
beginSendAck(ring);
Expand All @@ -228,6 +260,9 @@ namespace gazellemq::server {
case MessageSubscriberState_sendData:
onSendCurrentMessageComplete(ring, res);
break;
case MessageSubscriberState_disconnect:
onDisconnected(ring, res);
break;
default:
break;
}
Expand Down
44 changes: 31 additions & 13 deletions server/SubscriberService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,38 @@ namespace gazellemq::server {
}

/**
* Removes inactive subscribers/publishers, and runs handleEvent(...) on new publishers/subscribers
* Removes all zombie subscribers
*/
void onBeforeHandleMessages(io_uring* ring) {
void killZombies() {
subscribers.erase(
std::remove_if(subscribers.begin(), subscribers.end(), [](MessageSubscriber const& o) {
if (o.getIsZombie()) {
printf("Remove zombie subscriber [%s]\n", o.clientName.c_str());
}
return o.getIsZombie();
}), subscribers.end());
std::remove_if(subscribers.begin(), subscribers.end(), [](MessageSubscriber const& o) {
if (o.getIsZombie()) {
printf("Remove zombie subscriber [%s]\n", o.clientName.c_str());
}
return o.getIsZombie();
}), subscribers.end());
}

/**
* Initializes all new subscribers
* @param ring
*/
void initNewSubscribers(io_uring* ring) {
if (hasNewSubscribers.test()) {
hasNewSubscribers.clear();
std::for_each(subscribers.begin(), subscribers.end(), [ring](MessageSubscriber &o) {
return o.handleEvent(ring, 0);
});
}
}

/**
* Removes inactive subscribers/publishers, and runs handleEvent(...) on new publishers/subscribers
*/
void onBeforeHandleMessages(io_uring* ring) {
killZombies();
initNewSubscribers(ring);
}
public:
void go() {
using namespace std::chrono_literals;
Expand Down Expand Up @@ -83,11 +97,15 @@ namespace gazellemq::server {
messageQueue.afQueue.clear();

while (isRunning.test()) {
if (std::all_of(subscribers.begin(), subscribers.end(), [](MessageSubscriber const& o) {
return o.isIdle();
})) {
goto outer;
}
bool hasZombies{};
bool allIdle{true};
std::for_each(subscribers.begin(), subscribers.end(), [&hasZombies, &allIdle](MessageSubscriber const& o) {
if (!o.isIdle()) allIdle = false;
if (o.getIsZombie()) hasZombies = true;
});

if (hasZombies) killZombies();
if (allIdle) goto outer;

int ret = io_uring_wait_cqe_timeout(&ring, cqes.data(), &ts);
if (ret == -SIGILL || ret == TIMEOUT) {
Expand Down

0 comments on commit a4d43ce

Please sign in to comment.