Skip to content

Commit

Permalink
make RedisWrapper.subscribe more safe
Browse files Browse the repository at this point in the history
  • Loading branch information
caoli5288 committed May 27, 2020
1 parent fe5fd38 commit 11cb2ef
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 49 deletions.
105 changes: 65 additions & 40 deletions src/main/java/com/mengcraft/simpleorm/RedisWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,26 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.mengcraft.simpleorm.lib.Utils;
import com.mengcraft.simpleorm.redis.RedisLiveObjectBucket;
import com.mengcraft.simpleorm.redis.RedisMessageTopic;
import lombok.Cleanup;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.Client;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.params.SetParams;

import java.io.Closeable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
Expand All @@ -28,7 +31,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -37,7 +39,7 @@
public class RedisWrapper implements Closeable {

private final JedisResources resources;
private MessageFilter messageFilter;
private MessageFilter filter;

private RedisWrapper(JedisPool pool) {
resources = new GenericJedisResources(pool);
Expand Down Expand Up @@ -134,61 +136,58 @@ public void subscribe(String channel, Consumer<byte[]> consumer) {
}

public synchronized void subscribe(String channel, Consumer<byte[]> consumer, Executor executor) {
if (nil(messageFilter)) {
messageFilter = new MessageFilter();
messageFilter.handled.put(channel, consumer);
executor.execute(() -> open(client -> client.subscribe(messageFilter, channel.getBytes(StandardCharsets.UTF_8))));
} else if (!messageFilter.handled.containsKey(channel)) {
messageFilter.handled.put(channel, consumer);
messageFilter.subscribe(channel.getBytes(StandardCharsets.UTF_8));
if (nil(filter)) {
filter = new MessageFilter();
Jedis client = resources.getResource();
client.subscribe(filter, new byte[0]);// hacked
filter.addSubscriber(channel, consumer);
executor.execute(() -> {
try {
filter.execute();
} finally {
client.close();
}
});
} else if (!filter.isSubscribed(channel, consumer)) {
filter.addSubscriber(channel, consumer);
}
}

public synchronized void unsubscribeAll() {
if (nil(messageFilter)) {
if (nil(filter)) {
return;
}
messageFilter.unsubscribe();
messageFilter = null;
filter.unsubscribe();
filter = null;
}

@SneakyThrows
public synchronized void unsubscribe(String channel) {
if (nil(messageFilter)) {
if (nil(filter)) {
return;
}

if (messageFilter.handled.removeAll(channel).isEmpty()) {// no op
if (filter.subscribers.removeAll(channel).isEmpty()) {// no op
return;
}

messageFilter.unsubscribe(channel.getBytes(StandardCharsets.UTF_8));
if (!messageFilter.handled.isEmpty()) {
filter.unsubscribe(channel.getBytes(StandardCharsets.UTF_8));
if (!filter.subscribers.isEmpty()) {
return;
}

messageFilter = null;
filter = null;
}

public synchronized void unsubscribe(String channel, Consumer<byte[]> consumer) {
if (nil(messageFilter)) {
if (nil(filter)) {
return;
}

if (!messageFilter.handled.remove(channel, consumer)) {// no op
return;
}

if (messageFilter.handled.containsKey(channel)) {// if still handled any
return;
int subscribers = filter.removeSubscriber(channel, consumer);
if (subscribers == 0) {
filter = null;
}

messageFilter.unsubscribe(channel.getBytes(StandardCharsets.UTF_8));
if (!messageFilter.handled.isEmpty()) {
return;
}

messageFilter = null;
}

public void set(String key, Object obj) {
Expand Down Expand Up @@ -270,11 +269,27 @@ public void close() {

private static class MessageFilter extends BinaryJedisPubSub {

private final Multimap<String, Consumer<byte[]>> handled = HashMultimap.create();
private static final Method METHOD_process = Utils.getAccessibleMethod(BinaryJedisPubSub.class, "process", Client.class);
private static final Field FIELD_client = Utils.getAccessibleField(BinaryJedisPubSub.class, "client");

private final Multimap<String, Consumer<byte[]>> subscribers = HashMultimap.create();

void addSubscriber(String channel, Consumer<byte[]> consumer) {
if (!subscribers.containsKey(channel)) {
subscribe(channel.getBytes(StandardCharsets.UTF_8));
}
subscribers.put(channel, consumer);
}

int removeSubscriber(String channel, Consumer<byte[]> consumer) {
if (subscribers.remove(channel, consumer) && !subscribers.containsKey(channel)) {
unsubscribe(channel.getBytes(StandardCharsets.UTF_8));
}
return subscribers.size();
}

@SneakyThrows
public void onMessage(byte[] channel, byte[] message) {
Collection<Consumer<byte[]>> allConsumer = handled.get(new String(channel, StandardCharsets.UTF_8));
Collection<Consumer<byte[]>> allConsumer = subscribers.get(new String(channel, StandardCharsets.UTF_8));
if (allConsumer.isEmpty()) {
return;
}
Expand All @@ -284,14 +299,24 @@ public void onMessage(byte[] channel, byte[] message) {

@Override
@SneakyThrows
public void subscribe(@NonNull byte[]... channels) {
public void proceed(Client client, byte[]... channels) {
FIELD_client.set(this, client);
}

@SneakyThrows
void execute() {
Client client = (Client) FIELD_client.get(this);
client.setTimeoutInfinite();
try {
super.subscribe(channels);
} catch (NullPointerException e) {
TimeUnit.MILLISECONDS.sleep(0);
subscribe(channels);
METHOD_process.invoke(this, client);
} finally {
client.rollbackTimeout();
}
}

public boolean isSubscribed(String channel, Consumer<byte[]> consumer) {
return subscribers.containsEntry(channel, consumer);
}
}

}
10 changes: 1 addition & 9 deletions src/main/java/com/mengcraft/simpleorm/lib/GsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import jdk.nashorn.api.scripting.ScriptObjectMirror;
import lombok.SneakyThrows;
import org.bukkit.configuration.serialization.ConfigurationSerializable;

import java.lang.reflect.Field;
Expand All @@ -29,7 +28,7 @@

public class GsonUtils {

private static final Field PRIMITIVE_VALUE = REFLECT_PRIMITIVE_VALUE();
private static final Field PRIMITIVE_VALUE = Utils.getAccessibleField(JsonPrimitive.class, "value");
private static TypeFunctionRegistry<Object> registry = new TypeFunctionRegistry<>();

static {
Expand All @@ -51,13 +50,6 @@ public class GsonUtils {
});
}

@SneakyThrows
private static Field REFLECT_PRIMITIVE_VALUE() {
Field value = JsonPrimitive.class.getDeclaredField("value");
value.setAccessible(true);
return value;
}

public static Object dump(JsonElement value) {
return registry.handle(value);
}
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/mengcraft/simpleorm/lib/Utils.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
package com.mengcraft.simpleorm.lib;

import lombok.SneakyThrows;

import java.lang.reflect.Field;
import java.lang.reflect.Method;

public class Utils {

public static boolean isNullOrEmpty(String msg) {
return msg == null || msg.isEmpty();
}

@SneakyThrows
public static Method getAccessibleMethod(Class<?> cls, String methodName, Class<?>... classes) {
Method method = cls.getDeclaredMethod(methodName, classes);
method.setAccessible(true);
return method;
}

@SneakyThrows
public static Field getAccessibleField(Class<?> cls, String fieldName) {
Field field = cls.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
}
}

0 comments on commit 11cb2ef

Please sign in to comment.