Skip to content

Commit

Permalink
Reformated.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Feb 6, 2025
1 parent 18c4bc0 commit 5209ddb
Show file tree
Hide file tree
Showing 35 changed files with 196 additions and 407 deletions.
11 changes: 4 additions & 7 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
bool last = false;
char const* next = inBuf;
char const* const end = inBuf + inBufSize;
LOGS(_log, LOG_LVL_TRACE,
context << " next=" << (uint64_t)next << " end=" << (uint64_t)end);
LOGS(_log, LOG_LVL_TRACE, context << " next=" << (uint64_t)next << " end=" << (uint64_t)end);
while ((next < end) && !last) {
if (exec->getCancelled()) {
throw runtime_error(context + " query was cancelled");
Expand Down Expand Up @@ -216,8 +215,8 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
msgSizeBytes = 0;
} else {
LOGS(_log, LOG_LVL_TRACE,
context << " headerCount=" << headerCount << " incomplete read diff="
<< (msgSizeBytes - msgBufNext));
context << " headerCount=" << headerCount
<< " incomplete read diff=" << (msgSizeBytes - msgBufNext));
}
}
}
Expand Down Expand Up @@ -271,9 +270,7 @@ shared_ptr<http::ClientConnPool> const& MergingHandler::_getHttpConnPool() {
MergingHandler::MergingHandler(std::shared_ptr<rproc::InfileMerger> merger, std::string const& tableName)
: _infileMerger{merger}, _tableName{tableName} {}

MergingHandler::~MergingHandler() {
LOGS(_log, LOG_LVL_TRACE, __func__ << " " << _tableName);
}
MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_TRACE, __func__ << " " << _tableName); }

void MergingHandler::errorFlush(std::string const& msg, int code) {
_setError(code, msg);
Expand Down
3 changes: 1 addition & 2 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ void UserQuerySelect::submit() {
}
}


void UserQuerySelect::buildAndSendUberJobs() {
// TODO:UJ Is special handling needed for the dummy chunk, 1234567890 ?
string const funcN("UserQuerySelect::" + string(__func__) + " QID=" + to_string(_qMetaQueryId));
Expand Down Expand Up @@ -411,7 +410,7 @@ void UserQuerySelect::buildAndSendUberJobs() {
// attempt count will reach max and the query will be cancelled
auto lambdaMissingChunk = [&](string const& msg) {
missingChunks.push_back(chunkId);
auto logLvl = (missingChunks.size()%1000 == 1) ? LOG_LVL_WARN : LOG_LVL_TRACE;
auto logLvl = (missingChunks.size() % 1000 == 1) ? LOG_LVL_WARN : LOG_LVL_TRACE;
LOGS(_log, logLvl, msg);
};

Expand Down
18 changes: 9 additions & 9 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,15 @@ Czar::Czar(string const& configFilePath, string const& czarName)
string vectMinRunningSizesStr = _czarConfig->getQdispVectMinRunningSizes();
vector<int> vectMinRunningSizes = util::String::parseToVectInt(vectMinRunningSizesStr, ":", 0);

{ //&&& delete this block
vectRunSizesStr = "200:200:50:50";
vectRunSizes = util::String::parseToVectInt(vectRunSizesStr, ":", 1);
qPoolSize = 200;
LOGS(_log, LOG_LVL_WARN,
"&&& qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes="
<< vectRunSizesStr << " -> " << util::prettyCharList(vectRunSizes)
<< " vectMinRunningSizes=" << vectMinRunningSizesStr << " -> "
<< util::prettyCharList(vectMinRunningSizes));
{ //&&& delete this block
vectRunSizesStr = "200:200:50:50";
vectRunSizes = util::String::parseToVectInt(vectRunSizesStr, ":", 1);
qPoolSize = 200;
LOGS(_log, LOG_LVL_WARN,
"&&& qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes="
<< vectRunSizesStr << " -> " << util::prettyCharList(vectRunSizes)
<< " vectMinRunningSizes=" << vectMinRunningSizesStr << " -> "
<< util::prettyCharList(vectMinRunningSizes));
}
LOGS(_log, LOG_LVL_INFO,
" qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes="
Expand Down
1 change: 0 additions & 1 deletion src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ bool CzarFamilyMap::_read() {
return true;
}


std::shared_ptr<CzarFamilyMap::FamilyMapType> CzarFamilyMap::makeNewMaps(
qmeta::QMetaChunkMap const& qChunkMap) {
// Create new maps.
Expand Down
1 change: 0 additions & 1 deletion src/qdisp/Executive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ Executive::~Executive() {
}
if (_asyncTimer != nullptr) {
_asyncTimer->cancel();
//&&& qdisp::CzarStats::get()->untrackQueryProgress(_id);
}
qdisp::CzarStats::get()->untrackQueryProgress(_id);
}
Expand Down
2 changes: 1 addition & 1 deletion src/qdisp/Executive.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class Executive : public std::enable_shared_from_this<Executive> {

// The below value should probably be based on the user query, with longer sleeps for slower queries.
int getAttemptSleepSeconds() const { return 15; } // As above or until added to config file.
int getMaxAttempts() const { return 5; } // TODO:UJ Should be set by config
int getMaxAttempts() const { return 5; } // TODO:UJ Should be set by config

/// Calling this indicates all Jobs for this user query have been created.
void setAllJobsCreated() { _allJobsCreated = true; }
Expand Down
12 changes: 4 additions & 8 deletions src/qdisp/JobQuery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,13 @@ JobQuery::JobQuery(Executive::Ptr const& executive, JobDescription::Ptr const& j
LOGS(_log, LOG_LVL_TRACE, "JobQuery desc=" << _jobDescription);
}

JobQuery::~JobQuery() {
LOGS(_log, LOG_LVL_TRACE, "~JobQuery QID=" << _idStr);
}
JobQuery::~JobQuery() { LOGS(_log, LOG_LVL_TRACE, "~JobQuery QID=" << _idStr); }

/// Cancel response handling. Return true if this is the first time cancel has been called.
bool JobQuery::cancel(bool superfluous) {
QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getJobId());
LOGS(_log, LOG_LVL_DEBUG, "JobQuery::cancel() " << superfluous);
LOGS(_log, LOG_LVL_WARN, "&&&JobQuery::cancel() " << superfluous);
if (_cancelled.exchange(true) == false) {
LOGS(_log, LOG_LVL_INFO, "JobQuery::cancel() " << superfluous);
VMUTEX_NOT_HELD(_jqMtx);
lock_guard lock(_jqMtx);

Expand All @@ -83,7 +80,7 @@ bool JobQuery::cancel(bool superfluous) {
}
return true;
}
LOGS(_log, LOG_LVL_TRACE, "cancel, skipping, already cancelled.");
LOGS(_log, LOG_LVL_TRACE, "JobQuery::cancel, skipping, already cancelled.");
return false;
}

Expand Down Expand Up @@ -129,8 +126,7 @@ bool JobQuery::unassignFromUberJob(UberJobId ujId) {
_uberJobId = -1;

auto exec = _executive.lock();
// Do not increase the count as it should have been increased when the job was started.
//&&&_jobDescription->incrAttemptCount(exec, false);
// Do not increase the attempt count as it should have been increased when the job was started.
return true;
}

Expand Down
1 change: 0 additions & 1 deletion src/qdisp/JobQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class JobQuery {
QueryId getQueryId() const { return _qid; }
JobId getJobId() const { return _jobDescription->id(); }
std::string const& getIdStr() const { return _idStr; }
//&&&std::shared_ptr<ResponseHandler> getRespHandler() { return _jobDescription->respHandler(); }
JobDescription::Ptr getDescription() { return _jobDescription; }
qmeta::JobStatus::Ptr getStatus() { return _jobStatus; }

Expand Down
57 changes: 10 additions & 47 deletions src/qdisp/UberJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
#include "qproc/ChunkQuerySpec.h"
#include "util/Bug.h"
#include "util/common.h"
#include "util/Histogram.h" //&&&
#include "util/QdispPool.h"
#include "util/InstanceCount.h"

// LSST headers
#include "lsst/log/Log.h"
Expand Down Expand Up @@ -80,7 +78,7 @@ UberJob::UberJob(Executive::Ptr const& executive, std::shared_ptr<ResponseHandle
_rowLimit(rowLimit),
_idStr("QID=" + to_string(_queryId) + "_ujId=" + to_string(uberJobId)),
_workerData(workerData) {
LOGS(_log, LOG_LVL_WARN, _idStr << " &&& created");
LOGS(_log, LOG_LVL_TRACE, _idStr << " created");
}

void UberJob::_setup() {
Expand All @@ -102,16 +100,16 @@ bool UberJob::addJob(JobQuery::Ptr const& job) {
return success;
}

util::HistogramRolling histoRunUberJob("&&&uj histoRunUberJob", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000);
util::HistogramRolling histoUJSerialize("&&&uj histoUJSerialize", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000);

void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelled
void UberJob::runUberJob() {
LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " start");
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj runuj start");
// Build the uberjob payload for each job.
nlohmann::json uj;
unique_lock<mutex> jobsLock(_jobsMtx);
auto exec = _executive.lock();
if (exec == nullptr || exec->getCancelled()) {
LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " executive shutdown");
return;
}

// Send the uberjob to the worker
auto const method = http::Method::POST;
Expand All @@ -129,35 +127,15 @@ void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelle
auto uberJobMsg = protojson::UberJobMsg::create(
http::MetaModule::version, czarConfig->replicationInstanceId(), czarConfig->replicationAuthKey(),
czInfo, _wContactInfo, _queryId, _uberJobId, _rowLimit, maxTableSizeMB, scanInfoPtr, _jobs);
auto startserialize = CLOCK::now(); //&&&
json request = uberJobMsg->serializeJson();
auto endserialize = CLOCK::now(); //&&&
std::chrono::duration<double> secsserialize = endserialize - startserialize; // &&&
histoUJSerialize.addEntry(endserialize, secsserialize.count()); //&&&
LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoUJSerialize.getString(""));

jobsLock.unlock(); // unlock so other _jobsMtx threads can advance while this waits for transmit
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj c");
/* &&&
{ // &&& testing only, delete
auto parsedReq = protojson::UberJobMsg::createFromJson(request);
json jsParsedReq = parsedReq->serializeJson();
if (request == jsParsedReq) {
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj YAY!!! ");
} else {
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj noYAY request != jsParsedReq");
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj request=" << request);
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj jsParsedReq=" << jsParsedReq);
}
}
*/

LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " REQ " << request);
LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " REQ " << request);
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
LOGS(_log, LOG_LVL_TRACE,
cName(__func__) << " czarPost url=" << url << " request=" << request.dump()
<< " headers=" << headers[0]);
auto startclient = CLOCK::now(); //&&&

auto commandHttpPool = czar::Czar::getCzar()->getCommandHttpPool();
http::ClientConfig clientConfig;
Expand All @@ -170,10 +148,9 @@ void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelle
bool transmitSuccess = false;
string exceptionWhat;
try {
util::InstanceCount ic("runUJ_T&&&priQ");
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj sending");
LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " sending");
json const response = client.readAsJson();
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj worker recv");
LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " worker recv");
if (0 != response.at("success").get<int>()) {
transmitSuccess = true;
} else {
Expand All @@ -183,10 +160,6 @@ void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelle
LOGS(_log, LOG_LVL_WARN, requestContext + " ujresponse failed, ex: " + ex.what());
exceptionWhat = ex.what();
}
auto endclient = CLOCK::now(); //&&&
std::chrono::duration<double> secsclient = endclient - startclient; // &&&
histoRunUberJob.addEntry(endclient, secsclient.count()); //&&&
LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoRunUberJob.getString(""));
if (!transmitSuccess) {
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " transmit failure, try to send jobs elsewhere");
_unassignJobs(); // locks _jobsMtx
Expand All @@ -196,7 +169,6 @@ void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelle
} else {
setStatusIfOk(qmeta::JobStatus::REQUEST, cName(__func__) + " transmitSuccess"); // locks _jobsMtx
}
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj runuj end");
return;
}

Expand Down Expand Up @@ -269,7 +241,6 @@ bool UberJob::_setStatusIfOk(qmeta::JobStatus::State newState, string const& msg

void UberJob::callMarkCompleteFunc(bool success) {
LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " success=" << success);
LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& success=" << success);

lock_guard<mutex> lck(_jobsMtx);
// Need to set this uberJob's status, however exec->markCompleted will set
Expand All @@ -292,8 +263,6 @@ void UberJob::callMarkCompleteFunc(bool success) {
_jobs.clear();
}

util::HistogramRolling histoQueImp("&&&uj histoQueImp", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000);

json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_t fileSize) {
LOGS(_log, LOG_LVL_DEBUG,
cName(__func__) << " fileUrl=" << fileUrl << " rowCount=" << rowCount << " fileSize=" << fileSize);
Expand Down Expand Up @@ -327,15 +296,10 @@ json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_
}

weak_ptr<UberJob> ujThis = weak_from_this();
auto startQImp = CLOCK::now(); // &&&

// fileCollectFunc will be put on the queue to run later.
string const idStr = _idStr;
auto fileCollectFunc = [ujThis, fileUrl, rowCount, idStr, startQImp](util::CmdData*) {
auto endQImp = CLOCK::now(); //&&&
std::chrono::duration<double> secsQImp = endQImp - startQImp; // &&&
histoQueImp.addEntry(endQImp, secsQImp.count()); //&&&
LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoQueImp.getString(""));
auto fileCollectFunc = [ujThis, fileUrl, rowCount, idStr](util::CmdData*) {
auto ujPtr = ujThis.lock();
if (ujPtr == nullptr) {
LOGS(_log, LOG_LVL_DEBUG,
Expand Down Expand Up @@ -445,7 +409,6 @@ json UberJob::_importResultError(bool shouldCancel, string const& errorType, str

void UberJob::_importResultFinish(uint64_t resultRows) {
LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " start");
LOGS(_log, LOG_LVL_INFO, cName(__func__) << " &&& start");

auto exec = _executive.lock();
if (exec == nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion src/qdisp/UberJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class UberJob : public std::enable_shared_from_this<UberJob> {
czar::CzarChunkMap::WorkerChunksData::Ptr _workerData; // TODO:UJ this may not be needed

// Contact information for the target worker.
protojson::WorkerContactInfo::Ptr _wContactInfo; // Change to ActiveWorker &&& ???
protojson::WorkerContactInfo::Ptr _wContactInfo; // TODO:UJ Maybe change to ActiveWorker?
};

} // namespace lsst::qserv::qdisp
Expand Down
23 changes: 12 additions & 11 deletions src/qhttp/Server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,18 @@ void Server::_accept() {
return;
}
try {
if (!ec) {
LOGS(_log, LOG_LVL_INFO, "&&&qhttp::Server::_accept ok");
LOGLS_INFO(_log, logger(self) << logger(socket) << "connect from " << socket->remote_endpoint());
boost::system::error_code ignore;
socket->set_option(ip::tcp::no_delay(true), ignore);
self->_readRequest(socket);
} else {
LOGLS_ERROR(_log, logger(self) << "accept failed: " << ec.message());
}
if (!ec) {
LOGS(_log, LOG_LVL_INFO, "&&&qhttp::Server::_accept ok");
LOGLS_INFO(_log, logger(self)
<< logger(socket) << "connect from " << socket->remote_endpoint());
boost::system::error_code ignore;
socket->set_option(ip::tcp::no_delay(true), ignore);
self->_readRequest(socket);
} else {
LOGLS_ERROR(_log, logger(self) << "accept failed: " << ec.message());
}
} catch (boost::system::system_error const& bEx) {
LOGS(_log, LOG_LVL_ERROR, "qhttp::Server::_accept lambda threw " << bEx.what());
LOGS(_log, LOG_LVL_ERROR, "qhttp::Server::_accept lambda threw " << bEx.what());
}
self->_accept(); // start accept again for the next incoming connection
});
Expand Down Expand Up @@ -218,7 +219,7 @@ void Server::_readRequest(std::shared_ptr<ip::tcp::socket> socket) {
chrono::duration<double, std::milli> elapsed = chrono::steady_clock::now() - startTime;
string logStr;
if (LOG_CHECK_LVL(_log, LOG_LVL_INFO)) {
logStr = string("request duration ") + to_string(elapsed.count()) + "ms";
logStr = string("request duration ") + to_string(elapsed.count()) + "ms";
}
if (!ec && *reuseSocket) {
LOGLS_INFO(_log, logger(self) << logger(socket) << logStr << " lingering");
Expand Down
2 changes: 0 additions & 2 deletions src/qproc/ChunkQuerySpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,13 @@ class ChunkQuerySpec {
using Ptr = std::shared_ptr<ChunkQuerySpec>;

ChunkQuerySpec() {}
//&&&ChunkQuerySpec(std::string const& db_, int chunkId_, protojson::ScanInfo const& scanInfo_,
ChunkQuerySpec(std::string const& db_, int chunkId_, protojson::ScanInfo::Ptr const& scanInfo_,
bool scanInteractive_)
: db(db_), chunkId(chunkId_), scanInfo(scanInfo_), scanInteractive(scanInteractive_) {}

// Contents could change
std::string db{""}; ///< dominant db
int chunkId{0};
//&&&protojson::ScanInfo scanInfo; ///< shared-scan candidates
protojson::ScanInfo::Ptr scanInfo; ///< shared-scan candidates
// Consider saving subChunkTable templates, and substituting the chunkIds
// and subChunkIds into them on-the-fly.
Expand Down
8 changes: 4 additions & 4 deletions src/replica/contr/HttpQservMonitorModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,10 @@ wbase::TaskSelector HttpQservMonitorModule::_translateTaskSelector(string const&
}
}
selector.maxTasks = query().optionalUInt("max_tasks", 0);
trace(func, "include_tasks=" + replica::bool2str(selector.includeTasks)
+ " query_ids=" + util::String::toString(selector.queryIds)
+ " task_states=" + util::String::toString(selector.taskStates)
+ " max_tasks=" + to_string(selector.maxTasks));
trace(func, "include_tasks=" + replica::bool2str(selector.includeTasks) +
" query_ids=" + util::String::toString(selector.queryIds) +
" task_states=" + util::String::toString(selector.taskStates) +
" max_tasks=" + to_string(selector.maxTasks));
return selector;
}

Expand Down
Loading

0 comments on commit 5209ddb

Please sign in to comment.