Skip to content

Commit

Permalink
Reverted changes.
Browse files Browse the repository at this point in the history
The problem was with the connection pool of Resty Redis. We were using the same pool for all cluster nodes (so it always tried to connect the same node every time).
  • Loading branch information
ligreman committed Feb 12, 2022
1 parent b2c05bc commit 0917f14
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 82 deletions.
2 changes: 1 addition & 1 deletion kong/plugins/proxy-cache-redis-cluster/redis.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ local function red_connect(opts)
auth_user = (opts.cluster_user or nil),
connect_opts = {
ssl = (opts.cluster_use_ssl_connection or false),
pool = "redis-cluster-connection-pool",
-- pool = "", -- We do not use a unique pool, we want node of the cluster has its own pool (resty redis creates them automatically as host:port pools)
-- we leave the 30 default pool, shared among pool_size and backlog https://github.com/openresty/lua-nginx-module#lua_socket_pool_size
pool_size = 20,
backlog = 10
Expand Down
125 changes: 44 additions & 81 deletions kong/plugins/proxy-cache-redis-cluster/rediscluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ local mt = { __index = _M }

local slot_cache = {}
local master_nodes = {}
local connections = {}

local cmds_for_all_master = {
["flushall"] = true,
Expand All @@ -58,57 +57,6 @@ local cluster_invalid_cmds = {
["shutdown"] = true
}

local function name(ip, port)
return ip .. ":" .. tostring(port)
end

local function new_connection(config, ip, port)
local redis_client = redis:new()
redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
config.send_timeout or DEFAULT_SEND_TIMEOUT,
config.read_timeout or DEFAULT_READ_TIMEOUT)
local ok, connerr = redis_client:connect(ip, port, config.connect_opts)
if ok then
return redis_client, nil
else
return nil, connerr
end
end

local function connect_to(config, ip, port)
local conn_name = name(ip, port)
local _, err

-- Comprueba en conexiones si hay una no nil a ip:port
if not connections[conn_name] then
_, err = new_connection(config, ip, port)
if err then
ngx.log(ngx.ERR, "Error connecting Redis (" .. conn_name .. "): ", err)
return nil, err
else
connections[conn_name] = _
end
else
-- si la hay, hace ping para ver si está habilitada y si no creará una nueva
_, err = connections[conn_name]:ping()
if err then
-- Creo una conexión nueva
_, err = new_connection(config, ip, port)
if err then
ngx.log(ngx.ERR, "Error connecting Redis (" .. conn_name .. "): ", err)
return nil, err
else
connections[conn_name] = _
end
end
end

-- devuelve el cliente de la conexión si todo está bien
return connections[conn_name], nil

-- luego el release_connection deberá acceder a la conexion concreta para hacerle el keepalive
end

local function redis_slot(str)
return redis_crc(parse_key(str))
end
Expand Down Expand Up @@ -164,7 +112,11 @@ local function try_hosts_slots(self, serv_list)
for i = 1, #serv_list do
local ip = serv_list[i].ip
local port = serv_list[i].port
local redis_client, ok, err, max_connection_timeout_err
local redis_client = redis:new()
local ok, err, max_connection_timeout_err
redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
config.send_timeout or DEFAULT_SEND_TIMEOUT,
config.read_timeout or DEFAULT_READ_TIMEOUT)

--attempt to connect DEFAULT_MAX_CONNECTION_ATTEMPTS times to redis
for k = 1, config.max_connection_attempts or DEFAULT_MAX_CONNECTION_ATTEMPTS do
Expand All @@ -176,14 +128,13 @@ local function try_hosts_slots(self, serv_list)
break
end

redis_client, err = connect_to(config, ip, port)

ok, err = redis_client:connect(ip, port, self.config.connect_opts)
if ok then
break
end
if err then
ngx.log(ngx.ERR, "unable to connect, attempt nr ", k, " : error: ", err)
table_insert(errors, err)
else
ok = true
break
end
end

Expand Down Expand Up @@ -377,7 +328,7 @@ function _M.new(_, config)
return inst
end

local function pick_node(self, serv_list, slot, magic_random_seed)
local function pick_node(self, serv_list, slot, magic_radom_seed)
local host
local port
local slave
Expand All @@ -386,8 +337,8 @@ local function pick_node(self, serv_list, slot, magic_random_seed)
return nil, nil, nil, "serv_list for slot " .. slot .. " is empty"
end
if self.config.enable_slave_read then
if magic_random_seed then
index = magic_random_seed % #serv_list + 1
if magic_radom_seed then
index = magic_radom_seed % #serv_list + 1
else
index = math.random(#serv_list)
end
Expand Down Expand Up @@ -475,7 +426,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
-- coroutine swich happens(eg. ngx.sleep, cosocket), very important!
slots = nil

local ip, port, slave, err, ok
local ip, port, slave, err

if target_ip ~= nil and target_port ~= nil then
-- asking redirection should only happens at master nodes
Expand All @@ -489,9 +440,13 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c
end
end

local redis_client, connerr = connect_to(config, ip, port)
local redis_client = redis:new()
redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
config.send_timeout or DEFAULT_SEND_TIMEOUT,
config.read_timeout or DEFAULT_READ_TIMEOUT)
local ok, connerr = redis_client:connect(ip, port, self.config.connect_opts)

if not connerr then
if ok then
local authok, autherr = check_auth(self, redis_client)
if autherr then
return nil, autherr
Expand Down Expand Up @@ -584,12 +539,16 @@ end
local function _do_cmd_master(self, cmd, key, ...)
local errors = {}
for _, master in ipairs(master_nodes) do

local redis_client, err = connect_to(self.config, master.ip, master.port)
local redis_client = redis:new()
redis_client:set_timeouts(self.config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
self.config.send_timeout or DEFAULT_SEND_TIMEOUT,
self.config.read_timeout or DEFAULT_READ_TIMEOUT)
local ok, err = redis_client:connect(master.ip, master.port, self.config.connect_opts)
if ok then
_, err = redis_client[cmd](redis_client, key, ...)
end
if err then
table_insert(errors, err)
else
_, err = redis_client[cmd](redis_client, key, ...)
end
release_connection(redis_client, self.config)
end
Expand Down Expand Up @@ -677,7 +636,7 @@ function _M.commit_pipeline(self)
local _reqs = rawget(self, "_reqs")

if not _reqs or #_reqs == 0 then
return nil, "no pipeline"
return
end

self._reqs = nil
Expand Down Expand Up @@ -732,11 +691,11 @@ function _M.commit_pipeline(self)
local port = v.port
local reqs = v.reqs
local slave = v.slave

local redis_client, err = connect_to(config, ip, port)
if err then
return nil, err
end
local redis_client = redis:new()
redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT,
config.send_timeout or DEFAULT_SEND_TIMEOUT,
config.read_timeout or DEFAULT_READ_TIMEOUT)
local ok, err = redis_client:connect(ip, port, self.config.connect_opts)

local authok, autherr = check_auth(self, redis_client)
if autherr then
Expand All @@ -745,10 +704,10 @@ function _M.commit_pipeline(self)

if slave then
--set readonly
local ok2, err3 = redis_client:readonly()
if not ok2 then
local ok, err = redis_client:readonly()
if not ok then
self:refresh_slots()
return nil, err3
return nil, err
end
end
if ok then
Expand All @@ -765,11 +724,11 @@ function _M.commit_pipeline(self)
redis_client[req.cmd](redis_client, req.key)
end
end
local res, err2 = redis_client:commit_pipeline()
if err2 then
local res, err = redis_client:commit_pipeline()
if err then
--There might be node fail, we should also refresh slot cache
self:refresh_slots()
return nil, err2 .. " return from " .. tostring(ip) .. ":" .. tostring(port)
return nil, err .. " return from " .. tostring(ip) .. ":" .. tostring(port)
end

if has_cluster_fail_signal_in_pipeline(res) then
Expand All @@ -780,7 +739,11 @@ function _M.commit_pipeline(self)
else
--There might be node fail, we should also refresh slot cache
self:refresh_slots()
return nil, err .. "pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port)
if not err then
return nil, err .. "pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port)
else
return nil, "pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port)
end
end
end

Expand Down

0 comments on commit 0917f14

Please sign in to comment.