Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: added test_keys_expiration_during_migration #4558

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1870,6 +1870,57 @@ async def test_start_replication_during_migration(
assert await seeder.compare(m1_capture, r1_node.instance.port)


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_keys_expiration_during_migration(df_factory: DflyInstanceFactory):
# Check data migration from one node to another with expiration
instances = [
df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2)
]

df_factory.start_all(instances)

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("Start seeder")
await nodes[0].client.execute_command("debug", "populate", "100000", "foo", "100", "RAND")

capture_before = await StaticSeeder.capture(nodes[0].client)

seeder = ExpirySeeder(timeout=4)
seeder_task = asyncio.create_task(seeder.run(nodes[0].client))
await asyncio.sleep(1)

logging.debug("Start migration")
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id)
)
seeder.stop()
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")

logging.debug("Stop seeders")
seeder.stop()
await seeder_task

logging.debug("finish migration")
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

# wait to expire all keys
await asyncio.sleep(5)

assert await StaticSeeder.capture(nodes[1].client) == capture_before

stats = await nodes[1].client.info("STATS")
assert stats["expired_keys"] > 0


@pytest.mark.parametrize("migration_first", [False, True])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "dbfilename": "snap_during_migration"})
async def test_snapshoting_during_migration(
Expand Down
20 changes: 14 additions & 6 deletions tests/dragonfly/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,18 +754,26 @@ def skip_if_not_in_github():


class ExpirySeeder:
def __init__(self):
def __init__(self, stop_on_failure=True, timeout=3):
self.stop_flag = False
self.i = 0
self.batch_size = 200
self.stop_on_failure = stop_on_failure
self.timeout = timeout

async def run(self, client):
while not self.stop_flag:
pipeline = client.pipeline(transaction=True)
for i in range(0, self.batch_size):
pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX 3")
self.i = self.i + 1
await pipeline.execute()
try:
pipeline = client.pipeline(transaction=True)
for i in range(0, self.batch_size):
pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX {self.timeout}")
self.i = self.i + 1
await pipeline.execute()
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
if self.stop_on_failure:
return
else:
raise SystemExit(e)

async def wait_until_n_inserts(self, count):
while not self.i > count:
Expand Down
Loading