Skip to content

Commit

Permalink
Merge pull request #2 from Overmuse/SR/kick_ci
Browse files Browse the repository at this point in the history
Sr/kick ci
  • Loading branch information
SebRollen authored Apr 2, 2021
2 parents ebe41f6 + 40e59e8 commit b3bc070
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 139 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ on:
- 'Cargo.lock'
- 'Dockerfile'
- 'src/**'
- 'tests/**'
pull_request:
paths:
- 'Cargo.toml'
- 'Cargo.lock'
- 'Dockerfile'
- 'src/**'
- 'tests/**'

name: Continuous integration

Expand Down Expand Up @@ -118,7 +120,7 @@ jobs:
with:
toolchain: stable
- name: Initiate kafka
run: ./scripts/init_kafka
run: ./scripts/init_kafka.sh
- name: Install tarpaulin
run: cargo install cargo-tarpaulin
- name: Generate coverage
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.48"
futures = "0.3"
kafka-settings = { git = "ssh://[email protected]/Overmuse/kafka-settings", tag = "v0.1.0" }
rdkafka = { version = "0.26", features = ["ssl-vendored"] }
serde = "1.0"
serde_json = "1.0"
thiserror = "1.0"
tracing = "0.1"

[dev-dependencies]
anyhow = "1.0.40"
tokio = { version = "1.4", features = ["macros", "rt-multi-thread"] }
tracing-subscriber = "0.2"
13 changes: 7 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ pub enum Error {
#[error("Message received from Kafka with empty payload")]
EmptyPayload,

#[error("IO error: {0:?}")]
Io(#[from] std::io::Error),

#[error("Error from Kafka: {0:?}")]
#[error("Error from Kafka: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),

#[error("Error from Serde: {0:?}")]
Serde(#[from] serde_json::Error),
#[error("Error from Serde: {source}\n{msg}")]
Serde {
#[source]
source: serde_json::Error,
msg: String,
},
}

pub type Result<T> = std::result::Result<T, Error>;
28 changes: 0 additions & 28 deletions src/kafka.rs

This file was deleted.

3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
pub mod error;
mod kafka;
pub mod processor;
pub mod settings;

pub use error::Error;
pub use processor::{StreamProcessor, StreamRunner};
pub use settings::{KafkaSettings, SecurityProtocol};
114 changes: 79 additions & 35 deletions src/processor.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
use crate::{
error::{Error, Result},
kafka::{consumer, producer},
settings::KafkaSettings,
use crate::error::{Error, Result};
use futures::{
future::{ready, Either},
prelude::*,
};
use futures::prelude::*;
use kafka_settings::{consumer, producer, KafkaSettings};
use rdkafka::{message::Message, producer::FutureRecord};
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;
use std::{borrow::Cow, time::Duration};
use tracing::{debug, error, info, trace};

#[async_trait::async_trait]
/// The common trait for all stream processors.
pub trait StreamProcessor {
/// The input type to deserialize into from the Kafka input topics
type Input: DeserializeOwned;
/// The output type from the stream processor, which will be serialized and sent to Kafka.
type Output: Serialize + std::fmt::Debug;
/// The error type that might be thrown in [`handle_message`].
type Error: std::fmt::Display + std::fmt::Debug;

fn handle_message(&self, input: Self::Input) -> Result<Self::Output>;
fn assign_topic(&self, output: &Self::Output) -> &str;
fn assign_key(&self, output: &Self::Output) -> &str;
/// Convert the input into a `impl Future<Result<Option<Vec<Self::Output>>>>`.
/// [`futures::Future`] because we might want to `await` in the implementation.
/// [`Result`] because our function might fail.
/// [`Option`] because we might not want to send any output. If this is `None`, we skip sending
/// to Kafka.
/// [`Vec`] because we might want to send _many_ outputs for one input
async fn handle_message(
&self,
input: Self::Input,
) -> std::result::Result<Option<Vec<Self::Output>>, Self::Error>;
/// Decide which topic to send the output to.
fn assign_topic(&self, output: &Self::Output) -> Cow<str>;
/// Decide which key to assign to the output.
fn assign_key(&self, output: &Self::Output) -> Cow<str>;
}

pub struct StreamRunner<T: StreamProcessor> {
Expand All @@ -37,33 +54,60 @@ impl<T: StreamProcessor> StreamRunner<T> {
let consumer = consumer(&self.settings)?;
let producer = producer(&self.settings)?;

let msg_stream = consumer.stream().map(|x| -> Result<T::Output> {
let owned = x?.detach();
let payload = owned.payload().ok_or(Error::EmptyPayload)?;
let deserialized: T::Input = serde_json::from_slice(payload)?;
self.processor.handle_message(deserialized)
});
msg_stream
.for_each_concurrent(None, |msg| async {
if msg.is_err() {
error!("{:?}", msg);
return;
let msg_stream = consumer
.stream()
.map(|x| -> Result<T::Input> {
let owned = x?.detach();
let payload = owned.payload().ok_or(Error::EmptyPayload)?;
let payload_string =
String::from_utf8(payload.to_vec()).expect("Every message should be utf8");
debug!("Input: {:?}", payload_string);
serde_json::from_str(&payload_string).map_err(|e| Error::Serde {
source: e,
msg: payload_string,
})
})
.filter_map(|msg| match msg {
Ok(input) => {
let output = self
.processor
.handle_message(input)
.map(|msg| msg.transpose());
Either::Left(output)
}
let msg = msg.expect("Guaranteed to be Ok");
debug!("Message received: {:?}", msg);
let serialized = serde_json::to_string(&msg).expect("Failed to serialize message");
let topic = self.processor.assign_topic(&msg);
let key = self.processor.assign_key(&msg);
let record = FutureRecord::to(topic).key(key).payload(&serialized);
let res = producer.send(record, Duration::from_secs(0)).await;
match res {
Ok((partition, offset)) => trace!(
"Message successfully delivered to topic: {}, partition {}, offset {}",
topic,
partition,
offset
),
Err((err, msg)) => error!("Message: {:?}\nError: {:?}", msg, err),
Err(e) => {
error!("{:#?}", e);
Either::Right(ready(None))
}
});
msg_stream
.for_each_concurrent(None, |msgs| async {
match msgs {
Err(e) => {
error!("{:#?}", e);
}
Ok(msgs) => {
for msg in msgs {
debug!("Output: {:?}", msg);
let serialized =
serde_json::to_string(&msg).expect("Failed to serialize message");
let topic = self.processor.assign_topic(&msg);
let key = self.processor.assign_key(&msg);
let record = FutureRecord::to(topic.as_ref())
.key(key.as_ref())
.payload(&serialized);
let res = producer.send(record, Duration::from_secs(0)).await;
match res {
Ok((partition, offset)) => trace!(
"Message successfully delivered to topic: {}, partition {}, offset {}",
topic,
partition,
offset
),
Err((err, msg)) => error!("Message: {:?}\nError: {}", msg, err),
}
}
}
}
})
.await;
Expand Down
55 changes: 0 additions & 55 deletions src/settings.rs

This file was deleted.

35 changes: 24 additions & 11 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use rdkafka::{client::DefaultClientContext, ClientConfig};
use kafka_settings::{KafkaSettings, SecurityProtocol};
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
client::DefaultClientContext,
consumer::{Consumer, StreamConsumer},
producer::{FutureProducer, FutureRecord},
ClientConfig, Message,
};
use std::borrow::Cow;
use stream_processor::*;
use tracing_subscriber::{EnvFilter, FmtSubscriber};

struct StreamDoubler;

#[async_trait::async_trait]
impl StreamProcessor for StreamDoubler {
type Input = f64;
type Output = f64;
type Error = String;

fn handle_message(&self, input: Self::Input) -> Result<Self::Output, Error> {
Ok(input * 2.0)
async fn handle_message(
&self,
input: Self::Input,
) -> Result<Option<Vec<Self::Output>>, Self::Error> {
Ok(Some(vec![input * 2.0]))
}

fn assign_topic(&self, _output: &Self::Output) -> &str {
"test-output"
fn assign_topic(&self, _output: &Self::Output) -> Cow<str> {
"test-output".into()
}

fn assign_key(&self, _output: &Self::Output) -> &str {
"key"
fn assign_key(&self, _output: &Self::Output) -> Cow<str> {
"key".into()
}
}

Expand Down Expand Up @@ -59,6 +68,7 @@ async fn main() {
// Create topics
let admin_options = AdminOptions::new();
let admin = test_admin();
tracing::debug!("Creating topics");
admin
.create_topics(
&[
Expand Down Expand Up @@ -89,6 +99,9 @@ async fn main() {
test_consumer.subscribe(&["test-output"]).unwrap();

// Actual test
// TODO: Replace with liveness check
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
tracing::debug!("Sending input");
test_producer
.send_result(FutureRecord::to("test-input").key("test").payload("2"))
.unwrap();
Expand Down

0 comments on commit b3bc070

Please sign in to comment.