diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3992b126..61b23f38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: - name: SonarQube Scan (Push) if: ${{ github.event_name == 'push' }} - uses: SonarSource/sonarcloud-github-action@v4.0.0 + uses: SonarSource/sonarcloud-github-action@v5.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -46,7 +46,7 @@ jobs: - name: SonarQube Scan (Pull Request) if: ${{ github.event_name == 'pull_request' }} - uses: SonarSource/sonarcloud-github-action@v4.0.0 + uses: SonarSource/sonarcloud-github-action@v5.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/CHANGES b/CHANGES index 31aa4cb8..a2c4a883 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,11 @@ +7.0.0 (Apr 24, 2025) +- BREAKING CHANGE: + - UniqueKeysTracker interface changed: + - Added Unique keys storage implementation. + - Added support for batch posting. +- Bump dependencies for vulnerability fixes. +- Updated flag specs handler. + 6.1.0 (Jan 14, 2025) - Added support for Impressions toggle. - Bump dependencies for vulnerability fixes. diff --git a/conf/conf.go b/conf/conf.go index e84026fd..e96b71aa 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -77,6 +77,8 @@ type AdvancedConfig struct { EventsQueueSize int ImpressionsQueueSize int ImpressionsBulkSize int64 + UniqueKeysQueueSize int64 + UniqueKeysBulkSize int64 StreamingEnabled bool AuthServiceURL string StreamingServiceURL string diff --git a/dtos/telemetry.go b/dtos/telemetry.go index 1de529ab..afa9062d 100644 --- a/dtos/telemetry.go +++ b/dtos/telemetry.go @@ -143,8 +143,8 @@ type Stats struct { // Key struct type Key struct { - Feature string `json:"f,omitempty"` - Keys []string `json:"ks,omitempty"` + Feature string `json:"f,omitempty"` + Keys []interface{} `json:"ks,omitempty"` } // Uniques struct diff --git a/provisional/impmanager_test.go b/provisional/impmanager_test.go index 9847f880..ce87de80 100644 --- a/provisional/impmanager_test.go +++ b/provisional/impmanager_test.go @@ -8,7 +8,9 @@ import ( "github.com/splitio/go-split-commons/v6/provisional/strategy" "github.com/splitio/go-split-commons/v6/storage/filter" "github.com/splitio/go-split-commons/v6/storage/inmemory" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue" "github.com/splitio/go-split-commons/v6/telemetry" + "github.com/splitio/go-toolkit/v5/logging" ) func TestImpManagerInMemoryDebugListenerDisabled(t *testing.T) { @@ -122,7 +124,8 @@ func TestImpManagerInMemoryOptimized(t *testing.T) { func TestImpManagerInMemoryNone(t *testing.T) { counter := strategy.NewImpressionsCounter() filter := filter.NewBloomFilter(3000, 0.01) - uniqueTracker := strategy.NewUniqueKeysTracker(filter) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage) none := strategy.NewNoneImpl(counter, uniqueTracker, true) impManager := NewImpressionManager(none) @@ -190,7 +193,8 @@ func TestProcess(t *testing.T) { observer, _ := strategy.NewImpressionObserver(5000) debug := strategy.NewDebugImpl(observer, true) filter := filter.NewBloomFilter(3000, 0.01) - uniqueTracker := strategy.NewUniqueKeysTracker(filter) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage) counter := strategy.NewImpressionsCounter() none := strategy.NewNoneImpl(counter, uniqueTracker, false) diff --git a/provisional/strategy/none_test.go b/provisional/strategy/none_test.go index ee90b59b..7fd40c8c 100644 --- a/provisional/strategy/none_test.go +++ b/provisional/strategy/none_test.go @@ -6,13 +6,16 @@ import ( "github.com/splitio/go-split-commons/v6/dtos" "github.com/splitio/go-split-commons/v6/storage/filter" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue" "github.com/splitio/go-split-commons/v6/util" + "github.com/splitio/go-toolkit/v5/logging" ) func TestNoneMode(t *testing.T) { now := time.Now().UTC().UnixNano() filter := filter.NewBloomFilter(1000, 0.01) - tracker := NewUniqueKeysTracker(filter) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + tracker := NewUniqueKeysTracker(filter, uniqueKeysStorage) counter := NewImpressionsCounter() none := NewNoneImpl(counter, tracker, true) @@ -52,7 +55,8 @@ func TestNoneMode(t *testing.T) { func TestApplySingleNone(t *testing.T) { now := time.Now().UTC().UnixNano() filter := filter.NewBloomFilter(1000, 0.01) - tracker := NewUniqueKeysTracker(filter) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + tracker := NewUniqueKeysTracker(filter, uniqueKeysStorage) counter := NewImpressionsCounter() none := NewNoneImpl(counter, tracker, true) diff --git a/provisional/strategy/uniquekeystracker.go b/provisional/strategy/uniquekeystracker.go index acae45a5..7bae3855 100644 --- a/provisional/strategy/uniquekeystracker.go +++ b/provisional/strategy/uniquekeystracker.go @@ -1,32 +1,25 @@ package strategy import ( - "sync" - - "github.com/splitio/go-split-commons/v6/dtos" "github.com/splitio/go-split-commons/v6/storage" - "github.com/splitio/go-toolkit/v5/datastructures/set" ) // UniqueKeysTracker interface type UniqueKeysTracker interface { Track(featureName string, key string) bool - PopAll() dtos.Uniques } // UniqueKeysTrackerImpl description type UniqueKeysTrackerImpl struct { - filter storage.Filter - cache map[string]*set.ThreadUnsafeSet - mutex *sync.RWMutex + filter storage.Filter + storage storage.UniqueKeysStorageProducer } // NewUniqueKeysTracker create new implementation -func NewUniqueKeysTracker(f storage.Filter) UniqueKeysTracker { +func NewUniqueKeysTracker(f storage.Filter, storage storage.UniqueKeysStorageProducer) UniqueKeysTracker { return &UniqueKeysTrackerImpl{ - filter: f, - cache: make(map[string]*set.ThreadUnsafeSet), - mutex: &sync.RWMutex{}, + filter: f, + storage: storage, } } @@ -37,49 +30,8 @@ func (t *UniqueKeysTrackerImpl) Track(featureName string, key string) bool { return false } - t.mutex.Lock() - defer t.mutex.Unlock() - t.filter.Add(fKey) - _, ok := t.cache[featureName] - if !ok { - t.cache[featureName] = set.NewSet() - } - - t.cache[featureName].Add(key) + t.storage.Push(featureName, key) return true } - -// PopAll returns all the elements stored in the cache and resets the cache -func (t *UniqueKeysTrackerImpl) PopAll() dtos.Uniques { - t.mutex.Lock() - defer t.mutex.Unlock() - toReturn := t.cache - t.cache = make(map[string]*set.ThreadUnsafeSet) - - return getUniqueKeysDto(toReturn) -} - -func getUniqueKeysDto(uniques map[string]*set.ThreadUnsafeSet) dtos.Uniques { - uniqueKeys := dtos.Uniques{ - Keys: make([]dtos.Key, 0, len(uniques)), - } - - for name, keys := range uniques { - list := keys.List() - keysDto := make([]string, 0, len(list)) - - for _, value := range list { - keysDto = append(keysDto, value.(string)) - } - keyDto := dtos.Key{ - Feature: name, - Keys: keysDto, - } - - uniqueKeys.Keys = append(uniqueKeys.Keys, keyDto) - } - - return uniqueKeys -} diff --git a/provisional/strategy/uniquekeystracker_test.go b/provisional/strategy/uniquekeystracker_test.go index f2ec0992..af2cdb5c 100644 --- a/provisional/strategy/uniquekeystracker_test.go +++ b/provisional/strategy/uniquekeystracker_test.go @@ -5,12 +5,14 @@ import ( "testing" "github.com/splitio/go-split-commons/v6/storage/filter" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue" + "github.com/splitio/go-toolkit/v5/logging" ) -func Test(t *testing.T) { +func TestUniqueKeysTracker(t *testing.T) { bf := filter.NewBloomFilter(10000, 0.01) - - tracker := NewUniqueKeysTracker(bf) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + tracker := NewUniqueKeysTracker(bf, uniqueKeysStorage) for i := 0; i < 10; i++ { if !tracker.Track("feature-1", "key-"+fmt.Sprint(i)) { diff --git a/service/api/http_recorders_test.go b/service/api/http_recorders_test.go index 04b311fd..c3c22cd3 100644 --- a/service/api/http_recorders_test.go +++ b/service/api/http_recorders_test.go @@ -237,11 +237,11 @@ func TestJsonUniqueKeys(t *testing.T) { Keys: []dtos.Key{ { Feature: "feature-1", - Keys: []string{"key-1", "key-2"}, + Keys: []interface{}{"key-1", "key-2"}, }, { Feature: "feature-2", - Keys: []string{"key-10", "key-20"}, + Keys: []interface{}{"key-10", "key-20"}, }, }, } @@ -317,11 +317,11 @@ func TestPostUniqueKeys(t *testing.T) { keys := []dtos.Key{ { Feature: "feature-1", - Keys: []string{"key-1", "key-2"}, + Keys: []interface{}{"key-1", "key-2"}, }, { Feature: "feature-2", - Keys: []string{"key-3", "key-4"}, + Keys: []interface{}{"key-3", "key-4"}, }, } err := telemetryRecorder.RecordUniqueKeys(dtos.Uniques{ diff --git a/service/api/specs/specversion.go b/service/api/specs/specversion.go index 78fafcee..62a23cd2 100644 --- a/service/api/specs/specversion.go +++ b/service/api/specs/specversion.go @@ -1,24 +1,29 @@ package specs -import "fmt" +import ( + "fmt" + + "golang.org/x/exp/slices" +) const ( - FLAG_V1_0 = "1.0" - FLAG_V1_1 = "1.1" - FLAG_V1_2 = "1.2" + FLAG_V1_0 = "1.0" // default + FLAG_V1_1 = "1.1" // Semver + FLAG_V1_2 = "1.2" // Large Segment + FLAG_V1_3 = "1.3" // Rule-based Segment ) +var flagSpecs = []string{FLAG_V1_0, FLAG_V1_1, FLAG_V1_2} +var Latest = flagSpecs[len(flagSpecs)-1] + // Match returns the spec version if it is valid, otherwise it returns nil func Match(version string) *string { - switch version { - case FLAG_V1_0: - return &version - case FLAG_V1_1: - return &version - case FLAG_V1_2: - return &version + ok := slices.Contains(flagSpecs, version) + if !ok { + return nil } - return nil + + return &version } func ParseAndValidate(spec string) (string, error) { diff --git a/service/api/specs/splitversionfilter.go b/service/api/specs/splitversionfilter.go index fb612241..cbf67a8c 100644 --- a/service/api/specs/splitversionfilter.go +++ b/service/api/specs/splitversionfilter.go @@ -5,35 +5,34 @@ import ( ) type SplitVersionFilter struct { - v1_0 map[string]bool - v1_1 map[string]bool + data map[string]map[string]bool } func NewSplitVersionFilter() SplitVersionFilter { - v1_1 := map[string]bool{matchers.MatcherTypeInLargeSegment: true} - v1_0 := mergeMaps(map[string]bool{ + data := map[string]map[string]bool{ + FLAG_V1_1: {matchers.MatcherTypeInLargeSegment: true}, + } + + data[FLAG_V1_0] = mergeMaps(map[string]bool{ matchers.MatcherEqualToSemver: true, matchers.MatcherTypeLessThanOrEqualToSemver: true, matchers.MatcherTypeGreaterThanOrEqualToSemver: true, matchers.MatcherTypeBetweenSemver: true, matchers.MatcherTypeInListSemver: true, - }, v1_1) + }, data[FLAG_V1_1]) return SplitVersionFilter{ - v1_0: v1_0, - v1_1: v1_1, + data: data, } } func (f *SplitVersionFilter) ShouldFilter(matcher string, apiVersion string) bool { - switch apiVersion { - case FLAG_V1_1: - return f.v1_1[matcher] - case FLAG_V1_0: - return f.v1_0[matcher] + matchers, ok := f.data[apiVersion] + if !ok { + return false } - return false + return matchers[matcher] } func mergeMaps(versionMap map[string]bool, toMergeMap map[string]bool) map[string]bool { diff --git a/service/api/specs/splitversionfilter_test.go b/service/api/specs/splitversionfilter_test.go index e9f5cf0a..ab8e51cb 100644 --- a/service/api/specs/splitversionfilter_test.go +++ b/service/api/specs/splitversionfilter_test.go @@ -28,7 +28,7 @@ func TestParseAndValidate(t *testing.T) { } } -func TestsplitVersionFilter(t *testing.T) { +func TestSplitVersionFilter(t *testing.T) { filter := NewSplitVersionFilter() shouldFilter := filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FLAG_V1_0) if !shouldFilter { diff --git a/storage/inmemory/mutexqueue/uniquekeys.go b/storage/inmemory/mutexqueue/uniquekeys.go new file mode 100644 index 00000000..18121a0c --- /dev/null +++ b/storage/inmemory/mutexqueue/uniquekeys.go @@ -0,0 +1,104 @@ +package mutexqueue + +import ( + "container/list" + "sync" + + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/go-toolkit/v5/logging" +) + +type UniqueKeyWrapper struct { + featureName string + key string +} + +type MQUniqueKeysStorage struct { + queue *list.List + maxSize int64 + mutexQueue *sync.Mutex + fullChan chan string //only write channel + logger logging.LoggerInterface +} + +func NewMQUniqueKeysStorage(maxSize int64, isFull chan string, logger logging.LoggerInterface) *MQUniqueKeysStorage { + return &MQUniqueKeysStorage{ + queue: list.New(), + maxSize: maxSize, + mutexQueue: &sync.Mutex{}, + fullChan: isFull, + logger: logger, + } +} + +func (s *MQUniqueKeysStorage) Push(featureName string, key string) { + s.mutexQueue.Lock() + defer s.mutexQueue.Unlock() + + s.queue.PushBack(UniqueKeyWrapper{featureName: featureName, key: key}) + if s.queue.Len() == int(s.maxSize) { + s.sendSignalIsFull() + } +} + +func (s *MQUniqueKeysStorage) PopN(n int64) dtos.Uniques { + s.mutexQueue.Lock() + defer s.mutexQueue.Unlock() + + var totalItems int + if int64(s.queue.Len()) >= n { + totalItems = int(n) + } else { + totalItems = s.queue.Len() + } + + uniques := make(map[string]*set.ThreadUnsafeSet) + for i := 0; i < totalItems; i++ { + item, ok := s.queue.Remove(s.queue.Front()).(UniqueKeyWrapper) + if !ok { + continue + } + + _, exists := uniques[item.featureName] + if !exists { + uniques[item.featureName] = set.NewSet() + } + + uniques[item.featureName].Add(item.key) + } + + return getUniqueKeysDto(uniques) +} + +func (s *MQUniqueKeysStorage) sendSignalIsFull() { + // Nom blocking select + select { + case s.fullChan <- "UNIQUE_KEYS_FULL": + // Send "queue is full" signal + break + default: + s.logger.Debug("Some error occurred on sending signal for unique keys") + } +} + +func getUniqueKeysDto(uniques map[string]*set.ThreadUnsafeSet) dtos.Uniques { + keysToReturn := make([]dtos.Key, 0) + + for name, keys := range uniques { + keyDto := dtos.Key{ + Feature: name, + Keys: keys.List(), + } + + keysToReturn = append(keysToReturn, keyDto) + } + + return dtos.Uniques{ + Keys: keysToReturn, + } +} + +var _ storage.UniqueKeysStorageConsumer = (*MQUniqueKeysStorage)(nil) +var _ storage.UniqueKeysStorageProducer = (*MQUniqueKeysStorage)(nil) diff --git a/storage/inmemory/mutexqueue/uniquekeys_test.go b/storage/inmemory/mutexqueue/uniquekeys_test.go new file mode 100644 index 00000000..430d8e63 --- /dev/null +++ b/storage/inmemory/mutexqueue/uniquekeys_test.go @@ -0,0 +1,83 @@ +package mutexqueue + +import ( + "testing" + + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-toolkit/v5/logging" +) + +func TestMQUniqueKeysStorage(t *testing.T) { + isFull := make(chan string, 1) + logger := logging.NewLogger(&logging.LoggerOptions{}) + storage := NewMQUniqueKeysStorage(5, isFull, logger) + + // Push some items into the queue + storage.Push("feature-1", "key-1") + storage.Push("feature-1", "key-2") + storage.Push("feature-2", "key-3") + storage.Push("feature-2", "key-4") + storage.Push("feature-3", "key-5") + + // Test PopN with n less than the queue size + result := storage.PopN(3) + if len(result.Keys) != 2 { + t.Errorf("Expected 2 feature groups, got %d", len(result.Keys)) + } + + // Validate feature-1 keys + feature1Keys := findFeatureKeys(result.Keys, "feature-1") + if len(feature1Keys) != 2 || !contains(feature1Keys, "key-1") || !contains(feature1Keys, "key-2") { + t.Errorf("Unexpected keys for feature-1: %v", feature1Keys) + } + + // Validate feature-2 keys + feature2Keys := findFeatureKeys(result.Keys, "feature-2") + if len(feature2Keys) != 1 || !contains(feature2Keys, "key-3") { + t.Errorf("Unexpected keys for feature-2: %v", feature2Keys) + } + + // Test PopN with n greater than the remaining queue size + result = storage.PopN(5) + if len(result.Keys) != 2 { + t.Errorf("Expected 2 feature groups, got %d", len(result.Keys)) + } + + // Validate feature-2 keys + feature2Keys = findFeatureKeys(result.Keys, "feature-2") + if len(feature2Keys) != 1 || !contains(feature2Keys, "key-4") { + t.Errorf("Unexpected keys for feature-2: %v", feature2Keys) + } + + // Validate feature-3 keys + feature3Keys := findFeatureKeys(result.Keys, "feature-3") + if len(feature3Keys) != 1 || !contains(feature3Keys, "key-5") { + t.Errorf("Unexpected keys for feature-3: %v", feature3Keys) + } + + // Test PopN with an empty queue + result = storage.PopN(1) + if len(result.Keys) != 0 { + t.Errorf("Expected 0 feature groups, got %d", len(result.Keys)) + } +} + +// Helper function to find keys for a specific feature +func findFeatureKeys(keys []dtos.Key, feature string) []interface{} { + for _, key := range keys { + if key.Feature == feature { + return key.Keys + } + } + return nil +} + +// Helper function to check if a slice contains a specific value +func contains(slice []interface{}, value string) bool { + for _, v := range slice { + if v == value { + return true + } + } + return false +} diff --git a/storage/interfaces.go b/storage/interfaces.go index 357bd177..777f97be 100644 --- a/storage/interfaces.go +++ b/storage/interfaces.go @@ -265,3 +265,16 @@ type LargeSegmentsStorage interface { LargeSegmentStorageProducer LargeSegmentStorageConsumer } + +type UniqueKeysStorageConsumer interface { + PopN(n int64) dtos.Uniques +} + +type UniqueKeysStorageProducer interface { + Push(featureName string, key string) +} + +type UniqueKeysStorage interface { + UniqueKeysStorageConsumer + UniqueKeysStorageProducer +} diff --git a/storage/mocks/uniquekeys.go b/storage/mocks/uniquekeys.go index 14d7a736..cd6d61a1 100644 --- a/storage/mocks/uniquekeys.go +++ b/storage/mocks/uniquekeys.go @@ -1,15 +1,31 @@ package mocks +import ( + "github.com/splitio/go-split-commons/v6/dtos" +) + type MockUniqueKeysStorage struct { + PushCall func(featureName string, key string) + PopNCall func(bulkSize int64) dtos.Uniques +} + +func (m MockUniqueKeysStorage) Push(featureName string, key string) { + m.PushCall(featureName, key) +} + +func (m MockUniqueKeysStorage) PopN(bulkSize int64) dtos.Uniques { + return m.PopNCall(bulkSize) +} + +type MockUniqueKeysMultiSdkConsumer struct { CountCall func() int64 - PopNRawCall func(n int64) ([]string, int64, error) + PopNRawCall func(int64) ([]string, int64, error) } -// Count mock -func (m MockUniqueKeysStorage) Count() int64 { +func (m MockUniqueKeysMultiSdkConsumer) Count() int64 { return m.CountCall() } -func (m MockUniqueKeysStorage) PopNRaw(n int64) ([]string, int64, error) { +func (m MockUniqueKeysMultiSdkConsumer) PopNRaw(n int64) ([]string, int64, error) { return m.PopNRawCall(n) } diff --git a/storage/redis/telemetry_test.go b/storage/redis/telemetry_test.go index f99f81c6..c2ac217a 100644 --- a/storage/redis/telemetry_test.go +++ b/storage/redis/telemetry_test.go @@ -191,11 +191,11 @@ func TestRecordUniqueKeys(t *testing.T) { Keys: []dtos.Key{ { Feature: "feature-1", - Keys: []string{"key-1", "key-2"}, + Keys: []interface{}{"key-1", "key-2"}, }, { Feature: "feature-2", - Keys: []string{"key-1", "key-2"}, + Keys: []interface{}{"key-1", "key-2"}, }, }, }) diff --git a/storage/redis/uniquekeys_test.go b/storage/redis/uniquekeys_test.go index 2563b8d1..ce3c9608 100644 --- a/storage/redis/uniquekeys_test.go +++ b/storage/redis/uniquekeys_test.go @@ -47,15 +47,15 @@ func TestPopNRaw(t *testing.T) { Keys: []dtos.Key{ { Feature: "feature-test-1", - Keys: []string{"key-1", "key-2", "key-3"}, + Keys: []interface{}{"key-1", "key-2", "key-3"}, }, { Feature: "feature-test-2", - Keys: []string{"key-1", "key-2", "key-3"}, + Keys: []interface{}{"key-1", "key-2", "key-3"}, }, { Feature: "feature-test-3", - Keys: []string{"key-1", "key-2", "key-3"}, + Keys: []interface{}{"key-1", "key-2", "key-3"}, }, }, } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 948c488d..dca190fb 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -66,6 +66,7 @@ type SynchronizerImpl struct { inMememoryFullQueue chan string impressionBulkSize int64 eventBulkSize int64 + uniqueKeysBulkSize int64 splitsRefreshRate int segmentsRefreshRate int httpTiemoutSecs int @@ -84,6 +85,7 @@ func NewSynchronizer( sync := &SynchronizerImpl{ impressionBulkSize: confAdvanced.ImpressionsBulkSize, eventBulkSize: confAdvanced.EventsBulkSize, + uniqueKeysBulkSize: confAdvanced.UniqueKeysBulkSize, splitTasks: splitTasks, workers: workers, logger: logger, @@ -254,6 +256,9 @@ func (s *SynchronizerImpl) dataFlusher() { if err != nil { s.logger.Error("Error flushing storage queue", err) } + case "UNIQUE_KEYS_FULL": + s.logger.Debug("FLUSHING Unique Keys storage") + s.workers.TelemetryRecorder.SynchronizeUniqueKeys(s.uniqueKeysBulkSize) } } } diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index ccf86e10..9b12f237 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -64,13 +64,14 @@ func TestSyncAllErrorSplits(t *testing.T) { atomic.AddInt64(¬ifyEventCalled, 1) }, } + advanced := conf.AdvancedConfig{EventsQueueSize: 100, EventsBulkSize: 100, HTTPTimeout: 100, ImpressionsBulkSize: 100, ImpressionsQueueSize: 100, SegmentQueueSize: 50, SegmentWorkers: 5} workers := Workers{ SplitUpdater: split.NewSplitUpdater(splitMockStorage, splitAPI.SplitFetcher, logger, telemetryMockStorage, appMonitorMock, flagsets.NewFlagSetFilter(nil)), SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, storageMock.MockSegmentStorage{}, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(storageMock.MockImpressionStorage{}, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 10, logger), @@ -152,7 +153,7 @@ func TestSyncAllErrorInSegments(t *testing.T) { SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(storageMock.MockImpressionStorage{}, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 10, logger), @@ -256,7 +257,7 @@ func TestSyncAllOk(t *testing.T) { SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(storageMock.MockImpressionStorage{}, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 10, logger), @@ -360,7 +361,7 @@ func TestPeriodicFetching(t *testing.T) { SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(storageMock.MockImpressionStorage{}, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 1, logger), @@ -484,7 +485,7 @@ func TestPeriodicRecording(t *testing.T) { SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(eventMockStorage, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(impressionMockStorage, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, splitAPI.TelemetryRecorder, splitMockStorage, segmentMockStorage, logger, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, splitAPI.TelemetryRecorder, splitMockStorage, segmentMockStorage, logger, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 1, logger), @@ -1000,7 +1001,7 @@ func TestSplitUpdateWithReferencedSegments(t *testing.T) { SplitUpdater: split.NewSplitUpdater(splitMockStorage, splitAPI.SplitFetcher, logger, telemetryMockStorage, appMonitorMock, flagsets.NewFlagSetFilter(nil)), SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ SegmentSyncTask: tasks.NewFetchSegmentsTask(workers.SegmentUpdater, 10, 5, 50, logger, appMonitorMock), diff --git a/tasks/telemetrysync_test.go b/tasks/telemetrysync_test.go index 455fff4b..d0e891ad 100644 --- a/tasks/telemetrysync_test.go +++ b/tasks/telemetrysync_test.go @@ -64,6 +64,7 @@ func TestTelemetrySyncTask(t *testing.T) { logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, + nil, ), 2, logging.NewLogger(&logging.LoggerOptions{}), diff --git a/tasks/uniquekeyssync.go b/tasks/uniquekeyssync.go index a2c2b598..0cb390e3 100644 --- a/tasks/uniquekeyssync.go +++ b/tasks/uniquekeyssync.go @@ -1,7 +1,6 @@ package tasks import ( - "github.com/splitio/go-split-commons/v6/provisional/strategy" "github.com/splitio/go-split-commons/v6/telemetry" "github.com/splitio/go-toolkit/v5/asynctask" "github.com/splitio/go-toolkit/v5/logging" @@ -10,16 +9,16 @@ import ( // NewRecordUniqueKeysTask constructor func NewRecordUniqueKeysTask( recorder telemetry.TelemetrySynchronizer, - uniqueTracker strategy.UniqueKeysTracker, period int, logger logging.LoggerInterface, + bulkSize int64, ) *asynctask.AsyncTask { record := func(logger logging.LoggerInterface) error { - return recorder.SynchronizeUniqueKeys(uniqueTracker.PopAll()) + return recorder.SynchronizeUniqueKeys(bulkSize) } onStop := func(logger logging.LoggerInterface) { - recorder.SynchronizeUniqueKeys(uniqueTracker.PopAll()) + recorder.SynchronizeUniqueKeys(bulkSize) } return asynctask.NewAsyncTask("SubmitUniqueKeys", record, period, nil, onStop, logger) diff --git a/tasks/uniquekeyssync_test.go b/tasks/uniquekeyssync_test.go index 9046466b..d130037c 100644 --- a/tasks/uniquekeyssync_test.go +++ b/tasks/uniquekeyssync_test.go @@ -7,6 +7,7 @@ import ( "github.com/splitio/go-split-commons/v6/dtos" "github.com/splitio/go-split-commons/v6/provisional/strategy" "github.com/splitio/go-split-commons/v6/service/mocks" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue" st "github.com/splitio/go-split-commons/v6/storage/mocks" "github.com/splitio/go-split-commons/v6/telemetry" "github.com/splitio/go-toolkit/v5/datastructures/set" @@ -55,6 +56,8 @@ func TestUniqueKeysTask(t *testing.T) { SegmentKeysCountCall: func() int64 { return 10 }, } + uniqueKeys := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + synchronizer := telemetry.NewTelemetrySynchronizer( mockedTelemetryStorage, mockedTelemetryHTTP, @@ -63,6 +66,7 @@ func TestUniqueKeysTask(t *testing.T) { logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, + uniqueKeys, ) filter := st.MockFilter{ @@ -70,12 +74,13 @@ func TestUniqueKeysTask(t *testing.T) { ContainsCall: func(data string) bool { return false }, ClearCall: func() {}, } - tracker := strategy.NewUniqueKeysTracker(filter) + + tracker := strategy.NewUniqueKeysTracker(filter, uniqueKeys) task := NewRecordUniqueKeysTask( synchronizer, - tracker, 2, logging.NewLogger(&logging.LoggerOptions{}), + 5, ) if !tracker.Track("tratment-1", "key-1") { diff --git a/telemetry/interface.go b/telemetry/interface.go index ca6e0b13..8c68f6fc 100644 --- a/telemetry/interface.go +++ b/telemetry/interface.go @@ -1,10 +1,8 @@ package telemetry -import "github.com/splitio/go-split-commons/v6/dtos" - // TelemetrySynchronizer interface type TelemetrySynchronizer interface { SynchronizeConfig(cfg InitConfig, timedUntilReady int64, factoryInstances map[string]int64, tags []string) SynchronizeStats() error - SynchronizeUniqueKeys(uniques dtos.Uniques) error + SynchronizeUniqueKeys(bulkSizer int64) error } diff --git a/telemetry/localhost.go b/telemetry/localhost.go index fb95d74c..815291bc 100644 --- a/telemetry/localhost.go +++ b/telemetry/localhost.go @@ -1,9 +1,5 @@ package telemetry -import ( - "github.com/splitio/go-split-commons/v6/dtos" -) - type NoOp struct{} func (n *NoOp) SynchronizeConfig(cfg InitConfig, timedUntilReady int64, factoryInstances map[string]int64, tags []string) { @@ -13,6 +9,8 @@ func (n *NoOp) SynchronizeStats() error { return nil } -func (n *NoOp) SynchronizeUniqueKeys(uniques dtos.Uniques) error { +func (n *NoOp) SynchronizeUniqueKeys(bulkSize int64) error { return nil } + +var _ TelemetrySynchronizer = (*NoOp)(nil) diff --git a/telemetry/memory.go b/telemetry/memory.go index f7fa3d1d..97d77958 100644 --- a/telemetry/memory.go +++ b/telemetry/memory.go @@ -20,6 +20,7 @@ type RecorderSingle struct { logger logging.LoggerInterface metadata dtos.Metadata runtimeTelemetry storage.TelemetryRuntimeProducer + uniqueKeysStorage storage.UniqueKeysStorageConsumer } // NewTelemetrySynchronizer creates new event synchronizer for posting events @@ -31,6 +32,7 @@ func NewTelemetrySynchronizer( logger logging.LoggerInterface, metadata dtos.Metadata, runtimeTelemetry storage.TelemetryRuntimeProducer, + uniqueKeysStorage storage.UniqueKeysStorageConsumer, ) TelemetrySynchronizer { return &RecorderSingle{ telemetryStorage: telemetryStorage, @@ -40,6 +42,7 @@ func NewTelemetrySynchronizer( logger: logger, metadata: metadata, runtimeTelemetry: runtimeTelemetry, + uniqueKeysStorage: uniqueKeysStorage, } } @@ -133,7 +136,13 @@ func (e *RecorderSingle) SynchronizeConfig(cfg InitConfig, timedUntilReady int64 } // SynchronizeUniqueKeys syncs unique keys -func (e *RecorderSingle) SynchronizeUniqueKeys(uniques dtos.Uniques) error { +func (e *RecorderSingle) SynchronizeUniqueKeys(bulkSize int64) error { + if e.uniqueKeysStorage == nil { + return nil + } + + uniques := e.uniqueKeysStorage.PopN(bulkSize) + if len(uniques.Keys) < 1 { e.logger.Debug("Unique keys list is empty, nothing to synchronize.") return nil diff --git a/telemetry/memory_test.go b/telemetry/memory_test.go index 850957ab..d07d51f9 100644 --- a/telemetry/memory_test.go +++ b/telemetry/memory_test.go @@ -64,7 +64,7 @@ func TestTelemetryRecorderError(t *testing.T) { }, } - telemetrySync := NewTelemetrySynchronizer(mockedTelemetryStorage, telemetryRecorderMock, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage) + telemetrySync := NewTelemetrySynchronizer(mockedTelemetryStorage, telemetryRecorderMock, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, nil) err := telemetrySync.SynchronizeStats() if err == nil { @@ -119,7 +119,7 @@ func TestTelemetryRecorder(t *testing.T) { RecordStatsCall: func(stats dtos.Stats, metadata dtos.Metadata) error { return nil }, } - telemetrySync := NewTelemetrySynchronizer(mockedTelemetryStorage, telemetryRecorderMock, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage) + telemetrySync := NewTelemetrySynchronizer(mockedTelemetryStorage, telemetryRecorderMock, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, nil) err := telemetrySync.SynchronizeStats() if err != nil { @@ -201,7 +201,7 @@ func TestTelemetryRecorderSync(t *testing.T) { PopUpdatesFromSSECall: func() dtos.UpdatesFromSSE { return dtos.UpdatesFromSSE{} }, } - telemetryRecorder := NewTelemetrySynchronizer(mockedTelemetryStorage, httpTelemetryRecorder, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage) + telemetryRecorder := NewTelemetrySynchronizer(mockedTelemetryStorage, httpTelemetryRecorder, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, nil) telemetryRecorder.SynchronizeStats() @@ -255,7 +255,7 @@ func TestConfig(t *testing.T) { }, } - sync := NewTelemetrySynchronizer(mockTelemetryStorage, mockRecorder, st.MockSplitStorage{}, st.MockSegmentStorage{}, logger, dtos.Metadata{SDKVersion: "go-test", MachineIP: "1.1.1.1", MachineName: "some"}, mockTelemetryStorage) + sync := NewTelemetrySynchronizer(mockTelemetryStorage, mockRecorder, st.MockSplitStorage{}, st.MockSegmentStorage{}, logger, dtos.Metadata{SDKVersion: "go-test", MachineIP: "1.1.1.1", MachineName: "some"}, mockTelemetryStorage, nil) factories := make(map[string]int64) factories["one"] = 1 factories["two"] = 1 diff --git a/telemetry/redis.go b/telemetry/redis.go index fb63cee5..5e3b27ef 100644 --- a/telemetry/redis.go +++ b/telemetry/redis.go @@ -8,15 +8,21 @@ import ( // SynchronizerRedis struct type SynchronizerRedis struct { - storage storage.TelemetryConfigProducer - logger logging.LoggerInterface + storage storage.TelemetryConfigProducer + logger logging.LoggerInterface + uniqueKeysStorage storage.UniqueKeysStorageConsumer } // NewSynchronizerRedis constructor -func NewSynchronizerRedis(storage storage.TelemetryConfigProducer, logger logging.LoggerInterface) TelemetrySynchronizer { +func NewSynchronizerRedis( + storage storage.TelemetryConfigProducer, + logger logging.LoggerInterface, + uniqueKeysStorage storage.UniqueKeysStorageConsumer, +) TelemetrySynchronizer { return &SynchronizerRedis{ - storage: storage, - logger: logger, + storage: storage, + logger: logger, + uniqueKeysStorage: uniqueKeysStorage, } } @@ -45,7 +51,8 @@ func (r *SynchronizerRedis) SynchronizeConfig(cfg InitConfig, timedUntilReady in } // SynchronizeUniqueKeys syncs unique keys -func (r *SynchronizerRedis) SynchronizeUniqueKeys(uniques dtos.Uniques) error { +func (r *SynchronizerRedis) SynchronizeUniqueKeys(bulkSize int64) error { + uniques := r.uniqueKeysStorage.PopN(bulkSize) if len(uniques.Keys) < 1 { r.logger.Debug("Unique keys list is empty, nothing to synchronize.") return nil diff --git a/telemetry/redis_test.go b/telemetry/redis_test.go index b6b9c288..b3b75610 100644 --- a/telemetry/redis_test.go +++ b/telemetry/redis_test.go @@ -34,7 +34,7 @@ func TestRecorderRedis(t *testing.T) { }, } - sender := NewSynchronizerRedis(redisMock, logger) + sender := NewSynchronizerRedis(redisMock, logger, nil) factories := make(map[string]int64) factories["one"] = 1 sender.SynchronizeConfig(InitConfig{}, 0, factories, []string{"sentinel"})