Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store kusto database as part of wal filename #90

Merged
merged 5 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func realMain(ctx *cli.Context) error {
runtime.SetMutexProfileFraction(1)

var (
storageDir, kustoEndpoint, database string
storageDir, kustoEndpoint string
cacert, key string
insecureSkipVerify, disablePeerDiscovery bool
concurrentUploads int
Expand Down Expand Up @@ -248,7 +248,7 @@ func realMain(ctx *cli.Context) error {
dropMetrics = append(dropMetrics, metricRegex)
}

uploader, err := newUploader(kustoEndpoint, database, storageDir, concurrentUploads, defaultMapping)
uploader, err := newUploader(kustoEndpoint, storageDir, concurrentUploads, defaultMapping)
if err != nil {
logger.Fatal("Failed to create uploader: %s", err)
}
Expand Down Expand Up @@ -398,8 +398,8 @@ func newKustoClient(endpoint string) (ingest.QueryClient, error) {
return kusto.New(kcsb)
}

func newUploader(kustoEndpoint, database, storageDir string, concurrentUploads int, defaultMapping storage.SchemaMapping) (adx.Uploader, error) {
if kustoEndpoint == "" && database == "" {
func newUploader(kustoEndpoint, storageDir string, concurrentUploads int, defaultMapping storage.SchemaMapping) (adx.Uploader, error) {
if kustoEndpoint == "" {
logger.Warn("No kusto endpoint provided, using fake uploader")
return adx.NewFakeUploader(), nil
}
Expand All @@ -413,7 +413,7 @@ func newUploader(kustoEndpoint, database, storageDir string, concurrentUploads i
}

split := strings.Split(kustoEndpoint, "=")
database = split[0]
database := split[0]
kustoEndpoint = split[1]

if database == "" {
Expand Down
5 changes: 4 additions & 1 deletion ingestor/adx/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func (f *fakeUploader) UploadQueue() chan []string {
return f.queue
}

func (f *fakeUploader) Database() string {
return ""
}

func (f *fakeUploader) upload(ctx context.Context) {
for {
select {
Expand All @@ -56,7 +60,6 @@ func (f *fakeUploader) upload(ctx context.Context) {

type fakeKustoMgmt struct {
expectedQuery, actualQuery string
expectedRows *kusto.MockRows
}

func (f *fakeKustoMgmt) Mgmt(ctx context.Context, db string, query kusto.Stmt, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
Expand Down
29 changes: 20 additions & 9 deletions ingestor/adx/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"

Expand All @@ -21,6 +19,8 @@ const ConcurrentUploads = 50
type Uploader interface {
service.Component

Database() string

// UploadQueue returns a channel that can be used to upload files to kusto.
UploadQueue() chan []string
}
Expand Down Expand Up @@ -99,7 +99,11 @@ func (n *uploader) UploadQueue() chan []string {
return n.queue
}

func (n *uploader) uploadReader(reader io.Reader, table string) error {
func (n *uploader) Database() string {
return n.database
}

func (n *uploader) uploadReader(reader io.Reader, database, table string) error {
// Ensure we wait for this upload to finish.
n.wg.Add(1)
defer n.wg.Done()
Expand Down Expand Up @@ -169,9 +173,13 @@ func (n *uploader) upload(ctx context.Context) error {
case paths := <-n.queue:

func() {
readers := make([]io.Reader, 0, len(paths))
files := make([]io.Closer, 0, len(paths))
var fields []string
var (
readers = make([]io.Reader, 0, len(paths))
files = make([]io.Closer, 0, len(paths))
database string
table string
err error
)
n.mu.Lock()
for _, path := range paths {

Expand All @@ -181,8 +189,11 @@ func (n *uploader) upload(ctx context.Context) error {
}
n.uploading[path] = struct{}{}

fileName := filepath.Base(path)
fields = strings.Split(fileName, "_")
database, table, _, err = wal.ParseFilename(path)
if err != nil {
logger.Error("Failed to parse file: %s", err.Error())
continue
}

f, err := wal.NewSegmentReader(path)
if os.IsNotExist(err) {
Expand Down Expand Up @@ -217,7 +228,7 @@ func (n *uploader) upload(ctx context.Context) error {
mr := io.MultiReader(readers...)

now := time.Now()
if err := n.uploadReader(mr, fields[0]); err != nil {
if err := n.uploadReader(mr, database, table); err != nil {
logger.Error("Failed to upload file: %s", err.Error())
return
}
Expand Down
47 changes: 18 additions & 29 deletions ingestor/cluster/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/flake"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/adx-mon/pkg/wal"
)

type Segmenter interface {
Expand Down Expand Up @@ -126,12 +125,13 @@ func (a *batcher) BatchSegments() error {
// thresholds. In addition, the batches are ordered as oldest first to allow for prioritizing
// lagging segments over new ones.
func (a *batcher) processSegments() ([][]string, [][]string, error) {
entries, err := os.ReadDir(a.storageDir)
entries, err := wal.ListDir(a.storageDir)
if err != nil {
return nil, nil, fmt.Errorf("failed to read storage dir: %w", err)
}

sort.Slice(entries, func(i, j int) bool {
return entries[i].Name() < entries[j].Name()
return entries[i].Path < entries[j].Path
})

metrics.IngestorSegmentsTotal.Reset()
Expand All @@ -147,46 +147,36 @@ func (a *batcher) processSegments() ([][]string, [][]string, error) {
groupSize int
)
for _, v := range entries {
if v.IsDir() || !strings.HasSuffix(v.Name(), ".wal") {
continue
}

fi, err := v.Info()
fi, err := os.Stat(v.Path)
if err != nil {
logger.Warn("Failed to stat file: %s", filepath.Join(a.storageDir, v.Name()))
logger.Warn("Failed to stat file: %s", v.Path)
continue
}
groupSize += int(fi.Size())

parts := strings.Split(v.Name(), "_")
if len(parts) != 2 { // Cpu_1234.wal
logger.Warn("Invalid file name: %s", filepath.Join(a.storageDir, v.Name()))
continue
}

epoch := parts[1][:len(parts[1])-4]
createdAt, err := flake.ParseFlakeID(epoch)
key := fmt.Sprintf("%s_%s", v.Database, v.Table)
createdAt, err := flake.ParseFlakeID(v.Epoch)
jessejlt marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Warn("Failed to parse flake id: %s: %s", epoch, err)
logger.Warn("Failed to parse flake id: %s: %s", v.Epoch, err)
} else {
if lastMetric == "" || parts[0] != lastMetric {
if lastMetric == "" || key != lastMetric {
metrics.IngestorSegmentsMaxAge.WithLabelValues(lastMetric).Set(time.Since(createdAt).Seconds())
jessejlt marked this conversation as resolved.
Show resolved Hide resolved
metrics.IngestorSegmentsSizeBytes.WithLabelValues(lastMetric).Set(float64(groupSize))
groupSize = 0
}
}
lastMetric = parts[0]
lastMetric = key

metrics.IngestorSegmentsTotal.WithLabelValues(parts[0]).Inc()
metrics.IngestorSegmentsTotal.WithLabelValues(key).Inc()

if a.Segmenter.IsActiveSegment(filepath.Join(a.storageDir, v.Name())) {
if a.Segmenter.IsActiveSegment(v.Path) {
if logger.IsDebug() {
logger.Debug("Skipping active segment: %s", filepath.Join(a.storageDir, v.Name()))
logger.Debug("Skipping active segment: %s", v.Path)
}
continue
}

groups[parts[0]] = append(groups[parts[0]], filepath.Join(a.storageDir, v.Name()))
groups[key] = append(groups[key], v.Path)
}

// For each metric, sort the segments by name. The last segment is the current segment.
Expand Down Expand Up @@ -292,12 +282,11 @@ func minCreated(batch []string) time.Time {
}

func segmentCreationTime(filename string) (time.Time, error) {
parts := strings.Split(filepath.Base(filename), "_")
if len(parts) != 2 { // Cpu_1234.wal
return time.Time{}, fmt.Errorf("invalid file name: %s", filename)
_, _, epoch, err := wal.ParseFilename(filename)
if err != nil {
return time.Time{}, fmt.Errorf("invalid file name: %s: %w", filename, err)
}

epoch := parts[1][:len(parts[1])-4]
createdAt, err := flake.ParseFlakeID(epoch)
if err != nil {
return time.Time{}, fmt.Errorf("invalid file name: %s: %w", filename, err)
Expand Down
26 changes: 13 additions & 13 deletions ingestor/cluster/batcher_test.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
package cluster

import (
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/Azure/adx-mon/pkg/wal"
"github.com/davidnarayan/go-flake"
"github.com/stretchr/testify/require"
)

func TestBatcher_ClosedSegments(t *testing.T) {
dir := t.TempDir()

f, err := os.Create(filepath.Join(dir, "Cpu_aaaa.wal"))
f, err := os.Create(filepath.Join(dir, wal.Filename("db", "Cpu", "aaaa")))
require.NoError(t, err)
defer f.Close()

f1, err := os.Create(filepath.Join(dir, "Cpu_bbbb.wal"))
f1, err := os.Create(filepath.Join(dir, wal.Filename("db", "Cpu", "bbbb")))
require.NoError(t, err)
defer f1.Close()

a := &batcher{
hostname: "node1",
storageDir: dir,
Partitioner: &fakePartitioner{owner: "node1"},
Segmenter: &fakeSegmenter{active: filepath.Join(dir, "Cpu_bbbb.wal")},
Segmenter: &fakeSegmenter{active: filepath.Join(dir, wal.Filename("db", "Cpu", "bbbb"))},
}
owner, notOwned, err := a.processSegments()
require.NoError(t, err)
require.Equal(t, []string{filepath.Join(dir, "Cpu_aaaa.wal")}, owner[0])
require.Equal(t, []string{filepath.Join(dir, wal.Filename("db", "Cpu", "aaaa"))}, owner[0])
require.Equal(t, 0, len(notOwned))
}

Expand All @@ -41,13 +41,13 @@ func TestBatcher_NodeOwned(t *testing.T) {
require.NoError(t, err)
now := idgen.NextId()

fName := fmt.Sprintf("Cpu_%s.wal", now.String())
fName := wal.Filename("db", "Cpu", now.String())
f, err := os.Create(filepath.Join(dir, fName))
require.NoError(t, err)
defer f.Close()

now = idgen.NextId()
f1Name := fmt.Sprintf("Cpu_%s.wal", now.String())
f1Name := wal.Filename("db", "Cpu", now.String())
f1, err := os.Create(filepath.Join(dir, f1Name))
require.NoError(t, err)
defer f1.Close()
Expand All @@ -58,7 +58,7 @@ func TestBatcher_NodeOwned(t *testing.T) {
maxSegmentAge: 30 * time.Second,
maxSegmentSize: 100 * 1024 * 1024,
Partitioner: &fakePartitioner{owner: "node2"},
Segmenter: &fakeSegmenter{active: "Memory_aaaa.wal"},
Segmenter: &fakeSegmenter{active: wal.Filename("db", "Memory", "aaaa")},
}
owner, notOwned, err := a.processSegments()
require.NoError(t, err)
Expand All @@ -69,27 +69,27 @@ func TestBatcher_NodeOwned(t *testing.T) {
func TestBatcher_OldestFirst(t *testing.T) {
dir := t.TempDir()

f, err := os.Create(filepath.Join(dir, "Cpu_2359cdac7d6f0001.wal"))
f, err := os.Create(filepath.Join(dir, wal.Filename("db", "Cpu", "2359cdac7d6f0001")))
require.NoError(t, err)
defer f.Close()

// This segment is older, but lexicographically greater. It should be the in the first batch.
f1, err := os.Create(filepath.Join(dir, "Disk_2359cd7e3aef0001.wal"))
f1, err := os.Create(filepath.Join(dir, wal.Filename("db", "Disk", "2359cd7e3aef0001")))
require.NoError(t, err)
defer f1.Close()

a := &batcher{
hostname: "node1",
storageDir: dir,
Partitioner: &fakePartitioner{owner: "node1"},
Segmenter: &fakeSegmenter{active: "Memory_aaaa.wal"},
Segmenter: &fakeSegmenter{active: wal.Filename("db", "Memory", "aaaa")},
}
owner, notOwned, err := a.processSegments()
require.NoError(t, err)
require.Equal(t, 2, len(owner))
require.Equal(t, 0, len(notOwned))
require.Equal(t, []string{filepath.Join(dir, "Disk_2359cd7e3aef0001.wal")}, owner[0])
require.Equal(t, []string{filepath.Join(dir, "Cpu_2359cdac7d6f0001.wal")}, owner[1])
require.Equal(t, []string{filepath.Join(dir, wal.Filename("db", "Disk", "2359cd7e3aef0001"))}, owner[0])
require.Equal(t, []string{filepath.Join(dir, wal.Filename("db", "Cpu", "2359cdac7d6f0001"))}, owner[1])
}

type fakePartitioner struct {
Expand Down
8 changes: 4 additions & 4 deletions ingestor/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ var (
bytesPool = pool.NewBytes(1024)
)

type TimeSeriesWriter func(ctx context.Context, ts []prompb.TimeSeries) error
type TimeSeriesWriter func(ctx context.Context, database string, ts []prompb.TimeSeries) error

type Coordinator interface {
MetricPartitioner
service.Component

// Write writes the time series to the correct peer.
Write(ctx context.Context, wr prompb.WriteRequest) error
Write(ctx context.Context, database string, wr prompb.WriteRequest) error
}

// Coordinator manages the cluster state and writes to the correct peer.
Expand Down Expand Up @@ -234,8 +234,8 @@ func (c *coordinator) Close() error {
return nil
}

func (c *coordinator) Write(ctx context.Context, wr prompb.WriteRequest) error {
return c.tsw(ctx, wr.Timeseries)
func (c *coordinator) Write(ctx context.Context, database string, wr prompb.WriteRequest) error {
return c.tsw(ctx, database, wr.Timeseries)
}

// syncPeers determines the active set of ingestors and reconfigures the partitioner.
Expand Down
4 changes: 2 additions & 2 deletions ingestor/metrics/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type FakeRequestWriter struct {
}

func (f *FakeRequestWriter) Write(ctx context.Context, wr prompb.WriteRequest) error {
logger.Info("Received %d samples. Dropping", len(wr.Timeseries))
func (f *FakeRequestWriter) Write(ctx context.Context, database string, wr prompb.WriteRequest) error {
logger.Info("Received %d samples for database %s. Dropping", len(wr.Timeseries), database)
return nil
}
Loading