Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docs to use the new metadata() bloblang function #2588

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/bloblang/query/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func catchMethod(fn Function, args *ParsedParams) (Function, error) {
var _ = registerMethod(
NewMethodSpec(
"from",
"Modifies a target query such that certain functions are executed from the perspective of another message in the batch. This allows you to mutate events based on the contents of other messages. Functions that support this behaviour are `content`, `json` and `meta`.",
"Modifies a target query such that certain functions are executed from the perspective of another message in the batch. This allows you to mutate events based on the contents of other messages. Functions that support this behaviour are `content()`, `json()`, `metadata()` and `meta()`. It cannot be used with the `@` operator.",
NewExampleSpec("For example, the following map extracts the contents of the JSON field `foo` specifically from message index `1` of a batch, effectively overriding the field `foo` for all messages of a batch to that of message 1:",
`root = this
root.foo = json("foo").from(1)`,
Expand Down Expand Up @@ -200,8 +200,8 @@ func (f *fromMethod) QueryTargets(ctx TargetsContext) (TargetsContext, []TargetP
var _ = registerMethod(
NewMethodSpec(
"from_all",
"Modifies a target query such that certain functions are executed from the perspective of each message in the batch, and returns the set of results as an array. Functions that support this behaviour are `content`, `json` and `meta`.",
NewExampleSpec("",
"Modifies a target query such that certain functions are executed from the perspective of each message in the batch, and returns the set of results as an array. Functions that support this behaviour are `content()`, `json()`, `metadata()` and `meta()`. It cannot be used with the `@` operator.",
NewExampleSpec("Sum all the message `foo`s from the current batch:",
`root = this
root.foo_summed = json("foo").from_all().sum()`,
),
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/amqp09/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ The fields 'key', 'exchange' and 'type' can be dynamically set using function in
Description("Set the priority of each message with a dynamic interpolated expression.").
Advanced().
Example("0").
Example(`${! meta("amqp_priority") }`).
Example(`${! metadata("amqp_priority") }`).
Example(`${! json("doc.priority") }`).
Default(""),
service.NewOutputMaxInFlightField(),
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/aws/input_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ This input adds the following metadata fields to each message:
- All user defined metadata
`+"```"+`

You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries). Note that user defined metadata is case insensitive within AWS, and it is likely that the keys will be received in a capitalized form, if you wish to make them consistent you can map all metadata keys to lower or uppercase using a Bloblang mapping such as `+"`meta = meta().map_each_key(key -> key.lowercase())`"+`.`).
You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries). Note that user defined metadata is case insensitive within AWS, and it is likely that the keys will be received in a capitalized form, if you wish to make them consistent you can map all metadata keys to lower or uppercase using a Bloblang mapping such as `+"`meta = metadata().map_each_key(key -> key.string().lowercase())`"+`.`).
Fields(
service.NewStringField(s3iFieldBucket).
Description("The bucket to consume from. If the field `sqs.url` is specified this field is optional.").
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/aws/output_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ The field `+"`string_columns`"+` is a map of column names to string values, wher
string_columns:
id: ${!json("id")}
title: ${!json("body.title")}
topic: ${!meta("kafka_topic")}
topic: ${!metadata("kafka_topic")}
full_content: ${!content()}
`+"```"+`

Expand Down Expand Up @@ -120,7 +120,7 @@ This output benefits from sending messages as a batch for improved performance.
Example(map[string]any{
"id": "${!json(\"id\")}",
"title": "${!json(\"body.title\")}",
"topic": "${!meta(\"kafka_topic\")}",
"topic": "${!metadata(\"kafka_topic\")}",
"full_content": "${!content()}",
}),
service.NewStringMapField(ddboFieldJSONMapColumns).
Expand Down
6 changes: 3 additions & 3 deletions internal/impl/aws/output_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ output:
path: ${!count("files")}-${!timestamp_unix_nano()}.tar.gz
tags:
Key1: Value1
Timestamp: ${!meta("Timestamp")}
Timestamp: ${!metadata("Timestamp")}
`+"```"+`

### Credentials
Expand Down Expand Up @@ -202,14 +202,14 @@ output:
Description("The path of each message to upload.").
Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`).
Example(`${!count("files")}-${!timestamp_unix_nano()}.txt`).
Example(`${!meta("kafka_key")}.json`).
Example(`${!metadata("kafka_key")}.json`).
Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`),
service.NewInterpolatedStringMapField(s3oFieldTags).
Description("Key/value pairs to store with the object as tags.").
Default(map[string]any{}).
Example(map[string]any{
"Key1": "Value1",
"Timestamp": `${!meta("Timestamp")}`,
"Timestamp": `${!metadata("Timestamp")}`,
}),
service.NewInterpolatedStringField(s3oFieldContentType).
Description("The content type to set for each object.").
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/aws/processor_dynamodb_partiql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pipeline:
args_mapping: |
root = [
{ "S": this.foo },
{ "S": meta("kafka_topic") },
{ "S": metadata("kafka_topic") },
{ "S": this.document.content },
]
`,
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/aws/processor_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pipeline:
- aws_lambda:
function: foo
result_map: |
root = if meta().exists("lambda_function_error") {
root = if metadata().exists("lambda_function_error") {
throw("Invocation failed due to %v: %v".format(this.errorType, this.errorMessage))
} else {
this
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/output_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+
service.NewInterpolatedStringField(bsoFieldPath).
Description("The path of each message to upload.").
Example(`${!count("files")}-${!timestamp_unix_nano()}.json`).
Example(`${!meta("kafka_key")}.json`).
Example(`${!metadata("kafka_key")}.json`).
Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`).
Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`),
service.NewInterpolatedStringEnumField(bsoFieldBlobType, "BLOCK", "APPEND").
Expand Down
6 changes: 3 additions & 3 deletions internal/impl/azure/output_table_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ properties:
Fields(
service.NewInterpolatedStringField(tsoFieldTableName).
Description("The table to store messages into.").
Example(`${! meta("kafka_topic") }`).Example(`${! json("table") }`),
Example(`${! metadata("kafka_topic") }`).Example(`${! json("table") }`),
service.NewInterpolatedStringField(tsoFieldPartitionKey).
Description("The partition key.").
Example(`${! json("date") }`).
Expand All @@ -124,12 +124,12 @@ properties:
Default(map[string]any{}),
service.NewInterpolatedStringEnumField(tsoFieldInsertType, `INSERT`, `INSERT_MERGE`, `INSERT_REPLACE`).
Description("Type of insert operation. Valid options are `INSERT`, `INSERT_MERGE` and `INSERT_REPLACE`").
Example(`${! json("operation") }`).Example(`${! meta("operation") }`).Example(`INSERT`).
Example(`${! json("operation") }`).Example(`${! metadata("operation") }`).Example(`INSERT`).
Advanced().Deprecated().
Default(""),
service.NewInterpolatedStringEnumField(tsoFieldTransactionType, `INSERT`, `INSERT_MERGE`, `INSERT_REPLACE`, `UPDATE_MERGE`, `UPDATE_REPLACE`, `DELETE`).
Description("Type of transaction operation.").
Example(`${! json("operation") }`).Example(`${! meta("operation") }`).Example(`INSERT`).
Example(`${! json("operation") }`).Example(`${! metadata("operation") }`).Example(`INSERT`).
Advanced().
Default("INSERT"),
service.NewOutputMaxInFlightField().
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/processor_cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ input:
database: testdb
container: blobfish
partition_keys_map: root = json("habitat")
item_id: ${! meta("id") }
item_id: ${! metadata("id") }
operation: Patch
patch_operations:
# Add a new /diet field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ We will be considering alternative approaches in future so please [get in touch]
Field(service.NewURLField("url").Description("The base URL of the schema registry service.")).
Field(service.NewInterpolatedStringField("subject").Description("The schema subject to derive schemas from.").
Example("foo").
Example(`${! meta("kafka_topic") }`)).
Example(`${! metadata("kafka_topic") }`)).
Field(service.NewStringField("refresh_period").
Description("The period after which a schema is refreshed for each subject, this is done by polling the schema registry service.").
Default("10m").
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/gcp/output_cloud_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ output:
service.NewInterpolatedStringField(csoFieldPath).
Description("The path of each message to upload.").
Example(`${!count("files")}-${!timestamp_unix_nano()}.txt`).
Example(`${!meta("kafka_key")}.json`).
Example(`${!metadata("kafka_key")}.json`).
Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`).
Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`),
service.NewInterpolatedStringField(csoFieldContentType).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/io/input_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ You can access these metadata fields using [function interpolation](/docs/config
service.NewObjectField(hsiFieldResponse,
service.NewInterpolatedStringField(hsiFieldResponseStatus).
Description("Specify the status code to return with synchronous responses. This is a string value, which allows you to customize it based on resulting payloads and their metadata.").
Examples(`${! json("status") }`, `${! meta("status") }`).
Examples(`${! json("status") }`, `${! metadata("status") }`).
Default("200"),
service.NewInterpolatedStringMapField(hsiFieldResponseHeaders).
Description("Specify headers to return with synchronous responses.").
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/output_kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ This output often out-performs the traditional ` + "`kafka`" + ` output as well
Advanced().Optional()).
Field(service.NewInterpolatedStringField("partition").
Description("An optional explicit partition to set for each message. This field is only relevant when the `partitioner` is set to `manual`. The provided interpolation string must be a valid integer.").
Example(`${! meta("partition") }`).
Example(`${! metadata("partition") }`).
Optional()).
Field(service.NewStringField("client_id").
Description("An identifier for the client connection.").
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/nats/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func natsOutputConfig() *service.ConfigSpec {
Default(map[string]any{}).
Example(map[string]any{
"Content-Type": "application/json",
"Timestamp": `${!meta("Timestamp")}`,
"Timestamp": `${!metadata("Timestamp")}`,
})).
Field(service.NewMetadataFilterField("metadata").
Description("Determine which (if any) metadata values should be added to messages as headers.").
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/nats/output_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func natsJetStreamOutputConfig() *service.ConfigSpec {
Field(service.NewInterpolatedStringField("subject").
Description("A subject to write to.").
Example("foo.bar.baz").
Example(`${! meta("kafka_topic") }`).
Example(`${! metadata("kafka_topic") }`).
Example(`foo.${! json("meta.type") }`)).
Field(service.NewInterpolatedStringMapField("headers").
Description("Explicit message headers to add to messages.").
Default(map[string]any{}).
Example(map[string]any{
"Content-Type": "application/json",
"Timestamp": `${!meta("Timestamp")}`,
"Timestamp": `${!metadata("Timestamp")}`,
}).Version("4.1.0")).
Field(service.NewMetadataFilterField("metadata").
Description("Determine which (if any) metadata values should be added to messages as headers.").
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/nats/processor_request_reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ You can access these metadata fields using [function interpolation](/docs/config
Field(service.NewInterpolatedStringField("subject").
Description("A subject to write to.").
Example("foo.bar.baz").
Example(`${! meta("kafka_topic") }`).
Example(`${! metadata("kafka_topic") }`).
Example(`foo.${! json("meta.type") }`)).
Field(service.NewStringField("inbox_prefix").
Description("Set an explicit inbox prefix for the response subject").
Expand All @@ -50,7 +50,7 @@ You can access these metadata fields using [function interpolation](/docs/config
Default(map[string]any{}).
Example(map[string]any{
"Content-Type": "application/json",
"Timestamp": `${!meta("Timestamp")}`,
"Timestamp": `${!metadata("Timestamp")}`,
})).
Field(service.NewMetadataFilterField("metadata").
Description("Determine which (if any) metadata values should be added to messages as headers.").
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/parquet/processor_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ input:
output:
file:
codec: lines
path: './foos/${! meta("s3_key") }.jsonl'
path: './foos/${! metadata("s3_key") }.jsonl'
`)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/impl/pure/buffer_system_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ A [Bloblang mapping](/docs/guides/bloblang/about) applied to each message during
The timestamp value assigned to `+"`root`"+` must either be a numerical unix time in seconds (with up to nanosecond precision via decimals), or a string in ISO 8601 format. If the mapping fails or provides an invalid result the message will be dropped (with logging to describe the problem).
`).
Default("root = now()").
Example("root = this.created_at").Example(`root = meta("kafka_timestamp_unix").number()`)).
Example("root = this.created_at").Example(`root = metadata("kafka_timestamp_unix")`)).
Field(service.NewStringField("size").
Description("A duration string describing the size of each window. By default windows are aligned to the zeroth minute and zeroth hour on the UTC clock, meaning windows of 1 hour duration will match the turn of each hour in the day, this can be adjusted with the `offset` field.").
Example("30s").Example("10m")).
Expand Down Expand Up @@ -110,7 +110,7 @@ pipeline:
root = if batch_index() == 0 {
{
"traffic_light": this.traffic_light,
"created_at": meta("window_end_timestamp"),
"created_at": metadata("window_end_timestamp"),
"total_cars": json("registration_plate").from_all().unique().length(),
"passengers": json("passengers").from_all().sum(),
}
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/pure/output_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ In order to create a unique `+"`key`"+` value per item you should use function i
Examples(
`${!count("items")}-${!timestamp_unix_nano()}`,
`${!json("doc.id")}`,
`${!meta("kafka_key")}`,
`${!metadata("kafka_key")}`,
).
Default(`${!count("items")}-${!timestamp_unix_nano()}`),
service.NewInterpolatedStringField(coFieldTTL).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/pure/processor_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ The functionality of this processor depends on being applied across messages tha
Field(service.NewInterpolatedStringField("path").
Description("The path to set for each message in the archive (when applicable).").
Example("${!count(\"files\")}-${!timestamp_unix_nano()}.txt").
Example("${!meta(\"kafka_key\")}-${!json(\"id\")}.json").
Example("${!metadata(\"kafka_key\")}-${!json(\"id\")}.json").
Default("")).
Example("Tar Archive", `
If we had JSON messages in a batch each of the form:
Expand Down
8 changes: 4 additions & 4 deletions internal/impl/pure/processor_cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func newCachedProcessorConfigSpec() *service.ConfigSpec {
Description("A key to be resolved for each message, if the key already exists in the cache then the cached result is used, otherwise the processors are applied and the result is cached under this key. The key could be static and therefore apply generally to all messages or it could be an interpolated expression that is potentially unique for each message.").
Example("my_foo_result").
Example(`${! this.document.id }`).
Example(`${! meta("kafka_key") }`).
Example(`${! meta("kafka_topic") }`)).
Example(`${! metadata("kafka_key") }`).
Example(`${! metadata("kafka_topic") }`)).
Field(service.NewInterpolatedStringField("ttl").
Description("An optional expiry period to set for each cache entry. Some caches only have a general TTL and will therefore ignore this setting.").
Optional()).
Expand All @@ -44,12 +44,12 @@ pipeline:
- branch:
processors:
- cached:
key: '${! meta("kafka_topic") }-${! meta("kafka_partition") }'
key: '${! metadata("kafka_topic") }-${! metadata("kafka_partition") }'
cache: foo_cache
processors:
- mapping: 'root = ""'
- http:
url: http://example.com/enrichment/${! meta("kafka_topic") }/${! meta("kafka_partition") }
url: http://example.com/enrichment/${! metadata("kafka_topic") }/${! metadata("kafka_partition") }
verb: GET
result_map: 'root.enrichment = this'

Expand Down
4 changes: 2 additions & 2 deletions internal/impl/pure/processor_dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pipeline:
processors:
- dedupe:
cache: keycache
key: ${! meta("kafka_key") }
key: ${! metadata("kafka_key") }

cache_resources:
- label: keycache
Expand All @@ -62,7 +62,7 @@ cache_resources:
Description("The [`cache` resource](/docs/components/caches/about) to target with this processor."),
service.NewInterpolatedStringField(dedupFieldKey).
Description("An interpolated string yielding the key to deduplicate by for each message.").
Examples(`${! meta("kafka_key") }`, `${! content().hash("xxhash64") }`),
Examples(`${! metadata("kafka_key") }`, `${! content().hash("xxhash64") }`),
service.NewBoolField(dedupFieldDropOnCacheErr).
Description("Whether messages should be dropped when the cache returns a general error such as a network issue.").
Default(true),
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/pure/processor_group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pipeline:
output:
switch:
cases:
- check: meta("grouping") == "foo"
- check: metadata("grouping") == "foo"
output:
gcp_pubsub:
project: foo_prod
Expand Down
6 changes: 3 additions & 3 deletions internal/impl/pure/processor_group_by_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ If we were consuming Kafka messages and needed to group them by their key, archi
pipeline:
processors:
- group_by_value:
value: ${! meta("kafka_key") }
value: ${! metadata("kafka_key") }
- archive:
format: tar
- compress:
algorithm: gzip
output:
aws_s3:
bucket: TODO
path: docs/${! meta("kafka_key") }/${! count("files") }-${! timestamp_unix_nano() }.tar.gz
path: docs/${! metadata("kafka_key") }/${! count("files") }-${! timestamp_unix_nano() }.tar.gz
`+"```"+``).
Field(service.NewInterpolatedStringField(gbvpFieldValue).
Description("The interpolated string to group based on.").
Examples("${! meta(\"kafka_key\") }", "${! json(\"foo.bar\") }-${! meta(\"baz\") }")),
Examples("${! metadata(\"kafka_key\") }", "${! json(\"foo.bar\") }-${! memetadataa(\"baz\") }")),
func(conf *service.ParsedConfig, res *service.Resources) (service.BatchProcessor, error) {
valueStr, err := conf.FieldString(gbvpFieldValue)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/pure/processor_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pipeline:
root.reason = "cus I wana"
root.id = this.id
root.age = this.user.age
root.kafka_topic = meta("kafka_topic")
root.kafka_topic = metadata("kafka_topic")
`+"```"+`
`).
Fields(
Expand All @@ -59,7 +59,7 @@ pipeline:
`root.reason = "cus I wana"
root.id = this.id
root.age = this.user.age.number()
root.kafka_topic = meta("kafka_topic")`,
root.kafka_topic = metadata("kafka_topic")`,
).
Optional(),
service.NewInterpolatedStringField(logPFieldMessage).
Expand Down
Loading
Loading