Skip to content

Commit

Permalink
delete useless code: event loop group and config parser
Browse files Browse the repository at this point in the history
  • Loading branch information
loveyacper committed Sep 3, 2021
1 parent 7e27471 commit 7c7e38e
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 381 deletions.
82 changes: 59 additions & 23 deletions net/Application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
#include <cstring>
#include <cstdio>

#include <memory>
#include <mutex>
#include <condition_variable>

#include "util/Util.h"
#include "Application.h"
#include "AnanasLogo.h"
#include "Socket.h"
#include "EventLoopGroup.h"
#include "AnanasDebug.h"

static void SignalHandler(int num) {
Expand All @@ -24,11 +27,6 @@ static void InitSignal() {
// ignore sigpipe
sig.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &sig, NULL);

#ifdef ANANAS_LOGO
// logo
printf("%s\n", ananas::internal::logo);
#endif
}


Expand All @@ -44,12 +42,14 @@ Application& Application::Instance() {

void Application::SetNumOfWorker(size_t num) {
assert (state_ == State::eS_None);
workerGroup_->SetNumOfEventLoop(num);
assert (num <= 512);

numLoop_ = num;
}

size_t Application::NumOfWorker() const {
// plus one : the baseLoop
return 1 + workerGroup_->Size();
return 1 + numLoop_;
}

void Application::Run(int ac, char* av[]) {
Expand All @@ -71,25 +71,23 @@ void Application::Run(int ac, char* av[]) {
}
}

state_ = State::eS_Started;
workerGroup_->Start();

// start loops in thread pool
_StartWorkers();
BaseLoop()->Run();

baseGroup_->Wait();
printf("Stopped BaseEventLoopGroup ...\n");
printf("Stopped BaseEventLoop...\n");

workerGroup_->Wait();
printf("Stopped WorkerEventLoopGroup...\n");
pool_.JoinAll();
loops_.clear();
numLoop_ = 0;
printf("Stopped WorkerEventLoops...\n");
}

void Application::Exit() {
if (state_ == State::eS_Stopped)
return;

state_ = State::eS_Stopped;
baseGroup_->Stop();
workerGroup_->Stop();
}

bool Application::IsExit() const {
Expand Down Expand Up @@ -184,20 +182,58 @@ void Application::Connect(const char* ip,


EventLoop* Application::Next() {
//assert (BaseLoop()->IsInSameLoop());
auto loop = workerGroup_->Next();
if (state_ != State::eS_Started)
return nullptr;

if (loops_.empty())
return nullptr;

auto &loop = loops_[currentLoop_++ % loops_.size()];
if (loop)
return loop;
return loop.get();

return BaseLoop();
}

void Application::_StartWorkers() {
// only called by main thread
assert (state_ == State::eS_None);

std::mutex mutex;
std::condition_variable cond;

pool_.SetNumOfThreads(numLoop_);
for (size_t i = 0; i < numLoop_; ++i) {
pool_.Execute([this, &mutex, &cond]() {
EventLoop* loop(new EventLoop);

{
std::unique_lock<std::mutex> guard(mutex);
loops_.push_back(std::unique_ptr<EventLoop>(loop));
if (loops_.size() == numLoop_)
cond.notify_one();
}

loop->Run();
});
}

std::unique_lock<std::mutex> guard(mutex);
cond.wait(guard, [this] () {
return loops_.size() == numLoop_;
});

state_ = State::eS_Started;
}

Application::Application() :
baseGroup_(new internal::EventLoopGroup(0)),
base_(baseGroup_.get()),
workerGroup_(new internal::EventLoopGroup(0)),
state_ {State::eS_None} {
InitSignal();

// logo
fprintf(stdout, "%s", "\033[1;36;40m");
printf("%s\n", ananas::internal::logo);
fprintf(stdout, "%s", "\033[0m");
}

void Application::_DefaultBindCallback(bool succ, const SocketAddr& listenAddr) {
Expand Down
17 changes: 8 additions & 9 deletions net/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@
#include "Typedefs.h"
#include "Poller.h"
#include "ananas/util/Timer.h"
#include "ananas/util/ThreadPool.h"

///@brief Namespace ananas
namespace ananas {

namespace internal {
class EventLoopGroup;
}

/// @file Application.h

///@brief Abstract for a process.
Expand Down Expand Up @@ -137,13 +133,16 @@ class Application {
private:
Application();

// baseGroup_ is empty, just a placeholder container for base_.
std::unique_ptr<internal::EventLoopGroup> baseGroup_;
void _StartWorkers();

// The default loop for accept/connect, or as worker if workerGroup_ is empty
// The default loop for accept/connect, or as worker if empty worker pool
EventLoop base_;

std::unique_ptr<internal::EventLoopGroup> workerGroup_;
// worker thread pool
ThreadPool pool_;
std::vector<std::unique_ptr<EventLoop>> loops_;
size_t numLoop_ {0};
mutable std::atomic<size_t> currentLoop_ {0};

enum class State {
eS_None,
Expand Down
9 changes: 5 additions & 4 deletions net/EventLoop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <thread>

#include "EventLoop.h"
#include "EventLoopGroup.h"
#include "Application.h"

#include "Acceptor.h"
#include "Connection.h"
Expand Down Expand Up @@ -34,8 +34,7 @@ void EventLoop::SetMaxOpenFd(rlim_t maxfdPlus1) {
s_maxOpenFdPlus1 = maxfdPlus1;
}

EventLoop::EventLoop(internal::EventLoopGroup* group) :
group_(group) {
EventLoop::EventLoop() {
assert (!g_thisLoop && "There must be only one EventLoop per thread");
g_thisLoop = this;

Expand Down Expand Up @@ -204,12 +203,14 @@ bool EventLoop::Cancel(TimerId id) {
}

void EventLoop::Run() {
assert (this->InThisLoop());

const DurationMs kDefaultPollTime(10);
const DurationMs kMinPollTime(1);

Register(internal::eET_Read, notifier_);

while (!group_->IsStopped()) {
while (!Application::Instance().IsExit()) {
auto timeout = std::min(kDefaultPollTime, timers_.NearestTimer());
timeout = std::max(kMinPollTime, timeout);

Expand Down
8 changes: 2 additions & 6 deletions net/EventLoop.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ struct SocketAddr;

namespace internal {
class Connector;
class EventLoopGroup;
}

///@brief EventLoop class
Expand All @@ -29,9 +28,7 @@ class EventLoopGroup;
class EventLoop : public Scheduler {
public:
///@brief Constructor
///@param group The group belong to
explicit
EventLoop(internal::EventLoopGroup* group);
EventLoop();
~EventLoop();

EventLoop(const EventLoop& ) = delete;
Expand Down Expand Up @@ -118,7 +115,7 @@ class EventLoop : public Scheduler {

///@brief Run application
///
/// It's a infinite loop, until belonging EventLoopGroup stopped
/// It's a infinite loop, until Application stopped
void Run();

bool Register(int events, std::shared_ptr<internal::Channel> src);
Expand Down Expand Up @@ -155,7 +152,6 @@ class EventLoop : public Scheduler {
private:
bool _Loop(DurationMs timeout);

internal::EventLoopGroup* group_;
std::unique_ptr<internal::Poller> poller_;

std::shared_ptr<internal::PipeChannel> notifier_;
Expand Down
83 changes: 0 additions & 83 deletions net/EventLoopGroup.cc

This file was deleted.

62 changes: 0 additions & 62 deletions net/EventLoopGroup.h

This file was deleted.

Loading

0 comments on commit 7c7e38e

Please sign in to comment.