Skip to content

Commit

Permalink
Merge branch 'tickets/DM-45548' into tickets/DM-43715
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Feb 13, 2025
2 parents bf7c791 + 5d8d91e commit 0515db5
Show file tree
Hide file tree
Showing 179 changed files with 6,238 additions and 8,021 deletions.
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ add_subdirectory(mysql)
add_subdirectory(parser)
add_subdirectory(partition)
add_subdirectory(proto)
add_subdirectory(protojson)
add_subdirectory(proxy)
add_subdirectory(qana)
add_subdirectory(qdisp)
Expand All @@ -89,7 +90,6 @@ add_subdirectory(wpublish)
add_subdirectory(wsched)
add_subdirectory(www)
add_subdirectory(xrdlog)
add_subdirectory(xrdreq)
add_subdirectory(xrdsvc)

#-----------------------------------------------------------------------------
Expand All @@ -103,6 +103,7 @@ target_link_libraries(qserv_common PUBLIC
mysql
sql
util
protojson
)

install(
Expand Down Expand Up @@ -143,7 +144,6 @@ target_link_libraries(qserv_czar PUBLIC
rproc
qserv_css
qserv_meta
xrdreq
)

install(
Expand Down
2 changes: 0 additions & 2 deletions src/admin/templates/http/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ largestPriority = 3
vectRunSizes = 50:50:50:50
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
qReqPseudoFifoMaxRunning = 299

[replication]

Expand Down
10 changes: 4 additions & 6 deletions src/admin/templates/proxy/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,19 @@ notifyWorkersOnCzarRestart = 1
#[debug]
#chunkLimit = -1

# Please see qdisp/QdispPool.h QdispPool::QdispPool for more information
# Please see util/QdispPool.h QdispPool::QdispPool for more information
[qdisppool]
#size of the pool
poolSize = 50
poolSize = 1000
# Low numbers are higher priority. Largest priority 3 creates 4 priority queues 0, 1, 2, 3
# Must be greater than 0.
largestPriority = 3
# Maximum number of threads running for each queue. No spaces. Values separated by ':'
# Using largestPriority = 2 and vectRunsizes = 3:5:8
# queue 0 would have runSize 3, queue 1 would have runSize 5, and queue 2 would have runSize 8.
vectRunSizes = 50:50:50:50
vectRunSizes = 800:800:500:500
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
qReqPseudoFifoMaxRunning = 299
vectMinRunningSizes = 0:3:3:3

[replication]

Expand Down
7 changes: 3 additions & 4 deletions src/cconfig/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,17 @@ namespace lsst::qserv::cconfig {

std::mutex CzarConfig::_mtxOnInstance;

std::shared_ptr<CzarConfig> CzarConfig::_instance;
CzarConfig::Ptr CzarConfig::_instance;

std::shared_ptr<CzarConfig> CzarConfig::create(std::string const& configFileName,
std::string const& czarName) {
CzarConfig::Ptr CzarConfig::create(std::string const& configFileName, std::string const& czarName) {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
_instance = std::shared_ptr<CzarConfig>(new CzarConfig(util::ConfigStore(configFileName), czarName));
}
return _instance;
}

std::shared_ptr<CzarConfig> CzarConfig::instance() {
CzarConfig::Ptr CzarConfig::instance() {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created.");
Expand Down
61 changes: 55 additions & 6 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace lsst::qserv::cconfig {
*/
class CzarConfig {
public:
using Ptr = std::shared_ptr<CzarConfig>;
/**
* Create an instance of CzarConfig and load parameters from the specifid file.
* @note One has to call this method at least once before trying to obtain
Expand All @@ -63,15 +64,15 @@ class CzarConfig {
* @param czarName - the unique name of Czar.
* @return the shared pointer to the configuration object
*/
static std::shared_ptr<CzarConfig> create(std::string const& configFileName, std::string const& czarName);
static Ptr create(std::string const& configFileName, std::string const& czarName);

/**
* Get a pointer to an instance that was created by the last call to
* the method 'create'.
* @return the shared pointer to the configuration object
* @throws std::logic_error when attempting to call the bethod before creating an instance.
*/
static std::shared_ptr<CzarConfig> instance();
static Ptr instance();

CzarConfig() = delete;
CzarConfig(CzarConfig const&) = delete;
Expand Down Expand Up @@ -117,7 +118,7 @@ class CzarConfig {
*/
std::string const& getXrootdFrontendUrl() const { return _xrootdFrontendUrl->getVal(); }

/* Get the maximum number of threads for xrootd to use.
/* Get the maximum number of threads for xrootd to use. // TODO:UJ delete
*
* @return the maximum number of threads for xrootd to use.
*/
Expand Down Expand Up @@ -198,6 +199,31 @@ class CzarConfig {
/// the OOM situation.
unsigned int czarStatsRetainPeriodSec() const { return _czarStatsRetainPeriodSec->getVal(); }

/// A worker is considered fully ALIVE if the last update from the worker has been
/// heard in less than _activeWorkerTimeoutAliveSecs seconds.
int getActiveWorkerTimeoutAliveSecs() const { return _activeWorkerTimeoutAliveSecs->getVal(); }

/// A worker is considered DEAD if it hasn't been heard from in more than
/// _activeWorkerTimeoutDeadSecs.
int getActiveWorkerTimeoutDeadSecs() const { return _activeWorkerTimeoutDeadSecs->getVal(); }

/// Max lifetime of a message to be sent to an active worker. If the czar has been
/// trying to send a message to a worker and has failed for this many seconds,
/// it gives up at this point, removing elements of the message to save memory.
int getActiveWorkerMaxLifetimeSecs() const { return _activeWorkerMaxLifetimeSecs->getVal(); }

/// The maximum number of chunks (basically Jobs) allowed in a single UberJob.
int getUberJobMaxChunks() const { return _uberJobMaxChunks->getVal(); }

/// Return the maximum number of http connections to use for czar commands.
int getCommandMaxHttpConnections() const { return _commandMaxHttpConnections->getVal(); }

/// Return the sleep time (in milliseconds) between messages sent to active workers.
int getMonitorSleepTimeMilliSec() const { return _monitorSleepTimeMilliSec->getVal(); }

/// Return true if family map chunk distribution should depend on chunk size.
bool getFamilyMapUsingChunkSize() const { return _familyMapUsingChunkSize->getVal(); }

// Parameters of the Czar management service

std::string const& replicationInstanceId() const { return _replicationInstanceId->getVal(); }
Expand Down Expand Up @@ -293,7 +319,7 @@ class CzarConfig {
CVTIntPtr _resultMaxConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxconnections", notReq, 40);
CVTIntPtr _resultMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 8192);
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 2000);
CVTIntPtr _oldestResultKeptDays =
util::ConfigValTInt::create(_configValMap, "resultdb", "oldestResultKeptDays", notReq, 30);

Expand Down Expand Up @@ -344,10 +370,15 @@ class CzarConfig {
CVTIntPtr _qdispMaxPriority =
util::ConfigValTInt::create(_configValMap, "qdisppool", "largestPriority", notReq, 2);
CVTStrPtr _qdispVectRunSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "50:50:50:50");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "800:800:500:50");
CVTStrPtr _qdispVectMinRunningSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:1:3:3");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3");

// UberJobs
CVTIntPtr _uberJobMaxChunks =
util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 10000);

// TODO:UJ delete xrootd specific entries.
CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4);
CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create(
_configValMap, "tuning", "qMetaSecsBetweenChunkCompletionUpdates", notReq, 60);
Expand Down Expand Up @@ -385,6 +416,24 @@ class CzarConfig {
util::ConfigValTInt::create(_configValMap, "replication", "http_port", notReq, 0);
CVTUIntPtr _replicationNumHttpThreads =
util::ConfigValTUInt::create(_configValMap, "replication", "num_http_threads", notReq, 2);

// Active Worker
CVTIntPtr _activeWorkerTimeoutAliveSecs = // 5min
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutAliveSecs", notReq, 60 * 5);
CVTIntPtr _activeWorkerTimeoutDeadSecs = // 10min
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutDeadSecs", notReq, 60 * 10);
CVTIntPtr _activeWorkerMaxLifetimeSecs = // 1hr
util::ConfigValTInt::create(_configValMap, "activeworker", "maxLifetimeSecs", notReq, 60 * 60);
CVTIntPtr _monitorSleepTimeMilliSec = util::ConfigValTInt::create(
_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000);

// FamilyMap
CVTBoolPtr _familyMapUsingChunkSize =
util::ConfigValTBool::create(_configValMap, "familymap", "usingChunkSize", notReq, 0);

/// This may impact `_resultMaxHttpConnections` as too many connections may cause kernel memory issues.
CVTIntPtr _commandMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "uberjob", "commandMaxHttpConnections", notReq, 2000);
};

} // namespace lsst::qserv::cconfig
Expand Down
2 changes: 0 additions & 2 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ target_link_libraries(ccontrol PUBLIC
parser
replica
sphgeom
xrdreq
XrdCl
)

Expand All @@ -51,7 +50,6 @@ FUNCTION(ccontrol_tests)
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
Loading

0 comments on commit 0515db5

Please sign in to comment.