From 20548a6e33f72ede85e076e86bfa694c01acec91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 27 Jan 2025 13:12:57 +0200 Subject: [PATCH] store: refactor respSet MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We have a need to merge sorted sets of responses into a new one in different parts of Thanos e.g. rules, targets, etc. So, move the sorted set logic into a new package. Signed-off-by: Giedrius Statkevičius --- pkg/extprom/http/instrument_client.go | 2 +- pkg/query/endpointset.go | 2 +- pkg/responseset/set.go | 426 ++++++++++++++++++++++ pkg/runutil/runutil.go | 2 +- pkg/store/bucket.go | 5 +- pkg/store/proxy.go | 5 +- pkg/store/proxy_merge.go | 487 +++++--------------------- pkg/store/proxy_merge_test.go | 349 +++++++++++------- pkg/store/proxy_test.go | 24 +- 9 files changed, 765 insertions(+), 537 deletions(-) create mode 100644 pkg/responseset/set.go diff --git a/pkg/extprom/http/instrument_client.go b/pkg/extprom/http/instrument_client.go index ebdda5fbe1..e87165c5d3 100644 --- a/pkg/extprom/http/instrument_client.go +++ b/pkg/extprom/http/instrument_client.go @@ -23,7 +23,7 @@ type ClientMetrics struct { // NewClientMetrics creates a new instance of ClientMetrics. // It will also register the metrics with the included register. -// This ClientMetrics should be re-used for diff clients with the same purpose. +// This ClientMetrics should be reused for diff clients with the same purpose. // e.g. 1 ClientMetrics should be used for all the clients that talk to Alertmanager. func NewClientMetrics(reg prometheus.Registerer) *ClientMetrics { var m ClientMetrics diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 071e04a846..2f023fb3d1 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -366,7 +366,7 @@ func (e *EndpointSet) Update(ctx context.Context) { if er.HasStoreAPI() && (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) && stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset] > 0 { - level.Warn(e.logger).Log("msg", "found duplicate storeEndpoints producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket", + level.Warn(e.logger).Log("msg", "found duplicate storeEndpoints producer (sidecar or ruler). This is not advised as it will malform data in in the same bucket", "address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset]+1)) } stats[er.ComponentType().String()][extLset]++ diff --git a/pkg/responseset/set.go b/pkg/responseset/set.go new file mode 100644 index 0000000000..430c4a5089 --- /dev/null +++ b/pkg/responseset/set.go @@ -0,0 +1,426 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package responseset + +import ( + "context" + "io" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" +) + +// ResponseSet is a generic abstraction over a set of responses. +// It allows to iterate over responses, close the set and get some metadata about the set. +type ResponseSet[T any] interface { + Close() + At() *T + Next() bool + StoreID() string + Labelset() string + StoreLabels() map[string]struct{} + Empty() bool + Error() error +} + +type Client[T any] interface { + Recv() (*T, error) + CloseSend() error +} + +type EagerResponseSet[T any] struct { + // Generic parameters. + span opentracing.Span + + cl Client[T] + closeSeries context.CancelFunc + frameTimeout time.Duration + + // postStreamHook allows performing some manipulations on data + // after it is fully received. + postStreamHook func(data []*T) + // postReceiveHook returns true if data needs to be kept. + postReceiveHook func(data *T) bool + + storeName string + storeLabels map[string]struct{} + storeLabelSets []labels.Labels + + // Internal bookkeeping. + bufferedResponses []*T + wg *sync.WaitGroup + i int + err error + + closeHook func() +} + +func (l *EagerResponseSet[T]) Close() { + if l.closeSeries != nil { + l.closeSeries() + } + _ = l.cl.CloseSend() + l.closeHook() +} + +func (l *EagerResponseSet[T]) At() *T { + l.wg.Wait() + + if len(l.bufferedResponses) == 0 { + return nil + } + + return l.bufferedResponses[l.i-1] +} + +func (l *EagerResponseSet[T]) Next() bool { + l.wg.Wait() + + l.i++ + + return l.i <= len(l.bufferedResponses) +} + +func (l *EagerResponseSet[T]) Empty() bool { + l.wg.Wait() + + return len(l.bufferedResponses) == 0 +} + +func (l *EagerResponseSet[T]) StoreID() string { + return l.storeName +} + +func (l *EagerResponseSet[T]) Labelset() string { + return labelpb.PromLabelSetsToString(l.storeLabelSets) +} + +func (l *EagerResponseSet[T]) StoreLabels() map[string]struct{} { + return l.storeLabels +} + +func (l *EagerResponseSet[T]) Error() error { + return l.err +} + +func NewEagerResponseSet[T any]( + span opentracing.Span, + frameTimeout time.Duration, + storeName string, + storeLabelSets []labels.Labels, + closeSeries context.CancelFunc, + cl Client[T], + postStreamHook func(data []*T), + postReceiveHook func(data *T) bool, + closeHook func(), +) ResponseSet[T] { + ret := &EagerResponseSet[T]{ + span: span, + cl: cl, + closeSeries: closeSeries, + frameTimeout: frameTimeout, + bufferedResponses: []*T{}, + wg: &sync.WaitGroup{}, + storeName: storeName, + storeLabelSets: storeLabelSets, + closeHook: closeHook, + postStreamHook: postStreamHook, + postReceiveHook: postReceiveHook, + } + ret.storeLabels = make(map[string]struct{}) + for _, ls := range storeLabelSets { + ls.Range(func(l labels.Label) { + ret.storeLabels[l.Name] = struct{}{} + }) + } + + ret.wg.Add(1) + + // Start a goroutine and immediately buffer everything. + go func(l *EagerResponseSet[T]) { + defer ret.wg.Done() + + // TODO(bwplotka): Consider improving readability by getting rid of anonymous functions and merging eager and + // lazyResponse into one struct. + handleRecvResponse := func(t *time.Timer) bool { + if t != nil { + defer t.Reset(frameTimeout) + } + + resp, err := cl.Recv() + if err != nil { + if err == io.EOF { + return false + } + + var rerr error + // If timer is already stopped + if t != nil && !t.Stop() { + if t.C != nil { + <-t.C // Drain the channel if it was already stopped. + } + rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName) + } else { + rerr = errors.Wrapf(err, "receive series from %s", storeName) + } + + l.err = rerr + l.span.SetTag("err", rerr.Error()) + return false + } + + if !l.postReceiveHook(resp) { + return true + } + + l.bufferedResponses = append(l.bufferedResponses, resp) + return true + } + + var t *time.Timer + if frameTimeout > 0 { + t = time.AfterFunc(frameTimeout, closeSeries) + defer t.Stop() + } + + for { + if !handleRecvResponse(t) { + break + } + } + + l.postStreamHook(l.bufferedResponses) + l.span.Finish() + + }(ret) + + return ret +} + +// lazyRespSet is a lazy storepb.SeriesSet that buffers +// everything as fast as possible while at the same it permits +// reading response-by-response. It blocks if there is no data +// in Next(). +type lazyRespSet[T any] struct { + // Generic parameters. + span opentracing.Span + cl Client[T] + closeSeries context.CancelFunc + storeName string + storeLabelSets []labels.Labels + storeLabels map[string]struct{} + frameTimeout time.Duration + + // Internal bookkeeping. + dataOrFinishEvent *sync.Cond + bufferedResponses []*T + bufferedResponsesMtx *sync.Mutex + lastResp *T + err error + + noMoreData bool + initialized bool + + // postReceiveHook returns true if data needs to be kept. + postReceiveHook func(data *T) bool + + postStreamHook func(data []*T) + closeHook func() +} + +func (l *lazyRespSet[T]) Error() error { + return l.err +} + +func NewLazyResponseSet[T any]( + span opentracing.Span, + frameTimeout time.Duration, + storeName string, + storeLabelSets []labels.Labels, + closeSeries context.CancelFunc, + cl Client[T], + postStreamHook func(data []*T), + postReceiveHook func(data *T) bool, + closeHook func(), +) ResponseSet[T] { + bufferedResponses := make([]*T, 0) + bufferedResponsesMtx := &sync.Mutex{} + dataAvailable := sync.NewCond(bufferedResponsesMtx) + + respSet := &lazyRespSet[T]{ + frameTimeout: frameTimeout, + storeName: storeName, + storeLabelSets: storeLabelSets, + cl: cl, + closeSeries: closeSeries, + span: span, + dataOrFinishEvent: dataAvailable, + bufferedResponsesMtx: bufferedResponsesMtx, + bufferedResponses: bufferedResponses, + postReceiveHook: postReceiveHook, + postStreamHook: postStreamHook, + closeHook: closeHook, + } + respSet.storeLabels = make(map[string]struct{}) + for _, ls := range storeLabelSets { + ls.Range(func(l labels.Label) { + respSet.storeLabels[l.Name] = struct{}{} + }) + } + + go func(st string, l *lazyRespSet[T]) { + defer l.postStreamHook(nil) + + handleRecvResponse := func(t *time.Timer) bool { + if t != nil { + defer t.Reset(frameTimeout) + } + + resp, err := cl.Recv() + if err != nil { + if err == io.EOF { + l.bufferedResponsesMtx.Lock() + l.noMoreData = true + l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + return false + } + + var rerr error + // If timer is already stopped + if t != nil && !t.Stop() { + if t.C != nil { + <-t.C // Drain the channel if it was already stopped. + } + rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st) + } else { + rerr = errors.Wrapf(err, "receive series from %s", st) + } + + l.span.SetTag("err", rerr.Error()) + + l.bufferedResponsesMtx.Lock() + l.err = rerr + l.noMoreData = true + l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + return false + } + + if !l.postReceiveHook(resp) { + return true + } + + l.bufferedResponsesMtx.Lock() + l.bufferedResponses = append(l.bufferedResponses, resp) + l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + return true + } + + var t *time.Timer + if frameTimeout > 0 { + t = time.AfterFunc(frameTimeout, closeSeries) + defer t.Stop() + } + for { + if !handleRecvResponse(t) { + return + } + } + }(storeName, respSet) + + return respSet +} + +func (l *lazyRespSet[T]) Close() { + l.bufferedResponsesMtx.Lock() + defer l.bufferedResponsesMtx.Unlock() + + l.closeSeries() + l.noMoreData = true + l.dataOrFinishEvent.Signal() + + _ = l.cl.CloseSend() + l.closeHook() +} + +func (l *lazyRespSet[T]) StoreID() string { + return l.storeName +} + +func (l *lazyRespSet[T]) Labelset() string { + return labelpb.PromLabelSetsToString(l.storeLabelSets) +} + +func (l *lazyRespSet[T]) StoreLabels() map[string]struct{} { + return l.storeLabels +} + +func (l *lazyRespSet[T]) Empty() bool { + l.bufferedResponsesMtx.Lock() + defer l.bufferedResponsesMtx.Unlock() + + // NOTE(GiedriusS): need to wait here for at least one + // response so that we could build the heap properly. + if l.noMoreData && len(l.bufferedResponses) == 0 { + return true + } + + for len(l.bufferedResponses) == 0 { + l.dataOrFinishEvent.Wait() + if l.noMoreData && len(l.bufferedResponses) == 0 { + break + } + } + + return len(l.bufferedResponses) == 0 && l.noMoreData +} + +// Next either blocks until more data is available or reads +// the next response. If it is not lazy then it waits for everything +// to finish. +func (l *lazyRespSet[T]) Next() bool { + l.bufferedResponsesMtx.Lock() + defer l.bufferedResponsesMtx.Unlock() + + l.initialized = true + + if l.noMoreData && len(l.bufferedResponses) == 0 { + l.lastResp = nil + + return false + } + + for len(l.bufferedResponses) == 0 { + l.dataOrFinishEvent.Wait() + if l.noMoreData && len(l.bufferedResponses) == 0 { + break + } + } + + if len(l.bufferedResponses) > 0 { + l.lastResp = l.bufferedResponses[0] + if l.initialized { + l.bufferedResponses = l.bufferedResponses[1:] + } + return true + } + + l.lastResp = nil + return false +} + +func (l *lazyRespSet[T]) At() *T { + if !l.initialized { + panic("please call Next before At") + } + + return l.lastResp +} diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index 9aaeeca615..a455a504b8 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -45,7 +45,7 @@ // The rununtil.Exhaust* family of functions provide the same functionality but // they take an io.ReadCloser and they exhaust the whole reader before closing // them. They are useful when trying to use http keep-alive connections because -// for the same connection to be re-used the whole response body needs to be +// for the same connection to be reused the whole response body needs to be // exhausted. package runutil diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 6e5a656e62..5abdabd880 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -53,6 +53,7 @@ import ( "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/pool" + "github.com/thanos-io/thanos/pkg/responseset" "github.com/thanos-io/thanos/pkg/runutil" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/hintspb" @@ -1569,7 +1570,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) ctx = srv.Context() stats = &queryStats{} - respSets []respSet + respSets []responseset.ResponseSet[storepb.SeriesResponse] mtx sync.Mutex g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} @@ -1683,7 +1684,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - var resp respSet + var resp responseset.ResponseSet[storepb.SeriesResponse] if s.sortingStrategy == sortingStrategyStore { resp = newEagerRespSet( span, diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 4ee987df29..33a3f5382e 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/thanos-io/thanos/pkg/responseset" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -305,7 +306,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, } - storeResponses := make([]respSet, 0, len(stores)) + storeResponses := make([]responseset.ResponseSet[storepb.SeriesResponse], 0, len(stores)) for _, st := range stores { st := st @@ -352,7 +353,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } } - return nil + return respHeap.Error() } // LabelNames returns all known label names. diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 4442cf8fdb..f9ded2d652 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -6,7 +6,6 @@ package store import ( "context" "fmt" - "io" "sort" "sync" "time" @@ -18,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/responseset" grpc_opentracing "github.com/thanos-io/thanos/pkg/tracing/tracing_middleware" "github.com/thanos-io/thanos/pkg/losertree" @@ -29,6 +29,7 @@ import ( type seriesStream interface { Next() bool At() *storepb.SeriesResponse + Error() error } type responseDeduplicator struct { @@ -45,6 +46,10 @@ type responseDeduplicator struct { chunkDedupMap map[uint64]storepb.AggrChunk } +func (d *responseDeduplicator) Error() error { + return d.h.Error() +} + // NewResponseDeduplicator returns a wrapper around a loser tree that merges duplicated series messages into one. // It also deduplicates identical chunks identified by the same checksum from each series message. func NewResponseDeduplicator(h seriesStream) *responseDeduplicator { @@ -165,9 +170,23 @@ func (d *responseDeduplicator) At() *storepb.SeriesResponse { return d.bufferedResp[d.buffRespI] } +type loserTreeError struct { + seriesSets []responseset.ResponseSet[storepb.SeriesResponse] + *losertree.Tree[*storepb.SeriesResponse, responseset.ResponseSet[storepb.SeriesResponse]] +} + +func (e *loserTreeError) Error() error { + for _, s := range e.seriesSets { + if err := s.Error(); err != nil { + return err + } + } + return nil +} + // NewProxyResponseLoserTree returns heap that k-way merge series together. // It's agnostic to duplicates and overlaps, it forwards all duplicated series in random order. -func NewProxyResponseLoserTree(seriesSets ...respSet) *losertree.Tree[*storepb.SeriesResponse, respSet] { +func NewProxyResponseLoserTree(seriesSets ...responseset.ResponseSet[storepb.SeriesResponse]) *loserTreeError { var maxVal *storepb.SeriesResponse = storepb.NewSeriesResponse(nil) less := func(a, b *storepb.SeriesResponse) bool { @@ -193,111 +212,14 @@ func NewProxyResponseLoserTree(seriesSets ...respSet) *losertree.Tree[*storepb.S return false } - return losertree.New[*storepb.SeriesResponse, respSet](seriesSets, maxVal, func(s respSet) *storepb.SeriesResponse { - return s.At() - }, less, func(s respSet) { - s.Close() - }) -} - -func (l *lazyRespSet) StoreID() string { - return l.storeName -} - -func (l *lazyRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.storeLabelSets) -} - -func (l *lazyRespSet) StoreLabels() map[string]struct{} { - return l.storeLabels -} - -// lazyRespSet is a lazy storepb.SeriesSet that buffers -// everything as fast as possible while at the same it permits -// reading response-by-response. It blocks if there is no data -// in Next(). -type lazyRespSet struct { - // Generic parameters. - span opentracing.Span - cl storepb.Store_SeriesClient - closeSeries context.CancelFunc - storeName string - storeLabelSets []labels.Labels - storeLabels map[string]struct{} - frameTimeout time.Duration - - // Internal bookkeeping. - dataOrFinishEvent *sync.Cond - bufferedResponses []*storepb.SeriesResponse - bufferedResponsesMtx *sync.Mutex - lastResp *storepb.SeriesResponse - - noMoreData bool - initialized bool - - shardMatcher *storepb.ShardMatcher -} - -func (l *lazyRespSet) Empty() bool { - l.bufferedResponsesMtx.Lock() - defer l.bufferedResponsesMtx.Unlock() - - // NOTE(GiedriusS): need to wait here for at least one - // response so that we could build the heap properly. - if l.noMoreData && len(l.bufferedResponses) == 0 { - return true - } - - for len(l.bufferedResponses) == 0 { - l.dataOrFinishEvent.Wait() - if l.noMoreData && len(l.bufferedResponses) == 0 { - break - } - } - - return len(l.bufferedResponses) == 0 && l.noMoreData -} - -// Next either blocks until more data is available or reads -// the next response. If it is not lazy then it waits for everything -// to finish. -func (l *lazyRespSet) Next() bool { - l.bufferedResponsesMtx.Lock() - defer l.bufferedResponsesMtx.Unlock() - - l.initialized = true - - if l.noMoreData && len(l.bufferedResponses) == 0 { - l.lastResp = nil - - return false - } - - for len(l.bufferedResponses) == 0 { - l.dataOrFinishEvent.Wait() - if l.noMoreData && len(l.bufferedResponses) == 0 { - break - } - } - - if len(l.bufferedResponses) > 0 { - l.lastResp = l.bufferedResponses[0] - if l.initialized { - l.bufferedResponses = l.bufferedResponses[1:] - } - return true + return &loserTreeError{ + seriesSets: seriesSets, + Tree: losertree.New(seriesSets, maxVal, func(s responseset.ResponseSet[storepb.SeriesResponse]) *storepb.SeriesResponse { + return s.At() + }, less, func(s responseset.ResponseSet[storepb.SeriesResponse]) { + s.Close() + }), } - - l.lastResp = nil - return false -} - -func (l *lazyRespSet) At() *storepb.SeriesResponse { - if !l.initialized { - panic("please call Next before At") - } - - return l.lastResp } func newLazyRespSet( @@ -310,116 +232,44 @@ func newLazyRespSet( shardMatcher *storepb.ShardMatcher, applySharding bool, emptyStreamResponses prometheus.Counter, -) respSet { - bufferedResponses := []*storepb.SeriesResponse{} - bufferedResponsesMtx := &sync.Mutex{} - dataAvailable := sync.NewCond(bufferedResponsesMtx) - - respSet := &lazyRespSet{ - frameTimeout: frameTimeout, - storeName: storeName, - storeLabelSets: storeLabelSets, - cl: cl, - closeSeries: closeSeries, - span: span, - dataOrFinishEvent: dataAvailable, - bufferedResponsesMtx: bufferedResponsesMtx, - bufferedResponses: bufferedResponses, - shardMatcher: shardMatcher, - } - respSet.storeLabels = make(map[string]struct{}) - for _, ls := range storeLabelSets { - ls.Range(func(l labels.Label) { - respSet.storeLabels[l.Name] = struct{}{} - }) - } - - go func(st string, l *lazyRespSet) { - bytesProcessed := 0 - seriesStats := &storepb.SeriesStatsCounter{} - - defer func() { - l.span.SetTag("processed.series", seriesStats.Series) - l.span.SetTag("processed.chunks", seriesStats.Chunks) - l.span.SetTag("processed.samples", seriesStats.Samples) - l.span.SetTag("processed.bytes", bytesProcessed) - l.span.Finish() - }() - - numResponses := 0 - defer func() { +) responseset.ResponseSet[storepb.SeriesResponse] { + seriesStats := &storepb.SeriesStatsCounter{} + numResponses := 0 + bytesProcessed := 0 + + ret := responseset.NewLazyResponseSet( + span, + frameTimeout, + storeName, + storeLabelSets, + closeSeries, + cl, + func(_ []*storepb.SeriesResponse) { + span.SetTag("processed.series", seriesStats.Series) + span.SetTag("processed.chunks", seriesStats.Chunks) + span.SetTag("processed.samples", seriesStats.Samples) + span.SetTag("processed.bytes", bytesProcessed) if numResponses == 0 { emptyStreamResponses.Inc() } - }() - - handleRecvResponse := func(t *time.Timer) bool { - if t != nil { - defer t.Reset(frameTimeout) - } - - resp, err := cl.Recv() - if err != nil { - if err == io.EOF { - l.bufferedResponsesMtx.Lock() - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() - return false - } - - var rerr error - // If timer is already stopped - if t != nil && !t.Stop() { - if t.C != nil { - <-t.C // Drain the channel if it was already stopped. - } - rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st) - } else { - rerr = errors.Wrapf(err, "receive series from %s", st) - } - - l.span.SetTag("err", rerr.Error()) - - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() - return false - } - + }, + func(data *storepb.SeriesResponse) bool { numResponses++ - bytesProcessed += resp.Size() - - if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { - return true + bytesProcessed += data.Size() + if data.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(data.GetSeries().Labels) { + return false } - - if resp.GetSeries() != nil { - seriesStats.Count(resp.GetSeries()) + if data.GetSeries() != nil { + seriesStats.Count(data.GetSeries()) } - - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, resp) - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() return true - } - - var t *time.Timer - if frameTimeout > 0 { - t = time.AfterFunc(frameTimeout, closeSeries) - defer t.Stop() - } - for { - if !handleRecvResponse(t) { - return - } - } - }(storeName, respSet) + }, + func() { + shardMatcher.Close() + }, + ) - return respSet + return ret } // RetrievalStrategy stores what kind of retrieval strategy @@ -446,7 +296,7 @@ func newAsyncRespSet( shardInfo *storepb.ShardInfo, logger log.Logger, emptyStreamResponses prometheus.Counter, -) (respSet, error) { +) (responseset.ResponseSet[storepb.SeriesResponse], error) { var ( span opentracing.Span @@ -525,42 +375,6 @@ func newAsyncRespSet( } } -func (l *lazyRespSet) Close() { - l.bufferedResponsesMtx.Lock() - defer l.bufferedResponsesMtx.Unlock() - - l.closeSeries() - l.noMoreData = true - l.dataOrFinishEvent.Signal() - - l.shardMatcher.Close() - _ = l.cl.CloseSend() -} - -// eagerRespSet is a SeriesSet that blocks until all data is retrieved from -// the StoreAPI. -// NOTE(bwplotka): It also resorts the series (and emits warning) if the client.SupportsWithoutReplicaLabels() is false. -type eagerRespSet struct { - // Generic parameters. - span opentracing.Span - - cl storepb.Store_SeriesClient - closeSeries context.CancelFunc - frameTimeout time.Duration - - shardMatcher *storepb.ShardMatcher - removeLabels map[string]struct{} - - storeName string - storeLabels map[string]struct{} - storeLabelSets []labels.Labels - - // Internal bookkeeping. - bufferedResponses []*storepb.SeriesResponse - wg *sync.WaitGroup - i int -} - func newEagerRespSet( span opentracing.Span, frameTimeout time.Duration, @@ -572,112 +386,43 @@ func newEagerRespSet( applySharding bool, emptyStreamResponses prometheus.Counter, removeLabels map[string]struct{}, -) respSet { - ret := &eagerRespSet{ - span: span, - cl: cl, - closeSeries: closeSeries, - frameTimeout: frameTimeout, - bufferedResponses: []*storepb.SeriesResponse{}, - wg: &sync.WaitGroup{}, - shardMatcher: shardMatcher, - removeLabels: removeLabels, - storeName: storeName, - storeLabelSets: storeLabelSets, - } - ret.storeLabels = make(map[string]struct{}) - for _, ls := range storeLabelSets { - ls.Range(func(l labels.Label) { - ret.storeLabels[l.Name] = struct{}{} - }) - } - - ret.wg.Add(1) - - // Start a goroutine and immediately buffer everything. - go func(l *eagerRespSet) { - seriesStats := &storepb.SeriesStatsCounter{} - bytesProcessed := 0 - - defer func() { - l.span.SetTag("processed.series", seriesStats.Series) - l.span.SetTag("processed.chunks", seriesStats.Chunks) - l.span.SetTag("processed.samples", seriesStats.Samples) - l.span.SetTag("processed.bytes", bytesProcessed) - l.span.Finish() - ret.wg.Done() - }() - - numResponses := 0 - defer func() { +) responseset.ResponseSet[storepb.SeriesResponse] { + seriesStats := &storepb.SeriesStatsCounter{} + numResponses := 0 + bytesProcessed := 0 + + ret := responseset.NewEagerResponseSet( + span, + frameTimeout, + storeName, + storeLabelSets, + closeSeries, + cl, + func(data []*storepb.SeriesResponse) { + span.SetTag("processed.series", seriesStats.Series) + span.SetTag("processed.chunks", seriesStats.Chunks) + span.SetTag("processed.samples", seriesStats.Samples) + span.SetTag("processed.bytes", bytesProcessed) if numResponses == 0 { emptyStreamResponses.Inc() } - }() - - // TODO(bwplotka): Consider improving readability by getting rid of anonymous functions and merging eager and - // lazyResponse into one struct. - handleRecvResponse := func(t *time.Timer) bool { - if t != nil { - defer t.Reset(frameTimeout) - } - - resp, err := cl.Recv() - if err != nil { - if err == io.EOF { - return false - } - - var rerr error - // If timer is already stopped - if t != nil && !t.Stop() { - if t.C != nil { - <-t.C // Drain the channel if it was already stopped. - } - rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName) - } else { - rerr = errors.Wrapf(err, "receive series from %s", storeName) - } - - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.span.SetTag("err", rerr.Error()) - return false - } - + sortWithoutLabels(data, removeLabels) + }, + func(data *storepb.SeriesResponse) bool { numResponses++ - bytesProcessed += resp.Size() - - if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { - return true + bytesProcessed += data.Size() + if data.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(data.GetSeries().Labels) { + return false } - - if resp.GetSeries() != nil { - seriesStats.Count(resp.GetSeries()) + if data.GetSeries() != nil { + seriesStats.Count(data.GetSeries()) } - - l.bufferedResponses = append(l.bufferedResponses, resp) return true - } - - var t *time.Timer - if frameTimeout > 0 { - t = time.AfterFunc(frameTimeout, closeSeries) - defer t.Stop() - } - - for { - if !handleRecvResponse(t) { - break - } - } - - // This should be used only for stores that does not support doing this on server side. - // See docs/proposals-accepted/20221129-avoid-global-sort.md for details. - // NOTE. Client is not guaranteed to give a sorted response when extLset is added - // Generally we need to resort here. - sortWithoutLabels(l.bufferedResponses, l.removeLabels) - - }(ret) + }, + func() { + shardMatcher.Close() + }, + ) return ret } @@ -718,57 +463,3 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] return labels.Compare(labelpb.ZLabelsToPromLabels(si.Labels), labelpb.ZLabelsToPromLabels(sj.Labels)) < 0 }) } - -func (l *eagerRespSet) Close() { - if l.closeSeries != nil { - l.closeSeries() - } - l.shardMatcher.Close() - _ = l.cl.CloseSend() -} - -func (l *eagerRespSet) At() *storepb.SeriesResponse { - l.wg.Wait() - - if len(l.bufferedResponses) == 0 { - return nil - } - - return l.bufferedResponses[l.i-1] -} - -func (l *eagerRespSet) Next() bool { - l.wg.Wait() - - l.i++ - - return l.i <= len(l.bufferedResponses) -} - -func (l *eagerRespSet) Empty() bool { - l.wg.Wait() - - return len(l.bufferedResponses) == 0 -} - -func (l *eagerRespSet) StoreID() string { - return l.storeName -} - -func (l *eagerRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.storeLabelSets) -} - -func (l *eagerRespSet) StoreLabels() map[string]struct{} { - return l.storeLabels -} - -type respSet interface { - Close() - At() *storepb.SeriesResponse - Next() bool - StoreID() string - Labelset() string - StoreLabels() map[string]struct{} - Empty() bool -} diff --git a/pkg/store/proxy_merge_test.go b/pkg/store/proxy_merge_test.go index 387d5ff6a8..9ed53163b3 100644 --- a/pkg/store/proxy_merge_test.go +++ b/pkg/store/proxy_merge_test.go @@ -5,13 +5,16 @@ package store import ( "fmt" - "sync" + "io" "testing" + "time" "github.com/efficientgo/core/testutil" + "github.com/opentracing/opentracing-go" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/thanos/pkg/errors" + "github.com/thanos-io/thanos/pkg/responseset" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -29,32 +32,48 @@ func TestRmLabelsCornerCases(t *testing.T) { func TestProxyResponseTreeSort(t *testing.T) { t.Parallel() + sp := opentracing.StartSpan("foo") + for _, tcase := range []struct { title string - input []respSet + input []responseset.ResponseSet[storepb.SeriesResponse] exp []*storepb.SeriesResponse }{ { title: "merge sets with different series and common labels", - input: []respSet{ - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")), + input: []responseset.ResponseSet[storepb.SeriesResponse]{ + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("a", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")), + }, }, - }, - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")), + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("b", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")), + }, }, - }, + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), }, exp: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), @@ -65,26 +84,40 @@ func TestProxyResponseTreeSort(t *testing.T) { }, { title: "merge sets with different series and labels", - input: []respSet{ - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")), + input: []responseset.ResponseSet[storepb.SeriesResponse]{ + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("a", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")), + }, }, - }, - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")), - storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")), + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("b", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")), + storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")), + }, }, - }, + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), }, exp: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), @@ -96,27 +129,39 @@ func TestProxyResponseTreeSort(t *testing.T) { }, { title: "merge repeated series in stores with different external labels", - input: []respSet{ - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + input: []responseset.ResponseSet[storepb.SeriesResponse]{ + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("a", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + }, }, - storeLabels: map[string]struct{}{"ext2": {}}, - }, - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("b", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + }, }, - storeLabels: map[string]struct{}{"ext1": {}, "ext2": {}}, - }, + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), }, exp: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -127,27 +172,39 @@ func TestProxyResponseTreeSort(t *testing.T) { }, { title: "merge series with external labels at beginning of series", - input: []respSet{ - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")), + input: []responseset.ResponseSet[storepb.SeriesResponse]{ + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("a", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")), + }, }, - storeLabels: map[string]struct{}{"a": {}}, - }, - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("b", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), + }, }, - storeLabels: map[string]struct{}{"a": {}}, - }, + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), }, exp: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), @@ -158,27 +215,39 @@ func TestProxyResponseTreeSort(t *testing.T) { }, { title: "merge series in stores with external labels not present in series (e.g. stripped during dedup)", - input: []respSet{ - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + input: []responseset.ResponseSet[storepb.SeriesResponse]{ + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("a", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), + }, }, - storeLabels: map[string]struct{}{"ext2": {}, "replica": {}}, - }, - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("b", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), + }, }, - storeLabels: map[string]struct{}{"ext1": {}, "ext2": {}, "replica": {}}, - }, + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), }, exp: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -189,29 +258,42 @@ func TestProxyResponseTreeSort(t *testing.T) { }, { title: "test", - input: []respSet{ - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), - storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), - storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + input: []responseset.ResponseSet[storepb.SeriesResponse]{ + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("a", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, }, - storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}}, - }, - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), - storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), - storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), + responseset.NewEagerResponseSet( + sp, + 1*time.Second, + "foo", + []labels.Labels{labels.FromStrings("b", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, }, - storeLabels: map[string]struct{}{"cluster": {}, "prometheus": {}, "prometheus_replica": {}, "receive": {}, "tenant_id": {}, "thanos_replica": {}, "thanos_ruler_replica": {}}, - }, + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), }, exp: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -237,6 +319,16 @@ func TestProxyResponseTreeSort(t *testing.T) { type nopClientSendCloser struct { storepb.Store_SeriesClient + responses []*storepb.SeriesResponse + i int +} + +func (c *nopClientSendCloser) Recv() (*storepb.SeriesResponse, error) { + if c.i >= len(c.responses) { + return nil, io.EOF + } + c.i++ + return c.responses[c.i-1], nil } func (c nopClientSendCloser) CloseSend() error { return nil } @@ -361,18 +453,25 @@ func BenchmarkSortWithoutLabels(b *testing.B) { func BenchmarkKWayMerge(b *testing.B) { for i := 0; i < b.N; i++ { - respSets := []respSet{} + respSets := []responseset.ResponseSet[storepb.SeriesResponse]{} for j := 0; j < 1000; j++ { - respSets = append(respSets, &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")), - storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3", "d", "4")), - storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "4")), + respSets = append(respSets, responseset.NewEagerResponseSet( + opentracing.StartSpan("foo"), + 1*time.Second, + "foo", + []labels.Labels{labelsFromStrings("a", "1")}, + func() {}, + &nopClientSendCloser{ + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")), + storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3", "d", "4")), + storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "4")), + }, }, - }) + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return false }, + func() {}, + )) } lt := NewProxyResponseLoserTree(respSets...) diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index dd0ac551ff..59299c54e0 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -11,7 +11,6 @@ import ( "os" "path/filepath" "strings" - "sync" "testing" "time" @@ -20,17 +19,20 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/thanos-io/thanos/pkg/responseset" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -2361,6 +2363,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { }, }) + require.Error(t, err) testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error())) }) }) @@ -2501,12 +2504,19 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { } { t.Run(tcase.tname, func(t *testing.T) { h := NewResponseDeduplicator(NewProxyResponseLoserTree( - &eagerRespSet{ - closeSeries: func() {}, - cl: nopClientSendCloser{}, - wg: &sync.WaitGroup{}, - bufferedResponses: tcase.responses, - }, + responseset.NewEagerResponseSet( + opentracing.StartSpan("test"), + 1*time.Second, + "test", + []labels.Labels{labels.FromStrings("a", "1")}, + func() {}, + &nopClientSendCloser{ + responses: tcase.responses, + }, + func(data []*storepb.SeriesResponse) {}, + func(data *storepb.SeriesResponse) bool { return true }, + func() {}, + ), )) tcase.testFn(tcase.responses, h) })