From 7b0d3c335376b14e04a2997e8372402ee8c89817 Mon Sep 17 00:00:00 2001 From: ErcanGorgulu Date: Wed, 22 Jul 2020 12:37:45 +0300 Subject: [PATCH 1/2] Adding support for cluster keyspace notifications to subscriber by allowing subscription to multiple nodes --- src/StackExchange.Redis/RedisChannel.cs | 22 +++- src/StackExchange.Redis/RedisSubscriber.cs | 131 ++++++++++++++++----- 2 files changed, 121 insertions(+), 32 deletions(-) diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index b830e6f27..4ea0d0576 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Text; namespace StackExchange.Redis @@ -8,8 +9,12 @@ namespace StackExchange.Redis /// public readonly struct RedisChannel : IEquatable { + private static readonly byte[] __keyBytes = Encoding.UTF8.GetBytes("__key"); + private static readonly byte[] __KEYBytes = Encoding.UTF8.GetBytes("__KEY"); + internal readonly byte[] Value; internal readonly bool IsPatternBased; + internal readonly bool IsKeyspaceChannel; /// /// Indicates whether the channel-name is either null or a zero-length value @@ -23,19 +28,28 @@ namespace StackExchange.Redis /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[] value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) {} + public RedisChannel(byte[] value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { } /// /// Create a new redis channel from a string, explicitly controlling the pattern mode /// /// The string name of the channel to create. /// The mode for name matching. - public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) {} + public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { } private RedisChannel(byte[] value, bool isPatternBased) { Value = value; IsPatternBased = isPatternBased; + if (value != null && value.Length >= __keyBytes.Length) + { + var prefix = new ArraySegment(value, 0, 5); + IsKeyspaceChannel = prefix.SequenceEqual(__keyBytes) || prefix.SequenceEqual(__KEYBytes); + } + else + { + IsKeyspaceChannel = false; + } } private static bool DeterminePatternBased(byte[] value, PatternMode mode) @@ -244,7 +258,7 @@ public static implicit operator RedisChannel(byte[] key) /// /// The channel to get a byte[] from. #pragma warning disable RCS1231 // Make parameter ref read-only. - public API - public static implicit operator byte[] (RedisChannel key) => key.Value; + public static implicit operator byte[](RedisChannel key) => key.Value; #pragma warning restore RCS1231 // Make parameter ref read-only. /// @@ -252,7 +266,7 @@ public static implicit operator RedisChannel(byte[] key) /// /// The channel to get a string from. #pragma warning disable RCS1231 // Make parameter ref read-only. - public API - public static implicit operator string (RedisChannel key) + public static implicit operator string(RedisChannel key) #pragma warning restore RCS1231 // Make parameter ref read-only. { var arr = key.Value; diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 40684f105..fcb4aeea0 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Threading; @@ -191,7 +192,7 @@ internal sealed class Subscription { private Action _handlers; private ChannelMessageQueue _queues; - private ServerEndPoint owner; + private readonly HashSet owners = new HashSet(); public void Add(Action handler, ChannelMessageQueue queue) { @@ -221,14 +222,51 @@ public bool Remove(Action handler, ChannelMessageQueue public Task SubscribeToServer(ConnectionMultiplexer multiplexer, in RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall) { - var selected = multiplexer.SelectServer(RedisCommand.SUBSCRIBE, flags, default(RedisKey)); + // subscribe to all masters in cluster for keyspace/keyevent notifications + if (channel.IsKeyspaceChannel) + { + return SubscribeToMasters(multiplexer, channel, flags, asyncState, internalCall); + } + return SubscribeToSelectedEndpoint(multiplexer, channel, flags, asyncState, internalCall); + } + + private Task SubscribeToMasters(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall) + { + var subscribeTasks = new List(); + var masters = multiplexer.GetServerSnapshot().ToArray() + .Where(s => !s.IsReplica && s.ClusterConfiguration != null && s.EndPoint.Equals(s.ClusterConfiguration.Origin)); + + lock (owners) + { + foreach (var master in masters) + { + if (owners.Contains(master)) continue; + owners.Add(master); + + var state = SubscribeToSelectedEndpoint(multiplexer, channel, flags, asyncState, internalCall); + subscribeTasks.Add(state ?? CompletedTask.Default(asyncState)); + } + } + + return Task.WhenAll(subscribeTasks); + } + + private Task SubscribeToSelectedEndpoint(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall) + { + var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE; + var selected = multiplexer.SelectServer(cmd, flags, default(RedisKey)); var bridge = selected?.GetBridge(ConnectionType.Subscription, true); if (bridge == null) return null; - // note: check we can create the message validly *before* we swap the owner over (Interlocked) var state = PendingSubscriptionState.Create(channel, this, flags, true, internalCall, asyncState, selected.IsReplica); - if (Interlocked.CompareExchange(ref owner, selected, null) != null) return null; + lock (owners) + { + if (!owners.Add(selected)) + { + return null; + } + } try { if (!bridge.TryEnqueueBackgroundSubscriptionWrite(state)) @@ -241,25 +279,48 @@ public Task SubscribeToServer(ConnectionMultiplexer multiplexer, in RedisChannel catch { // clear the owner if it is still us - Interlocked.CompareExchange(ref owner, null, selected); + lock (owners) + { + owners.Remove(selected); + } throw; } } public Task UnsubscribeFromServer(in RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall) { - var oldOwner = Interlocked.Exchange(ref owner, null); - var bridge = oldOwner?.GetBridge(ConnectionType.Subscription, false); - if (bridge == null) return null; + lock (owners) + { + if (owners.Count == 0) return null; + } - var state = PendingSubscriptionState.Create(channel, this, flags, false, internalCall, asyncState, oldOwner.IsReplica); + var queuedTasks = new List(); - if (!bridge.TryEnqueueBackgroundSubscriptionWrite(state)) + var cmd = channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE; + + var msg = Message.Create(-1, flags, cmd, channel); + if (internalCall) msg.SetInternalCall(); + + lock (owners) { - state.Abort(); - return null; + foreach (var owner in owners) + { + var bridge = owner?.GetBridge(ConnectionType.Subscription, false); + if (bridge == null) return null; + + var state = PendingSubscriptionState.Create(channel, this, flags, false, internalCall, asyncState, owner.IsReplica); + + if (!bridge.TryEnqueueBackgroundSubscriptionWrite(state)) + { + state.Abort(); + return null; + } + queuedTasks.Add(state.Task); + } + + owners.Clear(); } - return state.Task; + return Task.WhenAll(queuedTasks); } internal readonly struct PendingSubscriptionState @@ -294,36 +355,50 @@ private PendingSubscriptionState(object asyncState, RedisChannel channel, Subscr } } - internal ServerEndPoint GetOwner() => Volatile.Read(ref owner); + internal ServerEndPoint GetOwner() + { + lock (owners) + { + return owners.FirstOrDefault(); + } + } internal void Resubscribe(in RedisChannel channel, ServerEndPoint server) { - if (server != null && Interlocked.CompareExchange(ref owner, server, server) == server) + bool hasOwner; + lock (owners) { - var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE; - var msg = Message.Create(-1, CommandFlags.FireAndForget, cmd, channel); - msg.SetInternalCall(); + hasOwner = owners.Contains(server); + } + if (server == null || !hasOwner) + { + return; + } + var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE; + var msg = Message.Create(-1, CommandFlags.FireAndForget, cmd, channel); + msg.SetInternalCall(); #pragma warning disable CS0618 - server.WriteDirectFireAndForgetSync(msg, ResultProcessor.TrackSubscriptions); + server.WriteDirectFireAndForgetSync(msg, ResultProcessor.TrackSubscriptions); #pragma warning restore CS0618 - } } internal bool Validate(ConnectionMultiplexer multiplexer, in RedisChannel channel) { bool changed = false; - var oldOwner = Volatile.Read(ref owner); - if (oldOwner != null && !oldOwner.IsSelectable(RedisCommand.PSUBSCRIBE)) + lock (owners) { - if (UnsubscribeFromServer(channel, CommandFlags.FireAndForget, null, true) != null) + if (owners.Count != 0 && !owners.All(o => o.IsSelectable(RedisCommand.PSUBSCRIBE))) + { + if (UnsubscribeFromServer(channel, CommandFlags.FireAndForget, null, true) != null) + { + changed = true; + } + owners.Clear(); + } + if (owners.Count == 0 && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null) { changed = true; } - oldOwner = null; - } - if (oldOwner == null && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null) - { - changed = true; } return changed; } From 4ed8b2d05a0eb69ad4f56e371646b1927dce6680 Mon Sep 17 00:00:00 2001 From: Nick Craver Date: Sun, 27 Jun 2021 07:35:10 -0400 Subject: [PATCH 2/2] Whitespace change cleanup --- src/StackExchange.Redis/RedisChannel.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index 4ea0d0576..63cfbb0ca 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -28,14 +28,14 @@ namespace StackExchange.Redis /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[] value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { } + public RedisChannel(byte[] value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) {} /// /// Create a new redis channel from a string, explicitly controlling the pattern mode /// /// The string name of the channel to create. /// The mode for name matching. - public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { } + public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) {} private RedisChannel(byte[] value, bool isPatternBased) { @@ -258,7 +258,7 @@ public static implicit operator RedisChannel(byte[] key) /// /// The channel to get a byte[] from. #pragma warning disable RCS1231 // Make parameter ref read-only. - public API - public static implicit operator byte[](RedisChannel key) => key.Value; + public static implicit operator byte[] (RedisChannel key) => key.Value; #pragma warning restore RCS1231 // Make parameter ref read-only. /// @@ -266,7 +266,7 @@ public static implicit operator RedisChannel(byte[] key) /// /// The channel to get a string from. #pragma warning disable RCS1231 // Make parameter ref read-only. - public API - public static implicit operator string(RedisChannel key) + public static implicit operator string (RedisChannel key) #pragma warning restore RCS1231 // Make parameter ref read-only. { var arr = key.Value;