diff --git a/CSRedis/CSRedis.csproj b/CSRedis/CSRedis.csproj index b6a680c..71904f3 100644 --- a/CSRedis/CSRedis.csproj +++ b/CSRedis/CSRedis.csproj @@ -65,9 +65,12 @@ + + + diff --git a/CSRedis/Internal/Fakes/FakeRedisSocket.cs b/CSRedis/Internal/Fakes/FakeRedisSocket.cs index 5c15ae2..4d02d4c 100644 --- a/CSRedis/Internal/Fakes/FakeRedisSocket.cs +++ b/CSRedis/Internal/Fakes/FakeRedisSocket.cs @@ -6,6 +6,7 @@ using System.Net; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; namespace CSRedis.Internal.Fakes { @@ -20,6 +21,16 @@ class FakeRedisSocket : IRedisSocket public int SendTimeout { get; set; } + public EndPoint EndPoint + { + get + { + throw new NotImplementedException(); + } + } + + public bool SSL { get { return false; } } + public FakeRedisSocket(params string[] responses) : this(Encoding.UTF8, responses) { } @@ -35,7 +46,7 @@ public FakeRedisSocket(params byte[][] responses) _stream.AddResponse(response); } - public void Connect(EndPoint endpoint) + public void Connect() { _connected = true; } @@ -78,5 +89,10 @@ static byte[][] ToBytes(Encoding encoding, string[] strings) set[i] = encoding.GetBytes(strings[i]); return set; } + + public Task ConnectAsync() + { + throw new NotImplementedException(); + } } } diff --git a/CSRedis/Internal/IO/AsyncConnector.cs b/CSRedis/Internal/IO/AsyncConnector.cs index b27ff45..c67fad3 100644 --- a/CSRedis/Internal/IO/AsyncConnector.cs +++ b/CSRedis/Internal/IO/AsyncConnector.cs @@ -9,12 +9,11 @@ namespace CSRedis.Internal.IO { - class AsyncConnector : IDisposable + class AsyncConnector : IAsyncConnector { readonly SocketAsyncEventArgs _asyncConnectArgs; readonly SocketAsyncPool _asyncTransferPool; - readonly ConcurrentQueue _asyncReadQueue; - readonly ConcurrentQueue _asyncWriteQueue; + readonly IOQueue _ioQueue; readonly object _readLock; readonly object _writeLock; readonly int _concurrency; @@ -36,8 +35,7 @@ public AsyncConnector(IRedisSocket socket, EndPoint endpoint, RedisIO io, int co _bufferSize = bufferSize; _asyncTransferPool = new SocketAsyncPool(concurrency, bufferSize); _asyncTransferPool.Completed += OnSocketCompleted; - _asyncReadQueue = new ConcurrentQueue(); - _asyncWriteQueue = new ConcurrentQueue(); + _ioQueue = new IOQueue(); _readLock = new object(); _writeLock = new object(); _asyncConnectArgs = new SocketAsyncEventArgs { RemoteEndPoint = endpoint }; @@ -69,8 +67,8 @@ public Task ConnectAsync() public Task CallAsync(RedisCommand command) { var token = new RedisAsyncCommandToken(command); - _asyncWriteQueue.Enqueue(token); - ConnectAsync().ContinueWith(CallAsyncDeferred); + _ioQueue.Enqueue(token); + ConnectAsync().ContinueWith(CallAsync_Continued); return token.TaskSource.Task; } @@ -82,33 +80,32 @@ void InitConnection() _connectionTaskSource = new TaskCompletionSource(); } - void CallAsyncDeferred(Task t) + void CallAsync_Continued(Task t) { lock (_writeLock) { - IRedisAsyncCommandToken token; - if (!_asyncWriteQueue.TryDequeue(out token)) - throw new Exception(); + IRedisAsyncCommandToken token = _ioQueue.DequeueForWrite(); - _asyncReadQueue.Enqueue(token); - - var args = _asyncTransferPool.Acquire(); - int bytes; - try - { - bytes = _io.Writer.Write(token.Command, args.Buffer, args.Offset); - } - catch (ArgumentException e) - { - throw new RedisClientException("Could not write command '" + token.Command.Command + "'. Argument size exceeds buffer allocation of " + args.Count + ".", e); - } + SocketAsyncEventArgs args = _asyncTransferPool.Acquire(); + int bytes = TryWriteBuffer(token.Command, args.Buffer, args.Offset); args.SetBuffer(args.Offset, bytes); - if (!_redisSocket.SendAsync(args)) OnSocketSent(args); } } + int TryWriteBuffer(RedisCommand command, byte[] buffer, int offset) + { + try + { + return _io.Writer.Write(command, buffer, offset); + } + catch (ArgumentException e) + { + throw new RedisClientException("Could not write command '" + command.Command + "'. Argument size exceeds buffer size.", e); + } + } + void OnSocketCompleted(object sender, SocketAsyncEventArgs e) { switch (e.LastOperation) @@ -136,28 +133,23 @@ void OnSocketSent(SocketAsyncEventArgs args) { _asyncTransferPool.Release(args); - IRedisAsyncCommandToken token; lock (_readLock) { - if (_asyncReadQueue.TryDequeue(out token)) - { - try - { - token.SetResult(_io.Reader); - } - /*catch (IOException) // TODO implement async retry - { - if (ReconnectAttempts == 0) - throw; - Reconnect(); - _asyncWriteQueue.Enqueue(token); - ConnectAsync().ContinueWith(CallAsyncDeferred); - }*/ - catch (Exception e) - { - token.SetException(e); - } - } + IRedisAsyncCommandToken token = _ioQueue.DequeueForRead(); + TrySetResult(token); + } + } + + void TrySetResult(IRedisAsyncCommandToken token) + { + try + { + token.SetResult(_io.Reader); + } + // TODO: catch IOException and reconnect + catch (Exception e) + { + token.SetException(e); } } diff --git a/CSRedis/Internal/IO/IAsyncConnector.cs b/CSRedis/Internal/IO/IAsyncConnector.cs new file mode 100644 index 0000000..c695626 --- /dev/null +++ b/CSRedis/Internal/IO/IAsyncConnector.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace CSRedis.Internal.IO +{ + interface IAsyncConnector : IDisposable + { + Task ConnectAsync(); + Task CallAsync(RedisCommand command); + event EventHandler Connected; + } +} diff --git a/CSRedis/Internal/IO/IOQueue.cs b/CSRedis/Internal/IO/IOQueue.cs new file mode 100644 index 0000000..9ca7538 --- /dev/null +++ b/CSRedis/Internal/IO/IOQueue.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace CSRedis.Internal.IO +{ + class IOQueue + { + readonly ConcurrentQueue _readQueue; + readonly ConcurrentQueue _writeQueue; + + public IOQueue() + { + _readQueue = new ConcurrentQueue(); + _writeQueue = new ConcurrentQueue(); + } + + public void Enqueue(IRedisAsyncCommandToken token) + { + _writeQueue.Enqueue(token); + } + + public IRedisAsyncCommandToken DequeueForWrite() + { + IRedisAsyncCommandToken token; + if (!_writeQueue.TryDequeue(out token)) + throw new Exception(); + _readQueue.Enqueue(token); + return token; + } + + public IRedisAsyncCommandToken DequeueForRead() + { + IRedisAsyncCommandToken token; + if (!_readQueue.TryDequeue(out token)) + throw new Exception(); + return token; + } + } +} diff --git a/CSRedis/Internal/IO/IRedisSocket.cs b/CSRedis/Internal/IO/IRedisSocket.cs index 48e1441..87853b6 100644 --- a/CSRedis/Internal/IO/IRedisSocket.cs +++ b/CSRedis/Internal/IO/IRedisSocket.cs @@ -5,16 +5,20 @@ using System.Net; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; namespace CSRedis.Internal.IO { interface IRedisSocket : IDisposable { + EndPoint EndPoint { get; } + bool SSL { get; } bool Connected { get; } int ReceiveTimeout { get; set; } int SendTimeout { get; set; } - void Connect(EndPoint endpoint); + void Connect(); bool ConnectAsync(SocketAsyncEventArgs args); + Task ConnectAsync(); bool SendAsync(SocketAsyncEventArgs args); Stream GetStream(); } diff --git a/CSRedis/Internal/IO/RedisPooledSocket.cs b/CSRedis/Internal/IO/RedisPooledSocket.cs index 584ea09..296fbf5 100644 --- a/CSRedis/Internal/IO/RedisPooledSocket.cs +++ b/CSRedis/Internal/IO/RedisPooledSocket.cs @@ -5,6 +5,7 @@ using System.Net; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; namespace CSRedis.Internal.IO { @@ -14,6 +15,8 @@ class RedisPooledSocket : IRedisSocket readonly SocketPool _pool; public bool Connected { get { return _socket == null ? false : _socket.Connected; } } + public bool SSL { get { return false; } } + public EndPoint EndPoint { get { return _pool.EndPoint; } } public int ReceiveTimeout { @@ -32,7 +35,7 @@ public RedisPooledSocket(SocketPool pool) _pool = pool; } - public void Connect(EndPoint endpoint) + public void Connect() { _socket = _pool.Connect(); System.Diagnostics.Debug.WriteLine("Got socket #{0}", _socket.Handle); @@ -57,5 +60,10 @@ public void Dispose() { _pool.Release(_socket); } + + public Task ConnectAsync() + { + throw new NotImplementedException(); + } } } diff --git a/CSRedis/Internal/IO/RedisSocket.cs b/CSRedis/Internal/IO/RedisSocket.cs index 4017bef..7ab6af9 100644 --- a/CSRedis/Internal/IO/RedisSocket.cs +++ b/CSRedis/Internal/IO/RedisSocket.cs @@ -6,14 +6,18 @@ using System.Net.Security; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; namespace CSRedis.Internal.IO { class RedisSocket : IRedisSocket { readonly bool _ssl; + readonly EndPoint _endPoint; Socket _socket; - EndPoint _remote; + + public bool SSL { get { return _ssl; } } + public EndPoint EndPoint { get { return _endPoint; } } public bool Connected { get { return _socket == null ? false : _socket.Connected; } } @@ -29,23 +33,37 @@ public int SendTimeout set { _socket.SendTimeout = value; } } - public RedisSocket(bool ssl) + public RedisSocket(EndPoint endPoint, bool ssl) { _ssl = ssl; + _endPoint = endPoint; } - public void Connect(EndPoint endpoint) + public void Connect() { - InitSocket(endpoint); - _socket.Connect(endpoint); + InitSocket(); + _socket.Connect(_endPoint); } public bool ConnectAsync(SocketAsyncEventArgs args) { - InitSocket(args.RemoteEndPoint); + args.RemoteEndPoint = _endPoint; + InitSocket(); return _socket.ConnectAsync(args); } + public Task ConnectAsync() + { + InitSocket(); + var tcs = new TaskCompletionSource(); + _socket.BeginConnect(_endPoint, iar => + { + _socket.EndConnect(iar); + tcs.SetResult(_socket.Connected); + }, null); + return tcs.Task; + } + public bool SendAsync(SocketAsyncEventArgs args) { return _socket.SendAsync(args); @@ -67,23 +85,22 @@ public void Dispose() _socket.Dispose(); } - void InitSocket(EndPoint endpoint) + void InitSocket() { if (_socket != null) _socket.Dispose(); _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - _remote = endpoint; } string GetHostForAuthentication() { - if (_remote == null) + if (_endPoint == null) throw new ArgumentNullException("Remote endpoint is not set"); - else if (_remote is DnsEndPoint) - return (_remote as DnsEndPoint).Host; - else if (_remote is IPEndPoint) - return (_remote as IPEndPoint).Address.ToString(); + else if (_endPoint is DnsEndPoint) + return (_endPoint as DnsEndPoint).Host; + else if (_endPoint is IPEndPoint) + return (_endPoint as IPEndPoint).Address.ToString(); throw new InvalidOperationException("Cannot get remote host"); } diff --git a/CSRedis/Internal/IO/RedisWriter.cs b/CSRedis/Internal/IO/RedisWriter.cs index 3c0328a..b12e071 100644 --- a/CSRedis/Internal/IO/RedisWriter.cs +++ b/CSRedis/Internal/IO/RedisWriter.cs @@ -4,6 +4,7 @@ using System.IO; using System.Linq; using System.Text; +using System.Threading.Tasks; namespace CSRedis.Internal.IO { @@ -28,6 +29,19 @@ public int Write(RedisCommand command, Stream stream) return data.Length; } + public Task WriteAsync(RedisCommand command, Stream stream) + { + var tcs = new TaskCompletionSource(); + string prepared = Prepare(command); + byte[] data = _io.Encoding.GetBytes(prepared); + stream.BeginWrite(data, 0, data.Length, iar => + { + stream.EndWrite(iar); + tcs.SetResult(data.Length); + }, null); + return tcs.Task; + } + public int Write(RedisCommand command, byte[] buffer, int offset) { string prepared = Prepare(command); diff --git a/CSRedis/Internal/IO/SSLAsyncConnector.cs b/CSRedis/Internal/IO/SSLAsyncConnector.cs new file mode 100644 index 0000000..fb2530b --- /dev/null +++ b/CSRedis/Internal/IO/SSLAsyncConnector.cs @@ -0,0 +1,94 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace CSRedis.Internal.IO +{ + class SSLAsyncConnector : IAsyncConnector + { + readonly IRedisSocket _socket; + readonly RedisIO _io; + readonly EndPoint _endPoint; + readonly IOQueue _ioQueue; + readonly object _readLock; + readonly object _writeLock; + TaskCompletionSource _connected; + bool _connecting; + + public event EventHandler Connected; + + public SSLAsyncConnector(IRedisSocket socket, EndPoint endPoint, RedisIO io) + { + _socket = socket; + _io = io; + _endPoint = endPoint; + _ioQueue = new IOQueue(); + _writeLock = new object(); + _readLock = new object(); + _connected = new TaskCompletionSource(); + } + + public Task ConnectAsync() + { + if (_connecting) + return _connected.Task; + + _connecting = true; + + return _socket.ConnectAsync().ContinueWith(t => + { + if (Connected != null) + Connected(this, new EventArgs()); + _connected.SetResult(t.Result); + return t.Result; + }); + + } + + public Task CallAsync(RedisCommand command) + { + var token = new RedisAsyncCommandToken(command); + _ioQueue.Enqueue(token); + ConnectAsync().ContinueWith(CallAsync_Continued); + return token.TaskSource.Task; + } + + public void Dispose() + { } + + void CallAsync_Continued(Task t) + { + lock (_writeLock) + { + IRedisAsyncCommandToken token = _ioQueue.DequeueForWrite(); + _io.Writer.WriteAsync(token.Command, _io.Stream).ContinueWith(WriteAsync_Continued); + } + } + + void WriteAsync_Continued(Task t) + { + lock (_readLock) + { + IRedisAsyncCommandToken token = _ioQueue.DequeueForRead(); + TrySetResult(token); + } + } + + void TrySetResult(IRedisAsyncCommandToken token) + { + try + { + token.SetResult(_io.Reader); + } + // TODO: catch IOException and reconnect + catch (Exception e) + { + token.SetException(e); + } + } + } +} diff --git a/CSRedis/Internal/IO/SocketPool.cs b/CSRedis/Internal/IO/SocketPool.cs index 07410af..afd36c1 100644 --- a/CSRedis/Internal/IO/SocketPool.cs +++ b/CSRedis/Internal/IO/SocketPool.cs @@ -14,6 +14,8 @@ class SocketPool : IDisposable readonly ConcurrentStack _pool; readonly int _max; + public EndPoint EndPoint { get { return _endPoint; } } + public SocketPool(EndPoint endPoint, int max) { _max = max; diff --git a/CSRedis/Internal/RedisConnector.cs b/CSRedis/Internal/RedisConnector.cs index fc6b47a..eaba5d3 100644 --- a/CSRedis/Internal/RedisConnector.cs +++ b/CSRedis/Internal/RedisConnector.cs @@ -16,14 +16,13 @@ class RedisConnector { readonly int _concurrency; readonly int _bufferSize; - readonly Lazy _asyncConnector; + readonly Lazy _asyncConnector; readonly IRedisSocket _redisSocket; readonly EndPoint _endPoint; readonly RedisIO _io; public event EventHandler Connected; - public AsyncConnector Async { get { return _asyncConnector.Value; } } public bool IsConnected { get { return _redisSocket.Connected; } } public EndPoint EndPoint { get { return _endPoint; } } public bool IsPipelined { get { return _io.IsPipelined; } } @@ -44,7 +43,7 @@ public Encoding Encoding get { return _io.Encoding; } set { _io.Encoding = value; } } - + IAsyncConnector _async { get { return _asyncConnector.Value; } } public RedisConnector(EndPoint endPoint, IRedisSocket socket, int concurrency, int bufferSize) { @@ -53,12 +52,12 @@ public RedisConnector(EndPoint endPoint, IRedisSocket socket, int concurrency, i _endPoint = endPoint; _redisSocket = socket; _io = new RedisIO(); - _asyncConnector = new Lazy(AsyncConnectorFactory); + _asyncConnector = new Lazy(AsyncConnectorFactory); } public bool Connect() { - _redisSocket.Connect(_endPoint); + _redisSocket.Connect(); if (_redisSocket.Connected) OnConnected(); @@ -68,7 +67,7 @@ public bool Connect() public Task ConnectAsync() { - return Async.ConnectAsync(); + return _async.ConnectAsync(); } public T Call(RedisCommand command) @@ -94,7 +93,7 @@ public T Call(RedisCommand command) public Task CallAsync(RedisCommand command) { - return Async.CallAsync(command); + return _async.CallAsync(command); } public void Write(RedisCommand command) @@ -210,9 +209,14 @@ void OnAsyncConnected(object sender, EventArgs args) OnConnected(); } - AsyncConnector AsyncConnectorFactory() + IAsyncConnector AsyncConnectorFactory() { - var connector = new AsyncConnector(_redisSocket, _endPoint, _io, _concurrency, _bufferSize); + IAsyncConnector connector; + if (_redisSocket.SSL) + connector = new SSLAsyncConnector(_redisSocket, _endPoint, _io); + else + connector = new AsyncConnector(_redisSocket, _endPoint, _io, _concurrency, _bufferSize); + connector.Connected += OnAsyncConnected; return connector; } diff --git a/CSRedis/RedisClient.cs b/CSRedis/RedisClient.cs index 8aefebb..d87c051 100644 --- a/CSRedis/RedisClient.cs +++ b/CSRedis/RedisClient.cs @@ -197,7 +197,7 @@ public RedisClient(EndPoint endpoint, int asyncConcurrency, int asyncBufferSize) /// Max concurrent threads (default 1000) /// Async thread buffer size (default 10240 bytes) public RedisClient(EndPoint endpoint, bool ssl, int asyncConcurrency, int asyncBufferSize) - : this(new RedisSocket(ssl), endpoint, asyncConcurrency, asyncBufferSize) + : this(new RedisSocket(endpoint, ssl), endpoint, asyncConcurrency, asyncBufferSize) { } internal RedisClient(IRedisSocket socket, EndPoint endpoint) diff --git a/CSRedis/RedisConnectionPool.cs b/CSRedis/RedisConnectionPool.cs index bf22e68..77f3501 100644 --- a/CSRedis/RedisConnectionPool.cs +++ b/CSRedis/RedisConnectionPool.cs @@ -7,6 +7,7 @@ using System.Net; using System.Net.Sockets; using System.Text; +using System.Threading.Tasks; namespace CSRedis { @@ -95,6 +96,8 @@ class RedisPooledSocket : IRedisSocket readonly SocketPool _pool; public bool Connected { get { return _socket == null ? false : _socket.Connected; } } + public EndPoint EndPoint { get { return _pool.EndPoint; } } + public bool SSL { get { return false; } } public int ReceiveTimeout { @@ -113,7 +116,7 @@ public RedisPooledSocket(SocketPool pool) _pool = pool; } - public void Connect(EndPoint endpoint) + public void Connect() { _socket = _pool.Connect(); System.Diagnostics.Debug.WriteLine("Got socket #{0}", _socket.Handle); @@ -138,5 +141,10 @@ public void Dispose() { _pool.Release(_socket); } + + public Task ConnectAsync() + { + throw new NotImplementedException(); + } } } diff --git a/CSRedis/RedisSentinelClient.cs b/CSRedis/RedisSentinelClient.cs index 27a87c3..192af6c 100644 --- a/CSRedis/RedisSentinelClient.cs +++ b/CSRedis/RedisSentinelClient.cs @@ -116,7 +116,11 @@ public RedisSentinelClient(string host, int port) /// Redis sentinel port /// Set to true if remote Redis server expects SSL public RedisSentinelClient(string host, int port, bool ssl) - : this(new RedisSocket(ssl), new DnsEndPoint(host, port), DefaultConcurrency, DefaultBufferSize) + : this(new DnsEndPoint(host, port), ssl, DefaultConcurrency, DefaultBufferSize) + { } + + internal RedisSentinelClient(EndPoint endPoint, bool ssl, int concurrency, int bufferSize) + : this(new RedisSocket(endPoint, ssl), endPoint) { } internal RedisSentinelClient(IRedisSocket socket, EndPoint endpoint)