From dac376a406da17924d85efa164bbcbefd0a72627 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 15:36:21 -0400 Subject: [PATCH 01/15] kick ci From 2a2824c8dcf8432dc3360a782bce927bb88048c2 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 15:39:48 -0400 Subject: [PATCH 02/15] actually kick tests --- tests/integration_tests.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index a41029d..395e84b 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,8 +1,10 @@ -use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; -use rdkafka::consumer::{Consumer, StreamConsumer}; -use rdkafka::producer::{FutureProducer, FutureRecord}; -use rdkafka::Message; -use rdkafka::{client::DefaultClientContext, ClientConfig}; +use rdkafka::{ + admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, + client::DefaultClientContext, + consumer::{Consumer, StreamConsumer}, + producer::{FutureProducer, FutureRecord}, + ClientConfig, Message, +}; use stream_processor::*; use tracing_subscriber::{EnvFilter, FmtSubscriber}; From ca4a90ef697337adee6505e83e7679134c14d7ba Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 15:40:34 -0400 Subject: [PATCH 03/15] listen for changes in tests --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7f761a9..b597758 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,12 +7,14 @@ on: - 'Cargo.lock' - 'Dockerfile' - 'src/**' + - 'tests/**' pull_request: paths: - 'Cargo.toml' - 'Cargo.lock' - 'Dockerfile' - 'src/**' + - 'tests/**' name: Continuous integration From 38956e0f3bd51044dc98668b1665b39f06eec96b Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 21:10:29 -0400 Subject: [PATCH 04/15] fix script name --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b597758..f1e57ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -120,7 +120,7 @@ jobs: with: toolchain: stable - name: Initiate kafka - run: ./scripts/init_kafka + run: ./scripts/init_kafka.sh - name: Install tarpaulin run: cargo install cargo-tarpaulin - name: Generate coverage From 7854da7d3a02afffc7b06419755575eb050c2924 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 21:21:57 -0400 Subject: [PATCH 05/15] make return type of handle_message optional --- src/processor.rs | 35 +++++++++++++++++++---------------- tests/integration_tests.rs | 4 ++-- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/processor.rs b/src/processor.rs index 2fb6e59..0054419 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -13,7 +13,7 @@ pub trait StreamProcessor { type Input: DeserializeOwned; type Output: Serialize + std::fmt::Debug; - fn handle_message(&self, input: Self::Input) -> Result; + fn handle_message(&self, input: Self::Input) -> Result>; fn assign_topic(&self, output: &Self::Output) -> &str; fn assign_key(&self, output: &Self::Output) -> &str; } @@ -37,7 +37,7 @@ impl StreamRunner { let consumer = consumer(&self.settings)?; let producer = producer(&self.settings)?; - let msg_stream = consumer.stream().map(|x| -> Result { + let msg_stream = consumer.stream().map(|x| -> Result> { let owned = x?.detach(); let payload = owned.payload().ok_or(Error::EmptyPayload)?; let deserialized: T::Input = serde_json::from_slice(payload)?; @@ -49,21 +49,24 @@ impl StreamRunner { error!("{:?}", msg); return; } - let msg = msg.expect("Guaranteed to be Ok"); debug!("Message received: {:?}", msg); - let serialized = serde_json::to_string(&msg).expect("Failed to serialize message"); - let topic = self.processor.assign_topic(&msg); - let key = self.processor.assign_key(&msg); - let record = FutureRecord::to(topic).key(key).payload(&serialized); - let res = producer.send(record, Duration::from_secs(0)).await; - match res { - Ok((partition, offset)) => trace!( - "Message successfully delivered to topic: {}, partition {}, offset {}", - topic, - partition, - offset - ), - Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err), + let msg = msg.expect("Guaranteed to be Ok"); + if let Some(msg) = msg { + let serialized = + serde_json::to_string(&msg).expect("Failed to serialize message"); + let topic = self.processor.assign_topic(&msg); + let key = self.processor.assign_key(&msg); + let record = FutureRecord::to(topic).key(key).payload(&serialized); + let res = producer.send(record, Duration::from_secs(0)).await; + match res { + Ok((partition, offset)) => trace!( + "Message successfully delivered to topic: {}, partition {}, offset {}", + topic, + partition, + offset + ), + Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err), + } } }) .await; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 395e84b..90fa787 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -14,8 +14,8 @@ impl StreamProcessor for StreamDoubler { type Input = f64; type Output = f64; - fn handle_message(&self, input: Self::Input) -> Result { - Ok(input * 2.0) + fn handle_message(&self, input: Self::Input) -> Result, Error> { + Ok(Some(input * 2.0)) } fn assign_topic(&self, _output: &Self::Output) -> &str { From ac9ca2bdb69a83c2b88821b1431af057f29733f5 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 21:29:46 -0400 Subject: [PATCH 06/15] return Cow from trait --- src/processor.rs | 9 ++++++--- tests/integration_tests.rs | 9 +++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/processor.rs b/src/processor.rs index 0054419..3a40eff 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -6,6 +6,7 @@ use crate::{ use futures::prelude::*; use rdkafka::{message::Message, producer::FutureRecord}; use serde::{de::DeserializeOwned, Serialize}; +use std::borrow::Cow; use std::time::Duration; use tracing::{debug, error, info, trace}; @@ -14,8 +15,8 @@ pub trait StreamProcessor { type Output: Serialize + std::fmt::Debug; fn handle_message(&self, input: Self::Input) -> Result>; - fn assign_topic(&self, output: &Self::Output) -> &str; - fn assign_key(&self, output: &Self::Output) -> &str; + fn assign_topic(&self, output: &Self::Output) -> Cow; + fn assign_key(&self, output: &Self::Output) -> Cow; } pub struct StreamRunner { @@ -56,7 +57,9 @@ impl StreamRunner { serde_json::to_string(&msg).expect("Failed to serialize message"); let topic = self.processor.assign_topic(&msg); let key = self.processor.assign_key(&msg); - let record = FutureRecord::to(topic).key(key).payload(&serialized); + let record = FutureRecord::to(topic.as_ref()) + .key(key.as_ref()) + .payload(&serialized); let res = producer.send(record, Duration::from_secs(0)).await; match res { Ok((partition, offset)) => trace!( diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 90fa787..a996525 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -5,6 +5,7 @@ use rdkafka::{ producer::{FutureProducer, FutureRecord}, ClientConfig, Message, }; +use std::borrow::Cow; use stream_processor::*; use tracing_subscriber::{EnvFilter, FmtSubscriber}; @@ -18,12 +19,12 @@ impl StreamProcessor for StreamDoubler { Ok(Some(input * 2.0)) } - fn assign_topic(&self, _output: &Self::Output) -> &str { - "test-output" + fn assign_topic(&self, _output: &Self::Output) -> Cow { + "test-output".into() } - fn assign_key(&self, _output: &Self::Output) -> &str { - "key" + fn assign_key(&self, _output: &Self::Output) -> Cow { + "key".into() } } From 1287963c3d816c43d2601500560598d1a462fe02 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 22:05:48 -0400 Subject: [PATCH 07/15] make message handling async --- Cargo.toml | 1 + src/processor.rs | 67 +++++++++++++++++++++++--------------- tests/integration_tests.rs | 3 +- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9edea37..f9f63f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.48" futures = "0.3" rdkafka = { version = "0.26", features = ["ssl-vendored"] } serde = "1.0" diff --git a/src/processor.rs b/src/processor.rs index 3a40eff..69b8181 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -3,18 +3,22 @@ use crate::{ kafka::{consumer, producer}, settings::KafkaSettings, }; -use futures::prelude::*; +use futures::{ + future::{ready, Either}, + prelude::*, +}; use rdkafka::{message::Message, producer::FutureRecord}; use serde::{de::DeserializeOwned, Serialize}; use std::borrow::Cow; use std::time::Duration; use tracing::{debug, error, info, trace}; +#[async_trait::async_trait] pub trait StreamProcessor { type Input: DeserializeOwned; type Output: Serialize + std::fmt::Debug; - fn handle_message(&self, input: Self::Input) -> Result>; + async fn handle_message(&self, input: Self::Input) -> Result>; fn assign_topic(&self, output: &Self::Output) -> Cow; fn assign_key(&self, output: &Self::Output) -> Cow; } @@ -38,12 +42,26 @@ impl StreamRunner { let consumer = consumer(&self.settings)?; let producer = producer(&self.settings)?; - let msg_stream = consumer.stream().map(|x| -> Result> { - let owned = x?.detach(); - let payload = owned.payload().ok_or(Error::EmptyPayload)?; - let deserialized: T::Input = serde_json::from_slice(payload)?; - self.processor.handle_message(deserialized) - }); + let msg_stream = consumer + .stream() + .map(|x| -> Result { + let owned = x?.detach(); + let payload = owned.payload().ok_or(Error::EmptyPayload)?; + serde_json::from_slice(payload).map_err(From::from) + }) + .filter_map(|msg| match msg { + Ok(input) => { + let output = self + .processor + .handle_message(input) + .map(|msg| msg.transpose()); + Either::Left(output) + } + Err(e) => { + error!("Error: {:?}", e); + Either::Right(ready(None)) + } + }); msg_stream .for_each_concurrent(None, |msg| async { if msg.is_err() { @@ -52,24 +70,21 @@ impl StreamRunner { } debug!("Message received: {:?}", msg); let msg = msg.expect("Guaranteed to be Ok"); - if let Some(msg) = msg { - let serialized = - serde_json::to_string(&msg).expect("Failed to serialize message"); - let topic = self.processor.assign_topic(&msg); - let key = self.processor.assign_key(&msg); - let record = FutureRecord::to(topic.as_ref()) - .key(key.as_ref()) - .payload(&serialized); - let res = producer.send(record, Duration::from_secs(0)).await; - match res { - Ok((partition, offset)) => trace!( - "Message successfully delivered to topic: {}, partition {}, offset {}", - topic, - partition, - offset - ), - Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err), - } + let serialized = serde_json::to_string(&msg).expect("Failed to serialize message"); + let topic = self.processor.assign_topic(&msg); + let key = self.processor.assign_key(&msg); + let record = FutureRecord::to(topic.as_ref()) + .key(key.as_ref()) + .payload(&serialized); + let res = producer.send(record, Duration::from_secs(0)).await; + match res { + Ok((partition, offset)) => trace!( + "Message successfully delivered to topic: {}, partition {}, offset {}", + topic, + partition, + offset + ), + Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err), } }) .await; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index a996525..71c018c 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -11,11 +11,12 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber}; struct StreamDoubler; +#[async_trait::async_trait] impl StreamProcessor for StreamDoubler { type Input = f64; type Output = f64; - fn handle_message(&self, input: Self::Input) -> Result, Error> { + async fn handle_message(&self, input: Self::Input) -> Result, Error> { Ok(Some(input * 2.0)) } From ec73bc8dc307278593b6816c4835ba181a631d86 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 22:26:03 -0400 Subject: [PATCH 08/15] allow Vec in output, add some documentation --- src/processor.rs | 59 +++++++++++++++++++++++--------------- tests/integration_tests.rs | 4 +-- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/src/processor.rs b/src/processor.rs index 69b8181..bac030f 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -9,17 +9,27 @@ use futures::{ }; use rdkafka::{message::Message, producer::FutureRecord}; use serde::{de::DeserializeOwned, Serialize}; -use std::borrow::Cow; -use std::time::Duration; +use std::{borrow::Cow, time::Duration}; use tracing::{debug, error, info, trace}; #[async_trait::async_trait] +/// The common trait for all stream processors. pub trait StreamProcessor { + /// The input type to deserialize into from the Kafka input topics type Input: DeserializeOwned; + /// The output type from the stream processor, which will be serialized and sent to Kafka. type Output: Serialize + std::fmt::Debug; - async fn handle_message(&self, input: Self::Input) -> Result>; + /// Convert the input into a `impl Future>>>`. + /// [`futures::Future`] because we might want to `await` in the implementation. + /// [`Result`] because our function might fail. + /// [`Option`] because we might not want to send any output. If this is `None`, we skip sending + /// to Kafka. + /// [`Vec`] because we might want to send _many_ outputs for one input + async fn handle_message(&self, input: Self::Input) -> Result>>; + /// Decide which topic to send the output to. fn assign_topic(&self, output: &Self::Output) -> Cow; + /// Decide which key to assign to the output. fn assign_key(&self, output: &Self::Output) -> Cow; } @@ -63,28 +73,31 @@ impl StreamRunner { } }); msg_stream - .for_each_concurrent(None, |msg| async { - if msg.is_err() { - error!("{:?}", msg); + .for_each_concurrent(None, |msgs| async { + if msgs.is_err() { + error!("{:?}", msgs); return; } - debug!("Message received: {:?}", msg); - let msg = msg.expect("Guaranteed to be Ok"); - let serialized = serde_json::to_string(&msg).expect("Failed to serialize message"); - let topic = self.processor.assign_topic(&msg); - let key = self.processor.assign_key(&msg); - let record = FutureRecord::to(topic.as_ref()) - .key(key.as_ref()) - .payload(&serialized); - let res = producer.send(record, Duration::from_secs(0)).await; - match res { - Ok((partition, offset)) => trace!( - "Message successfully delivered to topic: {}, partition {}, offset {}", - topic, - partition, - offset - ), - Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err), + let msgs = msgs.expect("Guaranteed to be Ok"); + for msg in msgs { + debug!("Message received: {:?}", msg); + let serialized = + serde_json::to_string(&msg).expect("Failed to serialize message"); + let topic = self.processor.assign_topic(&msg); + let key = self.processor.assign_key(&msg); + let record = FutureRecord::to(topic.as_ref()) + .key(key.as_ref()) + .payload(&serialized); + let res = producer.send(record, Duration::from_secs(0)).await; + match res { + Ok((partition, offset)) => trace!( + "Message successfully delivered to topic: {}, partition {}, offset {}", + topic, + partition, + offset + ), + Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err), + } } }) .await; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 71c018c..68ac2c4 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -16,8 +16,8 @@ impl StreamProcessor for StreamDoubler { type Input = f64; type Output = f64; - async fn handle_message(&self, input: Self::Input) -> Result, Error> { - Ok(Some(input * 2.0)) + async fn handle_message(&self, input: Self::Input) -> Result>, Error> { + Ok(Some(vec![input * 2.0])) } fn assign_topic(&self, _output: &Self::Output) -> Cow { From 15724e98953d9abb222d8898d44d40d01d25c153 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Thu, 25 Mar 2021 22:27:56 -0400 Subject: [PATCH 09/15] compile on nightly --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f1e57ec..d0dee43 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,7 +51,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: stable + toolchain: nightly override: true - name: Install rustfmt run: rustup component add rustfmt @@ -85,7 +85,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: stable + toolchain: nightly override: true - name: Install clippy run: rustup component add clippy @@ -118,7 +118,7 @@ jobs: - name: Install toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly - name: Initiate kafka run: ./scripts/init_kafka.sh - name: Install tarpaulin From 24374b326c6a91f97845f72fec9db6b67a8437f1 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Fri, 26 Mar 2021 10:05:06 -0400 Subject: [PATCH 10/15] include associated Error type --- src/processor.rs | 7 ++++++- tests/integration_tests.rs | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/processor.rs b/src/processor.rs index bac030f..901cc02 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -19,6 +19,8 @@ pub trait StreamProcessor { type Input: DeserializeOwned; /// The output type from the stream processor, which will be serialized and sent to Kafka. type Output: Serialize + std::fmt::Debug; + /// The error type that might be thrown in [`handle_message`]. + type Error: std::fmt::Debug; /// Convert the input into a `impl Future>>>`. /// [`futures::Future`] because we might want to `await` in the implementation. @@ -26,7 +28,10 @@ pub trait StreamProcessor { /// [`Option`] because we might not want to send any output. If this is `None`, we skip sending /// to Kafka. /// [`Vec`] because we might want to send _many_ outputs for one input - async fn handle_message(&self, input: Self::Input) -> Result>>; + async fn handle_message( + &self, + input: Self::Input, + ) -> std::result::Result>, Self::Error>; /// Decide which topic to send the output to. fn assign_topic(&self, output: &Self::Output) -> Cow; /// Decide which key to assign to the output. diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 68ac2c4..120b8fa 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -15,8 +15,12 @@ struct StreamDoubler; impl StreamProcessor for StreamDoubler { type Input = f64; type Output = f64; + type Error = (); - async fn handle_message(&self, input: Self::Input) -> Result>, Error> { + async fn handle_message( + &self, + input: Self::Input, + ) -> Result>, Self::Error> { Ok(Some(vec![input * 2.0])) } From ce45522ec8016853c20cf664812b091d333bf313 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Sat, 27 Mar 2021 15:48:08 -0400 Subject: [PATCH 11/15] make settings deserializable --- src/settings.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/settings.rs b/src/settings.rs index 5742add..20f0129 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,3 +1,5 @@ +use serde::Deserialize; + #[derive(Debug, Clone)] pub enum SecurityProtocol { Plaintext, @@ -7,7 +9,7 @@ pub enum SecurityProtocol { }, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize)] pub struct KafkaSettings { pub bootstrap_servers: String, pub group_id: String, From 9897a06ef35a579e02de638f1625eeb97bf43130 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Sun, 28 Mar 2021 12:58:25 -0400 Subject: [PATCH 12/15] Clean up errors --- src/error.rs | 12 +++++--- src/processor.rs | 59 +++++++++++++++++++++----------------- src/settings.rs | 3 +- tests/integration_tests.rs | 2 +- 4 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/error.rs b/src/error.rs index 79d406e..a26e60f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,14 +5,18 @@ pub enum Error { #[error("Message received from Kafka with empty payload")] EmptyPayload, - #[error("IO error: {0:?}")] + #[error("IO error: {0}")] Io(#[from] std::io::Error), - #[error("Error from Kafka: {0:?}")] + #[error("Error from Kafka: {0}")] Kafka(#[from] rdkafka::error::KafkaError), - #[error("Error from Serde: {0:?}")] - Serde(#[from] serde_json::Error), + #[error("Error from Serde: {source}\n{msg}")] + Serde { + #[source] + source: serde_json::Error, + msg: String, + }, } pub type Result = std::result::Result; diff --git a/src/processor.rs b/src/processor.rs index 901cc02..af26b97 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -20,7 +20,7 @@ pub trait StreamProcessor { /// The output type from the stream processor, which will be serialized and sent to Kafka. type Output: Serialize + std::fmt::Debug; /// The error type that might be thrown in [`handle_message`]. - type Error: std::fmt::Debug; + type Error: std::fmt::Display; /// Convert the input into a `impl Future>>>`. /// [`futures::Future`] because we might want to `await` in the implementation. @@ -62,7 +62,11 @@ impl StreamRunner { .map(|x| -> Result { let owned = x?.detach(); let payload = owned.payload().ok_or(Error::EmptyPayload)?; - serde_json::from_slice(payload).map_err(From::from) + serde_json::from_slice(payload).map_err(|e| Error::Serde { + source: e, + msg: String::from_utf8(payload.to_vec()) + .expect("Failed to parse utf8 vec to string"), + }) }) .filter_map(|msg| match msg { Ok(input) => { @@ -73,35 +77,38 @@ impl StreamRunner { Either::Left(output) } Err(e) => { - error!("Error: {:?}", e); + error!("{}", e); Either::Right(ready(None)) } }); msg_stream .for_each_concurrent(None, |msgs| async { - if msgs.is_err() { - error!("{:?}", msgs); - return; - } - let msgs = msgs.expect("Guaranteed to be Ok"); - for msg in msgs { - debug!("Message received: {:?}", msg); - let serialized = - serde_json::to_string(&msg).expect("Failed to serialize message"); - let topic = self.processor.assign_topic(&msg); - let key = self.processor.assign_key(&msg); - let record = FutureRecord::to(topic.as_ref()) - .key(key.as_ref()) - .payload(&serialized); - let res = producer.send(record, Duration::from_secs(0)).await; - match res { - Ok((partition, offset)) => trace!( - "Message successfully delivered to topic: {}, partition {}, offset {}", - topic, - partition, - offset - ), - Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err), + match msgs { + Err(e) => { + error!("{}", e); + return; + } + Ok(msgs) => { + for msg in msgs { + debug!("Message received: {:?}", msg); + let serialized = + serde_json::to_string(&msg).expect("Failed to serialize message"); + let topic = self.processor.assign_topic(&msg); + let key = self.processor.assign_key(&msg); + let record = FutureRecord::to(topic.as_ref()) + .key(key.as_ref()) + .payload(&serialized); + let res = producer.send(record, Duration::from_secs(0)).await; + match res { + Ok((partition, offset)) => trace!( + "Message successfully delivered to topic: {}, partition {}, offset {}", + topic, + partition, + offset + ), + Err((err, msg)) => error!("Message: {:?}\nError: {}", msg, err), + } + } } } }) diff --git a/src/settings.rs b/src/settings.rs index 20f0129..ae803f7 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,6 +1,7 @@ use serde::Deserialize; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "UPPERCASE")] pub enum SecurityProtocol { Plaintext, SaslSsl { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 120b8fa..eaed67f 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -15,7 +15,7 @@ struct StreamDoubler; impl StreamProcessor for StreamDoubler { type Input = f64; type Output = f64; - type Error = (); + type Error = String; async fn handle_message( &self, From 128a9257868f92a8f80d5a68014a3431099cd2f3 Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Fri, 2 Apr 2021 11:01:10 -0400 Subject: [PATCH 13/15] use kafka-settings --- Cargo.toml | 2 ++ src/error.rs | 3 -- src/kafka.rs | 28 ------------------ src/lib.rs | 3 -- src/processor.rs | 23 ++++++++------- src/settings.rs | 58 -------------------------------------- tests/integration_tests.rs | 1 + 7 files changed, 14 insertions(+), 104 deletions(-) delete mode 100644 src/kafka.rs delete mode 100644 src/settings.rs diff --git a/Cargo.toml b/Cargo.toml index f9f63f2..c6c80b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" [dependencies] async-trait = "0.1.48" futures = "0.3" +kafka-settings = { git = "ssh://git@github.com/Overmuse/kafka-settings", tag = "v0.1.0" } rdkafka = { version = "0.26", features = ["ssl-vendored"] } serde = "1.0" serde_json = "1.0" @@ -16,5 +17,6 @@ thiserror = "1.0" tracing = "0.1" [dev-dependencies] +anyhow = "1.0.40" tokio = { version = "1.4", features = ["macros", "rt-multi-thread"] } tracing-subscriber = "0.2" diff --git a/src/error.rs b/src/error.rs index a26e60f..ae01295 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,9 +5,6 @@ pub enum Error { #[error("Message received from Kafka with empty payload")] EmptyPayload, - #[error("IO error: {0}")] - Io(#[from] std::io::Error), - #[error("Error from Kafka: {0}")] Kafka(#[from] rdkafka::error::KafkaError), diff --git a/src/kafka.rs b/src/kafka.rs deleted file mode 100644 index 4a0986a..0000000 --- a/src/kafka.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::error::Result; -use crate::settings::KafkaSettings; -use rdkafka::{ - consumer::{Consumer, StreamConsumer}, - producer::FutureProducer, - ClientConfig, -}; - -pub fn consumer(settings: &KafkaSettings) -> Result { - let mut config = ClientConfig::new(); - let config = settings.config(&mut config); - let consumer: StreamConsumer = config - .set("group.id", &settings.group_id) - .set("enable.ssl.certificate.verification", "false") - .create()?; - let subscription_topics: Vec<_> = settings.input_topics.iter().map(String::as_str).collect(); - consumer.subscribe(&subscription_topics)?; - Ok(consumer) -} - -pub fn producer(settings: &KafkaSettings) -> Result { - let mut config = ClientConfig::new(); - let config = settings.config(&mut config); - let producer: FutureProducer = config - .set("enable.ssl.certificate.verification", "false") - .create()?; - Ok(producer) -} diff --git a/src/lib.rs b/src/lib.rs index 508a4f1..3e1ab46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,5 @@ pub mod error; -mod kafka; pub mod processor; -pub mod settings; pub use error::Error; pub use processor::{StreamProcessor, StreamRunner}; -pub use settings::{KafkaSettings, SecurityProtocol}; diff --git a/src/processor.rs b/src/processor.rs index af26b97..654ea78 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,12 +1,9 @@ -use crate::{ - error::{Error, Result}, - kafka::{consumer, producer}, - settings::KafkaSettings, -}; +use crate::error::{Error, Result}; use futures::{ future::{ready, Either}, prelude::*, }; +use kafka_settings::{consumer, producer, KafkaSettings}; use rdkafka::{message::Message, producer::FutureRecord}; use serde::{de::DeserializeOwned, Serialize}; use std::{borrow::Cow, time::Duration}; @@ -20,7 +17,7 @@ pub trait StreamProcessor { /// The output type from the stream processor, which will be serialized and sent to Kafka. type Output: Serialize + std::fmt::Debug; /// The error type that might be thrown in [`handle_message`]. - type Error: std::fmt::Display; + type Error: std::fmt::Display + std::fmt::Debug; /// Convert the input into a `impl Future>>>`. /// [`futures::Future`] because we might want to `await` in the implementation. @@ -62,10 +59,12 @@ impl StreamRunner { .map(|x| -> Result { let owned = x?.detach(); let payload = owned.payload().ok_or(Error::EmptyPayload)?; - serde_json::from_slice(payload).map_err(|e| Error::Serde { + let payload_string = + String::from_utf8(payload.to_vec()).expect("Every message should be utf8"); + debug!("Input: {:?}", payload_string); + serde_json::from_str(&payload_string).map_err(|e| Error::Serde { source: e, - msg: String::from_utf8(payload.to_vec()) - .expect("Failed to parse utf8 vec to string"), + msg: payload_string, }) }) .filter_map(|msg| match msg { @@ -77,7 +76,7 @@ impl StreamRunner { Either::Left(output) } Err(e) => { - error!("{}", e); + error!("{:#?}", e); Either::Right(ready(None)) } }); @@ -85,12 +84,12 @@ impl StreamRunner { .for_each_concurrent(None, |msgs| async { match msgs { Err(e) => { - error!("{}", e); + error!("{:#?}", e); return; } Ok(msgs) => { for msg in msgs { - debug!("Message received: {:?}", msg); + debug!("Output: {:?}", msg); let serialized = serde_json::to_string(&msg).expect("Failed to serialize message"); let topic = self.processor.assign_topic(&msg); diff --git a/src/settings.rs b/src/settings.rs deleted file mode 100644 index ae803f7..0000000 --- a/src/settings.rs +++ /dev/null @@ -1,58 +0,0 @@ -use serde::Deserialize; - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "UPPERCASE")] -pub enum SecurityProtocol { - Plaintext, - SaslSsl { - sasl_username: String, - sasl_password: String, - }, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct KafkaSettings { - pub bootstrap_servers: String, - pub group_id: String, - pub security_protocol: SecurityProtocol, - pub input_topics: Vec, -} - -impl KafkaSettings { - pub fn new( - bootstrap_servers: String, - group_id: String, - security_protocol: SecurityProtocol, - input_topics: Vec, - ) -> Self { - Self { - bootstrap_servers, - group_id, - security_protocol, - input_topics, - } - } - - pub(crate) fn config<'a>( - &self, - config: &'a mut rdkafka::ClientConfig, - ) -> &'a mut rdkafka::ClientConfig { - config.set("bootstrap.servers", &self.bootstrap_servers); - match &self.security_protocol { - SecurityProtocol::Plaintext => { - config.set("security.protocol", "PLAINTEXT"); - } - SecurityProtocol::SaslSsl { - sasl_username, - sasl_password, - } => { - config - .set("security.protocol", "SASL_SSL") - .set("sasl.mechanism", "PLAIN") - .set("sasl.username", sasl_username) - .set("sasl.password", sasl_password); - } - } - config - } -} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index eaed67f..4e3938c 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,3 +1,4 @@ +use anyhow::Error; use rdkafka::{ admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, client::DefaultClientContext, From c72420f0ff2abb1e5dae692bdd6fb9ddf93cf7cf Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Fri, 2 Apr 2021 11:06:57 -0400 Subject: [PATCH 14/15] run on stable toolchain --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0dee43..f1e57ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,7 +51,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly + toolchain: stable override: true - name: Install rustfmt run: rustup component add rustfmt @@ -85,7 +85,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly + toolchain: stable override: true - name: Install clippy run: rustup component add clippy @@ -118,7 +118,7 @@ jobs: - name: Install toolchain uses: actions-rs/toolchain@v1 with: - toolchain: nightly + toolchain: stable - name: Initiate kafka run: ./scripts/init_kafka.sh - name: Install tarpaulin From 40e59e80ac0a1cc4085c6c75f692c5a348ebb04f Mon Sep 17 00:00:00 2001 From: Sebastian Rollen Date: Fri, 2 Apr 2021 11:22:48 -0400 Subject: [PATCH 15/15] fix tests --- src/processor.rs | 1 - tests/integration_tests.rs | 6 +++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/processor.rs b/src/processor.rs index 654ea78..540c0d3 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -85,7 +85,6 @@ impl StreamRunner { match msgs { Err(e) => { error!("{:#?}", e); - return; } Ok(msgs) => { for msg in msgs { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 4e3938c..f98a04c 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,4 +1,4 @@ -use anyhow::Error; +use kafka_settings::{KafkaSettings, SecurityProtocol}; use rdkafka::{ admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, client::DefaultClientContext, @@ -68,6 +68,7 @@ async fn main() { // Create topics let admin_options = AdminOptions::new(); let admin = test_admin(); + tracing::debug!("Creating topics"); admin .create_topics( &[ @@ -98,6 +99,9 @@ async fn main() { test_consumer.subscribe(&["test-output"]).unwrap(); // Actual test + // TODO: Replace with liveness check + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tracing::debug!("Sending input"); test_producer .send_result(FutureRecord::to("test-input").key("test").payload("2")) .unwrap();