Skip to content

Pipeline:input:kafka: style #1730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions pipeline/inputs/kafka.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,38 @@
# Kafka

The Kafka input plugin allows subscribing to one or more Kafka topics to collect messages from an [Apache Kafka](https://kafka.apache.org/) service.
This plugin uses the official [librdkafka C library](https://github.com/edenhill/librdkafka) \(built-in dependency\).
The _Kafka_ input plugin subscribes to one or more Kafka topics to collect messages from an [Apache Kafka](https://kafka.apache.org/) service.

## Configuration Parameters
This plugin uses the official [librdkafka C library](https://github.com/edenhill/librdkafka) as a built-in dependency.

## Configuration parameters

| Key | Description | default |
| :--- | :--- | :--- |
| brokers | Single or multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092. | |
| topics | Single entry or list of topics separated by comma \(,\) that Fluent Bit will subscribe to. | |
| format | Serialization format of the messages. If set to "json", the payload will be parsed as json. | none |
| client\_id | Client id passed to librdkafka. | |
| group\_id | Group id passed to librdkafka. | fluent-bit |
| poll\_ms | Kafka brokers polling interval in milliseconds. | 500 |
| Buffer\_Max\_Size | Specify the maximum size of buffer per cycle to poll kafka messages from subscribed topics. To increase throughput, specify larger size. | 4M |
| rdkafka.{property} | `{property}` can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | |
| threaded | Indicates whether to run this input in its own [thread](../../administration/multithreading.md#inputs). | `false` |
| `brokers` | Single or multiple list of Kafka Brokers. For example: `192.168.1.3:9092`, `192.168.1.4:9092`. | _none_ |
| `topics` | Single entry or list of comma-separated topics (`,`) that Fluent Bit will subscribe to. | _none_ |
| `format` | Serialization format of the messages. If set to `json`, the payload will be parsed as JSON. | _none_ |
| `client_id` | Client id passed to librdkafka. | _none_ |
| `group_id` | Group id passed to librdkafka. | `fluent-bit` |
| `poll_ms` | Kafka brokers polling interval in milliseconds. | `500` |
| `Buffer_Max_Size` | Specify the maximum size of buffer per cycle to poll Kafka messages from subscribed topics. To increase throughput, specify larger size. | `4M` |
| `rdkafka.{property}` | `{property}` can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | _none_ |
| `threaded` | Indicates whether to run this input in its own [thread](../../administration/multithreading.md#inputs). | `false` |

## Getting Started
## Get started

In order to subscribe/collect messages from Apache Kafka, you can run the plugin from the command line or through the configuration file:
To subscribe to or collect messages from Apache Kafka, run the plugin from the command line or through the configuration file:

### Command Line
### Command line

The **kafka** plugin can read parameters through the **-p** argument \(property\), e.g:
The Kafka plugin can read parameters through the `-p` argument (property):

```text
$ fluent-bit -i kafka -o stdout -p brokers=192.168.1.3:9092 -p topics=some-topic
```shell
fluent-bit -i kafka -o stdout -p brokers=192.168.1.3:9092 -p topics=some-topic
```

### Configuration File
### Configuration file

In your main configuration file append the following _Input_ & _Output_ sections:
In your main configuration file append the following `Input` and `Output` sections:

```text
[INPUT]
Expand All @@ -44,10 +45,9 @@ In your main configuration file append the following _Input_ & _Output_ sections
Name stdout
```

#### Example of using kafka input/output plugins
#### Example of using Kafka input and output plugins

The Fluent Bit source repository contains a full example of using Fluent Bit to
process Kafka records:
The Fluent Bit source repository contains a full example of using Fluent Bit to process Kafka records:

```text
[INPUT]
Expand All @@ -69,10 +69,10 @@ process Kafka records:
topics fb-sink
```

The above will connect to the broker listening on `kafka-broker:9092` and subscribe to the `fb-source` topic, polling for new messages every 100 milliseconds.
The previous example will connect to the broker listening on `kafka-broker:9092` and subscribe to the `fb-source` topic, polling for new messages every 100 milliseconds.

Since the payload will be in json format, we ask the plugin to automatically parse the payload with `format json`.
Since the payload will be in JSON format, the plugin is configured to parse the payload with `format json`.

Every message received is then processed with `kafka.lua` and sent back to the `fb-sink` topic of the same broker.

The example can be executed locally with `make start` in the `examples/kafka_filter` directory (docker/compose is used).
The example can be executed locally with `make start` in the `examples/kafka_filter` directory (`docker/compose` is used).
1 change: 1 addition & 0 deletions vale-styles/FluentBit/Spelling-exceptions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Kube
Kubernetes
Kusto
labelset
librdkafka
loadgenerator
Logstash
Lua
Expand Down