Skip to content

Commit

Permalink
FluxWorkers.java
Browse files Browse the repository at this point in the history
  • Loading branch information
caoli5288 committed Dec 15, 2020
1 parent aa2fe8c commit 41d2c07
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 2 deletions.
76 changes: 76 additions & 0 deletions src/main/java/com/mengcraft/simpleorm/FluxWorkers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.mengcraft.simpleorm;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.bukkit.Bukkit;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class FluxWorkers implements Executor, Closeable {

private final ServerWorker serverWorker = new ServerWorker();
private final int size;
private final List<ExecutorService> workers;
private int cursor;

public FluxWorkers(int size) {
this.size = size;
workers = Lists.newArrayListWithCapacity(size);
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("SimpleORM/FancyWorkers/%s")
.build();
for (int i = 0; i < size; i++) {
workers.add(Executors.newSingleThreadExecutor(factory));
}
}

public Executor ofServer() {
return serverWorker;
}

public Executor of() {
return this;
}

public Executor of(String ns) {
cursor++;
return workers.get(ns.hashCode() % size);
}

@Override
public void execute(Runnable command) {
workers.get(cursor++ % size).execute(command);
}

@Override
public void close() {
for (ExecutorService service : workers) {
service.shutdown();
}
}

public void awaitClose(long mills) throws InterruptedException {
for (ExecutorService service : workers) {
Preconditions.checkState(service.awaitTermination(mills, TimeUnit.MILLISECONDS));
}
}

public static class ServerWorker implements Executor {

@Override
public void execute(Runnable command) {
if (Bukkit.isPrimaryThread()) {
command.run();
} else {
Bukkit.getScheduler().runTask(ORM.plugin, command);
}
}
}
}
34 changes: 33 additions & 1 deletion src/main/java/com/mengcraft/simpleorm/ORM.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.mengcraft.simpleorm.provider.IRedisProvider;
import com.mengcraft.simpleorm.redis.RedisProviders;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.val;
Expand All @@ -25,7 +26,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.logging.Level;

import static com.mengcraft.simpleorm.lib.Tuple.tuple;

Expand All @@ -38,9 +41,11 @@ public class ORM extends JavaPlugin {
private static RedisWrapper globalRedisWrapper;
private static MongoWrapper globalMongoWrapper;
private static DataSource sharedDs;
private static ORM plugin;
static ORM plugin;
private static IDataSourceProvider dataSourceProvider = new DataSourceProvider();
private static IRedisProvider redisProvider;
@Getter
private static FluxWorkers workers;

@Override
public void onLoad() {
Expand Down Expand Up @@ -78,6 +83,7 @@ private static void loadExtLibrary(JavaPlugin plugin) {
@SneakyThrows
public void onEnable() {
new MetricsLite(this);
workers = new FluxWorkers(getConfig().getInt("cpus", 8));
if (nil(globalRedisWrapper)) {
if (redisProvider == null) {
FileConfiguration config = getConfig();
Expand All @@ -97,6 +103,16 @@ public void onEnable() {
getLogger().info("Welcome!");
}

@Override
public void onDisable() {
workers.close();
try {
workers.awaitClose(Integer.MAX_VALUE);
} catch (InterruptedException e) {
getLogger().log(Level.WARNING, "Error occurred while await workers closed", e);
}
}

public static boolean nil(Object any) {
return any == null;
}
Expand Down Expand Up @@ -251,4 +267,20 @@ public static void setRedisProvider(IRedisProvider redisProvider) {
Preconditions.checkState(!isFullyEnabled(), "Cannot be set after ORM enabled");
ORM.redisProvider = redisProvider;
}

public static CompletableFuture<Void> enqueue(Runnable runnable) {
return CompletableFuture.runAsync(runnable, workers.of());
}

public static CompletableFuture<Void> enqueue(String ns, Runnable runnable) {
return CompletableFuture.runAsync(runnable, workers.of(ns));
}

public static <T> CompletableFuture<T> enqueue(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier, workers.of());
}

public static <T> CompletableFuture<T> enqueue(String ns, Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier, workers.of(ns));
}
}
4 changes: 3 additions & 1 deletion src/main/resources/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ redis:
password: null

mongo:
url: mongodb://localhost
url: mongodb://localhost

cpus: 8

0 comments on commit 41d2c07

Please sign in to comment.