Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IDS-9547: Max Processes Per Worker #126

Merged
merged 19 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,15 @@ ES_PASSWORD="na"
# Num of workers to start. 1 is the minimum.
NUM_WORKERS=1

# Default value is 16. 1 is the mininum.
WORKER_MAX_NUM_RUNNING_PROCS=16

# Default value is 1. Specifies the number of days (int) until the
# abandoned workers in the cws_workers database table are cleaned out.
WORKER_ABANDONED_DAYS=1

# Run the dev script
./dev.sh `pwd` ${USER} ${DB_TYPE} ${DB_HOST} ${DB_PORT} ${DB_NAME} ${DB_USER} ${DB_PASS} ${ES_PROTOCOL} ${ES_HOST} ${ES_PORT} ${ES_USE_AUTH} ${ES_USERNAME} ${ES_PASSWORD} ${CLOUD} ${SECURITY} ${HOSTNAME} ${EMAIL_LIST} ${ADMIN_FIRST} ${ADMIN_LAST} ${ADMIN_EMAIL} ${NUM_WORKERS} ${WORKER_ABANDONED_DAYS}
./dev.sh `pwd` ${USER} ${DB_TYPE} ${DB_HOST} ${DB_PORT} ${DB_NAME} ${DB_USER} ${DB_PASS} ${ES_PROTOCOL} ${ES_HOST} ${ES_PORT} ${ES_USE_AUTH} ${ES_USERNAME} ${ES_PASSWORD} ${CLOUD} ${SECURITY} ${HOSTNAME} ${EMAIL_LIST} ${ADMIN_FIRST} ${ADMIN_LAST} ${ADMIN_EMAIL} ${NUM_WORKERS} ${WORKER_MAX_NUM_RUNNING_PROCS} ${WORKER_ABANDONED_DAYS}
```

###### Run Personal Dev Script
Expand Down
89 changes: 73 additions & 16 deletions cws-core/src/main/java/jpl/cws/core/db/SchedulerDbService.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public class SchedulerDbService extends DbService implements InitializingBean {

public static final int DEFAULT_WORKER_PROC_DEF_MAX_INSTANCES = 1;
public static final int PROCESSES_PAGE_SIZE = 100;

// KEY FOR THIS IS: KEY `claimKey` (`status`,`proc_def_key`,`priority`,`created_time`)

public static final String FIND_CLAIMABLE_ROWS_SQL =
"SELECT uuid FROM cws_sched_worker_proc_inst " +
"WHERE " +
Expand Down Expand Up @@ -243,17 +242,24 @@ public int updateProcInstIdAndStartedByWorker(
return numUpdated;
}





/**
* Attempt to claim a process start request in the database.
*
* @param procDefKey -- only attempt to claim rows for this process definition
* @param workerProcsList -- attempts to claim rows for the active set of process definition(s)
* @return mappings of claimUuids and claimedRowUuids
*
*/
public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, String procDefKey, int limit) {

public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, Map<String,Integer> workerProcsList, Map<String,Integer> limitsPerProcs, int limit) {
List<String> claimUuids = new ArrayList<String>();
List<String> rowUuids = new ArrayList<String>();
List<String> rowUuidsPerProcDefKey = new ArrayList<String>();
LinkedHashMap<String, String> uuidAndProcDefKeyPair = new LinkedHashMap<String, String>();
List<String> clearOutUnclaimedInst = new ArrayList<String>();
List<String> unfilteredRowUuids = new ArrayList<String>();
List<String> claimedRowUuids = new ArrayList<String>();
long t0 = System.currentTimeMillis();
int numClaimed = 0;
Expand All @@ -266,11 +272,45 @@ public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, St
try {
// Find claimable rows
//
rowUuids = jdbcTemplate.queryForList(
FIND_CLAIMABLE_ROWS_SQL, String.class,
new Object[] {procDefKey,
limit*2}); // over-find because some workers might compete with this set
for (Map.Entry<String, Integer> procs : limitsPerProcs.entrySet()) {
rowUuidsPerProcDefKey = jdbcTemplate.queryForList(FIND_CLAIMABLE_ROWS_SQL, String.class,
new Object[] {procs.getKey(), procs.getValue()*2});
// get list of uuids using array of procdefkeys IN (keys)
unfilteredRowUuids.addAll(rowUuidsPerProcDefKey);
}

Collections.sort(unfilteredRowUuids);
for (String id : unfilteredRowUuids) {
String procDefKeyString = getProcDefKeyFromUuid(id);
uuidAndProcDefKeyPair.put(id, procDefKeyString);
}

for (Map.Entry<String,Integer> procLimit : limitsPerProcs.entrySet()) {
Set<String> keys = uuidAndProcDefKeyPair.keySet();
int applyPerProcsCap = 0;
for (String key : keys) {

if (uuidAndProcDefKeyPair.get(key).equals(procLimit.getKey())) {
applyPerProcsCap = applyPerProcsCap + 1;
if (applyPerProcsCap > procLimit.getValue()) {
clearOutUnclaimedInst.add(key);
}
}
}
}

for (String removeUuidFromList : clearOutUnclaimedInst) {
uuidAndProcDefKeyPair.remove(removeUuidFromList);
}

Set<String> uuidKeys = uuidAndProcDefKeyPair.keySet();
// after its filtered add the uuids to rowUuids arraylist
for (String key : uuidKeys) {
rowUuids.add(key);
}

// make query that uses multi limit per ProcDefkey (JOIN)
// iterate to grab 30
if (!rowUuids.isEmpty()) {
// Found some claimable rows, so now try to claim them..
//
Expand All @@ -283,7 +323,7 @@ public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, St
numClaimed++;
claimUuids.add(claimUuid);
claimedRowUuids.add(uuid);
log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKey + "'");
//log.debug("CLAIMED " + claimUuid + " (uuid=" +uuid+") for procDefKey '" + procDefKeyList + "'");
}

if (numClaimed == limit) {
Expand All @@ -293,12 +333,12 @@ public Map<String,List<String>> claimHighestPriorityStartReq(String workerId, St

if (numClaimed == 0) {
// other workers beat us to claiming the rows
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKey '" + procDefKey + "', but claimed none! " +
log.warn("Attempted to claim " + rowUuids.size() + " rows for procDefKeys '" + workerProcsList.keySet() + "', but claimed none! " +
(attempts < 10 ? "Retrying..." : "GIVING UP!"));
continue; // retry finding claimable rows
}
else {
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKey '" + procDefKey + "'");
log.debug("Claimed (" + numClaimed + " of " + rowUuids.size() + ") for procDefKeys '" + workerProcsList.keySet() + "'");
}
}
else if (log.isTraceEnabled()) {
Expand Down Expand Up @@ -335,11 +375,11 @@ else if (log.isTraceEnabled()) {
if (numClaimed != claimUuids.size()) {
log.error("numUpdated != claimUuids.size()" );
}

Map<String,List<String>> ret = new HashMap<String,List<String>>();
ret.put("claimUuids", claimUuids);
ret.put("claimedRowUuids", claimedRowUuids);

return ret;
}

Expand All @@ -356,8 +396,25 @@ public String getProcInstRowStatus(String uuid) {
return null;
}
}



public int getMaxProcsValueForWorker(String workerId) {
return jdbcTemplate.queryForObject(
"SELECT max_num_running_procs FROM cws_worker WHERE id=?",
new Object[] {workerId}, Integer.class);
}

public int getCountForClaimedProcInstPerKey(String procDefKey, List<String> claimedUuids) {
String listOfClaimUuid = "\"" + String.join("\", \"", claimedUuids) + "\"" ;
String query = "SELECT count(*) FROM cws_sched_worker_proc_inst " + "WHERE proc_def_key='" + procDefKey + "' " + "AND claim_uuid IN (" + listOfClaimUuid + ")";
return jdbcTemplate.queryForObject(query, Integer.class);
}


public String getProcDefKeyFromUuid(String uuid) {
String query = "SELECT proc_def_key FROM cws_sched_worker_proc_inst " + "WHERE uuid='" + uuid + "'";
return jdbcTemplate.queryForObject(query, String.class);
}

public Map<String,Object> getProcInstRow(String uuid) {
List<Map<String,Object>> list = jdbcTemplate.queryForList(
"SELECT * FROM cws_sched_worker_proc_inst " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class EngineDbService extends DbService implements InitializingBean {
@Value("${cws.install.type}") private String cwsInstallType;
@Value("${cws.worker.type}") private String cwsWorkerType;
@Value("${camunda.executor.service.max.pool.size}") private int maxExecutorServicePoolSize;
@Value("${worker.max.num.running.procs}") private int workerMaxNumRunningProcs;

private Logger log;

Expand Down Expand Up @@ -180,8 +181,8 @@ public void createOrUpdateWorkerRow(String lockOwner) {
numUpdated = jdbcTemplate.update(
"INSERT INTO cws_worker" +
" (id, lock_owner, name, install_directory, cws_install_type, cws_worker_type, " +
" status, job_executor_max_pool_size, created_time, last_heartbeat_time) " +
"VALUES (?,?,?,?,?,?,?,?,?,?)",
" status, job_executor_max_pool_size, max_num_running_procs, created_time, last_heartbeat_time) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?)",
new Object[]{
workerId,
lockOwner,
Expand All @@ -191,6 +192,7 @@ public void createOrUpdateWorkerRow(String lockOwner) {
cwsWorkerType,
null, // status will be changed to null once the worker is fully initialized
maxExecutorServicePoolSize, // changeable later via the UI..
workerMaxNumRunningProcs,
tsNow, // created_time
tsNow // last_heartbeat_time
});
Expand Down
124 changes: 80 additions & 44 deletions cws-engine-service/src/main/java/jpl/cws/engine/WorkerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public class WorkerService implements InitializingBean {
@Value("${cws.engine.jobexecutor.enabled}") private boolean jobExecutorEnabled;

@Value("${cws.tomcat.lib}") private String cwsTomcatLib;


@Value("${worker.max.num.running.procs}") private int workerMaxNumRunningProcs;

private Logger log;

public static AtomicInteger processorExecuteCount = new AtomicInteger(0);
Expand All @@ -84,8 +86,9 @@ public class WorkerService implements InitializingBean {

// Map of procDefKey and count of active process instances
public static Map<String,Integer> processCounters = new HashMap<String,Integer>();

private static Map<String,Integer> workerMaxProcInstances = new HashMap<String,Integer>();

private static Set<String> procStartReqUuidStartedThisWorker = new HashSet<String>();
private static Set<String> acceptingProcDefKeys = new HashSet<String>();
//private static Set<String> runningToCompleteTransitionUuids = new HashSet<String>();
Expand Down Expand Up @@ -185,7 +188,7 @@ public void initProcessCountersAndLimits() {

}

log.debug("AFTER INIT: limits: " + workerMaxProcInstances + ", counts: " + processCounters);
log.info("AFTER INIT: limits: " + workerMaxProcInstances + ", counts: " + processCounters);
}


Expand Down Expand Up @@ -312,7 +315,7 @@ public boolean updateProcessCountersAndLimits() {
//
String postConfig = "limits: " + workerMaxProcInstances + ", counts: " + processCounters;
if (lastProcCounterStatusMsg == null || !lastProcCounterStatusMsg.equals(postConfig)) {
log.debug("NEW: " + postConfig + ", OLD: " + lastProcCounterStatusMsg);
log.info("NEW: " + postConfig + ", OLD: " + lastProcCounterStatusMsg);
lastProcCounterStatusMsg = postConfig;
return true; // config changed
}
Expand Down Expand Up @@ -670,59 +673,92 @@ public List<Map<String,Object>> claimWithCounter(String limitToProcDefKey) {

synchronized (procStateLock) { // procCountsLock
t1 = System.currentTimeMillis();

int procSetSize = 0;
//int totalCurrentRunningProcsOnWorker = 0;
Map<String,Integer> currentCounts = new HashMap<String,Integer>();
Map<String,Integer> remainders = new HashMap<String,Integer>();
Map<String,Integer> queryLimitForProcSet = new HashMap<String,Integer>();
Map<String,Integer> limitToProcDefKeyObject = new HashMap<String,Integer>();

for (Entry<String,Integer> procMax : workerMaxProcInstances.entrySet()) {
String procDefKey = procMax.getKey();
if (limitToProcDefKey != null && !limitToProcDefKey.equals(procDefKey)) {
continue;
}

int procMaxNumber = procMax.getValue();
if (!acceptingProcDefKeys.contains(procDefKey)) {
//log.debug("skipping " + procDefKey + " BECAUSE IT NOT ACCEPTING RIGHT NOW!!!!");
continue;
}


currentCounts.put(procDefKey, processCounters.get(procDefKey));
remainders.put(procDefKey, procMaxNumber - currentCounts.get(procDefKey));
queryLimitForProcSet.put(procDefKey, Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainders.get(procDefKey)));

//log.trace("getting currentCount for procDefKey " + procDefKey);
int currentCount = processCounters.get(procDefKey);
//int currentCount = processCounters.get(procDefKey);
//log.trace("currentCount for " + procDefKey + " is " + currentCount);
int remainder = procMaxNumber - currentCount;
//int remainder = procMaxNumber - currentCount;
//log.trace("remainder for " + procDefKey + " is " + remainder);
int queryLimit = Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainder);
//int queryLimit = Math.min(EXEC_SERVICE_MAX_POOL_SIZE, remainder); // FIXME: needs revisit for proper min
//log.trace("queryLimit for " + procDefKey + " is " + queryLimit);

if (remainder > 0) {
// claim for remainder (marks DB rows as "claimedByWorker")
Map<String,List<String>> claimRowData =
schedulerDbService.claimHighestPriorityStartReq(
workerId, procDefKey, queryLimit);

List<String> claimed = claimRowData.get("claimUuids");

if (!claimed.isEmpty()) {
// increment counter by amount that was actually claimed
// in anticipation that the start will actually work.
// If the start turns out not to later worker, then this count will be decremented at that time.
//
processCounters.put(procDefKey, processCounters.get(procDefKey) + claimed.size());
// update uuid list
procStartReqUuidStartedThisWorker.addAll(claimRowData.get("claimedRowUuids"));
//log.debug("procStartReqUuidStartedThisWorker = " + procStartReqUuidStartedThisWorker);

log.debug("(CLAIMED " + claimed.size() + " / " + queryLimit + ", maxProcs=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")");

claimUuids.addAll(claimed);

} // end for loop

int totalCurrentRunningProcsOnWorker = 0;
for (Entry<String,Integer> entry : processCounters.entrySet()) {
totalCurrentRunningProcsOnWorker += entry.getValue().intValue();
}

// rename to workerMaxProcQueryLimit
int MaxNumForProcsOnWorker = schedulerDbService.getMaxProcsValueForWorker(workerId);
// this is for all procDefs cap
int workerMaxProcQueryLimit = MaxNumForProcsOnWorker - totalCurrentRunningProcsOnWorker;

int remaindersTotal = 0;
for (int r: remainders.values()) {
remaindersTotal += r;
}

if (remaindersTotal > 0 && workerMaxProcQueryLimit > 0) {
// claim for remainder (marks DB rows as "claimedByWorker")

int queryLimit = Math.min(MaxNumForProcsOnWorker, workerMaxProcQueryLimit);

Map<String,List<String>> claimRowData =
schedulerDbService.claimHighestPriorityStartReq(
workerId, currentCounts, queryLimitForProcSet, queryLimit); // pass list of procDefkey and a map of queryLimit per procDefKey

List<String> claimed = claimRowData.get("claimUuids");

if (!claimed.isEmpty()) {
// increment counter by amount that was actually claimed
// in anticipation that the start will actually work.
// If the start turns out not to later worker, then this count will be decremented at that time.
//
for (Map.Entry<String,Integer> procDefKey : processCounters.entrySet()) {
int claimedInstCount = schedulerDbService.getCountForClaimedProcInstPerKey(procDefKey.getKey(), claimed);
processCounters.put(procDefKey.getKey(), processCounters.get(procDefKey.getKey()) + claimedInstCount);
}
//else {
// log.debug("NONE CLAIMED (queryLimit=" + queryLimit + ", max=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")");
//}
}
else {
log.debug("[" + procDefKey + "] remainder <= 0, so not attempting claim. " +
"(remainder = " + remainder +
", procMaxNumber = " + procMaxNumber +
", currentCount = " + currentCount + ")");

// update uuid list
procStartReqUuidStartedThisWorker.addAll(claimRowData.get("claimedRowUuids"));
//log.debug("procStartReqUuidStartedThisWorker = " + procStartReqUuidStartedThisWorker);

log.debug("(CLAIMED " + claimed.size() + " / " + queryLimit + ", maxProcs=" + workerMaxProcInstances.entrySet() + ") for procDefKeys '" + workerMaxProcInstances.keySet() + "' (limitToProcDefKey="+limitToProcDefKey+")" + ", workerMaxNumRunningProcs=" + MaxNumForProcsOnWorker);
claimUuids.addAll(claimed);
}

} // end for loop
//else {
// log.debug("NONE CLAIMED (queryLimit=" + queryLimit + ", max=" + procMaxNumber + ") for procDef '" + procDefKey + "' (limitToProcDefKey="+limitToProcDefKey+")");
//}
}
else {
log.debug("Remainder for Worker Max Process Limit [" + workerMaxProcQueryLimit + "] workerMaxProcQueryLimit <= 0 OR Total of remainders [" + remaindersTotal + "] is <=0, so not attempting claim. " +
"(remainders = " + remainders +
", procMaxNumbers = " + workerMaxProcInstances.entrySet() +
", currentCounts = " + currentCounts + ")");
}



} // release lock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,15 @@ public void run() {
boolean endedOnThisWorker =
workerService.processEndedActions(procDefKey, uuid);
if (endedOnThisWorker) {
workerService.procStartReqAction(procDefKey, "processEndEventDetected message received");
workerService.procStartReqAction(null, "processEndEventDetected message received");
}
}
else if (eventType.equals("sync")) {
boolean processCounterStateChanged = workerService.syncCounters("received " + eventType + " message");

if (processCounterStateChanged) {
log.trace(eventType + " :: state changed");

// If the process counter state changed, then we potentially have more bandwidth to
// execute more processes of the type that was just completed/failed.
workerService.procStartReqAction(procDefKey, "sync message received");
workerService.procStartReqAction(null, "sync message received");
}
}
}
Expand Down
Loading