Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make lazy posting series match ratio configurable #8049

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func runStore(
}),
store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled),
store.WithPostingGroupMaxKeySeriesRatio(conf.postingGroupMaxKeySeriesRatio),
store.WithSeriesMatchRatio(0.5), // TODO: expose series match ratio as config.
store.WithIndexHeaderLazyDownloadStrategy(
indexheader.IndexHeaderLazyDownloadStrategy(conf.indexHeaderLazyDownloadStrategy).StrategyToDownloadFunc(),
),
Expand Down
21 changes: 19 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ type BucketStore struct {
enableChunkHashCalculation bool

enabledLazyExpandedPostings bool
seriesMatchRatio float64
postingGroupMaxKeySeriesRatio float64

sortingStrategy sortingStrategy
Expand Down Expand Up @@ -583,6 +584,15 @@ func WithPostingGroupMaxKeySeriesRatio(postingGroupMaxKeySeriesRatio float64) Bu
}
}

// WithSeriesMatchRatio configures how many series would match when intersecting posting groups.
// This is used for lazy posting optimization strategy. Ratio should be within (0, 1).
// The closer to 1, it means matchers have bad selectivity.
func WithSeriesMatchRatio(seriesMatchRatio float64) BucketStoreOption {
return func(s *BucketStore) {
s.seriesMatchRatio = seriesMatchRatio
}
}

// WithDontResort disables series resorting in Store Gateway.
func WithDontResort(true bool) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -1056,6 +1066,7 @@ type blockSeriesClient struct {
bytesLimiter BytesLimiter

lazyExpandedPostingEnabled bool
seriesMatchRatio float64
// Mark posting group as lazy if it adds too many keys. 0 to disable.
postingGroupMaxKeySeriesRatio float64
lazyExpandedPostingsCount prometheus.Counter
Expand Down Expand Up @@ -1102,6 +1113,7 @@ func newBlockSeriesClient(
chunkFetchDurationSum *prometheus.HistogramVec,
extLsetToRemove map[string]struct{},
lazyExpandedPostingEnabled bool,
seriesMatchRatio float64,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingsCount prometheus.Counter,
lazyExpandedPostingByReason *prometheus.CounterVec,
Expand Down Expand Up @@ -1139,6 +1151,7 @@ func newBlockSeriesClient(
chunkFetchDurationSum: chunkFetchDurationSum,

lazyExpandedPostingEnabled: lazyExpandedPostingEnabled,
seriesMatchRatio: seriesMatchRatio,
postingGroupMaxKeySeriesRatio: postingGroupMaxKeySeriesRatio,
lazyExpandedPostingsCount: lazyExpandedPostingsCount,
lazyExpandedPostingGroupByReason: lazyExpandedPostingByReason,
Expand Down Expand Up @@ -1193,7 +1206,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.seriesMatchRatio, b.postingGroupMaxKeySeriesRatio, b.lazyExpandedPostingSizeBytes, b.lazyExpandedPostingGroupByReason, b.tenant)
if err != nil {
return errors.Wrap(err, "expanded matching posting")
}
Expand Down Expand Up @@ -1626,6 +1639,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.metrics.chunkFetchDurationSum,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.seriesMatchRatio,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
Expand Down Expand Up @@ -1942,6 +1956,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
nil,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.seriesMatchRatio,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
Expand Down Expand Up @@ -2170,6 +2185,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
nil,
nil,
s.enabledLazyExpandedPostings,
s.seriesMatchRatio,
s.postingGroupMaxKeySeriesRatio,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingGroupsByReason,
Expand Down Expand Up @@ -2638,6 +2654,7 @@ func (r *bucketIndexReader) ExpandedPostings(
ms sortedMatchers,
bytesLimiter BytesLimiter,
lazyExpandedPostingEnabled bool,
seriesMatchRatio float64,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
Expand Down Expand Up @@ -2694,7 +2711,7 @@ func (r *bucketIndexReader) ExpandedPostings(
postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil))
}

ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, seriesMatchRatio, postingGroupMaxKeySeriesRatio, lazyExpandedPostingSizeBytes, lazyExpandedPostingGroupsByReason, tenant)
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ func benchmarkExpandedPostings(

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p.postings))
}
Expand Down Expand Up @@ -1344,7 +1344,7 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, ps, (*lazyExpandedPostings)(nil))
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -1384,7 +1384,7 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) {
reg := prometheus.NewRegistry()
dummyCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test"})
dummyCounterVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{Name: "test_counter_vec"}, []string{"reason"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
// We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers.
testutil.Equals(t, ps, emptyLazyPostings)
Expand Down Expand Up @@ -2924,6 +2924,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
dummyHistogram,
nil,
false,
0.5,
0,
dummyCounter,
dummyCounterVec,
Expand Down Expand Up @@ -3584,7 +3585,7 @@ func TestExpandedPostingsRace(t *testing.T) {
wg.Add(1)

go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, 0.5, 0, dummyCounter, dummyCounterVec, tenancy.DefaultTenant)
testutil.Ok(t, err)
defer wg.Done()

Expand Down
3 changes: 2 additions & 1 deletion pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func fetchLazyExpandedPostings(
bytesLimiter BytesLimiter,
addAllPostings bool,
lazyExpandedPostingEnabled bool,
seriesMatchRatio float64,
postingGroupMaxKeySeriesRatio float64,
lazyExpandedPostingSizeBytes prometheus.Counter,
lazyExpandedPostingGroupsByReason *prometheus.CounterVec,
Expand All @@ -237,7 +238,7 @@ func fetchLazyExpandedPostings(
r,
postingGroups,
int64(r.block.estimatedMaxSeriesSize),
0.5, // TODO(yeya24): Expose this as a flag.
seriesMatchRatio,
postingGroupMaxKeySeriesRatio,
lazyExpandedPostingSizeBytes,
lazyExpandedPostingGroupsByReason,
Expand Down
17 changes: 17 additions & 0 deletions pkg/store/lazy_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,23 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
{name: "bar", addKeys: []string{"foo"}, cardinality: 250000, existentKeys: 1, lazy: true},
},
},
{
name: "two posting groups with add keys, group not marked as lazy because of lower series match ratio",
inputPostings: map[string]map[string]index.Range{
"foo": {"bar": index.Range{End: 44}},
"bar": {"foo": index.Range{Start: 44, End: 5052}},
},
seriesMaxSize: 1000,
seriesMatchRatio: 0.1,
postingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}},
{name: "bar", addKeys: []string{"foo"}},
},
expectedPostingGroups: []*postingGroup{
{name: "foo", addKeys: []string{"bar"}, cardinality: 10, existentKeys: 1},
{name: "bar", addKeys: []string{"foo"}, cardinality: 1251, existentKeys: 1, lazy: false},
},
},
{
name: "three posting groups with add keys, two small posting group and a very large posting group, large one become lazy",
inputPostings: map[string]map[string]index.Range{
Expand Down
Loading