diff --git a/content/connectors/inbound/http.md b/content/connectors/inbound/http.md index 8068ac982..30b849b23 100644 --- a/content/connectors/inbound/http.md +++ b/content/connectors/inbound/http.md @@ -1,5 +1,5 @@ --- -menu: HTTP +title: HTTP --- {{% inline-embed file="embeds/connectors/inbound/http.md" %}} \ No newline at end of file diff --git a/content/connectors/inbound/kafka.md b/content/connectors/inbound/kafka.md index fca54c8be..ae8bc1eb1 100644 --- a/content/connectors/inbound/kafka.md +++ b/content/connectors/inbound/kafka.md @@ -1,5 +1,5 @@ --- -menu: Kafka +title: Kafka --- {{% inline-embed file="embeds/connectors/inbound/kafka.md" %}} \ No newline at end of file diff --git a/content/connectors/inbound/mqtt.md b/content/connectors/inbound/mqtt.md index 668e85567..67ad50c5b 100644 --- a/content/connectors/inbound/mqtt.md +++ b/content/connectors/inbound/mqtt.md @@ -1,5 +1,5 @@ --- -menu: MQTT +title: MQTT --- {{% inline-embed file="embeds/connectors/inbound/mqtt.md" %}} \ No newline at end of file diff --git a/content/connectors/outbound/duckdb.md b/content/connectors/outbound/duckdb.md index 5ccdb55d2..1cf2ba5cd 100644 --- a/content/connectors/outbound/duckdb.md +++ b/content/connectors/outbound/duckdb.md @@ -1,5 +1,5 @@ --- -menu: DuckDB +title: DuckDB --- {{% inline-embed file="embeds/connectors/outbound/duckdb.md" %}} \ No newline at end of file diff --git a/content/connectors/outbound/graphite.md b/content/connectors/outbound/graphite.md index 41ba7b10c..c3115b1b9 100644 --- a/content/connectors/outbound/graphite.md +++ b/content/connectors/outbound/graphite.md @@ -1,5 +1,5 @@ --- -menu: Graphite +title: Graphite --- {{% inline-embed file="embeds/connectors/outbound/graphite.md" %}} \ No newline at end of file diff --git a/content/connectors/outbound/sql.md b/content/connectors/outbound/sql.md index dfd499022..c986f0b9a 100644 --- a/content/connectors/outbound/sql.md +++ b/content/connectors/outbound/sql.md @@ -1,5 +1,5 @@ --- -menu: SQL +title: SQL --- {{% inline-embed file="embeds/connectors/outbound/sql.md" %}} \ No newline at end of file diff --git a/embeds/connectors/inbound/http.md b/embeds/connectors/inbound/http.md index 5df5fb07c..783987812 100644 --- a/embeds/connectors/inbound/http.md +++ b/embeds/connectors/inbound/http.md @@ -15,18 +15,19 @@ See [docs](https://www.fluvio.io/connectors/inbound/http/) here. Tutorial for [HTTP to SQL Pipeline](https://www.fluvio.io/docs/tutorials/data-pipeline/). ### Configuration -| Option | default | type | description | -| :------------| :--------------------------| :----- | :----------------------------------------------------------------------------------------- | -| interval | 10s | String | Interval between each HTTP Request. This is in the form of "1s", "10ms", "1m", "1ns", etc. | -| method | GET | String | GET, POST, PUT, HEAD | -| endpoint | - | String | HTTP URL endpoint. Use `ws://` for websocket URLs. | -| headers | - | Array\ | Request header(s) "Key:Value" pairs | -| body | - | String | Request body e.g. in POST | -| user-agent | "fluvio/http-source 0.1.0" | String | Request user-agent | -| output_type | text | String | `text` = UTF-8 String Output, `json` = UTF-8 JSON Serialized String | -| output_parts | body | String | `body` = body only, `full` = all status, header and body parts | -| stream | false | bool | Flag to indicate HTTP streaming mode | -| delimiter | '\n' | String | Delimiter to separate records when producing from an HTTP streaming endpoint | +| Option | default | type | description | +|:-----------------|:---------------------------|:----------------|:-------------------------------------------------------------------------------------------| +| interval | 10s | String | Interval between each HTTP Request. This is in the form of "1s", "10ms", "1m", "1ns", etc. | +| method | GET | String | GET, POST, PUT, HEAD | +| endpoint | - | String | HTTP URL endpoint. Use `ws://` for websocket URLs. | +| headers | - | Array\ | Request header(s) "Key:Value" pairs | +| body | - | String | Request body e.g. in POST | +| user-agent | "fluvio/http-source 0.1.0" | String | Request user-agent | +| output_type | text | String | `text` = UTF-8 String Output, `json` = UTF-8 JSON Serialized String | +| output_parts | body | String | `body` = body only, `full` = all status, header and body parts | +| stream | false | bool | Flag to indicate HTTP streaming mode | +| delimiter | '\n' | String | Delimiter to separate records when producing from an HTTP streaming endpoint | +| websocket_config | {} | Object | WebSocket configuration object. See below. | #### Record Type Output | Matrix | Output | @@ -36,6 +37,12 @@ Tutorial for [HTTP to SQL Pipeline](https://www.fluvio.io/docs/tutorials/data-pi | output_type = json, output_parts = body (default) | Only the "body" in JSON struct | | output_type = json, output_parts = full | HTTP "status", "body" and "header" JSON | +#### WebSocket Configuration +| Option | default | type | description | +|:----------------------|:--------|:----------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------| +| subscription_messages | [] | Array\ | List of messages to send to the server after connection is established. | +| ping_interval_ms | 10000 | int | Interval in milliseconds to send ping messages to the server. | +| subscription_message | - | String | (deprecated) Message to send to the server after connection is established. If provided with subscription_messages, subscription_message will be sent first. | ### Usage Example @@ -45,7 +52,7 @@ This is an example of simple connector config file for polling an endpoint: # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.3.7 + version: 0.4.3 name: cat-facts type: http-source topic: cat-facts @@ -54,7 +61,7 @@ meta: - name: AUTHORIZATION_TOKEN http: endpoint: "https://catfact.ninja/fact" - interval: 10s + interval: 10s headers: - "Authorization: token ${{ secrets.AUTHORIZATION_TOKEN }}" - "Cache-Control: no-cache" @@ -75,7 +82,7 @@ Fluvio HTTP Source Connector supports Secrets in the `endpoint` and in the `head # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.3.7 + version: 0.4.3 name: cat-facts type: http-source topic: cat-facts @@ -84,10 +91,10 @@ meta: - name: MY_SECRET_URL - name: MY_AUTHORIZATION_HEADER http: - endpoint: + endpoint: secret: name: MY_SECRET_URL - headers: + headers: - "Authorization: ${{ secrets.MY_AUTHORIZATION_HEADER }} interval: 10s ``` @@ -101,7 +108,7 @@ The previous example can be extended to add extra transformations to outgoing re # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.3.7 + version: 0.4.3 name: cat-facts type: http-source topic: cat-facts @@ -141,7 +148,7 @@ Provide the `stream` configuration option to enable streaming mode with `delimit # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.3.7 + version: 0.4.3 name: wiki-updates type: http-source topic: wiki-updates @@ -159,7 +166,7 @@ Connect to a websocket endpoint using a `ws://` URL. When reading text messages, # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.3.7 + version: 0.4.3 name: websocket-connector type: http-source topic: websocket-updates diff --git a/embeds/connectors/inbound/kafka.md b/embeds/connectors/inbound/kafka.md index f989ebaf1..c4ea1a138 100644 --- a/embeds/connectors/inbound/kafka.md +++ b/embeds/connectors/inbound/kafka.md @@ -17,7 +17,7 @@ Example: ```yaml apiVersion: 0.1.0 meta: - version: 0.2.5 + version: 0.2.8 name: my-kafka-connector type: kafka-source topic: kafka-topic @@ -28,12 +28,9 @@ kafka: ``` ### Usage - To try out Kafka Source connector locally, you can use Fluvio CDK tool: - -%copy% ```bash -$ cdk deploy -p kafka-source start --config crates/kafka-source/sample-config.yaml +cdk deploy -p kafka-source start --config crates/kafka-source/config-example.yaml ``` ## Transformations diff --git a/embeds/connectors/inbound/mqtt.md b/embeds/connectors/inbound/mqtt.md index bf07925d5..384a5b042 100644 --- a/embeds/connectors/inbound/mqtt.md +++ b/embeds/connectors/inbound/mqtt.md @@ -33,7 +33,7 @@ url: #### Record Type Output -JSON Serialized string with fields `mqtt_topic` and `payload` +JSON Serialized string with fields `mqtt_topic` and `payload` #### Payload Output Type @@ -50,7 +50,7 @@ This is an example of connector config file: # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.2.5 + version: 0.2.9 name: my-mqtt-connector type: mqtt-source topic: mqtt-topic @@ -104,7 +104,7 @@ The previous example can be extended to add extra transformations to outgoing re # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.2.5 + version: 0.2.9 name: my-mqtt-connector type: mqtt-source topic: mqtt-topic @@ -121,12 +121,12 @@ transforms: with: spec: - operation: shift - spec: + spec: payload: device: "device" - operation: default spec: - source: "mqtt-connector" + source: "mqtt-connector" ``` The object `device` in the resulting record will be "unwrapped" and the addition field `source` with value `mqtt-connector` will be added. diff --git a/embeds/connectors/outbound/duckdb.md b/embeds/connectors/outbound/duckdb.md index 8bc504bba..58c2c799b 100644 --- a/embeds/connectors/outbound/duckdb.md +++ b/embeds/connectors/outbound/duckdb.md @@ -41,7 +41,7 @@ To connect to Motherduck server, use prefix: `md`. For example, `md://motherduc ```yaml apiVersion: 0.1.0 meta: - version: 0.1.0 + version: 0.1.3 name: duckdb-connector type: duckdb-sink topic: fluvio-topic-source @@ -99,7 +99,7 @@ Connector configuration file: apiVersion: 0.1.0 meta: version: 0.1.0 - name: duckdb-connector + name: json-sql-connector type: duckdb-sink topic: sql-topic create-topic: true @@ -127,14 +127,16 @@ transforms: ``` You can use Fluvio `cdk` tool to deploy the connector: - +```bash +fluvio install cdk +``` +and then: ```bash cdk deploy start --config connector-config.yaml ``` - To delete the connector run: ```bash -cdk deploy shutdown --name duckdb-connector +cdk deploy shutdown --config connector-config.yaml ``` After you run the connector you will see records in your database table. diff --git a/embeds/connectors/outbound/graphite.md b/embeds/connectors/outbound/graphite.md index 0a977bd1c..e0b63b1d6 100644 --- a/embeds/connectors/outbound/graphite.md +++ b/embeds/connectors/outbound/graphite.md @@ -15,7 +15,7 @@ server address is specified on the `addr` field. # sample-config.yaml apiVersion: 0.1.0 meta: - version: 0.1.2 + version: 0.2.0 name: my-graphite-connector-test-connector type: graphite-sink topic: test-graphite-connector-topic @@ -85,22 +85,15 @@ With the Graphite instance set, we can move into [Setting Up Fluvio with Graphit In this section we are going use the CDK to spin up the Graphite Sink Connector to send metrics from Fluvio Records to the Graphite instance. -Make sure the Connector Development Kit is setup in your system by issuing the following command in your terminal. - -%copy% -```bash -cdk -``` > If you dont have the Fluvio CLI installed already visit the [CLI][2] section Create a YAML file with the name `weather-monitor-config.yaml` and specify connector settings: -%copy% ```yaml apiVersion: 0.1.0 meta: - version: 0.1.2 + version: 0.2.0 name: weather-monitor-sandiego type: graphite-sink topic: weather-ca-sandiego @@ -112,16 +105,15 @@ graphite: Deploy the Connector using the CDK - ```bash cdk deploy start --config weather-monitor-config.yaml ``` -> Make sure your Graphite instance is running on `localhost:2003`, use the `cdk log` subcommand to read logs from the connector instance. +> Make sure your Graphite instance is running on `localhost:2003`, use the +> `cdk log` subcommand to read logs from the connector instance. Then produce records as usual: -%copy% ```bash echo 120 | fluvio produce weather-ca-sandiego ``` @@ -131,12 +123,10 @@ echo 120 | fluvio produce weather-ca-sandiego Use Graphite's REST API to check on the stored data. -%copy% ```bash curl -o ./data.json http://localhost:12345/render\?target\=weather.temperature.ca.sandiego\&format\=json\&noNullPoints ``` - [1]: https://infinyon.cloud/login [2]: https://www.fluvio.io/cli/ [3]: https://github.com/infinyon/graphite-sink-connector/blob/main/CONTRIBUTING.md diff --git a/embeds/connectors/outbound/http.md b/embeds/connectors/outbound/http.md index d7d89ec8a..46e7bd45b 100644 --- a/embeds/connectors/outbound/http.md +++ b/embeds/connectors/outbound/http.md @@ -16,7 +16,7 @@ HTTP Sink is configured using a YAML file: # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.2.9 + version: 0.2.11 name: my-http-sink type: http-sink topic: http-sink-topic @@ -34,7 +34,7 @@ http: | method | POST | String | POST, PUT | | endpoint | - | String | HTTP URL endpoint | | headers | - | Array\ | Request header(s) "Key:Value" pairs | -| user-agent | `fluvio/http-sink 0.2.9` | String | Request user-agent | +| user-agent | `fluvio/http-sink 0.2.11` | String | Request user-agent | | http_request_timeout | 1s | String | HTTP Request Timeout | | http_connect_timeout | 15s | String | HTTP Connect Timeout | @@ -57,7 +57,7 @@ HTTP request to `http://httpbin.org/post`. # config.yaml apiVersion: 0.1.0 meta: - version: 0.2.9 + version: 0.2.11 name: httpbin type: http-sink topic: httpbin-send-post @@ -133,7 +133,7 @@ The previous example can be extended to add extra transformations to outgoing re # config-example.yaml apiVersion: 0.1.0 meta: - version: 0.2.9 + version: 0.2.11 name: my-http-sink type: http-sink topic: http-sink-topic @@ -165,7 +165,7 @@ See the example below: ```yaml apiVersion: 0.2.0 meta: - version: 0.2.9 + version: 0.2.11 name: my-http-sink type: http-sink topic: diff --git a/embeds/connectors/outbound/kafka.md b/embeds/connectors/outbound/kafka.md index f481f9d6c..98c1ac32c 100644 --- a/embeds/connectors/outbound/kafka.md +++ b/embeds/connectors/outbound/kafka.md @@ -29,7 +29,7 @@ Example without security: ```yaml apiVersion: 0.1.0 meta: - version: 0.2.7 + version: 0.2.10 name: my-kafka-connector type: kafka-sink topic: kafka-topic @@ -44,7 +44,7 @@ Example with security enabled: ```yaml apiVersion: 0.1.0 meta: - version: 0.2.7 + version: 0.2.10 name: my-kafka-connector type: kafka-sink topic: kafka-topic @@ -68,9 +68,38 @@ kafka: ### Usage To try out Kafka Sink connector locally, you can use Fluvio CDK tool: +```bash +cdk deploy -p kafka-sink start --config crates/kafka-sink/config-example.yaml +``` + +### Offset Management +Fluvio Consumer Offset feature allows for a connector to store the offset in the Fluvio cluster and use it on restart. +To activate it, you need to provide the `consumer` name and set the `strategy: auto`. +See the example below: +```yaml +apiVersion: 0.2.0 +meta: + version: 0.2.10 + name: my-kafka-connector + type: kafka-sink + topic: + meta: + name: kafka-sink-topic + consumer: + id: my-kafka-sink + offset: + strategy: auto +kafka: + url: "localhost:9092" + topic: fluvio-topic + create-topic: true +``` +After the connector processed any records, you can check the last stored offset value via: ```bash -cdk deploy -p kafka-sink start --config crates/kafka-sink/sample-config.yaml +$ fluvio consumer list + CONSUMER TOPIC PARTITION OFFSET LAST SEEN + my-kafka-sink kafka-sink-topic 0 0 3s ``` ### Testing with security diff --git a/embeds/connectors/outbound/sql.md b/embeds/connectors/outbound/sql.md index 830b3d8de..90b4860fb 100644 --- a/embeds/connectors/outbound/sql.md +++ b/embeds/connectors/outbound/sql.md @@ -40,7 +40,7 @@ in the config. If a SmartModule requires configuration, it is passed via `with` ```yaml apiVersion: 0.1.0 meta: - version: 0.3.3 + version: 0.4.3 name: my-sql-connector type: sql-sink topic: sql-topic @@ -62,7 +62,7 @@ The connector can use secrets in order to hide sensitive information. ```yaml apiVersion: 0.1.0 meta: - version: 0.3.3 + version: 0.4.3 name: my-sql-connector type: sql-sink topic: sql-topic @@ -71,6 +71,37 @@ meta: sql: url: ${{ secrets.DATABASE_URL }} ``` + +### Offset Management +Fluvio Consumer Offset feature allows for a connector to store the offset in the Fluvio cluster and use it on restart. +To activate it, you need to provide the `consumer` name and set the `strategy: auto`. +See the example below: +```yaml +apiVersion: 0.2.0 +meta: + version: 0.4.3 + name: my-sql-connector + type: sql-sink + topic: + meta: + name: sql-sink-topic + consumer: + id: my-sql-sink + offset: + strategy: auto + secrets: + - name: DATABASE_URL +sql: + url: ${{ secrets.DATABASE_URL }} +``` + +After the connector processed any records, you can check the last stored offset value via: +```bash +$ fluvio consumer list + CONSUMER TOPIC PARTITION OFFSET LAST SEEN + my-http-sink http-sink-topic 0 0 3s +``` + ## Insert Usage Example Let's look at the example of the connector with one transformation named [infinyon/json-sql](https://github.com/infinyon/fluvio-connectors/blob/main/smartmodules/json-sql/README.md). The transformation takes records in JSON format and creates SQL insert operation to `topic_message` table. The value from `device.device_id` @@ -95,7 +126,7 @@ Connector configuration file: # connector-config.yaml apiVersion: 0.1.0 meta: - version: 0.3.3 + version: 0.4.3 name: json-sql-connector type: sql-sink topic: sql-topic @@ -133,8 +164,8 @@ To delete the connector run: ```bash cdk deploy shutdown --name json-sql-connector - ``` + After you run the connector you will see records in your database table. See more in our [Build MQTT to SQL Pipeline](https://www.fluvio.io/docs/tutorials/mqtt-to-sql/) and [Build HTTP to SQL Pipeline](https://www.fluvio.io/docs/tutorials/data-pipeline/) tutorials. @@ -155,7 +186,7 @@ Connector configuration file for upsert (assuming `device_id` is a unique column # connector-config.yaml apiVersion: 0.1.0 meta: - version: 0.3.3 + version: 0.4.3 name: json-sql-connector type: sql-sink topic: sql-topic