Skip to content

Commit

Permalink
Add Redis Cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Jul 16, 2018
1 parent ddf058b commit 9690c1e
Show file tree
Hide file tree
Showing 61 changed files with 3,724 additions and 499 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Gemfile.lock
/tmp/
/.idea
/.yardoc
/.bundle
/coverage/*
/doc/
/examples/sentinel/sentinel.conf
Expand All @@ -14,3 +15,5 @@ Gemfile.lock
/redis/*
/test/db
/test/test.conf
appendonly.aof
temp-rewriteaof-*.aof
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ before_install:
- gem update --system 2.6.14
- gem --version

script: make test
script: make

rvm:
- 2.2.2
Expand All @@ -25,7 +25,7 @@ before_script:
env:
global:
- VERBOSE=true
- TIMEOUT=1
- TIMEOUT=5
matrix:
- DRIVER=ruby REDIS_BRANCH=3.0
- DRIVER=ruby REDIS_BRANCH=3.2
Expand Down
99 changes: 94 additions & 5 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ def self.current=(redis)
# @option options [Boolean] :inherit_socket (false) Whether to use socket in forked process or not
# @option options [Array] :sentinels List of sentinels to contact
# @option options [Symbol] :role (:master) Role to fetch via Sentinel, either `:master` or `:slave`
# @option options [Array<String, Hash{Symbol => String, Integer}>] :cluster List of cluster nodes to contact
# @option options [Boolean] :replica Whether to use readonly replica nodes in Redis Cluster or not
# @option options [Class] :connector Class of custom connector
#
# @return [Redis] a new client instance
def initialize(options = {})
@options = options.dup
@original_client = @client = Client.new(options)
@cluster_mode = options.key?(:cluster)
client = @cluster_mode ? Cluster : Client
@original_client = @client = client.new(options)
@queue = Hash.new { |h, k| h[k] = [] }

super() # Monitor#initialize
Expand Down Expand Up @@ -274,9 +278,7 @@ def info(cmd = nil)
synchronize do |client|
client.call([:info, cmd].compact) do |reply|
if reply.kind_of?(String)
reply = Hash[reply.split("\r\n").map do |line|
line.split(":", 2) unless line =~ /^(#|$)/
end.compact]
reply = HashifyInfo.call(reply)

if cmd && cmd.to_s == "commandstats"
# Extract nested hashes for INFO COMMANDSTATS
Expand Down Expand Up @@ -2818,6 +2820,41 @@ def sentinel(subcommand, *args)
end
end

# Sends `CLUSTER *` command to random node and returns its reply.
#
# @see https://redis.io/commands#cluster Reference of cluster command
#
# @param subcommand [String, Symbol] the subcommand of cluster command
# e.g. `:slots`, `:nodes`, `:slaves`, `:info`
#
# @return [Object] depends on the subcommand
def cluster(subcommand, *args)
subcommand = subcommand.to_s.downcase
block = case subcommand
when 'slots' then HashifyClusterSlots
when 'nodes' then HashifyClusterNodes
when 'slaves' then HashifyClusterSlaves
when 'info' then HashifyInfo
else Noop
end

# @see https://github.com/antirez/redis/blob/unstable/src/redis-trib.rb#L127 raw reply expected
block = Noop unless @cluster_mode

synchronize do |client|
client.call([:cluster, subcommand] + args, &block)
end
end

# Sends `ASKING` command to random node and returns its reply.
#
# @see https://redis.io/topics/cluster-spec#ask-redirection ASK redirection
#
# @return [String] `'OK'`
def asking
synchronize { |client| client.call(%i[asking]) }
end

def id
@original_client.id
end
Expand All @@ -2831,6 +2868,8 @@ def dup
end

def connection
return @original_client.connection_info if @cluster_mode

{
host: @original_client.host,
port: @original_client.port,
Expand Down Expand Up @@ -2896,6 +2935,56 @@ def method_missing(command, *args)
end
}

HashifyInfo =
lambda { |reply|
Hash[reply.split("\r\n").map do |line|
line.split(':', 2) unless line =~ /^(#|$)/
end.compact]
}

HashifyClusterNodeInfo =
lambda { |str|
arr = str.split(' ')
{
'node_id' => arr[0],
'ip_port' => arr[1],
'flags' => arr[2].split(','),
'master_node_id' => arr[3],
'ping_sent' => arr[4],
'pong_recv' => arr[5],
'config_epoch' => arr[6],
'link_state' => arr[7],
'slots' => arr[8].nil? ? nil : Range.new(*arr[8].split('-'))
}
}

HashifyClusterSlots =
lambda { |reply|
reply.map do |arr|
first_slot, last_slot = arr[0..1]
master = { 'ip' => arr[2][0], 'port' => arr[2][1], 'node_id' => arr[2][2] }
replicas = arr[3..-1].map { |r| { 'ip' => r[0], 'port' => r[1], 'node_id' => r[2] } }
{
'start_slot' => first_slot,
'end_slot' => last_slot,
'master' => master,
'replicas' => replicas
}
end
}

HashifyClusterNodes =
lambda { |reply|
reply.split(/[\r\n]+/).map { |str| HashifyClusterNodeInfo.call(str) }
}

HashifyClusterSlaves =
lambda { |reply|
reply.map { |str| HashifyClusterNodeInfo.call(str) }
}

Noop = ->(reply) { reply }

def _geoarguments(*args, options: nil, sort: nil, count: nil)
args.push sort if sort
args.push 'count', count if count
Expand All @@ -2918,11 +3007,11 @@ def _subscription(method, timeout, channels, block)
@client = original
end
end

end

require_relative "redis/version"
require_relative "redis/connection"
require_relative "redis/client"
require_relative "redis/cluster"
require_relative "redis/pipeline"
require_relative "redis/subscribe"
Loading

0 comments on commit 9690c1e

Please sign in to comment.