From 41d2c0704f03dbead380a7f302abbcc9cbde5fd4 Mon Sep 17 00:00:00 2001 From: caoli5288 Date: Tue, 15 Dec 2020 17:29:19 +0800 Subject: [PATCH] FluxWorkers.java --- .../com/mengcraft/simpleorm/FluxWorkers.java | 76 +++++++++++++++++++ .../java/com/mengcraft/simpleorm/ORM.java | 34 ++++++++- src/main/resources/config.yml | 4 +- 3 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/mengcraft/simpleorm/FluxWorkers.java diff --git a/src/main/java/com/mengcraft/simpleorm/FluxWorkers.java b/src/main/java/com/mengcraft/simpleorm/FluxWorkers.java new file mode 100644 index 0000000..a988b37 --- /dev/null +++ b/src/main/java/com/mengcraft/simpleorm/FluxWorkers.java @@ -0,0 +1,76 @@ +package com.mengcraft.simpleorm; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.bukkit.Bukkit; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +public class FluxWorkers implements Executor, Closeable { + + private final ServerWorker serverWorker = new ServerWorker(); + private final int size; + private final List workers; + private int cursor; + + public FluxWorkers(int size) { + this.size = size; + workers = Lists.newArrayListWithCapacity(size); + ThreadFactory factory = new ThreadFactoryBuilder() + .setNameFormat("SimpleORM/FancyWorkers/%s") + .build(); + for (int i = 0; i < size; i++) { + workers.add(Executors.newSingleThreadExecutor(factory)); + } + } + + public Executor ofServer() { + return serverWorker; + } + + public Executor of() { + return this; + } + + public Executor of(String ns) { + cursor++; + return workers.get(ns.hashCode() % size); + } + + @Override + public void execute(Runnable command) { + workers.get(cursor++ % size).execute(command); + } + + @Override + public void close() { + for (ExecutorService service : workers) { + service.shutdown(); + } + } + + public void awaitClose(long mills) throws InterruptedException { + for (ExecutorService service : workers) { + Preconditions.checkState(service.awaitTermination(mills, TimeUnit.MILLISECONDS)); + } + } + + public static class ServerWorker implements Executor { + + @Override + public void execute(Runnable command) { + if (Bukkit.isPrimaryThread()) { + command.run(); + } else { + Bukkit.getScheduler().runTask(ORM.plugin, command); + } + } + } +} diff --git a/src/main/java/com/mengcraft/simpleorm/ORM.java b/src/main/java/com/mengcraft/simpleorm/ORM.java index d0194df..2d1984e 100644 --- a/src/main/java/com/mengcraft/simpleorm/ORM.java +++ b/src/main/java/com/mengcraft/simpleorm/ORM.java @@ -10,6 +10,7 @@ import com.mengcraft.simpleorm.provider.IRedisProvider; import com.mengcraft.simpleorm.redis.RedisProviders; import com.zaxxer.hikari.HikariDataSource; +import lombok.Getter; import lombok.NonNull; import lombok.SneakyThrows; import lombok.val; @@ -25,7 +26,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import java.util.logging.Level; import static com.mengcraft.simpleorm.lib.Tuple.tuple; @@ -38,9 +41,11 @@ public class ORM extends JavaPlugin { private static RedisWrapper globalRedisWrapper; private static MongoWrapper globalMongoWrapper; private static DataSource sharedDs; - private static ORM plugin; + static ORM plugin; private static IDataSourceProvider dataSourceProvider = new DataSourceProvider(); private static IRedisProvider redisProvider; + @Getter + private static FluxWorkers workers; @Override public void onLoad() { @@ -78,6 +83,7 @@ private static void loadExtLibrary(JavaPlugin plugin) { @SneakyThrows public void onEnable() { new MetricsLite(this); + workers = new FluxWorkers(getConfig().getInt("cpus", 8)); if (nil(globalRedisWrapper)) { if (redisProvider == null) { FileConfiguration config = getConfig(); @@ -97,6 +103,16 @@ public void onEnable() { getLogger().info("Welcome!"); } + @Override + public void onDisable() { + workers.close(); + try { + workers.awaitClose(Integer.MAX_VALUE); + } catch (InterruptedException e) { + getLogger().log(Level.WARNING, "Error occurred while await workers closed", e); + } + } + public static boolean nil(Object any) { return any == null; } @@ -251,4 +267,20 @@ public static void setRedisProvider(IRedisProvider redisProvider) { Preconditions.checkState(!isFullyEnabled(), "Cannot be set after ORM enabled"); ORM.redisProvider = redisProvider; } + + public static CompletableFuture enqueue(Runnable runnable) { + return CompletableFuture.runAsync(runnable, workers.of()); + } + + public static CompletableFuture enqueue(String ns, Runnable runnable) { + return CompletableFuture.runAsync(runnable, workers.of(ns)); + } + + public static CompletableFuture enqueue(Supplier supplier) { + return CompletableFuture.supplyAsync(supplier, workers.of()); + } + + public static CompletableFuture enqueue(String ns, Supplier supplier) { + return CompletableFuture.supplyAsync(supplier, workers.of(ns)); + } } diff --git a/src/main/resources/config.yml b/src/main/resources/config.yml index 23823ca..1b79f5b 100644 --- a/src/main/resources/config.yml +++ b/src/main/resources/config.yml @@ -10,4 +10,6 @@ redis: password: null mongo: - url: mongodb://localhost \ No newline at end of file + url: mongodb://localhost + +cpus: 8 \ No newline at end of file