Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

Commit

Permalink
feat(mqtt sink): expose retain config flag (vectordotdev#21291)
Browse files Browse the repository at this point in the history
  • Loading branch information
miquelruiz authored Sep 20, 2024
1 parent dcdb72d commit f99e052
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 1 deletion.
3 changes: 3 additions & 0 deletions changelog.d/21278-expose_mqtt_retain.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Expose a `retain` boolean config flag in the MQTT sink to tell the server to retain the messages.

authors: miquelruiz
9 changes: 9 additions & 0 deletions src/sinks/mqtt/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct MqttSinkConfig {
/// MQTT publish topic (templates allowed)
pub topic: Template,

/// Whether the messages should be retained by the server
#[serde(default = "default_retain")]
pub retain: bool,

#[configurable(derived)]
pub encoding: EncodingConfig,

Expand Down Expand Up @@ -112,6 +116,10 @@ const fn default_qos() -> MqttQoS {
MqttQoS::AtLeastOnce
}

const fn default_retain() -> bool {
false
}

impl Default for MqttSinkConfig {
fn default() -> Self {
Self {
Expand All @@ -124,6 +132,7 @@ impl Default for MqttSinkConfig {
clean_session: default_clean_session(),
tls: None,
topic: Template::try_from("vector").expect("Cannot parse as a template"),
retain: default_retain(),
encoding: JsonSerializerConfig::default().into(),
acknowledgements: AcknowledgementsConfig::default(),
quality_of_service: MqttQoS::default(),
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/mqtt/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl MetaDescriptive for MqttRequest {
pub(super) struct MqttService {
pub(super) client: AsyncClient,
pub(super) quality_of_service: MqttQoS,
pub(super) retain: bool,
}

#[derive(Debug, Snafu)]
Expand All @@ -72,13 +73,14 @@ impl Service<MqttRequest> for MqttService {

fn call(&mut self, req: MqttRequest) -> Self::Future {
let quality_of_service = self.quality_of_service;
let retain = self.retain;
let client = self.client.clone();

Box::pin(async move {
let byte_size = req.body.len();

let res = client
.publish(&req.topic, quality_of_service.into(), false, req.body)
.publish(&req.topic, quality_of_service.into(), retain, req.body)
.await;
match res {
Ok(()) => Ok(MqttResponse {
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/mqtt/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct MqttSink {
encoder: Encoder<()>,
connector: MqttConnector,
quality_of_service: MqttQoS,
retain: bool,
}

pub(super) struct MqttEvent {
Expand All @@ -70,6 +71,7 @@ impl MqttSink {
encoder,
connector,
quality_of_service: config.quality_of_service,
retain: config.retain,
})
}

Expand Down Expand Up @@ -115,6 +117,7 @@ impl MqttSink {
let service = ServiceBuilder::new().service(MqttService {
client,
quality_of_service: self.quality_of_service,
retain: self.retain,
});

let request_builder = MqttRequestBuilder {
Expand Down
5 changes: 5 additions & 0 deletions website/cue/reference/components/sinks/base/mqtt.cue
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ base: components: sinks: mqtt: configuration: {
}
}
}
retain: {
description: "Whether the messages should be retained by the server"
required: false
type: bool: default: false
}
tls: {
description: "Configures the TLS options for incoming/outgoing connections."
required: false
Expand Down

0 comments on commit f99e052

Please sign in to comment.