diff --git a/src/Splitio/Services/Impressions/Classes/ImpressionsManager.cs b/src/Splitio/Services/Impressions/Classes/ImpressionsManager.cs index c3195c5d..0385f66f 100644 --- a/src/Splitio/Services/Impressions/Classes/ImpressionsManager.cs +++ b/src/Splitio/Services/Impressions/Classes/ImpressionsManager.cs @@ -63,11 +63,9 @@ public KeyImpression Build(TreatmentResult result, Key key, Dictionary> _cache; private readonly IImpressionsSenderAdapter _senderAdapter; private readonly ISplitTask _cacheLongTermCleaningTask; + private int _keySize; public UniqueKeysTracker(ComponentConfig config, IFilterAdapter filterAdapter, @@ -34,6 +35,7 @@ public UniqueKeysTracker(ComponentConfig config, _senderAdapter = senderAdapter; _cacheLongTermCleaningTask = cacheLongTermCleaningTask; _cacheLongTermCleaningTask.SetAction(_filterAdapter.Clear); + _keySize = 0; } #region Public Methods @@ -48,8 +50,8 @@ public bool Track(string key, string featureName) hashSet.Add(key); return hashSet; }); - - if (_cache.Count >= _cacheMaxSize) + _keySize++; + if (_keySize >= _maxBulkSize) { _taskBulkData.Start(); } @@ -76,8 +78,9 @@ protected override async Task SendBulkDataAsync() try { var uniques = new ConcurrentDictionary>(_cache); - + var keySize = _keySize; _cache.Clear(); + _keySize = 0; if (!uniques.Any()) return; @@ -85,23 +88,64 @@ protected override async Task SendBulkDataAsync() .Select(v => new Mtks(v.Key, v.Value)) .ToList(); - if (values.Count <= _maxBulkSize) + if (keySize <= _maxBulkSize) { await _senderAdapter.RecordUniqueKeysAsync(values); return; } + List bulksFlattened = FlattenBulks(values); + await SendAllBulks(bulksFlattened); + } + catch (Exception e) + { + _logger.Error("Exception caught sending Unique Keys.", e); + } + } - while (values.Count > 0) + protected async Task SendAllBulks(List bulksFlattened) + { + List bulksToSend = new List(); + int bulkSize = 0; + foreach (var unique in bulksFlattened) + { + if ((unique.Keys.Count() + bulkSize) > _maxBulkSize) { - var bulkToPost = Util.Helper.TakeFromList(values, _maxBulkSize); - - await _senderAdapter.RecordUniqueKeysAsync(bulkToPost); + await _senderAdapter.RecordUniqueKeysAsync(bulksToSend); + bulkSize = 0; + bulksToSend = new List(); } + bulkSize += unique.Keys.Count(); + bulksToSend.Add(unique); } - catch (Exception e) + await _senderAdapter.RecordUniqueKeysAsync(bulksToSend); + } + + protected List FlattenBulks(List values) + { + List bulksFlattened = new List(); + foreach (var unique in values) { - _logger.Error("Exception caught sending Unique Keys.", e); + var bulks = ConvertToBulks(unique); + bulksFlattened.AddRange(bulks); + } + return bulksFlattened; + } + + protected List ConvertToBulks(Mtks unique) + { + if (unique.Keys.Count > _maxBulkSize) + { + List chunks = new List(); + var uniqueTemp = new List(unique.Keys.ToArray()); + var bulks = Util.Helper.ChunkBy(uniqueTemp, _maxBulkSize); + foreach (var bulk in bulks) + { + chunks.Add(new Mtks(unique.Feature, new HashSet(bulk))); + } + return chunks; } + + return new List { unique }; } #endregion } diff --git a/src/Splitio/Services/Shared/Classes/ConfigService.cs b/src/Splitio/Services/Shared/Classes/ConfigService.cs index 7ecb0eaa..6e9a684e 100644 --- a/src/Splitio/Services/Shared/Classes/ConfigService.cs +++ b/src/Splitio/Services/Shared/Classes/ConfigService.cs @@ -71,7 +71,7 @@ public BaseConfig ReadRedisConfig(ConfigurationOptions config) baseConfig.UniqueKeysRefreshRate = 300; baseConfig.ImpressionsCounterRefreshRate = 300; baseConfig.ImpressionsCountBulkSize = 10000; - baseConfig.UniqueKeysBulkSize = 10000; + baseConfig.UniqueKeysBulkSize = 5000; return baseConfig; } @@ -93,7 +93,7 @@ public SelfRefreshingConfig ReadInMemoryConfig(ConfigurationOptions config) ImpressionsCounterCacheMaxSize = baseConfig.ImpressionsCounterCacheMaxSize, FlagSetsFilter = baseConfig.FlagSetsFilter, FlagSetsInvalid = baseConfig.FlagSetsInvalid, - UniqueKeysBulkSize = 30000, + UniqueKeysBulkSize = 5000, ImpressionsCountBulkSize = 30000, UniqueKeysRefreshRate = 3600, ImpressionsCounterRefreshRate = 1800, // Send bulk impressions count - Refresh rate: 30 min. diff --git a/src/Splitio/Util/Helper.cs b/src/Splitio/Util/Helper.cs index 1930434d..869967c8 100644 --- a/src/Splitio/Util/Helper.cs +++ b/src/Splitio/Util/Helper.cs @@ -3,6 +3,7 @@ using Splitio.Telemetry.Domain.Enums; using Splitio.Telemetry.Storages; using System.Collections.Generic; +using System.Linq; namespace Splitio.Util { @@ -49,5 +50,14 @@ public static bool HasNonASCIICharacters(string input) return false; } + + public static List> ChunkBy(List source, int chunkSize) + { + return source + .Select((x, i) => new { Index = i, Value = x }) + .GroupBy(x => x.Index / chunkSize) + .Select(x => x.Select(v => v.Value).ToList()) + .ToList(); + } } } diff --git a/tests/Splitio-tests/Unit Tests/Impressions/UniqueKeysTrackerTests.cs b/tests/Splitio-tests/Unit Tests/Impressions/UniqueKeysTrackerTests.cs index 419a616f..8df8e366 100644 --- a/tests/Splitio-tests/Unit Tests/Impressions/UniqueKeysTrackerTests.cs +++ b/tests/Splitio-tests/Unit Tests/Impressions/UniqueKeysTrackerTests.cs @@ -65,7 +65,7 @@ public async Task PeriodicTask_ShouldSendBulk() } [TestMethod] - public void Track_WithFullSize_ShouldSendBulk() + public void Track_WithFullSize_ShouldSendTwoBulk() { // Arrange. _cache.Clear(); @@ -101,16 +101,17 @@ public void Track_WithFullSize_ShouldSendBulk() Assert.AreEqual(1, values2.Count); _cache.TryGetValue("feature-name-test-3", out HashSet values3); Assert.AreEqual(1, values3.Count); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test", "feature-name-test-5")); Assert.IsTrue(_uniqueKeysTracker.Track("key-test-2", "feature-name-test-5")); Thread.Sleep(500); - _senderAdapter.Verify(mock => mock.RecordUniqueKeysAsync(It.IsAny>()), Times.Once); + _senderAdapter.Verify(mock => mock.RecordUniqueKeysAsync(It.IsAny>()), Times.Exactly(2)); _cache.Clear(); } [TestMethod] - public void Track_WithFullSize_ShouldSendTwoBulk() + public void Track_WithFullSize_ShouldSplitBulks() { // Arrange. _cache.Clear(); @@ -122,15 +123,21 @@ public void Track_WithFullSize_ShouldSendTwoBulk() _uniqueKeysTracker = new UniqueKeysTracker(config, _filterAdapter.Object, _cache, _senderAdapter.Object, task, cacheLongTermCleaningTask, sendBulkDataTask); // Act && Assert. + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-1", "feature-name-test")); Assert.IsTrue(_uniqueKeysTracker.Track("key-test-2", "feature-name-test")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-3", "feature-name-test")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-4", "feature-name-test")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-5", "feature-name-test")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-6", "feature-name-test")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-1", "feature-name-test-2")); Assert.IsTrue(_uniqueKeysTracker.Track("key-test-2", "feature-name-test-2")); - Assert.IsTrue(_uniqueKeysTracker.Track("key-test-2", "feature-name-test-3")); - Assert.IsTrue(_uniqueKeysTracker.Track("key-test-2", "feature-name-test-4")); - Assert.IsTrue(_uniqueKeysTracker.Track("key-test-2", "feature-name-test-5")); - Assert.IsTrue(_uniqueKeysTracker.Track("key-test-2", "feature-name-test-6")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-3", "feature-name-test-2")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-4", "feature-name-test-2")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-5", "feature-name-test-2")); + Assert.IsTrue(_uniqueKeysTracker.Track("key-test-6", "feature-name-test-2")); Thread.Sleep(500); - _senderAdapter.Verify(mock => mock.RecordUniqueKeysAsync(It.IsAny>()), Times.Exactly(2)); + _senderAdapter.Verify(mock => mock.RecordUniqueKeysAsync(It.IsAny>()), Times.Exactly(4)); _cache.Clear(); }