From 1514a18b4e0e314b4f71ceb6275bfc6258ec575c Mon Sep 17 00:00:00 2001 From: Sergio Troiano Date: Tue, 17 Sep 2024 09:13:22 +0200 Subject: [PATCH] Adding logging to test --- pkg/storage/redis/node_management_lua.go | 39 +-------- .../go_test/integration_test.go | 87 ++++++++++++++++++- 2 files changed, 88 insertions(+), 38 deletions(-) diff --git a/pkg/storage/redis/node_management_lua.go b/pkg/storage/redis/node_management_lua.go index 9d44050..f4aa3b4 100644 --- a/pkg/storage/redis/node_management_lua.go +++ b/pkg/storage/redis/node_management_lua.go @@ -99,80 +99,45 @@ if ARGV[1] == "monitor" then end if ARGV[1] == "add_latest_produced_offset" then - local log = "" -- Initialize an empty log string - local key = KEYS[1] -- Use the key from KEYS[1] local offset = tonumber(ARGV[2]) local newTimestamp = tonumber(ARGV[3]) local ttlSeconds = tonumber(ARGV[4]) -- TTL value in seconds local cleanupProbability = tonumber(ARGV[5]) -- Probability as a percentage (e.g., 20 for 20%) - log = log .. "Processing key: " .. key .. ", Offset: " .. offset .. ", Timestamp: " .. newTimestamp .. "\n" - log = log .. "TTL: " .. ttlSeconds .. "s, Cleanup Probability: " .. cleanupProbability .. "%\n" - - -- Log the current contents of the sorted set - local currentMembers = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES') - log = log .. "Current members in the sorted set:\n" - for i = 1, #currentMembers, 2 do - local member = currentMembers[i] - local score = currentMembers[i + 1] - log = log .. " Member: " .. member .. ", Score: " .. score .. "\n" - end - -- Optionally perform cleanup - if math.random(100) < cleanupProbability then + if math.random(100) <= cleanupProbability then local expiredTimestamp = newTimestamp - (ttlSeconds * 1000) -- TTL converted to milliseconds - log = log .. "Cleanup triggered. Expired timestamp threshold: " .. expiredTimestamp .. "\n" -- Find and remove old entries by member (timestamp) local oldMembers = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES') - log = log .. "Found " .. (#oldMembers / 2) .. " members in the sorted set.\n" - for i = 1, #oldMembers, 2 do local member = oldMembers[i] local timestamp = tonumber(member) - log = log .. "Checking member with timestamp: " .. timestamp .. "\n" if timestamp < expiredTimestamp then redis.call('ZREM', key, member) - log = log .. "Removed member with timestamp: " .. timestamp .. "\n" end end - else - log = log .. "Cleanup not triggered.\n" end -- Retrieve the last two entries in the ZSET local members = redis.call('ZRANGE', key, -2, -1, 'WITHSCORES') local memberCount = #members / 2 - log = log .. "There are " .. memberCount .. " members in the last two entries.\n" if memberCount == 2 then local secondLastOffset = tonumber(members[2]) local lastOffset = tonumber(members[4]) - log = log .. "Second last offset: " .. secondLastOffset .. ", Last offset: " .. lastOffset .. "\n" if secondLastOffset == offset and lastOffset == offset then local latestMember = members[3] redis.call('ZREM', key, latestMember) - log = log .. "Removed latest member with timestamp: " .. latestMember .. " due to duplicate offsets.\n" end end -- Add the new member (whether it's a replacement or a new entry) redis.call('ZADD', key, offset, newTimestamp) redis.call('EXPIRE', key, ttlSeconds) -- Renew TTL - log = log .. "Added/Updated member with timestamp: " .. newTimestamp .. " and offset: " .. offset .. "\n" - log = log .. "TTL set to " .. ttlSeconds .. " seconds for key: " .. key .. "\n" - - -- Log the contents of the sorted set after modification - local updatedMembers = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES') - log = log .. "Updated members in the sorted set:\n" - for i = 1, #updatedMembers, 2 do - local member = updatedMembers[i] - local score = updatedMembers[i + 1] - log = log .. " Member: " .. member .. ", Score: " .. score .. "\n" - end - return log + return "Added or replaced member with timestamp " .. newTimestamp end ` diff --git a/test/integration/tests/redis_lua_scripts/go_test/integration_test.go b/test/integration/tests/redis_lua_scripts/go_test/integration_test.go index 7dfe5b4..adbbfdb 100644 --- a/test/integration/tests/redis_lua_scripts/go_test/integration_test.go +++ b/test/integration/tests/redis_lua_scripts/go_test/integration_test.go @@ -347,6 +347,91 @@ func TestMonitorAndRemoveFailedNodes(t *testing.T) { } // Test function for adding multiple offsets and verifying order in the sorted set +func TestAddLatestProducedOffset_MultipleEntries(t *testing.T) { + ctx := context.Background() + nodeTag := "test_key" // This key will be used for the test + + // Initialize the Redis test environment + rdb, sha, err := initRedisTest(ctx, redis_local.LuaScriptContent) + assert.NoError(t, err, "Failed to initialize Redis test environment") + defer cleanupRedisTest(ctx, rdb) + + // Define multiple offsets and timestamps + entries := []struct { + offset int + timestamp int64 + }{ + {100, time.Now().Unix()}, + {200, time.Now().Unix() + 1}, + {300, time.Now().Unix() + 2}, + } + ttlSeconds := 60 + + // Add multiple entries using the Lua script + for _, entry := range entries { + keys := []string{nodeTag} + args := []interface{}{"add_latest_produced_offset", entry.offset, entry.timestamp, ttlSeconds, 0} + + // Run the Lua script to add the offset and timestamp + res, err := rdb.EvalSha(ctx, sha, keys, args...).Result() + assert.NoError(t, err, "Error running Lua script") + assert.Equal(t, fmt.Sprintf("Added or replaced member with timestamp %d", entry.timestamp), res, "Unexpected result from Lua script") + } + + // Verify that all entries exist in the sorted set and in the correct order + members, err := rdb.ZRangeWithScores(ctx, nodeTag, 0, -1).Result() + assert.NoError(t, err, "Error fetching members from the sorted set") + assert.Len(t, members, 3, "There should be 3 members in the sorted set") + + // Verify the order of the entries by checking the offsets and timestamps + for i, entry := range entries { + assert.Equal(t, float64(entry.offset), members[i].Score, "The offset value should match") + assert.Equal(t, fmt.Sprintf("%d", entry.timestamp), members[i].Member.(string), "The timestamp should match") + } + + // Check that the TTL is correctly set for the key + ttl, err := rdb.TTL(ctx, nodeTag).Result() + assert.NoError(t, err, "Error checking TTL of the key") + assert.GreaterOrEqual(t, ttl.Seconds(), float64(ttlSeconds-1), "TTL should be close to 60 seconds") +} + +// Test function for TTL expiration +func TestAddLatestProducedOffset_TTLExpiration(t *testing.T) { + ctx := context.Background() + nodeTag := "test_key_ttl_expiration" // This key will be used for the test + + // Initialize the Redis test environment + rdb, sha, err := initRedisTest(ctx, redis_local.LuaScriptContent) + assert.NoError(t, err, "Failed to initialize Redis test environment") + defer cleanupRedisTest(ctx, rdb) + + // Define offset, timestamp, and short TTL (1 second) + offset := 100 + timestamp := time.Now().Unix() + ttlSeconds := 1 + + // Add the entry using the Lua script + keys := []string{nodeTag} + args := []interface{}{"add_latest_produced_offset", offset, timestamp, ttlSeconds, 0} + + // Run the Lua script to add the offset and timestamp + res, err := rdb.EvalSha(ctx, sha, keys, args...).Result() + assert.NoError(t, err, "Error running Lua script") + assert.Equal(t, fmt.Sprintf("Added or replaced member with timestamp %d", timestamp), res, "Unexpected result from Lua script") + + // Verify that the entry exists immediately after insertion + members, err := rdb.ZRangeWithScores(ctx, nodeTag, 0, -1).Result() + assert.NoError(t, err, "Error fetching members from the sorted set") + assert.Len(t, members, 1, "There should be 1 member in the sorted set") + + // Wait for 2 seconds to ensure the TTL has expired + time.Sleep(2 * time.Second) + + // Verify that the key is no longer present in Redis + exists, err := rdb.Exists(ctx, nodeTag).Result() + assert.NoError(t, err, "Error checking if key exists after TTL expiration") + assert.Equal(t, int64(0), exists, "The key should no longer exist after TTL expiration") +} // Test function for cleanup logic with probability func TestAddLatestProducedOffset_CleanupLogic(t *testing.T) { @@ -405,7 +490,7 @@ func TestAddLatestProducedOffset_CleanupLogic(t *testing.T) { // Now, test with 0% cleanup probability (no cleanup) // Add another entry but ensure no cleanup happens noCleanupOffset := 400 - noCleanupTimestamp := time.Now().UnixMilli() + noCleanupTimestamp := time.Now().UnixMilli() + 1 // Add 1 millisecond to avoid duplicate timestamp with the previous entry noCleanupProbability := 0 args = []interface{}{"add_latest_produced_offset", noCleanupOffset, noCleanupTimestamp, ttlSeconds, noCleanupProbability}