Skip to content

Commit

Permalink
topping a scheduled task does not stop other tasks with the same exec…
Browse files Browse the repository at this point in the history
…ution time. (#44)
  • Loading branch information
onukristo authored Jan 3, 2024
1 parent e5fcab4 commit ff6e89a
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 63 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ jobs:
max-parallel: 100
matrix:
spring_boot_version:
- 3.1.2
- 3.0.7
- 2.7.13
- 2.6.15
- 3.1.6
- 3.0.13
- 2.7.18
env:
SPRING_BOOT_VERSION: ${{ matrix.spring_boot_version }}
steps:
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.12.2] - 2024-01-05

### Fixed

* Stopping a scheduled task does not stop other tasks with the same execution time.

## [1.12.1] - 2023-10-29

### Added

* UuidUtils.add method.

## [1.12.1] - 2023-10-29

### Added
Expand Down
4 changes: 2 additions & 2 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
ext {
springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: "2.6.15"}"
springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: "2.7.18"}"

libraries = [
// version defined
awaitility : "org.awaitility:awaitility:4.2.0",
guava : 'com.google.guava:guava:33.0.0-jre',
jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2',
javaxValidationApi : "javax.validation:validation-api:2.0.1.Final",
spockCore : "org.spockframework:spock-core:2.3-groovy-4.0",
springBootDependencies: "org.springframework.boot:spring-boot-dependencies:${springBootVersion}",

// versions managed by spring-boot-dependencies platform
commonsLang3 : "org.apache.commons:commons-lang3",
guava : 'com.google.guava:guava:31.1-jre',
junitJupiter : "org.junit.jupiter:junit-jupiter",
logbackClassic : "ch.qos.logback:logback-classic",
lombok : "org.projectlombok:lombok",
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=1.12.1
version=1.12.2
1 change: 1 addition & 0 deletions tw-base-utils/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation libraries.jacksonDatabind
implementation libraries.jacksonJsr310
implementation libraries.jacksonJdk8
implementation libraries.guava

testImplementation libraries.awaitility
testImplementation libraries.guava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void afterPropertiesSet() {
try {
if (!initialized) {
executorService = Executors.newCachedThreadPool(new CountingThreadFactory("tw-base"));
scheduledTaskExecutor = new SimpleScheduledTaskExecutor(null, executorService);
scheduledTaskExecutor = new SimpleScheduledTaskExecutor("gste", executorService);
scheduledTaskExecutor.start();
initialized = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.transferwise.common.baseutils.concurrency;

import com.google.common.util.concurrent.RateLimiter;
import com.transferwise.common.baseutils.ExceptionUtils;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
Expand All @@ -16,14 +18,15 @@
@Slf4j
public class SimpleScheduledTaskExecutor implements ScheduledTaskExecutor {

private ExecutorService executorService;
private DelayQueue<ScheduledTask> taskQueue;
private final ExecutorService executorService;
private final DelayQueue<ScheduledTask> taskQueue;
@SuppressWarnings("checkstyle:MagicNumber")
private Duration tick = Duration.ofMillis(50);
private Clock clock;
private Lock stateLock;
private Condition stateCondition;
private AtomicInteger workingTasksCount;
private final Lock stateLock;
private final Condition stateCondition;
private final AtomicInteger workingTasksCount;
private final RateLimiter nextTaskLoggingRateLimiter = RateLimiter.create(1);

private volatile boolean started;
private volatile boolean stopRequested;
Expand Down Expand Up @@ -77,6 +80,14 @@ public void start() {
executorService.submit(() -> {
while (!stopRequested) {
ScheduledTask scheduledTask = ExceptionUtils.doUnchecked(() -> taskQueue.poll(tick.toMillis(), TimeUnit.MILLISECONDS));
if (log.isDebugEnabled() && scheduledTask == null) {
if (nextTaskLoggingRateLimiter.tryAcquire()) {
var nextScheduledTask = taskQueue.peek();
if (nextScheduledTask != null) {
log.debug("Next scheduled task is executed at '{}'.", Instant.ofEpochMilli(nextScheduledTask.nextExecutionTime));
}
}
}
if (scheduledTask != null && !stopRequested) {
executorService.submit(scheduledTask::execute);
}
Expand Down Expand Up @@ -130,14 +141,14 @@ private long currentTimeMillis() {

private static class ScheduledTask implements Delayed {

private Runnable runnable;
private TaskHandle taskHandle;
private Duration period;
private final Runnable runnable;
private final TaskHandle taskHandle;
private final Duration period;
private long nextExecutionTime;
private SimpleScheduledTaskExecutor taskExecutor;
private final SimpleScheduledTaskExecutor taskExecutor;

private Lock stateLock;
private Condition stateCondition;
private final Lock stateLock;
private final Condition stateCondition;

private volatile boolean stopRequested;
private volatile boolean working;
Expand All @@ -148,45 +159,7 @@ private ScheduledTask(SimpleScheduledTaskExecutor taskExecutor, Runnable task, D
this.stateLock = new ReentrantLock();
this.stateCondition = stateLock.newCondition();
this.taskExecutor = taskExecutor;

createTaskHandler();
}

private void createTaskHandler() {
this.taskHandle = new TaskHandle() {
@Override
public void stop() {
LockUtils.withLock(stateLock, () -> {
stopRequested = true;
taskExecutor.taskQueue.remove(ScheduledTask.this);
stateCondition.signalAll();
});
}

@Override
public boolean hasStopped() {
return LockUtils.withLock(stateLock, () -> stopRequested() && !working);
}

@Override
public boolean waitUntilStopped(Duration waitTime) {
long start = taskExecutor.currentTimeMillis();
while (taskExecutor.currentTimeMillis() >= start + waitTime.toMillis()) {
if (hasStopped()) {
return true;
}
LockUtils.withLock(stateLock, () -> ExceptionUtils.doUnchecked(() -> {
boolean ignored = stateCondition.await(start - taskExecutor.currentTimeMillis() + waitTime.toMillis(), TimeUnit.MILLISECONDS);
}));
}
return hasStopped();
}

@Override
public boolean isWorking() {
return working;
}
};
this.taskHandle = new DefaultTaskHandle();
}

private void execute() {
Expand Down Expand Up @@ -236,10 +209,7 @@ public int compareTo(Delayed o) {

@Override
public boolean equals(Object o) {
if (o instanceof Delayed) {
return compareTo((Delayed) o) == 0;
}
return false;
return this == o;
}

@Override
Expand All @@ -250,5 +220,44 @@ public int hashCode() {
protected boolean stopRequested() {
return stopRequested || taskExecutor.stopRequested;
}

class DefaultTaskHandle implements TaskHandle {

@Override
public void stop() {
LockUtils.withLock(stateLock, () -> {
stopRequested = true;
// This is O(n)
// TODO: Think if we should just leave the things into the queue.
taskExecutor.taskQueue.remove(ScheduledTask.this);
stateCondition.signalAll();
});
}

@Override
public boolean hasStopped() {
return LockUtils.withLock(stateLock, () -> stopRequested() && !working);
}

@Override
public boolean waitUntilStopped(Duration waitTime) {
long start = taskExecutor.currentTimeMillis();
while (taskExecutor.currentTimeMillis() >= start + waitTime.toMillis()) {
if (hasStopped()) {
return true;
}
LockUtils.withLock(stateLock, () -> ExceptionUtils.doUnchecked(() -> {
boolean ignored = stateCondition.await(start - taskExecutor.currentTimeMillis() + waitTime.toMillis(), TimeUnit.MILLISECONDS);
}));
}
return hasStopped();
}

@Override
public boolean isWorking() {
return working;
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.transferwise.common.baseutils.BaseTest;
import com.transferwise.common.baseutils.clock.TestClock;
import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -211,4 +215,46 @@ public void testIfSchedulingOnceWorks() {

scheduledTaskExecutor.stop();
}

/*
Covers a bug, where ScheduledTask's equals method was considering tasks with the same execution time as equals.
Stopping one task, stopped all other tasks with the same execution time as well.
*/
@Test
public void testIfStoppingTasksWorksCorrectly() {
var testClock = new TestClock();
Map<Long, Boolean> results = new ConcurrentHashMap<>();

var executorService = Executors.newCachedThreadPool();
var scheduledTaskExecutor = new SimpleScheduledTaskExecutor("test", executorService).setTick(Duration.ofMillis(5))
.setClock(testClock);
scheduledTaskExecutor.start();

List<TaskHandle> taskHandleList = new ArrayList<>();
var finishedCount = new AtomicInteger();
for (int i = 0; i < 100; i++) {
long finalI = i;
taskHandleList.add(scheduledTaskExecutor.scheduleOnce(
() -> {
results.put(finalI, Boolean.TRUE);
finishedCount.incrementAndGet();
}, Duration.ofSeconds(1))
);
}

taskHandleList.get(50).stop();
testClock.tick(Duration.ofMillis(1001));

await().until(() -> finishedCount.get() == 99);

assertNull(results.get(50L));
for (long i = 0; i < 100; i++) {
if (i != 50) {
assertNotNull(results.get(i));
}
}

scheduledTaskExecutor.stop();
}
}

0 comments on commit ff6e89a

Please sign in to comment.