Skip to content

Commit

Permalink
add locking to nmmsender which we need because navmerge --listener ad…
Browse files Browse the repository at this point in the history
…ds destinations dynamically
  • Loading branch information
berthubert committed Feb 3, 2024
1 parent c995b54 commit daa2771
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
9 changes: 6 additions & 3 deletions nmmsender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ void NMMSender::emitNMM(const NavMonMessage& nmm)

void NMMSender::emitNMM(const std::string& out)
{
std::lock_guard<std::mutex> l(d_destslock);
for(auto& d : d_dests) {
d->emitNMM(out, d_compress);
}
Expand Down Expand Up @@ -222,6 +223,7 @@ try
cout<<"Had a new connection from "<<remote.toStringWithPort()<<" on fd "<<fd<<endl;
auto nd = std::make_unique<Destination>();
nd->dst="source";
std::lock_guard<std::mutex> l(ns.d_destslock);
ns.d_dests.push_back(std::move(nd));

std::thread t(&NMMSender::sendTCPListenerThread, &ns, ns.d_dests.rbegin()->get(), fd, remote);
Expand Down Expand Up @@ -282,15 +284,16 @@ void NMMSender::sendTCPListenerThread(Destination* d, int fd, ComboAddress addr)
if (d_debug) { cerr<<humanTimeNow()<<" Sending thread for "<<d->dst <<" via "<< addr.toStringWithPort()<<" had error"; }

}
// need a lock here, but I think think this is the right one
std::lock_guard<std::mutex> mut(d->mut);
cerr<<"Done with serving client "<<addr.toStringWithPort()<<": "<<d_dests.size() <<endl;
std::lock_guard<std::mutex> l(d_destslock);


d_dests.erase(remove_if(d_dests.begin(), d_dests.end(), [d](const auto& a)
{
// cerr<<(void*) a.get()<< " ==? " <<(void*) d <<endl;
return a.get() == d;
}), d_dests.end());

cerr<<"Done with serving client "<<addr.toStringWithPort()<<": "<<d_dests.size() <<" destinations left"<<endl;
// cerr<<"Size now: "<<d_dests.size()<<endl;
// some kind of cleanup
}
Expand Down
4 changes: 4 additions & 0 deletions nmmsender.hh
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,22 @@ public:
{
auto d = std::make_unique<Destination>();
d->fd = fd;
std::lock_guard<std::mutex> l(d_destslock);
d_dests.push_back(std::move(d));
}
void addDestination(const std::string& dest)
{
auto d = std::make_unique<Destination>();
d->dst = dest;
std::lock_guard<std::mutex> l(d_destslock);
d_dests.push_back(std::move(d));
}
void addListener(const std::string& dest)
{
auto d = std::make_unique<Destination>();
d->dst = dest;
d->listener = true;
std::lock_guard<std::mutex> l(d_destslock);
d_dests.push_back(std::move(d));
}

Expand Down Expand Up @@ -78,6 +81,7 @@ public:
}

private:
std::mutex d_destslock;
std::vector<std::unique_ptr<Destination>> d_dests;
std::vector<std::unique_ptr<std::thread>> d_thread;
};

0 comments on commit daa2771

Please sign in to comment.