Skip to content

Commit

Permalink
Ingestor support for Kustainer (#448)
Browse files Browse the repository at this point in the history
Kustainer has several limitations that impact Ingestor's interaction with Kusto.
- Kustainer has no auth
- Kustainer only supports inline ingestion

To support the lack of auth, this change inspects the Kusto cluster URL and if it's an insecure connection, we do not attempt to add auth to the Kusto connection object.

To support inline ingestion, we query Kusto's cluster details and if the cluster is marked as "KustoPersonal", we use this as a trigger to switch into an inline ingestion mode.
  • Loading branch information
jessejlt authored Nov 25, 2024
1 parent 92b9b8f commit f815b6b
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 10 deletions.
8 changes: 5 additions & 3 deletions cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/Azure/adx-mon/pkg/tls"
"github.com/Azure/adx-mon/schema"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/urfave/cli/v2"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -495,9 +494,12 @@ func newKubeClient(cCtx *cli.Context) (dynamic.Interface, kubernetes.Interface,
return dyCli, client, ctrlCli, nil
}

func newKustoClient(endpoint string) (ingest.QueryClient, error) {
func newKustoClient(endpoint string) (*kusto.Client, error) {
kcsb := kusto.NewConnectionStringBuilder(endpoint)
kcsb.WithDefaultAzureCredential()

if strings.HasPrefix(endpoint, "https://") {
kcsb.WithDefaultAzureCredential()
}

return kusto.New(kcsb)
}
Expand Down
84 changes: 77 additions & 7 deletions ingestor/adx/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/adx-mon/pkg/testutils"
"github.com/Azure/adx-mon/pkg/wal"
adxschema "github.com/Azure/adx-mon/schema"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/kql"
)

const ConcurrentUploads = 50
Expand All @@ -35,7 +37,7 @@ type Uploader interface {
}

type uploader struct {
KustoCli ingest.QueryClient
KustoCli *kusto.Client
storageDir string
database string
opts UploaderOpts
Expand All @@ -44,9 +46,10 @@ type uploader struct {
queue chan *cluster.Batch
closeFn context.CancelFunc

wg sync.WaitGroup
mu sync.RWMutex
ingestors map[string]*ingest.Ingestion
wg sync.WaitGroup
mu sync.RWMutex
ingestors map[string]ingest.Ingestor
requireDirectIngest bool
}

type UploaderOpts struct {
Expand All @@ -59,7 +62,7 @@ type UploaderOpts struct {
FnStore FunctionStore
}

func NewUploader(kustoCli ingest.QueryClient, opts UploaderOpts) *uploader {
func NewUploader(kustoCli *kusto.Client, opts UploaderOpts) *uploader {
syncer := NewSyncer(kustoCli, opts.Database, opts.DefaultMapping, opts.SampleType, opts.FnStore)

return &uploader{
Expand All @@ -69,7 +72,7 @@ func NewUploader(kustoCli ingest.QueryClient, opts UploaderOpts) *uploader {
database: opts.Database,
opts: opts,
queue: make(chan *cluster.Batch, 10000),
ingestors: make(map[string]*ingest.Ingestion),
ingestors: make(map[string]ingest.Ingestor),
}
}

Expand All @@ -81,6 +84,15 @@ func (n *uploader) Open(ctx context.Context) error {
return err
}

requireDirectIngest, err := n.clusterRequiresDirectIngest(ctx)
if err != nil {
return err
}
if requireDirectIngest {
logger.Warnf("Cluster=%s requires direct ingest: %s", n.database, n.KustoCli.Endpoint())
n.requireDirectIngest = true
}

for i := 0; i < n.opts.ConcurrentUploads; i++ {
go n.upload(c)
}
Expand Down Expand Up @@ -144,7 +156,7 @@ func (n *uploader) uploadReader(reader io.Reader, database, table string, mappin
n.mu.RUnlock()

if ingestor == nil {
ingestor, err = func() (*ingest.Ingestion, error) {
ingestor, err = func() (ingest.Ingestor, error) {
n.mu.Lock()
defer n.mu.Unlock()

Expand All @@ -153,6 +165,12 @@ func (n *uploader) uploadReader(reader io.Reader, database, table string, mappin
return ingestor, nil
}

if n.requireDirectIngest {
ingestor = testutils.NewUploadReader(n.KustoCli, database, table)
n.ingestors[table] = ingestor
return ingestor, nil
}

ingestor, err = ingest.New(n.KustoCli, n.database, table)
if err != nil {
return nil, err
Expand Down Expand Up @@ -327,3 +345,55 @@ func (n *uploader) extractSchema(path string) (string, error) {
}
return string(b), nil
}

// clusterRequiresDirectIngest checks if the cluster is configured to require direct ingest.
// In particular, if a cluster's details have a named marked as KustoPersonal, we know this
// cluster to be a Kustainer, which does not support queued or streaming ingestion.
// https://learn.microsoft.com/en-us/azure/data-explorer/kusto-emulator-overview#limitations
func (n *uploader) clusterRequiresDirectIngest(ctx context.Context) (bool, error) {
stmt := kql.New(".show cluster details")
rows, err := n.KustoCli.Mgmt(ctx, n.database, stmt)
if err != nil {
return false, fmt.Errorf("failed to query cluster details: %w", err)
}
defer rows.Stop()

for {
row, errInline, errFinal := rows.NextRowOrError()
if errFinal == io.EOF {
break
}
if errInline != nil {
continue
}
if errFinal != nil {
return false, fmt.Errorf("failed to retrieve cluster details: %w", errFinal)
}

var cs clusterDetails
if err := row.ToStruct(&cs); err != nil {
return false, fmt.Errorf("failed to convert row to struct: %w", err)
}
return cs.Name == "KustoPersonal", nil
}
return false, nil
}

type clusterDetails struct {
NodeId string `kusto:"NodeId"`
Address string `kusto:"Address"`
Name string `kusto:"Name"`
StartTime time.Time `kusto:"StartTime"`
AssignedHotExtents int `kusto:"AssignedHotExtents"`
IsAdmin bool `kusto:"IsAdmin"`
MachineTotalMemory int64 `kusto:"MachineTotalMemory"`
MachineAvailableMemory int64 `kusto:"MachineAvailableMemory"`
ProcessorCount int `kusto:"ProcessorCount"`
HotExtentsOriginalSize int64 `kusto:"HotExtentsOriginalSize"`
HotExtentsSize int64 `kusto:"HotExtentsSize"`
EnvironmentDescription string `kusto:"EnvironmentDescription"`
ProductVersion string `kusto:"ProductVersion"`
Reserved0 int `kusto:"Reserved0"`
ClockDescription string `kusto:"ClockDescription"`
RuntimeDescription string `kusto:"RuntimeDescription"`
}
31 changes: 31 additions & 0 deletions ingestor/adx/uploader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package adx

import (
"context"
"testing"

"github.com/Azure/adx-mon/pkg/testutils/kustainer"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
)

func TestClusterRequiresDirectIngest(t *testing.T) {
ctx := context.Background()
k, err := kustainer.Run(ctx, "mcr.microsoft.com/azuredataexplorer/kustainer-linux:latest", kustainer.WithStarted())
testcontainers.CleanupContainer(t, k)
require.NoError(t, err)

cb := kusto.NewConnectionStringBuilder(k.ConnectionUrl())
client, err := kusto.New(cb)
require.NoError(t, err)
defer client.Close()

u := &uploader{
KustoCli: client,
database: "NetDefaultDB",
}
requiresDirectIngest, err := u.clusterRequiresDirectIngest(ctx)
require.NoError(t, err)
require.True(t, requiresDirectIngest)
}
146 changes: 146 additions & 0 deletions pkg/testutils/kql_verify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package testutils

import (
"context"
"io"
"testing"

"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/kql"
"github.com/stretchr/testify/require"
)

func TableExists(ctx context.Context, t *testing.T, database, table, uri string) bool {
t.Helper()

cb := kusto.NewConnectionStringBuilder(uri)
client, err := kusto.New(cb)
require.NoError(t, err)
defer client.Close()

stmt := kql.New(".show tables")
rows, err := client.Mgmt(ctx, database, stmt)
require.NoError(t, err)
defer rows.Stop()

for {
row, errInline, errFinal := rows.NextRowOrError()
if errFinal == io.EOF {
break
}
if errInline != nil {
t.Logf("Partial failure to retrieve tables: %v", errInline)
continue
}
if errFinal != nil {
t.Errorf("Failed to retrieve tables: %v", errFinal)
}

var tbl Table
if err := row.ToStruct(&tbl); err != nil {
t.Errorf("Failed to convert row to struct: %v", err)
continue
}
if tbl.TableName == table {
return true
}
}

return false
}

type Table struct {
TableName string `kusto:"TableName"`
DatabaseName string `kusto:"DatabaseName"`
Folder string `kusto:"Folder"`
DocString string `kusto:"DocString"`
}

func FunctionExists(ctx context.Context, t *testing.T, database, function, uri string) bool {
t.Helper()

cb := kusto.NewConnectionStringBuilder(uri)
client, err := kusto.New(cb)
require.NoError(t, err)
defer client.Close()

stmt := kql.New(".show functions")
rows, err := client.Mgmt(ctx, database, stmt)
require.NoError(t, err)
defer rows.Stop()

for {
row, errInline, errFinal := rows.NextRowOrError()
if errFinal == io.EOF {
break
}
if errInline != nil {
t.Logf("Partial failure to retrieve functions: %v", errInline)
continue
}
if errFinal != nil {
t.Errorf("Failed to retrieve functions: %v", errFinal)
}

var fn Function
if err := row.ToStruct(&fn); err != nil {
t.Errorf("Failed to convert row to struct: %v", err)
continue
}
if fn.Name == function {
return true
}
}

return false
}

type Function struct {
Name string `kusto:"Name"`
Parameters string `kusto:"Parameters"`
Body string `kusto:"Body"`
Folder string `kusto:"Folder"`
DocString string `kusto:"DocString"`
}

func TableHasRows(ctx context.Context, t *testing.T, database, table, uri string) bool {
t.Helper()

cb := kusto.NewConnectionStringBuilder(uri)
client, err := kusto.New(cb)
require.NoError(t, err)
defer client.Close()

query := kql.New("").AddUnsafe(table).AddLiteral(" | count")
rows, err := client.Query(ctx, database, query)
require.NoError(t, err)
defer rows.Stop()

for {
row, errInline, errFinal := rows.NextRowOrError()
if errFinal == io.EOF {
break
}
if errInline != nil {
t.Logf("Partial failure to retrieve row count: %v", errInline)
continue
}
if errFinal != nil {
t.Errorf("Failed to retrieve row count: %v", errFinal)
}

var count RowCount
if err := row.ToStruct(&count); err != nil {
t.Errorf("Failed to convert row to struct: %v", err)
continue
}
return count.Count > 0
}

t.Logf("Table %s has no rows", table)
return false
}

type RowCount struct {
Count int64 `kusto:"Count"`
}
28 changes: 28 additions & 0 deletions pkg/testutils/kql_verify_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package testutils_test

import (
"context"
"testing"

"github.com/Azure/adx-mon/pkg/testutils"
"github.com/Azure/adx-mon/pkg/testutils/kustainer"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
)

func TestTableExists(t *testing.T) {
k, err := kustainer.Run(context.Background(), "mcr.microsoft.com/azuredataexplorer/kustainer-linux:latest", kustainer.WithStarted())
testcontainers.CleanupContainer(t, k)
require.NoError(t, err)

require.False(t, testutils.TableExists(context.Background(), t, "NetDefaultDB", "Foo", k.ConnectionUrl()))
require.True(t, testutils.TableExists(context.Background(), t, "NetDefaultDB", "Table_0", k.ConnectionUrl()))
}

func TestTableHasRows(t *testing.T) {
k, err := kustainer.Run(context.Background(), "mcr.microsoft.com/azuredataexplorer/kustainer-linux:latest", kustainer.WithStarted())
testcontainers.CleanupContainer(t, k)
require.NoError(t, err)

require.False(t, testutils.TableHasRows(context.Background(), t, "NetDefaultDB", "Table_0", k.ConnectionUrl()))
}
Loading

0 comments on commit f815b6b

Please sign in to comment.