From b2c05bc19a0d683476e9958556da34550c5c7213 Mon Sep 17 00:00:00 2001 From: Ligre Date: Sat, 12 Feb 2022 01:31:03 +0100 Subject: [PATCH] Fixed connection problems with cluster nodes --- ...ache-redis-cluster-plugin-1.0.0-1.rockspec | 2 +- .../proxy-cache-redis-cluster/handler.lua | 2 +- .../proxy-cache-redis-cluster/redis.lua | 5 +- .../rediscluster.lua | 168 +++++++++++------- 4 files changed, 104 insertions(+), 73 deletions(-) rename kong-proxy-cache-redis-cluster-plugin-1.0.0-0.rockspec => kong-proxy-cache-redis-cluster-plugin-1.0.0-1.rockspec (98%) diff --git a/kong-proxy-cache-redis-cluster-plugin-1.0.0-0.rockspec b/kong-proxy-cache-redis-cluster-plugin-1.0.0-1.rockspec similarity index 98% rename from kong-proxy-cache-redis-cluster-plugin-1.0.0-0.rockspec rename to kong-proxy-cache-redis-cluster-plugin-1.0.0-1.rockspec index db2db41..59f7a20 100644 --- a/kong-proxy-cache-redis-cluster-plugin-1.0.0-0.rockspec +++ b/kong-proxy-cache-redis-cluster-plugin-1.0.0-1.rockspec @@ -1,5 +1,5 @@ package = "kong-proxy-cache-redis-cluster-plugin" -version = "1.0.0-0" +version = "1.0.0-1" source = { url = "git://github.com/ligreman/kong-proxy-cache-redis-cluster-plugin" diff --git a/kong/plugins/proxy-cache-redis-cluster/handler.lua b/kong/plugins/proxy-cache-redis-cluster/handler.lua index f8da9ce..ed7a80a 100644 --- a/kong/plugins/proxy-cache-redis-cluster/handler.lua +++ b/kong/plugins/proxy-cache-redis-cluster/handler.lua @@ -223,7 +223,7 @@ local function store_cache_value(premature, conf, req_body, status, proxy_cache) end local ProxyCacheHandler = { - VERSION = "1.0.0-0", + VERSION = "1.0.0-1", PRIORITY = 902, } diff --git a/kong/plugins/proxy-cache-redis-cluster/redis.lua b/kong/plugins/proxy-cache-redis-cluster/redis.lua index de05098..debaad4 100644 --- a/kong/plugins/proxy-cache-redis-cluster/redis.lua +++ b/kong/plugins/proxy-cache-redis-cluster/redis.lua @@ -47,6 +47,7 @@ local function red_connect(opts) max_redirection = (opts.cluster_max_redirection or 16), max_connection_attempts = (opts.cluster_max_connection_attempts or 3), auth = (opts.cluster_password or nil), + auth_user = (opts.cluster_user or nil), connect_opts = { ssl = (opts.cluster_use_ssl_connection or false), pool = "redis-cluster-connection-pool", @@ -61,8 +62,8 @@ local function red_connect(opts) end -- Support for ACL (we send AUTH username password) - if is_present(opts.cluster_user) and is_present(opts.cluster_password) then - config.auth = opts.cluster_user .. " " .. opts.cluster_password + if is_present(opts.cluster_user) then + config.auth_user = opts.cluster_user end local red, err_redis = redis_cluster:new(config) diff --git a/kong/plugins/proxy-cache-redis-cluster/rediscluster.lua b/kong/plugins/proxy-cache-redis-cluster/rediscluster.lua index 3d1df77..185c0dc 100644 --- a/kong/plugins/proxy-cache-redis-cluster/rediscluster.lua +++ b/kong/plugins/proxy-cache-redis-cluster/rediscluster.lua @@ -40,13 +40,13 @@ local function parse_key(key_str) end end - local _M = {} local mt = { __index = _M } local slot_cache = {} local master_nodes = {} +local connections = {} local cmds_for_all_master = { ["flushall"] = true, @@ -58,6 +58,57 @@ local cluster_invalid_cmds = { ["shutdown"] = true } +local function name(ip, port) + return ip .. ":" .. tostring(port) +end + +local function new_connection(config, ip, port) + local redis_client = redis:new() + redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, + config.send_timeout or DEFAULT_SEND_TIMEOUT, + config.read_timeout or DEFAULT_READ_TIMEOUT) + local ok, connerr = redis_client:connect(ip, port, config.connect_opts) + if ok then + return redis_client, nil + else + return nil, connerr + end +end + +local function connect_to(config, ip, port) + local conn_name = name(ip, port) + local _, err + + -- Comprueba en conexiones si hay una no nil a ip:port + if not connections[conn_name] then + _, err = new_connection(config, ip, port) + if err then + ngx.log(ngx.ERR, "Error connecting Redis (" .. conn_name .. "): ", err) + return nil, err + else + connections[conn_name] = _ + end + else + -- si la hay, hace ping para ver si está habilitada y si no creará una nueva + _, err = connections[conn_name]:ping() + if err then + -- Creo una conexión nueva + _, err = new_connection(config, ip, port) + if err then + ngx.log(ngx.ERR, "Error connecting Redis (" .. conn_name .. "): ", err) + return nil, err + else + connections[conn_name] = _ + end + end + end + + -- devuelve el cliente de la conexión si todo está bien + return connections[conn_name], nil + + -- luego el release_connection deberá acceder a la conexion concreta para hacerle el keepalive +end + local function redis_slot(str) return redis_crc(parse_key(str)) end @@ -67,7 +118,12 @@ local function check_auth(self, redis_client) local count, err = redis_client:get_reused_times() if count == 0 then local _ - _, err = redis_client:auth(self.config.auth) + + if self.config.auth_user then + _, err = redis_client:auth(self.config.auth_user, self.config.auth) + else + _, err = redis_client:auth(self.config.auth) + end end if not err then @@ -82,7 +138,7 @@ local function check_auth(self, redis_client) end local function release_connection(red, config) - local ok,err = red:set_keepalive(config.keepalive_timeout + local ok, err = red:set_keepalive(config.keepalive_timeout or DEFAULT_KEEPALIVE_TIMEOUT, config.keepalive_cons or DEFAULT_KEEPALIVE_CONS) if not ok then ngx.log(ngx.ERR, "set keepalive failed:", err) @@ -91,7 +147,7 @@ end local function split(s, delimiter) local result = {}; - for m in (s..delimiter):gmatch("(.-)"..delimiter) do + for m in (s .. delimiter):gmatch("(.-)" .. delimiter) do table_insert(result, m); end return result; @@ -108,11 +164,7 @@ local function try_hosts_slots(self, serv_list) for i = 1, #serv_list do local ip = serv_list[i].ip local port = serv_list[i].port - local redis_client = redis:new() - local ok, err, max_connection_timeout_err - redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - config.send_timeout or DEFAULT_SEND_TIMEOUT, - config.read_timeout or DEFAULT_READ_TIMEOUT) + local redis_client, ok, err, max_connection_timeout_err --attempt to connect DEFAULT_MAX_CONNECTION_ATTEMPTS times to redis for k = 1, config.max_connection_attempts or DEFAULT_MAX_CONNECTION_ATTEMPTS do @@ -124,11 +176,14 @@ local function try_hosts_slots(self, serv_list) break end - ok, err = redis_client:connect(ip, port, self.config.connect_opts) - if ok then break end + redis_client, err = connect_to(config, ip, port) + if err then - ngx.log(ngx.ERR,"unable to connect, attempt nr ", k, " : error: ", err) + ngx.log(ngx.ERR, "unable to connect, attempt nr ", k, " : error: ", err) table_insert(errors, err) + else + ok = true + break end end @@ -152,7 +207,7 @@ local function try_hosts_slots(self, serv_list) -- generate new list of servers for j = 3, #sub_info do - servers.serv_list[#servers.serv_list + 1 ] = { ip = sub_info[j][1], port = sub_info[j][2] } + servers.serv_list[#servers.serv_list + 1] = { ip = sub_info[j][1], port = sub_info[j][2] } end for slot = start_slot, end_slot do @@ -209,7 +264,6 @@ local function try_hosts_slots(self, serv_list) return nil, errors end - function _M.fetch_slots(self) local serv_list = self.config.serv_list local serv_list_cached = slot_cache[self.config.name .. "serv_list"] @@ -240,11 +294,10 @@ function _M.fetch_slots(self) end end - function _M.refresh_slots(self) local worker_id = ngx.worker.id() local lock, err, elapsed, ok - lock, err = resty_lock:new(self.config.dict_name or DEFAULT_SHARED_DICT_NAME, {time_out = 0}) + lock, err = resty_lock:new(self.config.dict_name or DEFAULT_SHARED_DICT_NAME, { time_out = 0 }) if not lock then ngx.log(ngx.ERR, "failed to create lock in refresh slot cache: ", err) return nil, err @@ -264,7 +317,6 @@ function _M.refresh_slots(self) end end - function _M.init_slots(self) if slot_cache[self.config.name] then -- already initialized @@ -308,8 +360,6 @@ function _M.init_slots(self) return true end - - function _M.new(_, config) if not config.name then return nil, " redis cluster config name is empty" @@ -318,7 +368,6 @@ function _M.new(_, config) return nil, " redis cluster config serv_list is empty" end - local inst = { config = config } inst = setmetatable(inst, mt) local _, err = inst:init_slots() @@ -328,8 +377,7 @@ function _M.new(_, config) return inst end - -local function pick_node(self, serv_list, slot, magic_radom_seed) +local function pick_node(self, serv_list, slot, magic_random_seed) local host local port local slave @@ -338,8 +386,8 @@ local function pick_node(self, serv_list, slot, magic_radom_seed) return nil, nil, nil, "serv_list for slot " .. slot .. " is empty" end if self.config.enable_slave_read then - if magic_radom_seed then - index = magic_radom_seed % #serv_list + 1 + if magic_random_seed then + index = magic_random_seed % #serv_list + 1 else index = math.random(#serv_list) end @@ -361,10 +409,8 @@ local function pick_node(self, serv_list, slot, magic_radom_seed) return host, port, slave end - local ask_host_and_port = {} - local function parse_ask_signal(res) --ask signal sample:ASK 12191 127.0.0.1:7008, so we need to parse and get 127.0.0.1, 7008 if res ~= ngx.null then @@ -390,7 +436,6 @@ local function parse_ask_signal(res) return nil, nil end - local function has_moved_signal(res) if res ~= ngx.null then if type(res) == "string" and string.sub(res, 1, 5) == "MOVED" then @@ -408,7 +453,6 @@ local function has_moved_signal(res) return false end - local function handle_command_with_retry(self, target_ip, target_port, asking, cmd, key, ...) local config = self.config @@ -431,7 +475,7 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c -- coroutine swich happens(eg. ngx.sleep, cosocket), very important! slots = nil - local ip, port, slave, err + local ip, port, slave, err, ok if target_ip ~= nil and target_port ~= nil then -- asking redirection should only happens at master nodes @@ -445,13 +489,9 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c end end - local redis_client = redis:new() - redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - config.send_timeout or DEFAULT_SEND_TIMEOUT, - config.read_timeout or DEFAULT_READ_TIMEOUT) - local ok, connerr = redis_client:connect(ip, port, self.config.connect_opts) + local redis_client, connerr = connect_to(config, ip, port) - if ok then + if not connerr then local authok, autherr = check_auth(self, redis_client) if autherr then return nil, autherr @@ -532,7 +572,6 @@ local function handle_command_with_retry(self, target_ip, target_port, asking, c return nil, "failed to execute command, reaches maximum redirection attempts" end - local function generate_magic_seed(self) --For pipeline, We don't want request to be forwarded to all channels, eg. if we have 3*3 cluster(3 master 2 replicas) we --alway want pick up specific 3 nodes for pipeline requests, instead of 9. @@ -545,16 +584,12 @@ end local function _do_cmd_master(self, cmd, key, ...) local errors = {} for _, master in ipairs(master_nodes) do - local redis_client = redis:new() - redis_client:set_timeouts(self.config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - self.config.send_timeout or DEFAULT_SEND_TIMEOUT, - self.config.read_timeout or DEFAULT_READ_TIMEOUT) - local ok, err = redis_client:connect(master.ip, master.port, self.config.connect_opts) - if ok then - _, err = redis_client[cmd](redis_client, key, ...) - end + + local redis_client, err = connect_to(self.config, master.ip, master.port) if err then table_insert(errors, err) + else + _, err = redis_client[cmd](redis_client, key, ...) end release_connection(redis_client, self.config) end @@ -582,7 +617,6 @@ local function _do_cmd(self, cmd, key, ...) return res, err end - local function construct_final_pipeline_resp(self, node_res_map, node_req_map) --construct final result with origin index local finalret = {} @@ -622,7 +656,6 @@ local function construct_final_pipeline_resp(self, node_res_map, node_req_map) return finalret end - local function has_cluster_fail_signal_in_pipeline(res) for i = 1, #res do if res[i] ~= ngx.null and type(res[i]) == "table" then @@ -636,16 +669,15 @@ local function has_cluster_fail_signal_in_pipeline(res) return false end - function _M.init_pipeline(self) self._reqs = {} end - function _M.commit_pipeline(self) local _reqs = rawget(self, "_reqs") - if not _reqs or #_reqs == 0 then return + if not _reqs or #_reqs == 0 then + return nil, "no pipeline" end self._reqs = nil @@ -700,11 +732,11 @@ function _M.commit_pipeline(self) local port = v.port local reqs = v.reqs local slave = v.slave - local redis_client = redis:new() - redis_client:set_timeouts(config.connect_timeout or DEFAULT_CONNECTION_TIMEOUT, - config.send_timeout or DEFAULT_SEND_TIMEOUT, - config.read_timeout or DEFAULT_READ_TIMEOUT) - local ok, err = redis_client:connect(ip, port, self.config.connect_opts) + + local redis_client, err = connect_to(config, ip, port) + if err then + return nil, err + end local authok, autherr = check_auth(self, redis_client) if autherr then @@ -713,10 +745,10 @@ function _M.commit_pipeline(self) if slave then --set readonly - local ok, err = redis_client:readonly() - if not ok then + local ok2, err3 = redis_client:readonly() + if not ok2 then self:refresh_slots() - return nil, err + return nil, err3 end end if ok then @@ -733,11 +765,11 @@ function _M.commit_pipeline(self) redis_client[req.cmd](redis_client, req.key) end end - local res, err = redis_client:commit_pipeline() - if err then + local res, err2 = redis_client:commit_pipeline() + if err2 then --There might be node fail, we should also refresh slot cache self:refresh_slots() - return nil, err .. " return from " .. tostring(ip) .. ":" .. tostring(port) + return nil, err2 .. " return from " .. tostring(ip) .. ":" .. tostring(port) end if has_cluster_fail_signal_in_pipeline(res) then @@ -761,18 +793,17 @@ function _M.commit_pipeline(self) end end - function _M.cancel_pipeline(self) self._reqs = nil end local function _do_eval_cmd(self, cmd, ...) ---[[ -eval command usage: -eval(script, 1, key, arg1, arg2 ...) -eval(script, 0, arg1, arg2 ...) -]] - local args = {...} + --[[ + eval command usage: + eval(script, 1, key, arg1, arg2 ...) + eval(script, 0, arg1, arg2 ...) + ]] + local args = { ... } local keys_num = args[2] if type(keys_num) ~= "number" then return nil, "Cannot execute eval without keys number" @@ -786,8 +817,7 @@ end -- dynamic cmd setmetatable(_M, { __index = function(_, cmd) - local method = - function(self, ...) + local method = function(self, ...) if cmd == "eval" or cmd == "evalsha" then return _do_eval_cmd(self, cmd, ...) else