Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(elasticsearch) add new elasticsearch_v8 output #3160

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

ooesili
Copy link
Contributor

@ooesili ooesili commented Jan 31, 2025

This new output uses the latest official Elasticsearch Go library.

  • Although the docs suggested it, I decided not to use esutil.NewBulkIndexer because it has a it's own internal concurrency and flushing logic that we don't really need since the benthos engine provides that for us.
  • I didn't support create action because it is not idempotent which I don't think would play well with the assumptions that Connect makes of it's plugins.

This new output uses the latest official Elasticsearch go library.
@ooesili ooesili added enhancement outputs Any tasks or issues relating specifically to outputs go Pull requests that update Go code labels Jan 31, 2025
@ooesili ooesili self-assigned this Jan 31, 2025
@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Collaborator

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will review the tests a bit later

CHANGELOG.md Show resolved Hide resolved
Comment on lines +63 to +69
if os.Getenv("ELASTICSEARCH_DEBUG") != "" {
conf.clientOpts.Logger = &elastictransport.CurlLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wrap this logger with our service logger at trace level instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not an easy way to do that, elastictransport.CurlLogger is a decorator around a custom roundtripper type interface. I could write my own struct for that but it doesn't really seem worth it for a debugger that I intended to really only be used by us during development.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the env var more verbose then? REDPANDA_CONNECT_ELASTICSEARCH_DEBUG=1

internal/impl/elasticsearchv8/output.go Outdated Show resolved Hide resolved
internal/impl/elasticsearchv8/output.go Show resolved Hide resolved
internal/impl/elasticsearchv8/output.go Outdated Show resolved Hide resolved
Copy link
Collaborator

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! LGTM, just a couple smaller things and then we're good to go.

Comment on lines +63 to +69
if os.Getenv("ELASTICSEARCH_DEBUG") != "" {
conf.clientOpts.Logger = &elastictransport.CurlLogger{
Output: os.Stdout,
EnableRequestBody: true,
EnableResponseBody: true,
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the env var more verbose then? REDPANDA_CONNECT_ELASTICSEARCH_DEBUG=1

@@ -35,6 +35,7 @@ import (
_ "github.com/redpanda-data/connect/v4/public/components/dgraph"
_ "github.com/redpanda-data/connect/v4/public/components/discord"
_ "github.com/redpanda-data/connect/v4/public/components/elasticsearch"
_ "github.com/redpanda-data/connect/v4/public/components/elasticsearchv8"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: up to you, but I would just put this in the same elasticsearch package. You could also use a v8 subdirectory if you do want to separate the packages.

@@ -70,6 +70,7 @@ drop_on ,output ,drop_on ,0.0.0 ,certif
dynamic ,input ,dynamic ,0.0.0 ,community ,n ,n ,n
dynamic ,output ,dynamic ,0.0.0 ,community ,n ,n ,n
elasticsearch ,output ,elasticsearch ,0.0.0 ,community ,n ,n ,n
elasticsearch_v8 ,output ,elasticsearch_v8 ,4.46.0 ,certified ,n ,y ,y
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
elasticsearch_v8 ,output ,elasticsearch_v8 ,4.46.0 ,certified ,n ,y ,y
elasticsearch_v8 ,output ,elasticsearch_v8 ,4.47.0 ,certified ,n ,y ,y

internal/impl/elasticsearchv8/output.go Show resolved Hide resolved
Comment on lines +350 to +353
action, err := batch.TryInterpolatedString(i, e.conf.actionStr)
if err != nil {
return fmt.Errorf("interpolating action: %w", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be using batch.InterpolationExecutor and reusing the executors across the batch to prevent N copies of the batch (TryInterpolatedString makes a shallow copy of the batch every time it's called).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement go Pull requests that update Go code outputs Any tasks or issues relating specifically to outputs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants