Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IDS-9547: Max Processes Per Worker #126

Merged
merged 19 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ public int updateProcInstIdAndStartedByWorker(
return numUpdated;
}





/**
* Attempt to claim a process start request in the database.
*
Expand Down Expand Up @@ -356,8 +358,26 @@ public String getProcInstRowStatus(String uuid) {
return null;
}
}



public int getMaxProcsValueForWorker(String workerId) {

return jdbcTemplate.queryForObject(
"SELECT worker_max_num_running_procs FROM cws_worker WHERE id=?",
new Object[] {workerId}, Integer.class);
}

public int getProcDefKeyPerWorker(String workerId) {
return jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM cws_worker_proc_def WHERE worker_id=?",
new Object[] {workerId}, Integer.class);
}

public int getMaxInstancesValueForProcDef(String workerId, String procDefKey) {
return jdbcTemplate.queryForObject(
"SELECT max_instances FROM cws_worker_proc_def WHERE worker_id=? AND proc_def_key=?",
new Object[] {workerId, procDefKey}, Integer.class);
}

public Map<String,Object> getProcInstRow(String uuid) {
List<Map<String,Object>> list = jdbcTemplate.queryForList(
"SELECT * FROM cws_sched_worker_proc_inst " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class EngineDbService extends DbService implements InitializingBean {
@Value("${cws.install.type}") private String cwsInstallType;
@Value("${cws.worker.type}") private String cwsWorkerType;
@Value("${camunda.executor.service.max.pool.size}") private int maxExecutorServicePoolSize;
@Value("${worker.max.num.running.procs}") private int workerMaxNumRunningProcs;

private Logger log;

Expand Down Expand Up @@ -180,8 +181,8 @@ public void createOrUpdateWorkerRow(String lockOwner) {
numUpdated = jdbcTemplate.update(
"INSERT INTO cws_worker" +
" (id, lock_owner, name, install_directory, cws_install_type, cws_worker_type, " +
" status, job_executor_max_pool_size, created_time, last_heartbeat_time) " +
"VALUES (?,?,?,?,?,?,?,?,?,?)",
" status, job_executor_max_pool_size, worker_max_num_running_procs, created_time, last_heartbeat_time) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?)",
new Object[]{
workerId,
lockOwner,
Expand All @@ -191,6 +192,7 @@ public void createOrUpdateWorkerRow(String lockOwner) {
cwsWorkerType,
null, // status will be changed to null once the worker is fully initialized
maxExecutorServicePoolSize, // changeable later via the UI..
workerMaxNumRunningProcs,
tsNow, // created_time
tsNow // last_heartbeat_time
});
Expand Down
23 changes: 19 additions & 4 deletions cws-engine-service/src/main/java/jpl/cws/engine/WorkerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public class WorkerService implements InitializingBean {
@Value("${cws.engine.jobexecutor.enabled}") private boolean jobExecutorEnabled;

@Value("${cws.tomcat.lib}") private String cwsTomcatLib;


@Value("${worker.max.num.running.procs}") private int workerMaxNumRunningProcs;

private Logger log;

public static AtomicInteger processorExecuteCount = new AtomicInteger(0);
Expand Down Expand Up @@ -688,9 +690,22 @@ public List<Map<String,Object>> claimWithCounter(String limitToProcDefKey) {
//log.trace("remainder for " + procDefKey + " is " + remainder);
int queryLimit = Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainder);
//log.trace("queryLimit for " + procDefKey + " is " + queryLimit);

if (remainder > 0) {

// total current running proc running
int totalCurrentRunningProcsOnWorker = 0;
for (Entry<String,Integer> entry : processCounters.entrySet()) {
totalCurrentRunningProcsOnWorker += entry.getValue().intValue();
}

int MaxNumForProcsOnWorker = schedulerDbService.getMaxProcsValueForWorker(workerId);
int cappedQueryLimit = MaxNumForProcsOnWorker - totalCurrentRunningProcsOnWorker;

if (remainder > 0 && totalCurrentRunningProcsOnWorker <= MaxNumForProcsOnWorker) {
// claim for remainder (marks DB rows as "claimedByWorker")
if (queryLimit > cappedQueryLimit) {
queryLimit = cappedQueryLimit;
}

Map<String,List<String>> claimRowData =
schedulerDbService.claimHighestPriorityStartReq(
workerId, procDefKey, queryLimit);
Expand All @@ -707,7 +722,7 @@ public List<Map<String,Object>> claimWithCounter(String limitToProcDefKey) {
procStartReqUuidStartedThisWorker.addAll(claimRowData.get("claimedRowUuids"));
//log.debug("procStartReqUuidStartedThisWorker = " + procStartReqUuidStartedThisWorker);

log.debug("(CLAIMED " + claimed.size() + " / " + queryLimit + ", maxProcs=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")");
log.debug("(CLAIMED " + claimed.size() + " / " + queryLimit + ", maxProcs=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")" + ", workerMaxNumRunningProcs=" + MaxNumForProcsOnWorker);

claimUuids.addAll(claimed);
}
Expand Down
43 changes: 43 additions & 0 deletions cws-installer/src/main/java/jpl/cws/task/CwsInstaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public class CwsInstaller {
private static String user_provided_logstash;
private static String history_level;
private static String history_days_to_live;
private static String worker_max_num_running_procs;
private static String worker_abandoned_days;

private static String aws_default_region;
Expand Down Expand Up @@ -260,6 +261,7 @@ public static void main(String args[]) {
setupAwsSqs();
}
setupLimitToRemoveAbandonedWorkersByDays();
setupMaxLimitForNumberOfProcessesPerWorker();
genUniqueWorkerId();
setupStartupAutoregisterProcessDefs();
showInstallationInfo();
Expand Down Expand Up @@ -884,6 +886,42 @@ private static void setupHistoryDaysToLive() {
}
}


private static void setupMaxLimitForNumberOfProcessesPerWorker() {
worker_max_num_running_procs = getPreset("worker_max_num_running_procs");

if (worker_max_num_running_procs == null) {
worker_max_num_running_procs = getPreset("default_worker_max_num_running_procs");
}

// make sure preset is valid positive integer
try {
if (Integer.parseInt(worker_max_num_running_procs) <= 0) {
log.warn("Processes per worker value must be a positive integer. Got: " + worker_max_num_running_procs + ". Defaulting to 25.");
worker_max_num_running_procs = "25";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm okay with setting a default of 25 like this. I think the default should actually be based on something tangible like number of CPU cores, like is done elsewhere in the code.

Copy link
Collaborator Author

@voxparcxls voxparcxls Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker_max_num_running_procs = CORES + "";

commit: 8ca402b

}
} catch (NumberFormatException e) {
log.warn("Processes per worker value failed to parse as an integer. Got: " + worker_max_num_running_procs + ". Defaulting to 25.");
worker_max_num_running_procs = "25";
}

if (cws_installer_mode.equals("interactive")) {
boolean done = false;
while (!done) {
worker_max_num_running_procs = readLine("Enter the maximum number of processes that run on worker(s). " +
"Default is " + worker_max_num_running_procs + ": ", worker_max_num_running_procs);

// make sure input was valid
try {
done = Integer.parseInt(worker_max_num_running_procs) >= 1;
} catch (NumberFormatException e) {
// bad input, try again
}
}
}
}


private static void setupLimitToRemoveAbandonedWorkersByDays() {
worker_abandoned_days = getPreset("worker_abandoned_days");

Expand Down Expand Up @@ -1581,6 +1619,7 @@ private static void showInstallationInfo() {
print("CWS Notification Emails = " + cws_notification_emails);
print("CWS Token Expiration In Hours = " + cws_token_expiration_hours);
print("History Level = " + history_level);
print("Processes per Worker = " + worker_max_num_running_procs);
print("Days Remove Abandoned Workers = " + worker_abandoned_days);
if (installConsole) {
print("History Days to Live = " + history_days_to_live);
Expand Down Expand Up @@ -2309,6 +2348,7 @@ private static void updateWorkerProperties() throws IOException {
content = content.replace("__CWS_DB_PASSWORD__", cws_db_password);
content = content.replace("__CAMUNDA_EXEC_SVC_MAX_POOL_SIZE__", CORES+"");
content = content.replace("__AWS_DEFAULT_REGION__", aws_default_region);
content = content.replace("__CWS_WORKER_MAX_NUM_RUNNING_PROCS__", worker_max_num_running_procs);

// S3 Initiator might not be in use
if(aws_sqs_dispatcher_sqsUrl != null) {
Expand Down Expand Up @@ -2420,6 +2460,7 @@ private static void updateCwsUiProperties() throws IOException {
content = content.replace("__CWS_AUTH_SCHEME__", cws_auth_scheme);
content = content.replace("__CWS_HISTORY_DAYS_TO_LIVE__", history_days_to_live);
content = content.replace("__CWS_HISTORY_LEVEL__", history_level);
content = content.replace("__CWS_WORKER_MAX_NUM_RUNNING_PROCS__", worker_max_num_running_procs);
content = content.replace("__CWS_WORKER_ABANDONED_DAYS__", worker_abandoned_days);
content = content.replace("__AWS_DEFAULT_REGION__", aws_default_region);

Expand Down Expand Up @@ -2452,6 +2493,7 @@ private static void updateCwsUiConfig() throws IOException {
content = content.replace("__CWS_DB_PASSWORD__", cws_db_password);
content = content.replace("__JOB_EXECUTOR_ACTIVATE__", "false");
content = content.replace("__HISTORY_LEVEL__", history_level);
content = content.replace("__CWS_WORKER_MAX_NUM_RUNNING_PROCS__", worker_max_num_running_procs);
content = content.replace("__CWS_WORKER_ABANDONED_DAYS__", worker_abandoned_days);

content = content.replace("__CWS_AMQ_HOST__", cws_amq_host);
Expand Down Expand Up @@ -2808,6 +2850,7 @@ private static void writeOutConfigurationFile() throws IOException {
setPreset("user_provided_logstash", user_provided_logstash);
setPreset("history_level", history_level);
setPreset("history_days_to_live", history_days_to_live);
setPreset("worker_max_num_running_procs", worker_max_num_running_procs);
setPreset("worker_abandoned_days", worker_abandoned_days);
setPreset("aws_default_region", aws_default_region);
setPreset("aws_sqs_dispatcher_sqsUrl", aws_sqs_dispatcher_sqsUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,9 @@ public GsonUTCDateAdapter() {

Map<String, String> procs = cwsConsoleService.getWorkerNumRunningProcs();

log.info("*** LOG cwsConsoleService.getWorkerNumRunningProcs() : " + procs);


for (String workerId : procs.keySet()) {
int count = Integer.parseInt(procs.get(workerId));

Expand All @@ -968,7 +971,10 @@ public GsonUTCDateAdapter() {

procs.put(workerId, Integer.toString(total));
}


log.info("*** LOG return procs : " + procs);


return procs;
}

Expand Down
3 changes: 2 additions & 1 deletion dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ ADMIN_FIRSTNAME=${19}
ADMIN_LASTNAME=${20}
ADMIN_EMAIL=${21}
NUM_WORKERS=${22}
WORKER_ABANDONED_DAYS=${23}
WORKER_MAX_NUM_RUNNING_PROCS=${23}
WORKER_ABANDONED_DAYS=${24}

source ${ROOT}/utils.sh

Expand Down
1 change: 1 addition & 0 deletions install/cws-engine/cws-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ cws.auth.scheme=__CWS_AUTH_SCHEME__
cws.worker.id=__CWS_WORKER_ID__
cws.worker.type=__CWS_WORKER_TYPE__
cws.worker.abandoned.days=__CWS_WORKER_ABANDONED_DAYS__
worker.max.num.running.procs=__CWS_WORKER_MAX_NUM_RUNNING_PROCS__
cws.jmx.service.url=service:jmx:rmi:///jndi/rmi://localhost:__CWS_JMX_PORT__/jmxrmi
camunda.executor.service.max.pool.size=__CAMUNDA_EXEC_SVC_MAX_POOL_SIZE__
default.smtp.host=smtp.localhost
Expand Down
1 change: 1 addition & 0 deletions install/cws-ui/cws-ui.properties
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ cws.history.days.to.live=__CWS_HISTORY_DAYS_TO_LIVE__
cws.history.level=__CWS_HISTORY_LEVEL__

cws.worker.abandoned.days=__CWS_WORKER_ABANDONED_DAYS__
worker.max.num.running.procs=__CWS_WORKER_MAX_NUM_RUNNING_PROCS__
#
# AWS CLOUD PROPERTIES
#
Expand Down
3 changes: 3 additions & 0 deletions install/example-cws-configuration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ history_days_to_live=7
# worker table. Allowed values are whole integers 1 <= n
worker_abandoned_days=1

# worker_max_num_running_procs
worker_max_num_running_procs=25
voxparcxls marked this conversation as resolved.
Show resolved Hide resolved

# Specifies the number of hours that a CWS security token is valid for. After this
# amount of time it will expire, and the User will be required to authenticate
# again to get a new one.
Expand Down
3 changes: 2 additions & 1 deletion install/installerPresets.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ default_elasticsearch_port=9200
default_user_provided_logstash=n
default_history_level=full
default_history_days_to_live=7
default_worker_max_num_running_procs=25
default_worker_abandoned_days=1
default_aws_cloudwatch_endpoint=monitoring.us-west-1.amazonaws.com
default_metrics_publishing_interval=10
default_metrics_publishing_interval=10
1 change: 1 addition & 0 deletions install/sql/core.sql.template
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS `cws_worker` (
`cws_worker_type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`status` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`job_executor_max_pool_size` int(11) DEFAULT NULL,
`worker_max_num_running_procs` int(11) DEFAULT NULL,
`active_count` int(11) DEFAULT NULL,
`created_time` datetime DEFAULT NULL,
`last_heartbeat_time` datetime NOT NULL,
Expand Down
9 changes: 6 additions & 3 deletions utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ function check_java_requirements () {

# Creates a complete installation configuration file for a CWS console/worker to streamline development installs.
function auto_conf_data () {
set -x
INSTALL_TYPE=${1}

# VALUES SET FROM dev.sh
Expand All @@ -203,9 +204,10 @@ function auto_conf_data () {
ADMIN_LASTNAME=${21}
ADMIN_EMAIL=${22}
NUM_WORKERS=${23}
WORKER_ABANDONED_DAYS=${24}
WORKER_MAX_NUM_RUNNING_PROCS=${24}
WORKER_ABANDONED_DAYS=${25}

OUTPUT_FILE=${25}
OUTPUT_FILE=${26}

source ${ROOT}/utils.sh

Expand Down Expand Up @@ -297,6 +299,7 @@ function auto_conf_data () {
cws_amq_jmx_port=${CWS_AMQ_JMX_PORT}
cws_jmx_port=${CWS_JMX_PORT}
history_days_to_live=1
worker_max_num_running_procs=${WORKER_MAX_NUM_RUNNING_PROCS}
worker_abandoned_days=${WORKER_ABANDONED_DAYS}
notify_users_email=y
email_subject=[CWS] You have been assigned a task (CWS_TASK_NAME)
Expand All @@ -316,5 +319,5 @@ function auto_conf_data () {
cws_token_expiration_hours=240
user_provided_logstash=n
EOF

set +x
}