diff --git a/Adapter/RedisTagAwareAdapter.php b/Adapter/RedisTagAwareAdapter.php index eb416ab..6ec1038 100644 --- a/Adapter/RedisTagAwareAdapter.php +++ b/Adapter/RedisTagAwareAdapter.php @@ -60,14 +60,23 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter implements PruneableI * @var string|null detected eviction policy used on Redis server */ private $redisEvictionPolicy; + /** + * @var string|null detected redis version of Redis server + */ + private $redisVersion; + /** + * @var bool|null Indicate whether this "namespace" has been pruned and what the result was. + */ + private $pruneResult; private $namespace; /** * @param \Redis|\RedisArray|\RedisCluster|\Predis\ClientInterface|RedisProxy|RedisClusterProxy $redis The redis client * @param string $namespace The default namespace * @param int $defaultLifetime The default lifetime + * @param bool $pruneWithCompression Enable compressed prune. Way more resource intensive. */ - public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null) + public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null, bool $pruneWithCompression = false) { if ($redis instanceof \Predis\ClientInterface && $redis->getConnection() instanceof ClusterInterface && !$redis->getConnection() instanceof PredisCluster) { throw new InvalidArgumentException(sprintf('Unsupported Predis cluster connection: only "%s" is, "%s" given.', PredisCluster::class, get_debug_type($redis->getConnection()))); @@ -85,6 +94,7 @@ public function __construct($redis, string $namespace = '', int $defaultLifetime $this->init($redis, $namespace, $defaultLifetime, new TagAwareMarshaller($marshaller)); $this->namespace = $namespace; + $this->pruneWithCompression = $pruneWithCompression; } /** @@ -296,6 +306,36 @@ protected function doInvalidate(array $tagIds): bool return $success; } + /** + * @TODO Move to RedisTrait? It already has a version check - this would be handy. + * + * @return string + */ + private function getRedisVersion(): string + { + if (null !== $this->redisVersion) { + return $this->redisVersion; + } + + $hosts = $this->getHosts(); + $host = reset($hosts); + if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) { + // Predis supports info command only on the master in replication environments + $hosts = [$host->getClientFor('master')]; + } + + foreach ($hosts as $host) { + $info = $host->info('Server'); + + if ($info instanceof ErrorInterface) { + continue; + } + return $this->redisVersion = $info['redis_version']; + } + // Fallback to 2.0 like RedisTrait does. + return $this->redisVersion = '2.0'; + } + private function getRedisEvictionPolicy(): string { if (null !== $this->redisEvictionPolicy) { @@ -362,9 +402,15 @@ protected function getAllTagKeys(): array }); $setKeys = $results->valid() ? iterator_to_array($results) : []; - [$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null]; - // merge the fetched ids together - $tagKeys = array_merge($tagKeys, $ids); + // $setKeys[$tagsPrefix] might be an RedisException object - + // check before just using it. + if (is_array($setKeys[$tagsPrefix])) { + [$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null]; + // merge the fetched ids together + $tagKeys = array_merge($tagKeys, $ids); + } elseif (isset($setKeys[$tagsPrefix]) && $setKeys[$tagsPrefix] instanceof \Throwable) { + $this->logger->error($setKeys[$tagsPrefix]->getMessage()); + } } while ($cursor = (int) $cursor); return $tagKeys; @@ -425,15 +471,9 @@ private function getOrphanedTagsStats(bool $compressMode = false): array // referenced and existing cache keys differs collect the // missing references. if ($compressMode && \count($referencedCacheKeys) > $existingCacheKeysCount) { - // In order to create the delta each single reference - // has to be checked. - foreach ($referencedCacheKeys as $cacheKey) { - $existingCacheKeyResult = $this->pipeline(function () use ($cacheKey) { - yield 'exists' => [$cacheKey]; - }); - if ($existingCacheKeyResult->valid() && !$existingCacheKeyResult->current()) { - $orphanedTagReferenceKeys[$tagKey][] = $cacheKey; - } + $orphanedTagReferenceKeysInHash = $this->getOrphanedCacheKeys($referencedCacheKeys); + if (!empty($orphanedTagReferenceKeysInHash)) { + $orphanedTagReferenceKeys[$tagKey] = $orphanedTagReferenceKeysInHash; } } // Stop processing cursors in case compression mode is @@ -456,6 +496,57 @@ private function getOrphanedTagsStats(bool $compressMode = false): array return $stats; } + /** + * Accepts a list of cache keys and returns a list with orphaned keys. + * + * The method attempts to reduced optimize the testing of the keys by + * batching the key tests and reduce the amount of redis calls. + * + * @param array $cacheKeys + * @param int $chunks Number of chunks to create when processing cacheKeys. + * + * @return array + */ + private function getOrphanedCacheKeys(array $cacheKeys, int $chunks = 2) + { + $orphanedCacheKeys = []; + if (version_compare($this->getRedisVersion(), '2.0.3', '>=')) { + // If we can check multiple keys at once divide and conquer to have + // faster execution. + $cacheKeysChunks = array_chunk($cacheKeys, floor(count($cacheKeys) / $chunks), true); + foreach ($cacheKeysChunks as $cacheKeysChunk) { + $result = $this->pipeline(function () use ($cacheKeysChunk) { + yield 'exists' => [$cacheKeysChunk]; + }); + if ($result->valid()) { + $existingKeys = $result->current(); + if ($existingKeys === 0) { + // None of the chunk exists - register all. + $orphanedCacheKeys = array_merge($orphanedCacheKeys, $cacheKeysChunk); + } elseif ($existingKeys !== count($cacheKeysChunk)) { + // Some exists some don't - trigger another batch of chunks. + // @TODO At what chunk size is a single item comparison more efficient? + // @TODO The call could set an optimized number of chunks. At this point the number of existing keys and the number + // of keys to check is known - this could allow to guesstimate the optimal fragmentation. + $orphanedCacheKeys = array_merge($orphanedCacheKeys, $this->getOrphanedCacheKeys($cacheKeysChunk)); + } + } + } + } else { + // Without multi-key support in exists each single reference + // has to be checked individually to create the delta. + foreach ($cacheKeys as $cacheKey) { + $result = $this->pipeline(function () use ($cacheKey) { + yield 'exists' => [$cacheKey]; + }); + if ($result->valid() && !$result->current()) { + $orphanedCacheKeys[] = $cacheKey; + } + } + } + return $orphanedCacheKeys; + } + /** * @TODO Verify the LUA scripts are redis-cluster safe. */ @@ -497,11 +588,17 @@ private function pruneOrphanedTags(bool $compressMode = false): bool return $success; } - /** - * @TODO Make compression mode flag configurable. - */ public function prune(): bool { - return $this->pruneOrphanedTags(true); + // Only prune once per prune run. + if (!isset($this->pruneResult)) { + // First run without compression enabled to reduce data that is + // processed by the compression handling. + $this->pruneResult = $this->pruneOrphanedTags(); + if ($this->pruneResult && $this->pruneWithCompression) { + $this->pruneResult = $this->pruneOrphanedTags(true); + } + } + return $this->pruneResult; } }