Skip to content

Commit c33da57

Browse files
committed
Adding support for cluster keyspace notifications to subscriber by allowing subscription to multiple nodes
1 parent fb4a630 commit c33da57

File tree

2 files changed

+62
-12
lines changed

2 files changed

+62
-12
lines changed

StackExchange.Redis/StackExchange/Redis/RedisChannel.cs

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public struct RedisChannel : IEquatable<RedisChannel>
1212

1313
internal readonly byte[] Value;
1414
internal readonly bool IsPatternBased;
15+
internal readonly bool IsKeyspaceChannel;
1516

1617
/// <summary>
1718
/// Indicates whether the channel-name is either null or a zero-length value
@@ -38,6 +39,7 @@ private RedisChannel(byte[] value, bool isPatternBased)
3839
{
3940
Value = value;
4041
IsPatternBased = isPatternBased;
42+
IsKeyspaceChannel = value != null && Encoding.UTF8.GetString(value).ToLower().StartsWith("__key");
4143
}
4244

4345
private static bool DeterminePatternBased(byte[] value, PatternMode mode)

StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs

+60-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Net;
45
using System.Threading;
56
using System.Threading.Tasks;
@@ -144,7 +145,7 @@ internal long ValidateSubscriptions()
144145
private sealed class Subscription
145146
{
146147
private Action<RedisChannel, RedisValue> handler;
147-
private ServerEndPoint owner;
148+
private List<ServerEndPoint> owners = new List<ServerEndPoint>();
148149

149150
public Subscription(Action<RedisChannel, RedisValue> value) => handler = value;
150151

@@ -170,33 +171,81 @@ public bool Remove(Action<RedisChannel, RedisValue> value)
170171
}
171172

172173
public Task SubscribeToServer(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
174+
{
175+
// subscribe to all masters in cluster for keyspace/keyevent notifications
176+
if (channel.IsKeyspaceChannel) {
177+
return SubscribeToMasters(multiplexer, channel, flags, asyncState, internalCall);
178+
}
179+
return SubscribeToSingleServer(multiplexer, channel, flags, asyncState, internalCall);
180+
}
181+
182+
private Task SubscribeToSingleServer(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
173183
{
174184
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
175185
var selected = multiplexer.SelectServer(-1, cmd, flags, default(RedisKey));
176186

177-
if (selected == null || Interlocked.CompareExchange(ref owner, selected, null) != null) return null;
187+
lock (owners)
188+
{
189+
if (selected == null || owners.Contains(selected)) return null;
190+
owners.Add(selected);
191+
}
178192

179193
var msg = Message.Create(-1, flags, cmd, channel);
180-
194+
if (internalCall) msg.SetInternalCall();
181195
return selected.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
182196
}
183197

198+
private Task SubscribeToMasters(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
199+
{
200+
List<Task> subscribeTasks = new List<Task>();
201+
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
202+
var masters = multiplexer.GetServerSnapshot().Where(s => !s.IsSlave && s.EndPoint.Equals(s.ClusterConfiguration.Origin));
203+
204+
lock (owners)
205+
{
206+
foreach (var master in masters)
207+
{
208+
if (owners.Contains(master)) continue;
209+
owners.Add(master);
210+
var msg = Message.Create(-1, flags, cmd, channel);
211+
if (internalCall) msg.SetInternalCall();
212+
subscribeTasks.Add(master.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState));
213+
}
214+
}
215+
216+
return Task.WhenAll(subscribeTasks);
217+
}
218+
184219
public Task UnsubscribeFromServer(RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall)
185220
{
186-
var oldOwner = Interlocked.Exchange(ref owner, null);
187-
if (oldOwner == null) return null;
221+
if (owners.Count == 0) return null;
188222

223+
List<Task> queuedTasks = new List<Task>();
189224
var cmd = channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE;
190225
var msg = Message.Create(-1, flags, cmd, channel);
191226
if (internalCall) msg.SetInternalCall();
192-
return oldOwner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState);
227+
foreach (var owner in owners)
228+
queuedTasks.Add(owner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState));
229+
owners.Clear();
230+
return Task.WhenAll(queuedTasks.ToArray());
193231
}
194232

195-
internal ServerEndPoint GetOwner() => Interlocked.CompareExchange(ref owner, null, null);
233+
internal ServerEndPoint GetOwner()
234+
{
235+
var owner = owners?[0]; // we subscribe to arbitrary server, so why not return one
236+
return Interlocked.CompareExchange(ref owner, null, null);
237+
}
196238

197239
internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
198240
{
199-
if (server != null && Interlocked.CompareExchange(ref owner, server, server) == server)
241+
bool hasOwner;
242+
243+
lock (owners)
244+
{
245+
hasOwner = owners.Contains(server);
246+
}
247+
248+
if (server != null && hasOwner)
200249
{
201250
var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE;
202251
var msg = Message.Create(-1, CommandFlags.FireAndForget, cmd, channel);
@@ -208,16 +257,15 @@ internal void Resubscribe(RedisChannel channel, ServerEndPoint server)
208257
internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel)
209258
{
210259
bool changed = false;
211-
var oldOwner = Interlocked.CompareExchange(ref owner, null, null);
212-
if (oldOwner != null && !oldOwner.IsSelectable(RedisCommand.PSUBSCRIBE))
260+
if (owners.Count != 0 && !owners.All(o => o.IsSelectable(RedisCommand.PSUBSCRIBE)))
213261
{
214262
if (UnsubscribeFromServer(channel, CommandFlags.FireAndForget, null, true) != null)
215263
{
216264
changed = true;
217265
}
218-
oldOwner = null;
266+
owners.Clear();
219267
}
220-
if (oldOwner == null && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null)
268+
if (owners.Count == 0 && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null)
221269
{
222270
changed = true;
223271
}

0 commit comments

Comments
 (0)