From f815b6b5a06b4fe7007e67785014af59eaaf600b Mon Sep 17 00:00:00 2001 From: Jesse Thompson Date: Mon, 25 Nov 2024 09:45:58 -0700 Subject: [PATCH] Ingestor support for Kustainer (#448) Kustainer has several limitations that impact Ingestor's interaction with Kusto. - Kustainer has no auth - Kustainer only supports inline ingestion To support the lack of auth, this change inspects the Kusto cluster URL and if it's an insecure connection, we do not attempt to add auth to the Kusto connection object. To support inline ingestion, we query Kusto's cluster details and if the cluster is marked as "KustoPersonal", we use this as a trigger to switch into an inline ingestion mode. --- cmd/ingestor/main.go | 8 +- ingestor/adx/uploader.go | 84 ++++++++++++++++-- ingestor/adx/uploader_test.go | 31 +++++++ pkg/testutils/kql_verify.go | 146 +++++++++++++++++++++++++++++++ pkg/testutils/kql_verify_test.go | 28 ++++++ pkg/testutils/uploader.go | 51 +++++++++++ pkg/testutils/uploader_test.go | 50 +++++++++++ 7 files changed, 388 insertions(+), 10 deletions(-) create mode 100644 ingestor/adx/uploader_test.go create mode 100644 pkg/testutils/kql_verify.go create mode 100644 pkg/testutils/kql_verify_test.go create mode 100644 pkg/testutils/uploader.go create mode 100644 pkg/testutils/uploader_test.go diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index fa4e31039..a2c11778e 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -30,7 +30,6 @@ import ( "github.com/Azure/adx-mon/pkg/tls" "github.com/Azure/adx-mon/schema" "github.com/Azure/azure-kusto-go/kusto" - "github.com/Azure/azure-kusto-go/kusto/ingest" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli/v2" "k8s.io/client-go/dynamic" @@ -495,9 +494,12 @@ func newKubeClient(cCtx *cli.Context) (dynamic.Interface, kubernetes.Interface, return dyCli, client, ctrlCli, nil } -func newKustoClient(endpoint string) (ingest.QueryClient, error) { +func newKustoClient(endpoint string) (*kusto.Client, error) { kcsb := kusto.NewConnectionStringBuilder(endpoint) - kcsb.WithDefaultAzureCredential() + + if strings.HasPrefix(endpoint, "https://") { + kcsb.WithDefaultAzureCredential() + } return kusto.New(kcsb) } diff --git a/ingestor/adx/uploader.go b/ingestor/adx/uploader.go index 563c092e2..8cc185fd7 100644 --- a/ingestor/adx/uploader.go +++ b/ingestor/adx/uploader.go @@ -14,10 +14,12 @@ import ( "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/service" + "github.com/Azure/adx-mon/pkg/testutils" "github.com/Azure/adx-mon/pkg/wal" adxschema "github.com/Azure/adx-mon/schema" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/ingest" + "github.com/Azure/azure-kusto-go/kusto/kql" ) const ConcurrentUploads = 50 @@ -35,7 +37,7 @@ type Uploader interface { } type uploader struct { - KustoCli ingest.QueryClient + KustoCli *kusto.Client storageDir string database string opts UploaderOpts @@ -44,9 +46,10 @@ type uploader struct { queue chan *cluster.Batch closeFn context.CancelFunc - wg sync.WaitGroup - mu sync.RWMutex - ingestors map[string]*ingest.Ingestion + wg sync.WaitGroup + mu sync.RWMutex + ingestors map[string]ingest.Ingestor + requireDirectIngest bool } type UploaderOpts struct { @@ -59,7 +62,7 @@ type UploaderOpts struct { FnStore FunctionStore } -func NewUploader(kustoCli ingest.QueryClient, opts UploaderOpts) *uploader { +func NewUploader(kustoCli *kusto.Client, opts UploaderOpts) *uploader { syncer := NewSyncer(kustoCli, opts.Database, opts.DefaultMapping, opts.SampleType, opts.FnStore) return &uploader{ @@ -69,7 +72,7 @@ func NewUploader(kustoCli ingest.QueryClient, opts UploaderOpts) *uploader { database: opts.Database, opts: opts, queue: make(chan *cluster.Batch, 10000), - ingestors: make(map[string]*ingest.Ingestion), + ingestors: make(map[string]ingest.Ingestor), } } @@ -81,6 +84,15 @@ func (n *uploader) Open(ctx context.Context) error { return err } + requireDirectIngest, err := n.clusterRequiresDirectIngest(ctx) + if err != nil { + return err + } + if requireDirectIngest { + logger.Warnf("Cluster=%s requires direct ingest: %s", n.database, n.KustoCli.Endpoint()) + n.requireDirectIngest = true + } + for i := 0; i < n.opts.ConcurrentUploads; i++ { go n.upload(c) } @@ -144,7 +156,7 @@ func (n *uploader) uploadReader(reader io.Reader, database, table string, mappin n.mu.RUnlock() if ingestor == nil { - ingestor, err = func() (*ingest.Ingestion, error) { + ingestor, err = func() (ingest.Ingestor, error) { n.mu.Lock() defer n.mu.Unlock() @@ -153,6 +165,12 @@ func (n *uploader) uploadReader(reader io.Reader, database, table string, mappin return ingestor, nil } + if n.requireDirectIngest { + ingestor = testutils.NewUploadReader(n.KustoCli, database, table) + n.ingestors[table] = ingestor + return ingestor, nil + } + ingestor, err = ingest.New(n.KustoCli, n.database, table) if err != nil { return nil, err @@ -327,3 +345,55 @@ func (n *uploader) extractSchema(path string) (string, error) { } return string(b), nil } + +// clusterRequiresDirectIngest checks if the cluster is configured to require direct ingest. +// In particular, if a cluster's details have a named marked as KustoPersonal, we know this +// cluster to be a Kustainer, which does not support queued or streaming ingestion. +// https://learn.microsoft.com/en-us/azure/data-explorer/kusto-emulator-overview#limitations +func (n *uploader) clusterRequiresDirectIngest(ctx context.Context) (bool, error) { + stmt := kql.New(".show cluster details") + rows, err := n.KustoCli.Mgmt(ctx, n.database, stmt) + if err != nil { + return false, fmt.Errorf("failed to query cluster details: %w", err) + } + defer rows.Stop() + + for { + row, errInline, errFinal := rows.NextRowOrError() + if errFinal == io.EOF { + break + } + if errInline != nil { + continue + } + if errFinal != nil { + return false, fmt.Errorf("failed to retrieve cluster details: %w", errFinal) + } + + var cs clusterDetails + if err := row.ToStruct(&cs); err != nil { + return false, fmt.Errorf("failed to convert row to struct: %w", err) + } + return cs.Name == "KustoPersonal", nil + } + return false, nil +} + +type clusterDetails struct { + NodeId string `kusto:"NodeId"` + Address string `kusto:"Address"` + Name string `kusto:"Name"` + StartTime time.Time `kusto:"StartTime"` + AssignedHotExtents int `kusto:"AssignedHotExtents"` + IsAdmin bool `kusto:"IsAdmin"` + MachineTotalMemory int64 `kusto:"MachineTotalMemory"` + MachineAvailableMemory int64 `kusto:"MachineAvailableMemory"` + ProcessorCount int `kusto:"ProcessorCount"` + HotExtentsOriginalSize int64 `kusto:"HotExtentsOriginalSize"` + HotExtentsSize int64 `kusto:"HotExtentsSize"` + EnvironmentDescription string `kusto:"EnvironmentDescription"` + ProductVersion string `kusto:"ProductVersion"` + Reserved0 int `kusto:"Reserved0"` + ClockDescription string `kusto:"ClockDescription"` + RuntimeDescription string `kusto:"RuntimeDescription"` +} diff --git a/ingestor/adx/uploader_test.go b/ingestor/adx/uploader_test.go new file mode 100644 index 000000000..7d7c81a0c --- /dev/null +++ b/ingestor/adx/uploader_test.go @@ -0,0 +1,31 @@ +package adx + +import ( + "context" + "testing" + + "github.com/Azure/adx-mon/pkg/testutils/kustainer" + "github.com/Azure/azure-kusto-go/kusto" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" +) + +func TestClusterRequiresDirectIngest(t *testing.T) { + ctx := context.Background() + k, err := kustainer.Run(ctx, "mcr.microsoft.com/azuredataexplorer/kustainer-linux:latest", kustainer.WithStarted()) + testcontainers.CleanupContainer(t, k) + require.NoError(t, err) + + cb := kusto.NewConnectionStringBuilder(k.ConnectionUrl()) + client, err := kusto.New(cb) + require.NoError(t, err) + defer client.Close() + + u := &uploader{ + KustoCli: client, + database: "NetDefaultDB", + } + requiresDirectIngest, err := u.clusterRequiresDirectIngest(ctx) + require.NoError(t, err) + require.True(t, requiresDirectIngest) +} diff --git a/pkg/testutils/kql_verify.go b/pkg/testutils/kql_verify.go new file mode 100644 index 000000000..d0b6a0a6d --- /dev/null +++ b/pkg/testutils/kql_verify.go @@ -0,0 +1,146 @@ +package testutils + +import ( + "context" + "io" + "testing" + + "github.com/Azure/azure-kusto-go/kusto" + "github.com/Azure/azure-kusto-go/kusto/kql" + "github.com/stretchr/testify/require" +) + +func TableExists(ctx context.Context, t *testing.T, database, table, uri string) bool { + t.Helper() + + cb := kusto.NewConnectionStringBuilder(uri) + client, err := kusto.New(cb) + require.NoError(t, err) + defer client.Close() + + stmt := kql.New(".show tables") + rows, err := client.Mgmt(ctx, database, stmt) + require.NoError(t, err) + defer rows.Stop() + + for { + row, errInline, errFinal := rows.NextRowOrError() + if errFinal == io.EOF { + break + } + if errInline != nil { + t.Logf("Partial failure to retrieve tables: %v", errInline) + continue + } + if errFinal != nil { + t.Errorf("Failed to retrieve tables: %v", errFinal) + } + + var tbl Table + if err := row.ToStruct(&tbl); err != nil { + t.Errorf("Failed to convert row to struct: %v", err) + continue + } + if tbl.TableName == table { + return true + } + } + + return false +} + +type Table struct { + TableName string `kusto:"TableName"` + DatabaseName string `kusto:"DatabaseName"` + Folder string `kusto:"Folder"` + DocString string `kusto:"DocString"` +} + +func FunctionExists(ctx context.Context, t *testing.T, database, function, uri string) bool { + t.Helper() + + cb := kusto.NewConnectionStringBuilder(uri) + client, err := kusto.New(cb) + require.NoError(t, err) + defer client.Close() + + stmt := kql.New(".show functions") + rows, err := client.Mgmt(ctx, database, stmt) + require.NoError(t, err) + defer rows.Stop() + + for { + row, errInline, errFinal := rows.NextRowOrError() + if errFinal == io.EOF { + break + } + if errInline != nil { + t.Logf("Partial failure to retrieve functions: %v", errInline) + continue + } + if errFinal != nil { + t.Errorf("Failed to retrieve functions: %v", errFinal) + } + + var fn Function + if err := row.ToStruct(&fn); err != nil { + t.Errorf("Failed to convert row to struct: %v", err) + continue + } + if fn.Name == function { + return true + } + } + + return false +} + +type Function struct { + Name string `kusto:"Name"` + Parameters string `kusto:"Parameters"` + Body string `kusto:"Body"` + Folder string `kusto:"Folder"` + DocString string `kusto:"DocString"` +} + +func TableHasRows(ctx context.Context, t *testing.T, database, table, uri string) bool { + t.Helper() + + cb := kusto.NewConnectionStringBuilder(uri) + client, err := kusto.New(cb) + require.NoError(t, err) + defer client.Close() + + query := kql.New("").AddUnsafe(table).AddLiteral(" | count") + rows, err := client.Query(ctx, database, query) + require.NoError(t, err) + defer rows.Stop() + + for { + row, errInline, errFinal := rows.NextRowOrError() + if errFinal == io.EOF { + break + } + if errInline != nil { + t.Logf("Partial failure to retrieve row count: %v", errInline) + continue + } + if errFinal != nil { + t.Errorf("Failed to retrieve row count: %v", errFinal) + } + + var count RowCount + if err := row.ToStruct(&count); err != nil { + t.Errorf("Failed to convert row to struct: %v", err) + continue + } + return count.Count > 0 + } + + t.Logf("Table %s has no rows", table) + return false +} + +type RowCount struct { + Count int64 `kusto:"Count"` +} diff --git a/pkg/testutils/kql_verify_test.go b/pkg/testutils/kql_verify_test.go new file mode 100644 index 000000000..78c94a1ea --- /dev/null +++ b/pkg/testutils/kql_verify_test.go @@ -0,0 +1,28 @@ +package testutils_test + +import ( + "context" + "testing" + + "github.com/Azure/adx-mon/pkg/testutils" + "github.com/Azure/adx-mon/pkg/testutils/kustainer" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" +) + +func TestTableExists(t *testing.T) { + k, err := kustainer.Run(context.Background(), "mcr.microsoft.com/azuredataexplorer/kustainer-linux:latest", kustainer.WithStarted()) + testcontainers.CleanupContainer(t, k) + require.NoError(t, err) + + require.False(t, testutils.TableExists(context.Background(), t, "NetDefaultDB", "Foo", k.ConnectionUrl())) + require.True(t, testutils.TableExists(context.Background(), t, "NetDefaultDB", "Table_0", k.ConnectionUrl())) +} + +func TestTableHasRows(t *testing.T) { + k, err := kustainer.Run(context.Background(), "mcr.microsoft.com/azuredataexplorer/kustainer-linux:latest", kustainer.WithStarted()) + testcontainers.CleanupContainer(t, k) + require.NoError(t, err) + + require.False(t, testutils.TableHasRows(context.Background(), t, "NetDefaultDB", "Table_0", k.ConnectionUrl())) +} diff --git a/pkg/testutils/uploader.go b/pkg/testutils/uploader.go new file mode 100644 index 000000000..df0dfe51a --- /dev/null +++ b/pkg/testutils/uploader.go @@ -0,0 +1,51 @@ +package testutils + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/Azure/azure-kusto-go/kusto" + "github.com/Azure/azure-kusto-go/kusto/ingest" + "github.com/Azure/azure-kusto-go/kusto/kql" +) + +type Uploader struct { + client *kusto.Client + database string + table string +} + +// NewUploadReader implements ingest.Ingestor +func NewUploadReader(client *kusto.Client, database string, table string) *Uploader { + return &Uploader{ + client: client, + database: database, + table: table, + } +} + +func (u *Uploader) Close() error { + return nil +} + +func (u *Uploader) FromFile(ctx context.Context, fPath string, options ...ingest.FileOption) (*ingest.Result, error) { + return nil, errors.New("not implemented") +} + +func (u *Uploader) FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error) { + // Kustainer is not able to using a streaming ingestor as there is no storage containers backing the Kusto cluster. + // We must instead ingest inline and thus consume the reader. + data, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to read data: %w", err) + } + + stmt := kql.New(".ingest inline into table ").AddTable(u.table).AddLiteral(" <| ").AddUnsafe(string(data)) + if _, err = u.client.Mgmt(ctx, u.database, stmt); err != nil { + return nil, fmt.Errorf("failed to ingest data: %w", err) + } + + return &ingest.Result{}, nil +} diff --git a/pkg/testutils/uploader_test.go b/pkg/testutils/uploader_test.go new file mode 100644 index 000000000..0ccfdb4fd --- /dev/null +++ b/pkg/testutils/uploader_test.go @@ -0,0 +1,50 @@ +package testutils_test + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/Azure/adx-mon/pkg/testutils" + "github.com/Azure/adx-mon/pkg/testutils/kustainer" + "github.com/Azure/azure-kusto-go/kusto" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" +) + +func TestUploader(t *testing.T) { + var ( + database = "NetDefaultDB" + table = "Table_0" + ctx = context.Background() + ) + k, err := kustainer.Run(ctx, "mcr.microsoft.com/azuredataexplorer/kustainer-linux:latest", kustainer.WithStarted()) + testcontainers.CleanupContainer(t, k) + require.NoError(t, err) + + cb := kusto.NewConnectionStringBuilder(k.ConnectionUrl()) + client, err := kusto.New(cb) + require.NoError(t, err) + defer client.Close() + + t.Run("Ingest", func(t *testing.T) { + uploader := testutils.NewUploadReader(client, database, table) + r := strings.NewReader("a") + result, err := uploader.FromReader(ctx, r) + require.NoError(t, err) + require.NotNil(t, result) + }) + + t.Run("Table exists in Kusto", func(t *testing.T) { + require.Eventually(t, func() bool { + return testutils.TableExists(ctx, t, database, table, k.ConnectionUrl()) + }, time.Minute, time.Second) + }) + + t.Run("Table has rows", func(t *testing.T) { + require.Eventually(t, func() bool { + return testutils.TableHasRows(ctx, t, database, table, k.ConnectionUrl()) + }, time.Minute, time.Second) + }) +}