Skip to content

Commit 0050311

Browse files
authored
Fix ResourceTracker in benchmark to work fine with VIRTUAL_THREAD (#234)
1 parent 98290cf commit 0050311

File tree

4 files changed

+73
-12
lines changed

4 files changed

+73
-12
lines changed

benchmark/src/main/java/com/linecorp/decaton/benchmark/DecatonRunner.java

+65-6
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,24 @@
1616

1717
package com.linecorp.decaton.benchmark;
1818

19+
import java.lang.management.ThreadInfo;
1920
import java.time.Duration;
2021
import java.util.ArrayList;
2122
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.Properties;
2526
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.ForkJoinPool;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.TimeUnit;
2631
import java.util.function.Function;
2732

2833
import org.apache.kafka.clients.consumer.ConsumerConfig;
2934

35+
import com.sun.management.ThreadMXBean;
36+
3037
import com.linecorp.decaton.processor.TaskMetadata;
3138
import com.linecorp.decaton.processor.metrics.Metrics;
3239
import com.linecorp.decaton.processor.runtime.DecatonTask;
@@ -43,7 +50,9 @@
4350
import io.micrometer.core.instrument.Clock;
4451
import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
4552
import io.micrometer.core.instrument.logging.LoggingRegistryConfig;
53+
import lombok.extern.slf4j.Slf4j;
4654

55+
@Slf4j
4756
public class DecatonRunner implements Runner {
4857
private static final Map<String, Function<String, Object>> propertyConstructors =
4958
new HashMap<String, Function<String, Object>>() {{
@@ -52,6 +61,7 @@ public class DecatonRunner implements Runner {
5261
put(ProcessorProperties.CONFIG_LOGGING_MDC_ENABLED.name(), Boolean::parseBoolean);
5362
}};
5463

64+
private SubPartitionRuntime subPartitionRuntime;
5565
private ProcessorSubscription subscription;
5666
private LoggingMeterRegistry registry;
5767

@@ -71,7 +81,7 @@ public void init(Config config, Recording recording, ResourceTracker resourceTra
7181
// value than zero with the default "latest" reset policy.
7282
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
7383

74-
SubPartitionRuntime subPartitionRuntime = SubPartitionRuntime.THREAD_POOL;
84+
subPartitionRuntime = SubPartitionRuntime.THREAD_POOL;
7585
List<Property<?>> properties = new ArrayList<>();
7686
for (Map.Entry<String, String> entry : config.parameters().entrySet()) {
7787
String name = entry.getKey();
@@ -97,6 +107,8 @@ public String get(String key) {
97107
}, Clock.SYSTEM);
98108
Metrics.register(registry);
99109

110+
maybeSetupForkJoinPoolDrip();
111+
100112
CountDownLatch startLatch = new CountDownLatch(1);
101113

102114
subscription = SubscriptionBuilder
@@ -113,22 +125,69 @@ public String get(String key) {
113125
TaskMetadata.builder().build(), task, bytes);
114126
})
115127
.thenProcess(
116-
(ctx, task) -> {
117-
resourceTracker.track(Thread.currentThread().getId());
118-
recording.process(task);
119-
}))
128+
(ctx, task) -> recording.process(task)))
120129
.stateListener(state -> {
121130
if (state == SubscriptionStateListener.State.RUNNING) {
122131
startLatch.countDown();
123132
}
124133
})
125134
.build();
126-
resourceTracker.track(subscription.getId());
127135
subscription.start();
128136

129137
startLatch.await();
130138
}
131139

140+
/**
141+
* This value comes from VirtualThread#createDefaultScheduler(), which defaults keep-alive of ForkJoinPool
142+
* to 30 seconds (as of Java21).
143+
*/
144+
private static final long DRIP_INTERVAL = 30;
145+
/**
146+
* The intention of this method is to setup a scheduled work submitting some VirtualThread instances
147+
* periodically, in order to make sure the {@link ForkJoinPool} to keep-alive the created carrier thread
148+
* during warmup phase.
149+
* It is necessary because {@link #onWarmupComplete(ResourceTracker)} depends on the alive list of
150+
* ForkJoinPool threads to observe cpu time and memory allocation profile, which cannot be obtained from
151+
* VirtualThread instances directly.
152+
*/
153+
private void maybeSetupForkJoinPoolDrip() {
154+
if (subPartitionRuntime != SubPartitionRuntime.VIRTUAL_THREAD) {
155+
return;
156+
}
157+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(
158+
1, Thread.ofPlatform().daemon().factory());
159+
executor.scheduleAtFixedRate(() -> {
160+
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
161+
Thread.startVirtualThread(() -> {
162+
try {
163+
TimeUnit.SECONDS.sleep(DRIP_INTERVAL);
164+
} catch (InterruptedException e) {
165+
throw new RuntimeException(e);
166+
}
167+
});
168+
}
169+
}, 0, DRIP_INTERVAL, TimeUnit.SECONDS);
170+
}
171+
172+
@Override
173+
public void onWarmupComplete(ResourceTracker resourceTracker) {
174+
// As we support VIRTUAL_THREAD runtime now, we can't use the technique to obtain the target Thread ID
175+
// in the execution context, as in VirtualThread gets created and discarded for all tasks separately.
176+
// Instead we need to observe resource usage based on the carrier thread (platform thread) side, while
177+
// stdlib doesn't provide a way to resolve the carrier thread's ID.
178+
ThreadMXBean threadMxBean = ResourceTracker.getSunThreadMxBean();
179+
ThreadInfo[] threadInfos = threadMxBean.dumpAllThreads(true, true);
180+
for (ThreadInfo threadInfo : threadInfos) {
181+
String name = threadInfo.getThreadName();
182+
log.debug("Tracking target check for thread name: {}", name);
183+
if (name.startsWith("DecatonSubscriptionThread-") || name.startsWith("PartitionProcessorThread-") ||
184+
subPartitionRuntime == SubPartitionRuntime.VIRTUAL_THREAD && name.startsWith("ForkJoinPool-")) {
185+
log.info("Tracking resource of thread {} - {}", threadInfo.getThreadId(), name);
186+
resourceTracker.track(threadInfo.getThreadId());
187+
}
188+
}
189+
}
190+
132191
@Override
133192
public void close() throws Exception {
134193
if (registry != null) {

benchmark/src/main/java/com/linecorp/decaton/benchmark/InProcessExecution.java

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public BenchmarkResult execute(Config config, Consumer<Stage> stageCallback) thr
7878
awaitJITGetsSettled();
7979
}
8080
profiling.stop();
81+
runner.onWarmupComplete(resourceTracker);
8182

8283
profiling.start();
8384
JvmTracker jvmTracker = JvmTracker.create();

benchmark/src/main/java/com/linecorp/decaton/benchmark/ResourceTracker.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public static class TrackingValues {
3838
long allocatedBytes;
3939
}
4040

41-
private static final ThreadLocal<Boolean> tracking = ThreadLocal.withInitial(() -> false);
4241
private final ConcurrentMap<Long, TrackingValues> targetThreadIds;
4342
private final ThreadMXBean threadMxBean;
4443

@@ -47,7 +46,7 @@ public ResourceTracker() {
4746
threadMxBean = getSunThreadMxBean();
4847
}
4948

50-
private static ThreadMXBean getSunThreadMxBean() {
49+
static ThreadMXBean getSunThreadMxBean() {
5150
java.lang.management.ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
5251
if (mxBean instanceof ThreadMXBean) {
5352
return (ThreadMXBean) mxBean;
@@ -65,10 +64,6 @@ private static ThreadMXBean getSunThreadMxBean() {
6564
* @param threadId thread ID to start tracking.
6665
*/
6766
public void track(long threadId) {
68-
if (tracking.get()) {
69-
return;
70-
}
71-
tracking.set(true);
7267
targetThreadIds.computeIfAbsent(
7368
threadId,
7469
key -> new TrackingValues(threadMxBean.getThreadCpuTime(threadId),

benchmark/src/main/java/com/linecorp/decaton/benchmark/Runner.java

+6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ class Config {
4646
*/
4747
void init(Config config, Recording recording, ResourceTracker resourceTracker) throws InterruptedException;
4848

49+
/**
50+
* Called after all warmup tasks are processed and before the stage to proceed for actual measurement.
51+
* @param resourceTracker resource tracking interface (use is optional).
52+
*/
53+
void onWarmupComplete(ResourceTracker resourceTracker);
54+
4955
@Override
5056
default void close() throws Exception {
5157
// noop by default

0 commit comments

Comments
 (0)