Skip to content

Commit

Permalink
Adding batching
Browse files Browse the repository at this point in the history
  • Loading branch information
gamedev8 committed Feb 14, 2024
1 parent 62d2743 commit 603b6fe
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 69 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD 20)

set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")

add_executable(${PROJECT_NAME} main.cpp lib/MPMCQueue/MPMCQueue.hpp server/Hub.hpp server/MessageSubscriber.hpp server/MessagePublisher.hpp server/ClientConnection.hpp server/Consts.hpp server/EventLoopObject.hpp server/ServerConnection.hpp server/Enums.hpp server/MessageHandler.hpp server/SubscriberService.hpp server/StringUtils.hpp server/Message.hpp server/PublisherService.hpp server/MessageQueue.hpp)
add_executable(${PROJECT_NAME} main.cpp lib/MPMCQueue/MPMCQueue.hpp server/Hub.hpp server/MessageSubscriber.hpp server/MessagePublisher.hpp server/ClientConnection.hpp server/Consts.hpp server/EventLoopObject.hpp server/ServerConnection.hpp server/Enums.hpp server/MessageHandler.hpp server/SubscriberService.hpp server/StringUtils.hpp server/Message.hpp server/PublisherService.hpp server/MessageQueue.hpp server/MessageBatch.hpp)

find_package(PkgConfig REQUIRED)

Expand Down
1 change: 1 addition & 0 deletions server/Consts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace gazellemq::server {
static constexpr addrinfo GAI_HINTS{0, AF_INET, SOCK_STREAM, 0, 0, nullptr, nullptr, nullptr};
static constexpr auto MAX_READ_BUF = 8192;
static constexpr auto DEFAULT_BUF_LENGTH = 256;
static constexpr auto MAX_OUT_BUF = 8192;
static constexpr auto BIG_READ_BUF = 8192;
static constexpr auto LAST_MSG_BUF_SIZE = 4096;
Expand Down
4 changes: 2 additions & 2 deletions server/Message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace gazellemq::server {
public:
Message() noexcept = default;;

Message(std::string messageType, std::string const& buffer) noexcept
: messageType(std::move(messageType)), n(buffer.size()), content(buffer), i(0)
Message(std::string messageType, std::string const& messageContent) noexcept
: messageType(std::move(messageType)), n(messageContent.size()), content(messageContent), i(0)
{}

Message(Message const& other) noexcept {
Expand Down
174 changes: 174 additions & 0 deletions server/MessageBatch.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#ifndef GAZELLEMQ_SERVER_MESSAGEBATCH_HPP
#define GAZELLEMQ_SERVER_MESSAGEBATCH_HPP

#include <cstring>
#include <cstdlib>
#include <algorithm>
#include <cmath>

namespace gazellemq::server {
struct MessageBatch {
private:
static constexpr size_t SIZEOF_CHAR = sizeof(char);
static constexpr size_t GROW_UNTIL_LENGTH = DEFAULT_BUF_LENGTH * 128;
std::string messageType{};
char *buffer{nullptr};
size_t maxLength{};
size_t bufferLength{};
size_t bufferPosition{};
unsigned int nbMessages{};
bool isBusy{};
public:
MessageBatch(MessageBatch const &) = delete;

MessageBatch &operator=(MessageBatch const &) = delete;

MessageBatch(): maxLength(DEFAULT_BUF_LENGTH) {
buffer = (char *) calloc(maxLength, sizeof(char));
}

MessageBatch(MessageBatch &&other) noexcept {
this->swap(std::move(other));
}

MessageBatch &operator=(MessageBatch &&other) noexcept {
this->swap(std::move(other));
return *this;
}

~MessageBatch() {
if (buffer != nullptr) {
free(buffer);
}
}
private:
void swap(MessageBatch &&other) noexcept {
std::swap(this->buffer, other.buffer);
std::swap(this->maxLength, other.maxLength);
std::swap(this->bufferLength, other.bufferLength);
std::swap(this->bufferPosition, other.bufferPosition);
std::swap(this->messageType, other.messageType);
std::swap(this->nbMessages, other.nbMessages);
std::swap(this->isBusy, other.isBusy);
}
public:
/**
* Tries to append the passed in chars. Returns false if the chars cannot fit in the remaining buffer space.
* @param chars
* @param len
* @return
*/
bool append(std::string&& message) {
if (bufferLength >= GROW_UNTIL_LENGTH) {
return false;
}

if (message.size() + bufferLength > maxLength) {
size_t byteSize{static_cast<size_t>((std::ceil(static_cast<double>(maxLength + message.size()) / DEFAULT_BUF_LENGTH) * DEFAULT_BUF_LENGTH))};
buffer = (buffer == nullptr)
? (char *) calloc(byteSize, SIZEOF_CHAR)
: (char *) realloc(buffer, byteSize);
maxLength += message.size();
}

memmove(&buffer[bufferLength], message.c_str(), SIZEOF_CHAR * message.size());
bufferLength += message.size();

return true;
}

void copy(MessageBatch const& other) {
if (other.bufferLength + bufferLength > maxLength) {
size_t byteSize{static_cast<size_t>((std::ceil(static_cast<double>(maxLength + other.maxLength) / DEFAULT_BUF_LENGTH) * DEFAULT_BUF_LENGTH))};
buffer = (buffer == nullptr)
? (char *) calloc(byteSize, SIZEOF_CHAR)
: (char *) realloc(buffer, byteSize);
maxLength += other.bufferLength;
}

memmove(&buffer[bufferLength], other.buffer, SIZEOF_CHAR * other.bufferLength);
bufferLength += other.bufferLength;

this->messageType = other.messageType;
this->nbMessages = other.nbMessages;
this->bufferPosition = 0;
this->isBusy = false;
}

[[nodiscard]] MessageBatch copy() const {
MessageBatch retVal{};
retVal.copy(*this);
return std::move(retVal);
}

/**
* Returns the number of characters in the buffer
* @return
*/
[[nodiscard]] size_t getBufferLength() const {
return bufferLength;
}

/**
* Returns the messageType
* @return
*/
[[nodiscard]] std::string getMessageType() const {
return messageType;
}

/**
* Returns the buffer starting from the read position
* @return
*/
[[nodiscard]] char const* getBufferRemaining() const {
return &buffer[bufferPosition];
}

/**
* Advances the read position
* @param amount
*/
void advance(size_t amount) {
bufferPosition = (bufferPosition + amount) <= maxLength ? (bufferPosition + amount) : maxLength;
bufferLength = (bufferLength - amount) >= 0 ? (bufferLength - amount) : 0;
}

void clearForNextMessage() {
if (this->buffer != nullptr) {
memset(this->buffer, 0, this->maxLength);
}

this->bufferLength = 0;
this->bufferPosition = 0;
this->nbMessages = 0;
this->isBusy = false;
}

[[nodiscard]] bool getIsBusy() const {
return isBusy;
}

[[nodiscard]] bool isFull() const {
return bufferLength > GROW_UNTIL_LENGTH;
}

void setBusy() {
isBusy = true;
}

void setMessageType(std::string const& value) {
this->messageType = value;
}

[[nodiscard]] bool getIsDone() const {
return bufferLength == 0;
}

[[nodiscard]] bool hasContent() const {
return bufferLength > 0;
}
};
}

#endif //GAZELLEMQ_SERVER_MESSAGEBATCH_HPP
71 changes: 43 additions & 28 deletions server/MessagePublisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <functional>
#include "MessageHandler.hpp"
#include "Consts.hpp"
#include "MessageBatch.hpp"

namespace gazellemq::server {
class MessagePublisher : public MessageHandler {
Expand All @@ -18,22 +19,26 @@ namespace gazellemq::server {

char readBuffer[MAX_READ_BUF]{};

MessagePublisherState state{MessagePublisherState_notSet};
MessagePublisherState state{};

enum ParseState {
ParseState_messageType,
ParseState_messageContentLength,
ParseState_messageContent,
};

ParseState parseState{ParseState_messageType};
ParseState parseState{};

std::string messageType;
std::string messageLengthBuffer;
std::string messageContent;
size_t messageContentLength{};
size_t nbContentBytesRead{};
std::function<void(std::string const& messageType, std::string && buffer)> fnPushToQueue;

MessageBatch currentBatch{};
MessageBatch nextBatch{};

std::function<void(MessageBatch&& batch)> fnPushToQueue;
private:
/**
* Disconnects from the server
Expand Down Expand Up @@ -148,7 +153,23 @@ namespace gazellemq::server {
message.push_back('|');
message.append(messageContent);

fnPushToQueue(messageType, std::move(message));
if (currentBatch.getMessageType().empty()) {
currentBatch.setMessageType(messageType);
} else if ((currentBatch.getMessageType() != messageType) || (currentBatch.isFull())) {
fnPushToQueue(std::move(currentBatch));
currentBatch.clearForNextMessage();
currentBatch.setMessageType(messageType);
}

currentBatch.append(std::move(message));

if (i == (bufferLength-1)) {
fnPushToQueue(std::move(currentBatch));
currentBatch.clearForNextMessage();
}


// fnPushToQueue(messageType, std::move(message));
messageContentLength = 0;
nbContentBytesRead = 0;
messageLengthBuffer.clear();
Expand All @@ -160,49 +181,43 @@ namespace gazellemq::server {
}
}

public:
explicit MessagePublisher(
int fileDescriptor,
std::string&& name,
std::function<void(std::string const& messageType, std::string && buffer)>&& fnPushToQueue
)
:MessageHandler(fileDescriptor, std::move(name)), fnPushToQueue(std::move(fnPushToQueue))
{}

MessagePublisher(MessagePublisher &&other) noexcept: MessageHandler(other.fd) {
void swap(MessagePublisher &&other) {
std::swap(this->readBuffer, other.readBuffer);
std::swap(this->state, other.state);
std::swap(this->parseState, other.parseState);
std::swap(this->messageType, other.messageType);
std::swap(this->messageLengthBuffer, other.messageLengthBuffer);
std::swap(this->messageContentLength, other.messageContentLength);
std::swap(this->nbContentBytesRead, other.nbContentBytesRead);
//std::swap(this->fnPushToQueue, other.fnPushToQueue);
std::swap(this->fnPushToQueue, other.fnPushToQueue);
std::swap(this->currentBatch, other.currentBatch);
std::swap(this->nextBatch, other.nextBatch);

std::swap(this->clientName, other.clientName);
std::swap(this->fd, other.fd);
std::swap(this->isZombie, other.isZombie);
std::swap(this->isDisconnecting, other.isDisconnecting);
}
public:
explicit MessagePublisher(
int fileDescriptor,
std::string&& name,
std::function<void(MessageBatch&& buffer)>&& fnPushToQueue
)
: MessageHandler(fileDescriptor, std::move(name)), fnPushToQueue(std::move(fnPushToQueue))
{}

MessagePublisher(MessagePublisher &&other) noexcept: MessageHandler(other.fd) {
this->swap(std::move(other));
}

MessagePublisher& operator=(MessagePublisher &&other) noexcept {
std::swap(this->readBuffer, other.readBuffer);
std::swap(this->state, other.state);
std::swap(this->parseState, other.parseState);
std::swap(this->messageType, other.messageType);
std::swap(this->messageLengthBuffer, other.messageLengthBuffer);
std::swap(this->messageContentLength, other.messageContentLength);
std::swap(this->nbContentBytesRead, other.nbContentBytesRead);
std::swap(this->fnPushToQueue, other.fnPushToQueue);

std::swap(this->clientName, other.clientName);
std::swap(this->fd, other.fd);
std::swap(this->isZombie, other.isZombie);
std::swap(this->isDisconnecting, other.isDisconnecting);
this->swap(std::move(other));
return *this;
}

MessagePublisher(MessagePublisher const& other) = default;
MessagePublisher(MessagePublisher const& other) = delete;
MessagePublisher& operator=(MessagePublisher const& other) = delete;

~MessagePublisher() override = default;
Expand Down
6 changes: 3 additions & 3 deletions server/MessageQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@
namespace gazellemq::server {
class MessageQueue {
private:
rigtorp::MPMCQueue<Message> messageQueue;
rigtorp::MPMCQueue<MessageBatch> messageQueue;
public:
std::atomic_flag afQueue{false};
public:
explicit MessageQueue(size_t messageQueueDepth)
:messageQueue(messageQueueDepth)
{}
public:
void push_back(Message &&chunk) {
void push_back(MessageBatch &&chunk) {
messageQueue.push(std::move(chunk));
afQueue.test_and_set();
afQueue.notify_one();
}

bool try_pop(Message& chunk) {
bool try_pop(MessageBatch& chunk) {
return messageQueue.try_pop(chunk);
}
};
Expand Down
Loading

0 comments on commit 603b6fe

Please sign in to comment.