Description
There seems to be a bug in CacheProxyConnection when handling invalidation message in another connection.
in order to handle invalidation messages that may have been sent to stale connections, it currently runs this check: https://github.com/redis/redis-py/blob/v5.3.0b5/redis/connection.py#L871
and handles invalidation messages by emptying the socket.
but then it does a return afterwards, which causes the subsequent call to read_response
to hang here trying to read from the socket, if the cache entry has been invalidated:
https://github.com/redis/redis-py/blob/v5.3.0b5/redis/connection.py#L913
to reproduce:
import redis
import redis.utils
from redis.cache import CacheConfig
import time
url = 'xxxxxx'
key = 'test_key'
# just to make the point clear, using 2 separate connection pools, one with client-side caching enabled and one without,
# and use the former for READs only and the latter for WRITEs only
r = redis.from_url(url, protocol=3, cache_config=CacheConfig())
r1 = redis.from_url(url)
r1.set(key, 'old')
def read():
start = time.perf_counter()
print(r.get(key))
end = time.perf_counter()
print(end - start)
# read and populate the cache for the key
read()
# quick and dirty way to create 1 extra CacheProxyConnection, so we have 2.
conn1 = r.connection_pool.get_connection('GET', 'zzz') # conn1 is the one that cached the key
conn2 = r.connection_pool.get_connection('GET', 'def')
print(r._send_command_parse_response(conn1, 'GET', 'GET', 'zzz', keys=['zzz']))
print(r._send_command_parse_response(conn2, 'GET', 'GET', 'def', keys=['def']))
# releasing conns in this order will make sure next read() will attempt to use conn2 because self._available_connections.pop() in get_connection will return it
r.connection_pool.release(conn1)
r.connection_pool.release(conn2)
print(r.connection_pool.__dict__)
# this should trigger invalidation messages
r1.set(key, 'new')
# give enough time for invalidation messages to be sent to the connection which has the key cached
time.sleep(1)
# it'll hang here
read()
read()
print('done')
r.close()
r1.close()
the fix seems to be rather simple.
instead of doing a return here, : https://github.com/redis/redis-py/blob/v5.3.0b5/redis/connection.py#L876
we could do a conditional return, something along the lines of
# check again to see if the cache has been invalidated by the above check,
# in which case we want to continue and do the remote READ
# so the cache entry for the key can get repopulated.
if self._cache.get(self._current_command_cache_key):
return