diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7f761a9..f1e57ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,12 +7,14 @@ on: - 'Cargo.lock' - 'Dockerfile' - 'src/**' + - 'tests/**' pull_request: paths: - 'Cargo.toml' - 'Cargo.lock' - 'Dockerfile' - 'src/**' + - 'tests/**' name: Continuous integration @@ -118,7 +120,7 @@ jobs: with: toolchain: stable - name: Initiate kafka - run: ./scripts/init_kafka + run: ./scripts/init_kafka.sh - name: Install tarpaulin run: cargo install cargo-tarpaulin - name: Generate coverage diff --git a/Cargo.toml b/Cargo.toml index 9edea37..c6c80b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.48" futures = "0.3" +kafka-settings = { git = "ssh://git@github.com/Overmuse/kafka-settings", tag = "v0.1.0" } rdkafka = { version = "0.26", features = ["ssl-vendored"] } serde = "1.0" serde_json = "1.0" @@ -15,5 +17,6 @@ thiserror = "1.0" tracing = "0.1" [dev-dependencies] +anyhow = "1.0.40" tokio = { version = "1.4", features = ["macros", "rt-multi-thread"] } tracing-subscriber = "0.2" diff --git a/src/error.rs b/src/error.rs index 79d406e..ae01295 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,14 +5,15 @@ pub enum Error { #[error("Message received from Kafka with empty payload")] EmptyPayload, - #[error("IO error: {0:?}")] - Io(#[from] std::io::Error), - - #[error("Error from Kafka: {0:?}")] + #[error("Error from Kafka: {0}")] Kafka(#[from] rdkafka::error::KafkaError), - #[error("Error from Serde: {0:?}")] - Serde(#[from] serde_json::Error), + #[error("Error from Serde: {source}\n{msg}")] + Serde { + #[source] + source: serde_json::Error, + msg: String, + }, } pub type Result = std::result::Result; diff --git a/src/kafka.rs b/src/kafka.rs deleted file mode 100644 index 4a0986a..0000000 --- a/src/kafka.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::error::Result; -use crate::settings::KafkaSettings; -use rdkafka::{ - consumer::{Consumer, StreamConsumer}, - producer::FutureProducer, - ClientConfig, -}; - -pub fn consumer(settings: &KafkaSettings) -> Result { - let mut config = ClientConfig::new(); - let config = settings.config(&mut config); - let consumer: StreamConsumer = config - .set("group.id", &settings.group_id) - .set("enable.ssl.certificate.verification", "false") - .create()?; - let subscription_topics: Vec<_> = settings.input_topics.iter().map(String::as_str).collect(); - consumer.subscribe(&subscription_topics)?; - Ok(consumer) -} - -pub fn producer(settings: &KafkaSettings) -> Result { - let mut config = ClientConfig::new(); - let config = settings.config(&mut config); - let producer: FutureProducer = config - .set("enable.ssl.certificate.verification", "false") - .create()?; - Ok(producer) -} diff --git a/src/lib.rs b/src/lib.rs index 508a4f1..3e1ab46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,5 @@ pub mod error; -mod kafka; pub mod processor; -pub mod settings; pub use error::Error; pub use processor::{StreamProcessor, StreamRunner}; -pub use settings::{KafkaSettings, SecurityProtocol}; diff --git a/src/processor.rs b/src/processor.rs index 2fb6e59..540c0d3 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,21 +1,38 @@ -use crate::{ - error::{Error, Result}, - kafka::{consumer, producer}, - settings::KafkaSettings, +use crate::error::{Error, Result}; +use futures::{ + future::{ready, Either}, + prelude::*, }; -use futures::prelude::*; +use kafka_settings::{consumer, producer, KafkaSettings}; use rdkafka::{message::Message, producer::FutureRecord}; use serde::{de::DeserializeOwned, Serialize}; -use std::time::Duration; +use std::{borrow::Cow, time::Duration}; use tracing::{debug, error, info, trace}; +#[async_trait::async_trait] +/// The common trait for all stream processors. pub trait StreamProcessor { + /// The input type to deserialize into from the Kafka input topics type Input: DeserializeOwned; + /// The output type from the stream processor, which will be serialized and sent to Kafka. type Output: Serialize + std::fmt::Debug; + /// The error type that might be thrown in [`handle_message`]. + type Error: std::fmt::Display + std::fmt::Debug; - fn handle_message(&self, input: Self::Input) -> Result; - fn assign_topic(&self, output: &Self::Output) -> &str; - fn assign_key(&self, output: &Self::Output) -> &str; + /// Convert the input into a `impl Future>>>`. + /// [`futures::Future`] because we might want to `await` in the implementation. + /// [`Result`] because our function might fail. + /// [`Option`] because we might not want to send any output. If this is `None`, we skip sending + /// to Kafka. + /// [`Vec`] because we might want to send _many_ outputs for one input + async fn handle_message( + &self, + input: Self::Input, + ) -> std::result::Result>, Self::Error>; + /// Decide which topic to send the output to. + fn assign_topic(&self, output: &Self::Output) -> Cow; + /// Decide which key to assign to the output. + fn assign_key(&self, output: &Self::Output) -> Cow; } pub struct StreamRunner { @@ -37,33 +54,60 @@ impl StreamRunner { let consumer = consumer(&self.settings)?; let producer = producer(&self.settings)?; - let msg_stream = consumer.stream().map(|x| -> Result { - let owned = x?.detach(); - let payload = owned.payload().ok_or(Error::EmptyPayload)?; - let deserialized: T::Input = serde_json::from_slice(payload)?; - self.processor.handle_message(deserialized) - }); - msg_stream - .for_each_concurrent(None, |msg| async { - if msg.is_err() { - error!("{:?}", msg); - return; + let msg_stream = consumer + .stream() + .map(|x| -> Result { + let owned = x?.detach(); + let payload = owned.payload().ok_or(Error::EmptyPayload)?; + let payload_string = + String::from_utf8(payload.to_vec()).expect("Every message should be utf8"); + debug!("Input: {:?}", payload_string); + serde_json::from_str(&payload_string).map_err(|e| Error::Serde { + source: e, + msg: payload_string, + }) + }) + .filter_map(|msg| match msg { + Ok(input) => { + let output = self + .processor + .handle_message(input) + .map(|msg| msg.transpose()); + Either::Left(output) } - let msg = msg.expect("Guaranteed to be Ok"); - debug!("Message received: {:?}", msg); - let serialized = serde_json::to_string(&msg).expect("Failed to serialize message"); - let topic = self.processor.assign_topic(&msg); - let key = self.processor.assign_key(&msg); - let record = FutureRecord::to(topic).key(key).payload(&serialized); - let res = producer.send(record, Duration::from_secs(0)).await; - match res { - Ok((partition, offset)) => trace!( - "Message successfully delivered to topic: {}, partition {}, offset {}", - topic, - partition, - offset - ), - Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err), + Err(e) => { + error!("{:#?}", e); + Either::Right(ready(None)) + } + }); + msg_stream + .for_each_concurrent(None, |msgs| async { + match msgs { + Err(e) => { + error!("{:#?}", e); + } + Ok(msgs) => { + for msg in msgs { + debug!("Output: {:?}", msg); + let serialized = + serde_json::to_string(&msg).expect("Failed to serialize message"); + let topic = self.processor.assign_topic(&msg); + let key = self.processor.assign_key(&msg); + let record = FutureRecord::to(topic.as_ref()) + .key(key.as_ref()) + .payload(&serialized); + let res = producer.send(record, Duration::from_secs(0)).await; + match res { + Ok((partition, offset)) => trace!( + "Message successfully delivered to topic: {}, partition {}, offset {}", + topic, + partition, + offset + ), + Err((err, msg)) => error!("Message: {:?}\nError: {}", msg, err), + } + } + } } }) .await; diff --git a/src/settings.rs b/src/settings.rs deleted file mode 100644 index 5742add..0000000 --- a/src/settings.rs +++ /dev/null @@ -1,55 +0,0 @@ -#[derive(Debug, Clone)] -pub enum SecurityProtocol { - Plaintext, - SaslSsl { - sasl_username: String, - sasl_password: String, - }, -} - -#[derive(Debug, Clone)] -pub struct KafkaSettings { - pub bootstrap_servers: String, - pub group_id: String, - pub security_protocol: SecurityProtocol, - pub input_topics: Vec, -} - -impl KafkaSettings { - pub fn new( - bootstrap_servers: String, - group_id: String, - security_protocol: SecurityProtocol, - input_topics: Vec, - ) -> Self { - Self { - bootstrap_servers, - group_id, - security_protocol, - input_topics, - } - } - - pub(crate) fn config<'a>( - &self, - config: &'a mut rdkafka::ClientConfig, - ) -> &'a mut rdkafka::ClientConfig { - config.set("bootstrap.servers", &self.bootstrap_servers); - match &self.security_protocol { - SecurityProtocol::Plaintext => { - config.set("security.protocol", "PLAINTEXT"); - } - SecurityProtocol::SaslSsl { - sasl_username, - sasl_password, - } => { - config - .set("security.protocol", "SASL_SSL") - .set("sasl.mechanism", "PLAIN") - .set("sasl.username", sasl_username) - .set("sasl.password", sasl_password); - } - } - config - } -} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index a41029d..f98a04c 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,27 +1,36 @@ -use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; -use rdkafka::consumer::{Consumer, StreamConsumer}; -use rdkafka::producer::{FutureProducer, FutureRecord}; -use rdkafka::Message; -use rdkafka::{client::DefaultClientContext, ClientConfig}; +use kafka_settings::{KafkaSettings, SecurityProtocol}; +use rdkafka::{ + admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, + client::DefaultClientContext, + consumer::{Consumer, StreamConsumer}, + producer::{FutureProducer, FutureRecord}, + ClientConfig, Message, +}; +use std::borrow::Cow; use stream_processor::*; use tracing_subscriber::{EnvFilter, FmtSubscriber}; struct StreamDoubler; +#[async_trait::async_trait] impl StreamProcessor for StreamDoubler { type Input = f64; type Output = f64; + type Error = String; - fn handle_message(&self, input: Self::Input) -> Result { - Ok(input * 2.0) + async fn handle_message( + &self, + input: Self::Input, + ) -> Result>, Self::Error> { + Ok(Some(vec![input * 2.0])) } - fn assign_topic(&self, _output: &Self::Output) -> &str { - "test-output" + fn assign_topic(&self, _output: &Self::Output) -> Cow { + "test-output".into() } - fn assign_key(&self, _output: &Self::Output) -> &str { - "key" + fn assign_key(&self, _output: &Self::Output) -> Cow { + "key".into() } } @@ -59,6 +68,7 @@ async fn main() { // Create topics let admin_options = AdminOptions::new(); let admin = test_admin(); + tracing::debug!("Creating topics"); admin .create_topics( &[ @@ -89,6 +99,9 @@ async fn main() { test_consumer.subscribe(&["test-output"]).unwrap(); // Actual test + // TODO: Replace with liveness check + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + tracing::debug!("Sending input"); test_producer .send_result(FutureRecord::to("test-input").key("test").payload("2")) .unwrap();