From 0917f14f6295415e6261eabf79bcc4488df647c9 Mon Sep 17 00:00:00 2001 From: Ligre Date: Sat, 12 Feb 2022 12:54:11 +0100 Subject: [PATCH] Reverted changes. The problem was with the connection pool of Resty Redis. We were using the same pool for all cluster nodes (so it always tried to connect the same node every time). --- .../proxy-cache-redis-cluster/redis.lua | 2 +- .../rediscluster.lua | 125 ++++++------------ 2 files changed, 45 insertions(+), 82 deletions(-) diff --git a/kong/plugins/proxy-cache-redis-cluster/redis.lua b/kong/plugins/proxy-cache-redis-cluster/redis.lua index debaad4..e00aa10 100644 --- a/kong/plugins/proxy-cache-redis-cluster/redis.lua +++ b/kong/plugins/proxy-cache-redis-cluster/redis.lua @@ -50,7 +50,7 @@ local function red_connect(opts) auth_user = (opts.cluster_user or nil), connect_opts = { ssl = (opts.cluster_use_ssl_connection or false), - pool = "redis-cluster-connection-pool", + -- pool = "", -- We do not use a unique pool, we want node of the cluster has its own pool (resty redis creates them automatically as host:port pools) -- we leave the 30 default pool, shared among pool_size and backlog https://github.com/openresty/lua-nginx-module#lua_socket_pool_size pool_size = 20, backlog = 10 diff --git a/kong/plugins/proxy-cache-redis-cluster/rediscluster.lua b/kong/plugins/proxy-cache-redis-cluster/rediscluster.lua index 185c0dc..838c861 100644 --- a/kong/plugins/proxy-cache-redis-cluster/rediscluster.lua +++ b/kong/plugins/proxy-cache-redis-cluster/rediscluster.lua @@ -46,7 +46,6 @@ local mt = { __index = _M } local slot_cache = {} local master_nodes = {} -local connections = {} local cmds_for_all_master = { ["flushall"] = true, @@ -58,57 +57,6 @@ local cluster_invalid_cmds = { ["shutdown"] = true } -local function name(ip, port) - return ip .. ":" .. tostring(port) -end - -local function new_connection(config, ip, port) - local redis_client = redis:new() - redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - config.send_timeout or DEFAULT_SEND_TIMEOUT, - config.read_timeout or DEFAULT_READ_TIMEOUT) - local ok, connerr = redis_client:connect(ip, port, config.connect_opts) - if ok then - return redis_client, nil - else - return nil, connerr - end -end - -local function connect_to(config, ip, port) - local conn_name = name(ip, port) - local _, err - - -- Comprueba en conexiones si hay una no nil a ip:port - if not connections[conn_name] then - _, err = new_connection(config, ip, port) - if err then - ngx.log(ngx.ERR, "Error connecting Redis (" .. conn_name .. "): ", err) - return nil, err - else - connections[conn_name] = _ - end - else - -- si la hay, hace ping para ver si está habilitada y si no creará una nueva - _, err = connections[conn_name]:ping() - if err then - -- Creo una conexión nueva - _, err = new_connection(config, ip, port) - if err then - ngx.log(ngx.ERR, "Error connecting Redis (" .. conn_name .. "): ", err) - return nil, err - else - connections[conn_name] = _ - end - end - end - - -- devuelve el cliente de la conexión si todo está bien - return connections[conn_name], nil - - -- luego el release_connection deberá acceder a la conexion concreta para hacerle el keepalive -end - local function redis_slot(str) return redis_crc(parse_key(str)) end @@ -164,7 +112,11 @@ local function try_hosts_slots(self, serv_list) for i = 1, #serv_list do local ip = serv_list[i].ip local port = serv_list[i].port - local redis_client, ok, err, max_connection_timeout_err + local redis_client = redis:new() + local ok, err, max_connection_timeout_err + redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, + config.send_timeout or DEFAULT_SEND_TIMEOUT, + config.read_timeout or DEFAULT_READ_TIMEOUT) --attempt to connect DEFAULT_MAX_CONNECTION_ATTEMPTS times to redis for k = 1, config.max_connection_attempts or DEFAULT_MAX_CONNECTION_ATTEMPTS do @@ -176,14 +128,13 @@ local function try_hosts_slots(self, serv_list) break end - redis_client, err = connect_to(config, ip, port) - + ok, err = redis_client:connect(ip, port, self.config.connect_opts) + if ok then + break + end if err then ngx.log(ngx.ERR, "unable to connect, attempt nr ", k, " : error: ", err) table_insert(errors, err) - else - ok = true - break end end @@ -377,7 +328,7 @@ function _M.new(_, config) return inst end -local function pick_node(self, serv_list, slot, magic_random_seed) +local function pick_node(self, serv_list, slot, magic_radom_seed) local host local port local slave @@ -386,8 +337,8 @@ local function pick_node(self, serv_list, slot, magic_random_seed) return nil, nil, nil, "serv_list for slot " .. slot .. " is empty" end if self.config.enable_slave_read then - if magic_random_seed then - index = magic_random_seed % #serv_list + 1 + if magic_radom_seed then + index = magic_radom_seed % #serv_list + 1 else index = math.random(#serv_list) end @@ -475,7 +426,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c -- coroutine swich happens(eg. ngx.sleep, cosocket), very important! slots = nil - local ip, port, slave, err, ok + local ip, port, slave, err if target_ip ~= nil and target_port ~= nil then -- asking redirection should only happens at master nodes @@ -489,9 +440,13 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c end end - local redis_client, connerr = connect_to(config, ip, port) + local redis_client = redis:new() + redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, + config.send_timeout or DEFAULT_SEND_TIMEOUT, + config.read_timeout or DEFAULT_READ_TIMEOUT) + local ok, connerr = redis_client:connect(ip, port, self.config.connect_opts) - if not connerr then + if ok then local authok, autherr = check_auth(self, redis_client) if autherr then return nil, autherr @@ -584,12 +539,16 @@ end local function _do_cmd_master(self, cmd, key, ...) local errors = {} for _, master in ipairs(master_nodes) do - - local redis_client, err = connect_to(self.config, master.ip, master.port) + local redis_client = redis:new() + redis_client:set_timeouts(self.config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, + self.config.send_timeout or DEFAULT_SEND_TIMEOUT, + self.config.read_timeout or DEFAULT_READ_TIMEOUT) + local ok, err = redis_client:connect(master.ip, master.port, self.config.connect_opts) + if ok then + _, err = redis_client[cmd](redis_client, key, ...) + end if err then table_insert(errors, err) - else - _, err = redis_client[cmd](redis_client, key, ...) end release_connection(redis_client, self.config) end @@ -677,7 +636,7 @@ function _M.commit_pipeline(self) local _reqs = rawget(self, "_reqs") if not _reqs or #_reqs == 0 then - return nil, "no pipeline" + return end self._reqs = nil @@ -732,11 +691,11 @@ function _M.commit_pipeline(self) local port = v.port local reqs = v.reqs local slave = v.slave - - local redis_client, err = connect_to(config, ip, port) - if err then - return nil, err - end + local redis_client = redis:new() + redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, + config.send_timeout or DEFAULT_SEND_TIMEOUT, + config.read_timeout or DEFAULT_READ_TIMEOUT) + local ok, err = redis_client:connect(ip, port, self.config.connect_opts) local authok, autherr = check_auth(self, redis_client) if autherr then @@ -745,10 +704,10 @@ function _M.commit_pipeline(self) if slave then --set readonly - local ok2, err3 = redis_client:readonly() - if not ok2 then + local ok, err = redis_client:readonly() + if not ok then self:refresh_slots() - return nil, err3 + return nil, err end end if ok then @@ -765,11 +724,11 @@ function _M.commit_pipeline(self) redis_client[req.cmd](redis_client, req.key) end end - local res, err2 = redis_client:commit_pipeline() - if err2 then + local res, err = redis_client:commit_pipeline() + if err then --There might be node fail, we should also refresh slot cache self:refresh_slots() - return nil, err2 .. " return from " .. tostring(ip) .. ":" .. tostring(port) + return nil, err .. " return from " .. tostring(ip) .. ":" .. tostring(port) end if has_cluster_fail_signal_in_pipeline(res) then @@ -780,7 +739,11 @@ function _M.commit_pipeline(self) else --There might be node fail, we should also refresh slot cache self:refresh_slots() - return nil, err .. "pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port) + if not err then + return nil, err .. "pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port) + else + return nil, "pipeline commit failed while connecting to " .. tostring(ip) .. ":" .. tostring(port) + end end end