From 966bd9851902b9bb5c2f8a38e8156566ff7ce3f1 Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 4 Feb 2025 15:54:12 +0200 Subject: [PATCH 1/4] test: added test_keys_expiration_during_migration --- tests/dragonfly/cluster_test.py | 51 +++++++++++++++++++++++++++++++++ tests/dragonfly/utility.py | 20 +++++++++---- 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index b6c433aeb57a..f16b89158288 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1876,6 +1876,57 @@ async def test_start_replication_during_migration( assert await seeder.compare(fake_capture, r1_node.instance.port) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_keys_expiration_during_migration(df_factory: DflyInstanceFactory): + # Check data migration from one node to another with expiration + instances = [ + df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) + ] + + df_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Start seeder") + await nodes[0].client.execute_command("debug", "populate", "100000", "foo", "100", "RAND") + + capture_before = await StaticSeeder.capture(nodes[0].client) + + seeder = ExpirySeeder(timeout=4) + seeder_task = asyncio.create_task(seeder.run(nodes[0].client)) + await asyncio.sleep(1) + + logging.debug("Start migration") + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) + ) + seeder.stop() + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED") + + logging.debug("Stop seeders") + seeder.stop() + await seeder_task + + logging.debug("finish migration") + nodes[0].slots = [] + nodes[1].slots = [(0, 16383)] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # wait to expire all keys + await asyncio.sleep(5) + + assert await StaticSeeder.capture(nodes[1].client) == capture_before + + stats = await nodes[1].client.info("STATS") + assert stats["expired_keys"] > 0 + + @pytest.mark.parametrize("migration_first", [False, True]) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_snapshoting_during_migration( diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index fc661130d829..be3d8dfd951a 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -754,18 +754,26 @@ def skip_if_not_in_github(): class ExpirySeeder: - def __init__(self): + def __init__(self, stop_on_failure=True, timeout=3): self.stop_flag = False self.i = 0 self.batch_size = 200 + self.stop_on_failure = stop_on_failure + self.timeout = timeout async def run(self, client): while not self.stop_flag: - pipeline = client.pipeline(transaction=True) - for i in range(0, self.batch_size): - pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX 3") - self.i = self.i + 1 - await pipeline.execute() + try: + pipeline = client.pipeline(transaction=True) + for i in range(0, self.batch_size): + pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX {self.timeout}") + self.i = self.i + 1 + await pipeline.execute() + except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e: + if self.stop_on_failure: + return + else: + raise SystemExit(e) async def wait_until_n_inserts(self, count): while not self.i > count: From a49c73db702df621619d10806dc2cb197dfaa922 Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 13 Feb 2025 10:56:44 +0200 Subject: [PATCH 2/4] refactor: address comments --- tests/dragonfly/cluster_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index f16b89158288..93d3974636e9 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1898,13 +1898,12 @@ async def test_keys_expiration_during_migration(df_factory: DflyInstanceFactory) seeder = ExpirySeeder(timeout=4) seeder_task = asyncio.create_task(seeder.run(nodes[0].client)) - await asyncio.sleep(1) + await seeder.wait_until_n_inserts(1000) logging.debug("Start migration") nodes[0].migrations.append( MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) ) - seeder.stop() await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED") From 4a884bc6974bc9a40aa1947279a35d4d9c8910bc Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 13 Feb 2025 13:22:26 +0200 Subject: [PATCH 3/4] refactor: remove extra pipeline usage --- tests/dragonfly/utility.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index be3d8dfd951a..386cbf5710b7 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -764,11 +764,9 @@ def __init__(self, stop_on_failure=True, timeout=3): async def run(self, client): while not self.stop_flag: try: - pipeline = client.pipeline(transaction=True) for i in range(0, self.batch_size): - pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX {self.timeout}") + await client.execute_command(f"SET tmp{self.i} bar{self.i} EX {self.timeout}") self.i = self.i + 1 - await pipeline.execute() except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e: if self.stop_on_failure: return From cd89b487c7ec62362cd2642c25ee55ab4e7c1e2d Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 13 Feb 2025 15:00:08 +0200 Subject: [PATCH 4/4] refactor: return pipeline into ExpirySeeder but without transaction --- tests/dragonfly/utility.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 386cbf5710b7..5b5195e7496f 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -764,9 +764,11 @@ def __init__(self, stop_on_failure=True, timeout=3): async def run(self, client): while not self.stop_flag: try: + pipeline = client.pipeline(transaction=False) for i in range(0, self.batch_size): - await client.execute_command(f"SET tmp{self.i} bar{self.i} EX {self.timeout}") + pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX {self.timeout}") self.i = self.i + 1 + await pipeline.execute() except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e: if self.stop_on_failure: return