Skip to content

fix bugs when connect to redis(sentinel)3.x #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CSRedis/CSRedis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@
<Compile Include="Internal\Fakes\FakeRedisSocket.cs" />
<Compile Include="Internal\Fakes\FakeStream.cs" />
<Compile Include="Internal\IO\AsyncConnector.cs" />
<Compile Include="Internal\IO\IAsyncConnector.cs" />
<Compile Include="Internal\IO\IOQueue.cs" />
<Compile Include="Internal\IO\SocketPool.cs" />
<Compile Include="Internal\IO\IRedisSocket.cs" />
<Compile Include="Internal\IO\RedisPooledSocket.cs" />
<Compile Include="Internal\IO\SSLAsyncConnector.cs" />
<Compile Include="RedisConnectionPool.cs" />
<Compile Include="Internal\IO\RedisIO.cs" />
<Compile Include="Internal\IO\RedisSocket.cs" />
Expand Down
18 changes: 17 additions & 1 deletion CSRedis/Internal/Fakes/FakeRedisSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace CSRedis.Internal.Fakes
{
Expand All @@ -20,6 +21,16 @@ class FakeRedisSocket : IRedisSocket

public int SendTimeout { get; set; }

public EndPoint EndPoint
{
get
{
throw new NotImplementedException();
}
}

public bool SSL { get { return false; } }

public FakeRedisSocket(params string[] responses)
: this(Encoding.UTF8, responses)
{ }
Expand All @@ -35,7 +46,7 @@ public FakeRedisSocket(params byte[][] responses)
_stream.AddResponse(response);
}

public void Connect(EndPoint endpoint)
public void Connect()
{
_connected = true;
}
Expand Down Expand Up @@ -78,5 +89,10 @@ static byte[][] ToBytes(Encoding encoding, string[] strings)
set[i] = encoding.GetBytes(strings[i]);
return set;
}

public Task<bool> ConnectAsync()
{
throw new NotImplementedException();
}
}
}
80 changes: 36 additions & 44 deletions CSRedis/Internal/IO/AsyncConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@

namespace CSRedis.Internal.IO
{
class AsyncConnector : IDisposable
class AsyncConnector : IAsyncConnector
{
readonly SocketAsyncEventArgs _asyncConnectArgs;
readonly SocketAsyncPool _asyncTransferPool;
readonly ConcurrentQueue<IRedisAsyncCommandToken> _asyncReadQueue;
readonly ConcurrentQueue<IRedisAsyncCommandToken> _asyncWriteQueue;
readonly IOQueue _ioQueue;
readonly object _readLock;
readonly object _writeLock;
readonly int _concurrency;
Expand All @@ -36,8 +35,7 @@ public AsyncConnector(IRedisSocket socket, EndPoint endpoint, RedisIO io, int co
_bufferSize = bufferSize;
_asyncTransferPool = new SocketAsyncPool(concurrency, bufferSize);
_asyncTransferPool.Completed += OnSocketCompleted;
_asyncReadQueue = new ConcurrentQueue<IRedisAsyncCommandToken>();
_asyncWriteQueue = new ConcurrentQueue<IRedisAsyncCommandToken>();
_ioQueue = new IOQueue();
_readLock = new object();
_writeLock = new object();
_asyncConnectArgs = new SocketAsyncEventArgs { RemoteEndPoint = endpoint };
Expand Down Expand Up @@ -69,8 +67,8 @@ public Task<bool> ConnectAsync()
public Task<T> CallAsync<T>(RedisCommand<T> command)
{
var token = new RedisAsyncCommandToken<T>(command);
_asyncWriteQueue.Enqueue(token);
ConnectAsync().ContinueWith(CallAsyncDeferred);
_ioQueue.Enqueue(token);
ConnectAsync().ContinueWith(CallAsync_Continued);
return token.TaskSource.Task;
}

Expand All @@ -82,33 +80,32 @@ void InitConnection()
_connectionTaskSource = new TaskCompletionSource<bool>();
}

void CallAsyncDeferred(Task t)
void CallAsync_Continued(Task t)
{
lock (_writeLock)
{
IRedisAsyncCommandToken token;
if (!_asyncWriteQueue.TryDequeue(out token))
throw new Exception();
IRedisAsyncCommandToken token = _ioQueue.DequeueForWrite();

_asyncReadQueue.Enqueue(token);

var args = _asyncTransferPool.Acquire();
int bytes;
try
{
bytes = _io.Writer.Write(token.Command, args.Buffer, args.Offset);
}
catch (ArgumentException e)
{
throw new RedisClientException("Could not write command '" + token.Command.Command + "'. Argument size exceeds buffer allocation of " + args.Count + ".", e);
}
SocketAsyncEventArgs args = _asyncTransferPool.Acquire();
int bytes = TryWriteBuffer(token.Command, args.Buffer, args.Offset);
args.SetBuffer(args.Offset, bytes);

if (!_redisSocket.SendAsync(args))
OnSocketSent(args);
}
}

int TryWriteBuffer(RedisCommand command, byte[] buffer, int offset)
{
try
{
return _io.Writer.Write(command, buffer, offset);
}
catch (ArgumentException e)
{
throw new RedisClientException("Could not write command '" + command.Command + "'. Argument size exceeds buffer size.", e);
}
}

void OnSocketCompleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
Expand Down Expand Up @@ -136,28 +133,23 @@ void OnSocketSent(SocketAsyncEventArgs args)
{
_asyncTransferPool.Release(args);

IRedisAsyncCommandToken token;
lock (_readLock)
{
if (_asyncReadQueue.TryDequeue(out token))
{
try
{
token.SetResult(_io.Reader);
}
/*catch (IOException) // TODO implement async retry
{
if (ReconnectAttempts == 0)
throw;
Reconnect();
_asyncWriteQueue.Enqueue(token);
ConnectAsync().ContinueWith(CallAsyncDeferred);
}*/
catch (Exception e)
{
token.SetException(e);
}
}
IRedisAsyncCommandToken token = _ioQueue.DequeueForRead();
TrySetResult(token);
}
}

void TrySetResult(IRedisAsyncCommandToken token)
{
try
{
token.SetResult(_io.Reader);
}
// TODO: catch IOException and reconnect
catch (Exception e)
{
token.SetException(e);
}
}

Expand Down
15 changes: 15 additions & 0 deletions CSRedis/Internal/IO/IAsyncConnector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace CSRedis.Internal.IO
{
interface IAsyncConnector : IDisposable
{
Task<bool> ConnectAsync();
Task<T> CallAsync<T>(RedisCommand<T> command);
event EventHandler Connected;
}
}
42 changes: 42 additions & 0 deletions CSRedis/Internal/IO/IOQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace CSRedis.Internal.IO
{
class IOQueue
{
readonly ConcurrentQueue<IRedisAsyncCommandToken> _readQueue;
readonly ConcurrentQueue<IRedisAsyncCommandToken> _writeQueue;

public IOQueue()
{
_readQueue = new ConcurrentQueue<IRedisAsyncCommandToken>();
_writeQueue = new ConcurrentQueue<IRedisAsyncCommandToken>();
}

public void Enqueue(IRedisAsyncCommandToken token)
{
_writeQueue.Enqueue(token);
}

public IRedisAsyncCommandToken DequeueForWrite()
{
IRedisAsyncCommandToken token;
if (!_writeQueue.TryDequeue(out token))
throw new Exception();
_readQueue.Enqueue(token);
return token;
}

public IRedisAsyncCommandToken DequeueForRead()
{
IRedisAsyncCommandToken token;
if (!_readQueue.TryDequeue(out token))
throw new Exception();
return token;
}
}
}
6 changes: 5 additions & 1 deletion CSRedis/Internal/IO/IRedisSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace CSRedis.Internal.IO
{
interface IRedisSocket : IDisposable
{
EndPoint EndPoint { get; }
bool SSL { get; }
bool Connected { get; }
int ReceiveTimeout { get; set; }
int SendTimeout { get; set; }
void Connect(EndPoint endpoint);
void Connect();
bool ConnectAsync(SocketAsyncEventArgs args);
Task<bool> ConnectAsync();
bool SendAsync(SocketAsyncEventArgs args);
Stream GetStream();
}
Expand Down
10 changes: 9 additions & 1 deletion CSRedis/Internal/IO/RedisPooledSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace CSRedis.Internal.IO
{
Expand All @@ -14,6 +15,8 @@ class RedisPooledSocket : IRedisSocket
readonly SocketPool _pool;

public bool Connected { get { return _socket == null ? false : _socket.Connected; } }
public bool SSL { get { return false; } }
public EndPoint EndPoint { get { return _pool.EndPoint; } }

public int ReceiveTimeout
{
Expand All @@ -32,7 +35,7 @@ public RedisPooledSocket(SocketPool pool)
_pool = pool;
}

public void Connect(EndPoint endpoint)
public void Connect()
{
_socket = _pool.Connect();
System.Diagnostics.Debug.WriteLine("Got socket #{0}", _socket.Handle);
Expand All @@ -57,5 +60,10 @@ public void Dispose()
{
_pool.Release(_socket);
}

public Task<bool> ConnectAsync()
{
throw new NotImplementedException();
}
}
}
43 changes: 30 additions & 13 deletions CSRedis/Internal/IO/RedisSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
using System.Net.Security;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace CSRedis.Internal.IO
{
class RedisSocket : IRedisSocket
{
readonly bool _ssl;
readonly EndPoint _endPoint;
Socket _socket;
EndPoint _remote;

public bool SSL { get { return _ssl; } }
public EndPoint EndPoint { get { return _endPoint; } }

public bool Connected { get { return _socket == null ? false : _socket.Connected; } }

Expand All @@ -29,23 +33,37 @@ public int SendTimeout
set { _socket.SendTimeout = value; }
}

public RedisSocket(bool ssl)
public RedisSocket(EndPoint endPoint, bool ssl)
{
_ssl = ssl;
_endPoint = endPoint;
}

public void Connect(EndPoint endpoint)
public void Connect()
{
InitSocket(endpoint);
_socket.Connect(endpoint);
InitSocket();
_socket.Connect(_endPoint);
}

public bool ConnectAsync(SocketAsyncEventArgs args)
{
InitSocket(args.RemoteEndPoint);
args.RemoteEndPoint = _endPoint;
InitSocket();
return _socket.ConnectAsync(args);
}

public Task<bool> ConnectAsync()
{
InitSocket();
var tcs = new TaskCompletionSource<bool>();
_socket.BeginConnect(_endPoint, iar =>
{
_socket.EndConnect(iar);
tcs.SetResult(_socket.Connected);
}, null);
return tcs.Task;
}

public bool SendAsync(SocketAsyncEventArgs args)
{
return _socket.SendAsync(args);
Expand All @@ -67,23 +85,22 @@ public void Dispose()
_socket.Dispose();
}

void InitSocket(EndPoint endpoint)
void InitSocket()
{
if (_socket != null)
_socket.Dispose();

_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_remote = endpoint;
}

string GetHostForAuthentication()
{
if (_remote == null)
if (_endPoint == null)
throw new ArgumentNullException("Remote endpoint is not set");
else if (_remote is DnsEndPoint)
return (_remote as DnsEndPoint).Host;
else if (_remote is IPEndPoint)
return (_remote as IPEndPoint).Address.ToString();
else if (_endPoint is DnsEndPoint)
return (_endPoint as DnsEndPoint).Host;
else if (_endPoint is IPEndPoint)
return (_endPoint as IPEndPoint).Address.ToString();

throw new InvalidOperationException("Cannot get remote host");
}
Expand Down
Loading