From 235ebb351b7281f2355ab35041148c3a88157ee9 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Apr 2024 12:44:41 +0200 Subject: [PATCH 1/7] Try verbose simulate API to detect issues in pipelines --- internal/elasticsearch/ingest/pipeline.go | 45 ++++++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 16fde116b7..99e63b7f66 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -7,6 +7,7 @@ package ingest import ( "bytes" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -22,7 +23,15 @@ type simulatePipelineRequest struct { } type simulatePipelineResponse struct { - Docs []pipelineIngestedDocument `json:"docs"` + Docs []struct { + ProcessorResults []verboseProcessorResult `json:"processor_results"` + } +} + +type verboseProcessorResult struct { + Processor string `json:"processor_type"` + Status string `json:"status"` + Doc pipelineDocument `json:"doc"` } type pipelineDocument struct { @@ -30,10 +39,6 @@ type pipelineDocument struct { Source json.RawMessage `json:"_source"` } -type pipelineIngestedDocument struct { - Doc pipelineDocument `json:"doc"` -} - // Pipeline represents a pipeline resource loaded from a file type Pipeline struct { Path string // Path of the file with the pipeline definition. @@ -86,9 +91,10 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json return nil, fmt.Errorf("marshalling simulate request failed: %w", err) } - r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), func(request *elasticsearch.IngestSimulateRequest) { - request.PipelineID = pipelineName - }) + r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), + api.Ingest.Simulate.WithPipelineID(pipelineName), + api.Ingest.Simulate.WithVerbose(true), + ) if err != nil { return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err) } @@ -110,10 +116,29 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json } processedEvents := make([]json.RawMessage, len(response.Docs)) + var errs []error for i, doc := range response.Docs { - processedEvents[i] = doc.Doc.Source + var source json.RawMessage + failed := false + for _, result := range doc.ProcessorResults { + switch result.Status { + case "success": + // Keep last successful document. + source = result.Doc.Source + case "skipped": + continue + case "failed": + failed = true + errs = append(errs, fmt.Errorf("%q processor failed (status: %s)", result.Processor, result.Status)) + } + } + + if !failed { + processedEvents[i] = source + } } - return processedEvents, nil + + return processedEvents, errors.Join(errs...) } func UninstallPipelines(api *elasticsearch.API, pipelines []Pipeline) error { From 2e540edb85c2630c85bd69b69d40ba975e28eb8b Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Apr 2024 12:46:32 +0200 Subject: [PATCH 2/7] Missing JSON tag --- internal/elasticsearch/ingest/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 99e63b7f66..dc0b2b6022 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -25,7 +25,7 @@ type simulatePipelineRequest struct { type simulatePipelineResponse struct { Docs []struct { ProcessorResults []verboseProcessorResult `json:"processor_results"` - } + } `json:"docs"` } type verboseProcessorResult struct { From 315af076e7b379e7b1f36cd79c8fad9eb6175f6a Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Apr 2024 13:19:54 +0200 Subject: [PATCH 3/7] Handle dropped events --- internal/elasticsearch/ingest/pipeline.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index dc0b2b6022..f9f3db8bda 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -125,11 +125,15 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json case "success": // Keep last successful document. source = result.Doc.Source + case "dropped": + source = nil case "skipped": continue case "failed": failed = true errs = append(errs, fmt.Errorf("%q processor failed (status: %s)", result.Processor, result.Status)) + default: + errs = append(errs, fmt.Errorf("unexpected result status %s", result.Status)) } } From 9951c42bc802eed3a8f4b03227823358190e45b0 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Apr 2024 13:41:34 +0200 Subject: [PATCH 4/7] Handle errors --- internal/elasticsearch/ingest/pipeline.go | 29 +++++++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index f9f3db8bda..511738d665 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -16,6 +16,7 @@ import ( "gopkg.in/yaml.v3" "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/logger" ) type simulatePipelineRequest struct { @@ -29,9 +30,19 @@ type simulatePipelineResponse struct { } type verboseProcessorResult struct { - Processor string `json:"processor_type"` - Status string `json:"status"` - Doc pipelineDocument `json:"doc"` + Processor string `json:"processor_type"` + Status string `json:"status"` + Doc pipelineDocument `json:"doc"` + Error verboseProcessorError `json:"error"` + Ignored struct { + Error verboseProcessorError `json:"error"` + } `json:"ignored_error"` +} + +type verboseProcessorError struct { + Type string `json:"type"` + Reason string `json:"reason"` + RootCause json.RawMessage `json:"root_cause"` } type pipelineDocument struct { @@ -94,6 +105,8 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), api.Ingest.Simulate.WithPipelineID(pipelineName), api.Ingest.Simulate.WithVerbose(true), + + api.Ingest.Simulate.WithPretty(), ) if err != nil { return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err) @@ -129,11 +142,17 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json source = nil case "skipped": continue + case "error_ignored": + logger.Debugf("error ignored for processor %s: [%s] %s", result.Processor, result.Ignored.Error.Type, result.Ignored.Error.Reason) + continue + case "error": + failed = true + errs = append(errs, fmt.Errorf("error in processor %s: [%s] %s", result.Processor, result.Error.Type, result.Error.Reason)) case "failed": failed = true - errs = append(errs, fmt.Errorf("%q processor failed (status: %s)", result.Processor, result.Status)) + errs = append(errs, fmt.Errorf("%q processor failed", result.Processor)) default: - errs = append(errs, fmt.Errorf("unexpected result status %s", result.Status)) + errs = append(errs, fmt.Errorf("unexpected result status %q for processor %q", result.Status, result.Processor)) } } From aede21130f1d9d6979d45c6add329139769bde0b Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Apr 2024 13:42:16 +0200 Subject: [PATCH 5/7] Remove pretty --- internal/elasticsearch/ingest/pipeline.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 511738d665..ef9df278ec 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -105,8 +105,6 @@ func SimulatePipeline(api *elasticsearch.API, pipelineName string, events []json r, err := api.Ingest.Simulate(bytes.NewReader(requestBody), api.Ingest.Simulate.WithPipelineID(pipelineName), api.Ingest.Simulate.WithVerbose(true), - - api.Ingest.Simulate.WithPretty(), ) if err != nil { return nil, fmt.Errorf("simulate API call failed (pipelineName: %s): %w", pipelineName, err) From c51f18e1e3b394e85532689a88a851dcc7976500 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Apr 2024 16:18:28 +0200 Subject: [PATCH 6/7] Take into account on-failure handlers --- internal/elasticsearch/ingest/pipeline.go | 41 +++++++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index 274c85a1b4..bd6ea69514 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -13,6 +13,7 @@ import ( "io" "net/http" "strings" + "time" "gopkg.in/yaml.v3" @@ -46,9 +47,24 @@ type verboseProcessorError struct { RootCause json.RawMessage `json:"root_cause"` } +func (e verboseProcessorError) Error() string { + return fmt.Sprintf("[%s] %s", e.Type, e.Reason) +} + type pipelineDocument struct { - Index string `json:"_index"` - Source json.RawMessage `json:"_source"` + Index string `json:"_index"` + Source json.RawMessage `json:"_source"` + Ingest verboseProcessorIngest `json:"_ingest"` +} + +type verboseProcessorIngest struct { + Pipeline string `json:"pipeline"` + Timestamp time.Time `json:"timestamp"` + + OnFailurePipeline string `json:"on_failure_pipeline"` + OnFailureMessage string `json:"on_failure_message"` + OnFailureProcessorTag string `json:"on_failure_processor_tag"` + OnFailureProcessorType string `json:"on_failure_processor_type"` } // Pipeline represents a pipeline resource loaded from a file @@ -128,12 +144,31 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName return nil, fmt.Errorf("unmarshalling simulate request failed: %w", err) } + handleErrors := func(ingest verboseProcessorIngest, errs []error) []error { + var filtered []error + for _, err := range errs { + var processorError verboseProcessorError + if errors.As(err, &processorError) && processorError.Reason == ingest.OnFailureMessage { + continue + } + filtered = append(filtered, err) + } + return filtered + } + processedEvents := make([]json.RawMessage, len(response.Docs)) var errs []error for i, doc := range response.Docs { var source json.RawMessage failed := false for _, result := range doc.ProcessorResults { + if result.Doc.Ingest.OnFailureMessage != "" { + // This processor is in an on_failure handler, filter out the handled errors + // and assume that processing is going on. + errs = handleErrors(result.Doc.Ingest, errs) + failed = false + } + switch result.Status { case "success": // Keep last successful document. @@ -147,7 +182,7 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName continue case "error": failed = true - errs = append(errs, fmt.Errorf("error in processor %s: [%s] %s", result.Processor, result.Error.Type, result.Error.Reason)) + errs = append(errs, fmt.Errorf("error in pricessor %s: %w", result.Processor, result.Error)) case "failed": failed = true errs = append(errs, fmt.Errorf("%q processor failed", result.Processor)) From a3cb1ec1662eb021335b338cc3b95e91533dd976 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Apr 2024 16:19:34 +0200 Subject: [PATCH 7/7] Typo --- internal/elasticsearch/ingest/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/elasticsearch/ingest/pipeline.go b/internal/elasticsearch/ingest/pipeline.go index bd6ea69514..15b4e6719b 100644 --- a/internal/elasticsearch/ingest/pipeline.go +++ b/internal/elasticsearch/ingest/pipeline.go @@ -182,7 +182,7 @@ func SimulatePipeline(ctx context.Context, api *elasticsearch.API, pipelineName continue case "error": failed = true - errs = append(errs, fmt.Errorf("error in pricessor %s: %w", result.Processor, result.Error)) + errs = append(errs, fmt.Errorf("error in processor %s: %w", result.Processor, result.Error)) case "failed": failed = true errs = append(errs, fmt.Errorf("%q processor failed", result.Processor))