Skip to content

Commit ef9b5bf

Browse files
authored
feat(s3stream): add trigger wal upload interval (#2347)
Signed-off-by: Robin Han <[email protected]>
1 parent 82df2f0 commit ef9b5bf

File tree

5 files changed

+21
-1
lines changed

5 files changed

+21
-1
lines changed

core/src/main/java/kafka/automq/AutoMQConfig.java

+5
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ public class AutoMQConfig {
7171
public static final String S3_WAL_UPLOAD_THRESHOLD_CONFIG = "s3.wal.upload.threshold";
7272
public static final String S3_WAL_UPLOAD_THRESHOLD_DOC = "The threshold at which WAL triggers upload to object storage. The configuration value needs to be less than s3.wal.cache.size. The larger the configuration value, the higher the data aggregation and the lower the cost of metadata storage.";
7373

74+
public static final String S3_WAL_UPLOAD_INTERVAL_MS_CONFIG = "s3.wal.upload.interval.ms";
75+
public static final String S3_WAL_UPLOAD_INTERVAL_MS_DOC = "The interval at which WAL triggers upload to object storage. -1 means only upload by size trigger";
76+
public static final long S3_WAL_UPLOAD_INTERVAL_MS_DEFAULT = -1L;
77+
7478
public static final String S3_STREAM_SPLIT_SIZE_CONFIG = "s3.stream.object.split.size";
7579
public static final String S3_STREAM_SPLIT_SIZE_DOC = "The S3 stream object split size threshold when upload delta WAL or compact stream set object.";
7680

@@ -246,6 +250,7 @@ public static void define(ConfigDef configDef) {
246250
.define(AutoMQConfig.S3_WAL_PATH_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_WAL_PATH_DOC)
247251
.define(AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG, LONG, -1L, MEDIUM, AutoMQConfig.S3_WAL_CACHE_SIZE_DOC)
248252
.define(AutoMQConfig.S3_WAL_UPLOAD_THRESHOLD_CONFIG, LONG, -1L, MEDIUM, AutoMQConfig.S3_WAL_UPLOAD_THRESHOLD_DOC)
253+
.define(AutoMQConfig.S3_WAL_UPLOAD_INTERVAL_MS_CONFIG, LONG, S3_WAL_UPLOAD_INTERVAL_MS_DEFAULT, LOW, AutoMQConfig.S3_WAL_UPLOAD_INTERVAL_MS_DOC)
249254
.define(AutoMQConfig.S3_STREAM_SPLIT_SIZE_CONFIG, INT, 8388608, MEDIUM, AutoMQConfig.S3_STREAM_SPLIT_SIZE_DOC)
250255
.define(AutoMQConfig.S3_OBJECT_BLOCK_SIZE_CONFIG, INT, 524288, MEDIUM, AutoMQConfig.S3_OBJECT_BLOCK_SIZE_DOC)
251256
.define(AutoMQConfig.S3_OBJECT_PART_SIZE_CONFIG, INT, 16777216, MEDIUM, AutoMQConfig.S3_OBJECT_PART_SIZE_DOC)

core/src/main/scala/kafka/log/stream/s3/ConfigUtils.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public static Config to(KafkaConfig s) {
2626
.walConfig(config.walConfig())
2727
.walCacheSize(s.s3WALCacheSize())
2828
.walUploadThreshold(s.s3WALUploadThreshold())
29+
.walUploadIntervalMs(s.s3WALUploadIntervalMs())
2930
.streamSplitSize(s.s3StreamSplitSize())
3031
.objectBlockSize(s.s3ObjectBlockSize())
3132
.objectPartSize(s.s3ObjectPartSize())

core/src/main/scala/kafka/server/KafkaConfig.scala

+1
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
775775
val s3ObjectPartSize = getInt(AutoMQConfig.S3_OBJECT_PART_SIZE_CONFIG)
776776
val s3StreamAllocatorPolicy = Enum.valueOf(classOf[ByteBufAllocPolicy], getString(AutoMQConfig.S3_STREAM_ALLOCATOR_POLICY_CONFIG))
777777
val (s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold) = adjustS3Configs(s3StreamAllocatorPolicy)
778+
val s3WALUploadIntervalMs = getLong(AutoMQConfig.S3_WAL_UPLOAD_INTERVAL_MS_CONFIG)
778779
val s3StreamObjectCompactionTaskIntervalMinutes = getInt(AutoMQConfig.S3_STREAM_OBJECT_COMPACTION_INTERVAL_MINUTES_CONFIG)
779780
val s3StreamObjectCompactionMaxSizeBytes = getLong(AutoMQConfig.S3_STREAM_OBJECT_COMPACTION_MAX_SIZE_BYTES_CONFIG)
780781
val s3ControllerRequestRetryMaxCount = getInt(AutoMQConfig.S3_CONTROLLER_REQUEST_RETRY_MAX_COUNT_CONFIG)

s3stream/src/main/java/com/automq/stream/s3/Config.java

+11
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ public class Config {
2525
private String walConfig = "0@file:///tmp/s3stream_wal";
2626
private long walCacheSize = 200 * 1024 * 1024;
2727
private long walUploadThreshold = 100 * 1024 * 1024;
28+
// -1L means don't upload by time
29+
private long walUploadIntervalMs = -1L;
2830
private int streamSplitSize = 16777216;
2931
private int objectBlockSize = 1048576;
3032
private int objectPartSize = 16777216;
@@ -73,6 +75,10 @@ public long walUploadThreshold() {
7375
return walUploadThreshold;
7476
}
7577

78+
public long walUploadIntervalMs() {
79+
return walUploadIntervalMs;
80+
}
81+
7682
public int streamSplitSize() {
7783
return streamSplitSize;
7884
}
@@ -182,6 +188,11 @@ public Config walUploadThreshold(long s3WALObjectSize) {
182188
return this;
183189
}
184190

191+
public Config walUploadIntervalMs(long s3WALUploadIntervalMs) {
192+
this.walUploadIntervalMs = s3WALUploadIntervalMs;
193+
return this;
194+
}
195+
185196
public Config streamSplitSize(int s3StreamSplitSize) {
186197
this.streamSplitSize = s3StreamSplitSize;
187198
return this;

s3stream/src/main/java/com/automq/stream/s3/S3Storage.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamMana
150150
this.drainBackoffTask = this.backgroundExecutor.scheduleWithFixedDelay(this::tryDrainBackoffRecords, 100, 100, TimeUnit.MILLISECONDS);
151151
S3StreamMetricsManager.registerInflightWALUploadTasksCountSupplier(this.inflightWALUploadTasks::size);
152152
S3StreamMetricsManager.registerDeltaWalPendingUploadBytesSupplier(this.pendingUploadBytes::get);
153+
if (config.walUploadIntervalMs() > 0) {
154+
this.backgroundExecutor.scheduleWithFixedDelay(this::maybeForceUpload, config.walUploadIntervalMs(), config.walUploadIntervalMs(), TimeUnit.MILLISECONDS);
155+
}
153156
}
154157

155158
/**
@@ -635,7 +638,6 @@ private boolean hasInflightForceUploadTask() {
635638
}
636639

637640
private CompletableFuture<Void> forceUpload() {
638-
LOGGER.info("force upload all streams");
639641
CompletableFuture<Void> cf = forceUpload(LogCache.MATCH_ALL_STREAMS);
640642
cf.whenComplete((nil, ignored) -> forceUploadCallback());
641643
return cf;

0 commit comments

Comments
 (0)