Skip to content

Commit

Permalink
Expand wal.File type to include a key consisting of DB + Table
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse Thompson committed Aug 22, 2023
1 parent 3c79d01 commit 97ac4e9
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 19 deletions.
17 changes: 8 additions & 9 deletions ingestor/cluster/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (a *batcher) processSegments() ([][]string, [][]string, error) {
// need to be transferred to other nodes.
var (
owned, notOwned [][]string
lastMetric string
lastSegmentKey string
groupSize int
)
for _, v := range entries {
Expand All @@ -154,20 +154,19 @@ func (a *batcher) processSegments() ([][]string, [][]string, error) {
}
groupSize += int(fi.Size())

key := fmt.Sprintf("%s_%s", v.Database, v.Table)
createdAt, err := flake.ParseFlakeID(v.Epoch)
if err != nil {
logger.Warn("Failed to parse flake id: %s: %s", v.Epoch, err)
} else {
if lastMetric == "" || key != lastMetric {
metrics.IngestorSegmentsMaxAge.WithLabelValues(lastMetric).Set(time.Since(createdAt).Seconds())
metrics.IngestorSegmentsSizeBytes.WithLabelValues(lastMetric).Set(float64(groupSize))
if lastSegmentKey == "" || v.Key != lastSegmentKey {
metrics.IngestorSegmentsMaxAge.WithLabelValues(lastSegmentKey).Set(time.Since(createdAt).Seconds())
metrics.IngestorSegmentsSizeBytes.WithLabelValues(lastSegmentKey).Set(float64(groupSize))
groupSize = 0
}
}
lastMetric = key
lastSegmentKey = v.Key

metrics.IngestorSegmentsTotal.WithLabelValues(key).Inc()
metrics.IngestorSegmentsTotal.WithLabelValues(v.Key).Inc()

if a.Segmenter.IsActiveSegment(v.Path) {
if logger.IsDebug() {
Expand All @@ -176,10 +175,10 @@ func (a *batcher) processSegments() ([][]string, [][]string, error) {
continue
}

groups[key] = append(groups[key], v.Path)
groups[v.Key] = append(groups[v.Key], v.Path)
}

// For each metric, sort the segments by name. The last segment is the current segment.
// For each sample, sort the segments by name. The last segment is the current segment.
for k, v := range groups {
sort.Strings(v)

Expand Down
10 changes: 7 additions & 3 deletions ingestor/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ func (s *LocalStore) WriteTimeSeries(ctx context.Context, database string, ts []
b := bytesPool.Get(256)
defer bytesPool.Put(b)

db := []byte(database)
for _, v := range ts {

key := SeriesKey(b[:0], database, v.Labels)
key := SegmentKey(b[:0], db, v.Labels)
wal, err := s.GetWAL(ctx, key)
if err != nil {
return err
Expand Down Expand Up @@ -227,8 +228,9 @@ func (s *LocalStore) Import(filename string, body io.ReadCloser) (int, error) {
return int(n), nil
}

func SeriesKey(dst []byte, database string, labels []prompb.Label) []byte {
dst = append([]byte(database+"_"), dst...)
func SegmentKey(dst []byte, database []byte, labels []prompb.Label) []byte {
dst = append(dst, database...)
dst = append(dst, delim...)
for _, v := range labels {
if bytes.Equal(v.Name, []byte("__name__")) {
// return fmt.Sprintf("%s%d", string(transform.Normalize(v.Value)), int(atomic.AddUint64(&idx, 1))%2)
Expand All @@ -237,3 +239,5 @@ func SeriesKey(dst []byte, database string, labels []prompb.Label) []byte {
}
return transform.AppendNormalize(dst, labels[0].Value)
}

var delim = []byte("_")
25 changes: 18 additions & 7 deletions ingestor/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ import (

func TestSeriesKey(t *testing.T) {
tests := []struct {
Database string
Database []byte
Labels []prompb.Label
Expect []byte
}{
{
Database: "adxmetrics",
Database: []byte("adxmetrics"),
Labels: newTimeSeries("foo", nil, 0, 0).Labels,
Expect: []byte("adxmetrics_Foo"),
},
}
for _, tt := range tests {
t.Run(string(tt.Expect), func(t *testing.T) {
b := make([]byte, 256)
require.Equal(t, string(tt.Expect), string(storage.SeriesKey(b[:0], tt.Database, tt.Labels)))
require.Equal(t, string(tt.Expect), string(storage.SegmentKey(b[:0], tt.Database, tt.Labels)))
})
}
}
Expand All @@ -52,19 +52,19 @@ func TestStore_Open(t *testing.T) {
require.Equal(t, 0, s.WALCount())

ts := newTimeSeries("foo", nil, 0, 0)
w, err := s.GetWAL(ctx, storage.SeriesKey(b[:0], database, ts.Labels))
w, err := s.GetWAL(ctx, storage.SegmentKey(b[:0], []byte(database), ts.Labels))
require.NoError(t, err)
require.NotNil(t, w)
require.NoError(t, s.WriteTimeSeries(context.Background(), database, []prompb.TimeSeries{ts}))

ts = newTimeSeries("foo", nil, 1, 1)
w, err = s.GetWAL(ctx, storage.SeriesKey(b[:0], database, ts.Labels))
w, err = s.GetWAL(ctx, storage.SegmentKey(b[:0], []byte(database), ts.Labels))
require.NoError(t, err)
require.NotNil(t, w)
require.NoError(t, s.WriteTimeSeries(context.Background(), database, []prompb.TimeSeries{ts}))

ts = newTimeSeries("bar", nil, 0, 0)
w, err = s.GetWAL(ctx, storage.SeriesKey(b[:0], database, ts.Labels))
w, err = s.GetWAL(ctx, storage.SegmentKey(b[:0], []byte(database), ts.Labels))
require.NoError(t, err)
require.NotNil(t, w)
require.NoError(t, s.WriteTimeSeries(context.Background(), database, []prompb.TimeSeries{ts}))
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestLocalStore_WriteTimeSeries(t *testing.T) {
require.Equal(t, 0, s.WALCount())

ts := newTimeSeries("foo", nil, 0, 0)
w, err := s.GetWAL(ctx, storage.SeriesKey(b[:0], database, ts.Labels))
w, err := s.GetWAL(ctx, storage.SegmentKey(b[:0], []byte(database), ts.Labels))
require.NoError(t, err)
require.NotNil(t, w)
require.NoError(t, s.WriteTimeSeries(context.Background(), database, []prompb.TimeSeries{ts}))
Expand Down Expand Up @@ -141,6 +141,17 @@ func TestStore_SkipNonCSV(t *testing.T) {
require.Equal(t, 0, s.WALCount())
}

func BenchmarkSegmentKey(b *testing.B) {
buf := make([]byte, 256)
database := []byte("adxmetrics")
labels := newTimeSeries("foo", nil, 0, 0).Labels
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
storage.SegmentKey(buf[:0], database, labels)
}
}

func newTimeSeries(name string, labels map[string]string, ts int64, val float64) prompb.TimeSeries {
l := []prompb.Label{
{
Expand Down
2 changes: 2 additions & 0 deletions pkg/wal/filename.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type File struct {
Database string
Table string
Epoch string
Key string
}

func ListDir(storageDir string) ([]File, error) {
Expand All @@ -63,6 +64,7 @@ func ListDir(storageDir string) ([]File, error) {
Database: fields[0],
Table: fields[1],
Epoch: fields[2][:len(fields[2])-4],
Key: fmt.Sprintf("%s_%s", fields[0], fields[1]),
},
)
return nil
Expand Down

0 comments on commit 97ac4e9

Please sign in to comment.