From 64f8104454d56a4aefeb6819ca7e57232ef32f27 Mon Sep 17 00:00:00 2001 From: Xi Yang Date: Fri, 20 Oct 2023 01:45:51 +1100 Subject: [PATCH] feat: add kafka support (#56) Adds support for kafka pubsub --- Cargo.lock | 76 +++++++++++ Cargo.toml | 1 + configs/kafka.toml | 83 ++++++++++++ src/clients/mod.rs | 4 + src/config/protocol.rs | 1 + src/config/pubsub.rs | 32 +++++ src/config/workload.rs | 23 ++++ src/main.rs | 2 +- src/metrics/mod.rs | 8 +- src/pubsub/kafka.rs | 273 ++++++++++++++++++++++++++++++++++++++ src/pubsub/mod.rs | 90 ++++++++++++- src/pubsub/momento.rs | 113 ++++------------ src/workload/mod.rs | 87 ++++++++---- src/workload/publisher.rs | 2 + 14 files changed, 677 insertions(+), 118 deletions(-) create mode 100644 configs/kafka.toml create mode 100644 src/pubsub/kafka.rs diff --git a/Cargo.lock b/Cargo.lock index 6d7de35c..dfd87a23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1071,6 +1071,18 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libz-sys" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d97137b25e321a73eef1418d1d5d2eda4d77e12813f8e6dead84bc52c5870a7b" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linkme" version = "0.3.17" @@ -1411,6 +1423,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "object" version = "0.32.1" @@ -1566,6 +1599,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" + [[package]] name = "powerfmt" version = "0.2.0" @@ -1767,6 +1806,36 @@ dependencies = [ "thiserror", ] +[[package]] +name = "rdkafka" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8acd8f5c5482fdf89e8878227bafa442d8c4409f6287391c85549ca83626c27" +dependencies = [ + "futures", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "3.0.0+1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca35e95c88e08cdc643b25744e38ccee7c93c7e90d1ac6850fe74cbaa40803c3" +dependencies = [ + "cmake", + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redis" version = "0.22.3" @@ -1906,6 +1975,7 @@ dependencies = [ "rand_distr", "rand_xoshiro", "ratelimit", + "rdkafka", "redis", "ringlog", "rpcperf-dataspec", @@ -2735,6 +2805,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index 3cf7c7ff..d3adbd9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ rand_distr = "0.4.3" rand_xoshiro = "0.6.0" ratelimit = "0.7.0" redis = { version = "0.22.3", features = ["tokio-comp"] } +rdkafka = { version = "0.25", features = ["cmake-build"] } ringlog = "0.3.0" rpcperf-dataspec = { path = "lib/dataspec" } serde = { workspace = true } diff --git a/configs/kafka.toml b/configs/kafka.toml new file mode 100644 index 00000000..e93f0061 --- /dev/null +++ b/configs/kafka.toml @@ -0,0 +1,83 @@ +# An example configuration for benchmarking Momento (https://www.gomomento.com) +# and demonstrating the use of the preview functionality for collections. Each +# command family is using its own keyspace and covers key-value, hash, list, +# set, and sorted set. + +[general] +# specify the protocol to be used +protocol = "kafka" +# the interval for stats integration and reporting +interval = 1 +# the number of intervals to run the test for +duration = 300 +# optionally, we can write some detailed stats to a file during the run +#json_output = "stats.json" +# run the admin thread with a HTTP listener at the address provided, this allows +# stats exposition via HTTP +admin = "127.0.0.1:9090" +# optionally, set an initial seed for the PRNGs used to generate the workload. +# The default is to intialize from the OS entropy pool. +#initial_seed = "0" + +[debug] +# choose from: error, warn, info, debug, trace +log_level = "error" +# optionally, log to the file below instead of standard out +# log_file = "rpc-perf.log" +# backup file name for use with log rotation +log_backup = "rpc-perf.log.old" +# trigger log rotation when the file grows beyond this size (in bytes). Set this +# option to '0' to disable log rotation. +log_max_size = 1073741824 + +[target] +# kafka broker ip:port +endpoints = [ + "127.0.0.1:9092" +] + +[pubsub] +# TODO the connect timeout in milliseconds +connect_timeout = 10000 +publish_timeout = 1000 +# the number of threads in the publisher runtime +publisher_threads = 4 +publisher_poolsize = 1 +publisher_concurrency = 20 +# the number of threads in the subscriber runtime +subscriber_threads = 4 +# kafka-specific client configurations +kafka_acks = "1" +kafka_linger_ms = "1" +#kafka_batch_size +#kafka_batch_num_messages +#kafka_fetch_message_max_bytes +#kafka_request_timeout_ms + +[workload] +# the number of threads that will be used to generate requests +threads = 1 +# the global ratelimit +ratelimit = 1000 + +# An example set of +#topics using a single consumer multiple producer. +[[workload.topics]] +# the weight relative to other workload components +weight = 1 +# the total number of Momento clients for subscribers to this set of topics +subscriber_poolsize = 1 +# the total number of gRPC sessions per Momento client for this set of topics +subscriber_concurrency = 1 +# sets the number of topics +topics = 1 +# set the length of the topic names, in bytes +topic_len = 5 +# set the topic names, if empty or the length and the number do not match topics and topic_len, generate random names +topic_names = ["hello"] +# sets the number of partitions in each topic +partitions = 10 +# sets the value length, in bytes +message_len = 512 +# sets the key length, in bytes +key_len = 8 \ No newline at end of file diff --git a/src/clients/mod.rs b/src/clients/mod.rs index b2d297a1..18f528c9 100644 --- a/src/clients/mod.rs +++ b/src/clients/mod.rs @@ -46,6 +46,10 @@ pub fn launch_clients(config: &Config, work_receiver: Receiver) -> Opt Protocol::Resp => { clients::redis::launch_tasks(&mut client_rt, config.clone(), work_receiver) } + Protocol::Kafka => { + error!("keyspace is not supported for the kafka protocol"); + std::process::exit(1); + } } Some(client_rt) diff --git a/src/config/protocol.rs b/src/config/protocol.rs index a73dc0c1..c9edef3e 100644 --- a/src/config/protocol.rs +++ b/src/config/protocol.rs @@ -9,4 +9,5 @@ pub enum Protocol { Momento, Ping, Resp, + Kafka, } diff --git a/src/config/pubsub.rs b/src/config/pubsub.rs index eb89776b..ce50b120 100644 --- a/src/config/pubsub.rs +++ b/src/config/pubsub.rs @@ -11,6 +11,14 @@ pub struct Pubsub { publisher_poolsize: usize, publisher_concurrency: usize, + + // kafka specific configs + kafka_acks: Option, + kafka_linger_ms: Option, + kafka_batch_size: Option, + kafka_batch_num_messages: Option, + kafka_fetch_message_max_bytes: Option, + kafka_request_timeout_ms: Option, } impl Pubsub { @@ -33,4 +41,28 @@ impl Pubsub { pub fn publisher_concurrency(&self) -> usize { self.publisher_concurrency } + + pub fn kafka_acks(&self) -> &Option { + &self.kafka_acks + } + + pub fn kafka_linger_ms(&self) -> &Option { + &self.kafka_linger_ms + } + + pub fn kafka_batch_size(&self) -> &Option { + &self.kafka_batch_size + } + + pub fn kafka_batch_num_messages(&self) -> &Option { + &self.kafka_batch_num_messages + } + + pub fn kafka_fetch_message_max_bytes(&self) -> &Option { + &self.kafka_fetch_message_max_bytes + } + + pub fn kafka_request_timeout_ms(&self) -> &Option { + &self.kafka_request_timeout_ms + } } diff --git a/src/config/workload.rs b/src/config/workload.rs index d4efd781..541840e2 100644 --- a/src/config/workload.rs +++ b/src/config/workload.rs @@ -44,15 +44,22 @@ impl Workload { #[derive(Clone, Deserialize)] pub struct Topics { topics: usize, + #[serde(default = "one")] + partitions: usize, topic_len: usize, + #[serde(default)] + topic_names: Vec, message_len: usize, #[serde(default = "one")] + key_len: usize, weight: usize, subscriber_poolsize: usize, #[serde(default = "one")] subscriber_concurrency: usize, #[serde(default)] topic_distribution: Distribution, + #[serde(default)] + partition_distribution: Distribution, } impl Topics { @@ -60,14 +67,26 @@ impl Topics { self.weight } + pub fn partitions(&self) -> usize { + self.partitions + } + pub fn topics(&self) -> usize { self.topics } + pub fn topic_names(&self) -> &[String] { + &self.topic_names + } + pub fn topic_len(&self) -> usize { self.topic_len } + pub fn key_len(&self) -> usize { + self.key_len + } + pub fn message_len(&self) -> usize { self.message_len } @@ -83,6 +102,10 @@ impl Topics { pub fn topic_distribution(&self) -> Distribution { self.topic_distribution } + + pub fn partition_distribution(&self) -> Distribution { + self.partition_distribution + } } #[derive(Clone, Copy, PartialEq, Eq, Deserialize)] diff --git a/src/main.rs b/src/main.rs index 134ae086..4da9c6e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -154,7 +154,7 @@ fn main() { let client_runtime = launch_clients(&config, client_receiver); - let mut pubsub_runtimes = launch_pubsub(&config, pubsub_receiver, workload_components); + let mut pubsub_runtimes = launch_pubsub(&config, pubsub_receiver, &workload_components); // launch json log output { diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 801b5f1e..b1adc5ff 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -316,7 +316,7 @@ macro_rules! request { }); paste! { #[allow(dead_code)] - pub static [<$ident _COUNTER>]: &'static str = $name; + pub static [<$ident _COUNTER>]: &'static str = concat!($name, "/total"); } paste! { @@ -330,7 +330,7 @@ macro_rules! request { }); paste! { #[allow(dead_code)] - pub static [<$ident _EX_COUNTER>]: &'static str = $name; + pub static [<$ident _EX_COUNTER>]: &'static str = concat!($name, "/exception"); } } @@ -345,7 +345,7 @@ macro_rules! request { }); paste! { #[allow(dead_code)] - pub static [<$ident _OK_COUNTER>]: &'static str = $name; + pub static [<$ident _OK_COUNTER>]: &'static str = concat!($name, "/ok"); } } @@ -360,7 +360,7 @@ macro_rules! request { }); paste! { #[allow(dead_code)] - pub static [<$ident _TIMEOUT_COUNTER>]: &'static str = $name; + pub static [<$ident _TIMEOUT_COUNTER>]: &'static str = concat!($name, "/timeout"); } } } diff --git a/src/pubsub/kafka.rs b/src/pubsub/kafka.rs new file mode 100644 index 00000000..3bf07f33 --- /dev/null +++ b/src/pubsub/kafka.rs @@ -0,0 +1,273 @@ +use super::*; +use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use rdkafka::client::DefaultClientContext; +use rdkafka::config::ClientConfig; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists; +use rdkafka::Message; +use std::sync::Arc; + +fn get_kafka_producer(config: &Config) -> FutureProducer { + let bootstrap_servers = config.target().endpoints().join(","); + let timeout = format!("{}", config.pubsub().unwrap().publish_timeout().as_millis()); + let pubsub_config = config.pubsub().unwrap(); + let mut client_config = ClientConfig::new(); + client_config.set("bootstrap.servers", &bootstrap_servers); + client_config.set("message.timeout.ms", timeout); + if let Some(acks) = pubsub_config.kafka_acks() { + client_config.set("acks", acks); + } + if let Some(linger_ms) = pubsub_config.kafka_linger_ms() { + client_config.set("linger.ms", linger_ms); + } + if let Some(batch_size) = pubsub_config.kafka_batch_size() { + client_config.set("batch.size", batch_size); + } + if let Some(batch_num_messages) = pubsub_config.kafka_batch_num_messages() { + client_config.set("batch.num.messages", batch_num_messages); + } + if let Some(request_timeout_ms) = pubsub_config.kafka_request_timeout_ms() { + client_config.set("request.timeout.ms", request_timeout_ms); + } + client_config.create().unwrap() +} + +fn get_kafka_consumer(config: &Config, group_id: &str) -> StreamConsumer { + let bootstrap_servers = config.target().endpoints().join(","); + let pubsub_config = config.pubsub().unwrap(); + let mut client_config = ClientConfig::new(); + client_config + .set("bootstrap.servers", &bootstrap_servers) + .set("group.id", group_id) + .set("client.id", "rpcperf_subscriber") + .set("enable.partition.eof", "false") + .set("enable.auto.commit", "false") + .set("statistics.interval.ms", "500") + .set("api.version.request", "true") + .set("auto.offset.reset", "earliest") + .set("session.timeout.ms", "6000"); + if let Some(fetch_message_max_bytes) = pubsub_config.kafka_fetch_message_max_bytes() { + client_config.set("fetch_message_max_bytes", fetch_message_max_bytes); + } + client_config.create().unwrap() +} + +fn get_kafka_admin(config: &Config) -> AdminClient { + let bootstrap_servers = config.target().endpoints().join(","); + ClientConfig::new() + .set("bootstrap.servers", &bootstrap_servers) + .set("client.id", "rpcperf_admin") + .create() + .unwrap() +} + +fn validate_topic(runtime: &mut Runtime, config: &Config, topic: &str, partitions: usize) { + let _guard = runtime.enter(); + let consumer_client = get_kafka_consumer(config, "topic_validator"); + let timeout = Some(Duration::from_secs(1)); + let metadata = consumer_client + .fetch_metadata(Some(topic), timeout) + .map_err(|e| e.to_string()) + .unwrap(); + if metadata.topics().is_empty() { + error!("Invalidated topic"); + std::process::exit(1); + } + let topic_partitions = metadata.topics()[0].partitions().len(); + if topic_partitions != partitions { + error!( + "Invalidated partition: asked {} found {}\n Please delete or recreate the topic {}", + partitions, topic_partitions, topic + ); + std::process::exit(1); + } +} + +pub fn create_topics(runtime: &mut Runtime, config: Config, workload_components: &[Component]) { + let admin_client = get_kafka_admin(&config); + for component in workload_components { + if let Component::Topics(topics) = component { + let partitions = topics.partitions(); + for topic in topics.topics() { + let topic_results = runtime + .block_on(admin_client.create_topics( + &[NewTopic::new( + topic, + partitions as i32, + TopicReplication::Fixed(1), + )], + &AdminOptions::new(), + )) + .unwrap(); + for r in topic_results { + match r { + Ok(_) => {} + Err(err) => { + if err.1 == TopicAlreadyExists { + validate_topic(runtime, &config, topic, partitions); + } else { + error!("Failed to create the topic {}:{} ", err.0, err.1); + std::process::exit(1); + } + } + } + } + } + } + } +} + +/// Launch tasks with one channel per task as Kafka connection is mux-enabled. +pub fn launch_subscribers( + runtime: &mut Runtime, + config: Config, + workload_components: &[Component], +) { + let group_id = "rpc_subscriber"; + for component in workload_components { + if let Component::Topics(topics) = component { + let poolsize = topics.subscriber_poolsize(); + let concurrency = topics.subscriber_concurrency(); + + for _ in 0..poolsize { + let client = { + let _guard = runtime.enter(); + Arc::new(get_kafka_consumer(&config, group_id)) + }; + for _ in 0..concurrency { + let mut sub_topics: Vec = Vec::new(); + for t in topics.topics() { + sub_topics.push(t.to_string().clone()) + } + + runtime.spawn(subscriber_task(client.clone(), sub_topics)); + } + } + } + } +} + +async fn subscriber_task(client: Arc, topics: Vec) { + PUBSUB_SUBSCRIBE.increment(); + let sub_topics: Vec<&str> = topics.iter().map(AsRef::as_ref).collect(); + if client.subscribe(&sub_topics).is_ok() { + PUBSUB_SUBSCRIBER_CURR.add(1); + PUBSUB_SUBSCRIBE_OK.increment(); + let msg_stamp = MessageValidator::new(); + while RUNNING.load(Ordering::Relaxed) { + match client.recv().await { + Ok(m) => match m.payload_view::<[u8]>() { + Some(Ok(m)) => { + let mut v = m.to_owned(); + match msg_stamp.validate_msg(&mut v) { + MessageValidationResult::Unexpected => { + error!("pubsub: invalid message received"); + RESPONSE_EX.increment(); + PUBSUB_RECEIVE_INVALID.increment(); + continue; + } + MessageValidationResult::Corrupted => { + error!("pubsub: corrupt message received"); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_CORRUPT.increment(); + continue; + } + MessageValidationResult::Validated(latency) => { + let _ = PUBSUB_LATENCY.increment(latency); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_OK.increment(); + } + } + } + Some(Err(e)) => { + error!("Error in deserializing the message:{:?}", e); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_EX.increment(); + } + None => { + error!("Empty Message"); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_EX.increment(); + } + }, + Err(e) => { + debug!("Kafka Message Error {}", e); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_EX.increment(); + } + } + } + } else { + error!("Failed to create subscriber"); + PUBSUB_SUBSCRIBE_EX.increment(); + } +} + +/// Launch tasks with one channel per task as Kafka connection is mux-enabled. +pub fn launch_publishers(runtime: &mut Runtime, config: Config, work_receiver: Receiver) { + for _ in 0..config.pubsub().unwrap().publisher_poolsize() { + let client = { + let _guard = runtime.enter(); + Arc::new(get_kafka_producer(&config)) + }; + PUBSUB_PUBLISHER_CONNECT.increment(); + for _ in 0..config.pubsub().unwrap().publisher_concurrency() { + runtime.spawn(publisher_task(client.clone(), work_receiver.clone())); + } + } +} + +async fn publisher_task( + client: Arc, + work_receiver: Receiver, +) -> Result<()> { + PUBSUB_PUBLISHER_CURR.add(1); + let msg_stamp = MessageValidator::new(); + while RUNNING.load(Ordering::Relaxed) { + let work_item = work_receiver + .recv() + .await + .map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?; + REQUEST.increment(); + let start = Instant::now(); + let result = match work_item { + WorkItem::Publish { + topic, + partition, + key, + mut message, + } => { + let timestamp = msg_stamp.stamp_msg(&mut message); + PUBSUB_PUBLISH.increment(); + client + .send( + FutureRecord { + topic: &topic, + payload: Some(&message), + key: Some(&key), + partition: Some(partition as i32), + timestamp: Some(timestamp as i64), + headers: None, + }, + Duration::from_secs(0), + ) + .await + } + }; + let stop = Instant::now(); + match result { + Ok(_) => { + let latency = stop.duration_since(start).as_nanos(); + PUBSUB_PUBLISH_OK.increment(); + let _ = PUBSUB_PUBLISH_LATENCY.increment(latency); + } + Err(e) => { + debug!("Error in producing: {:?}", e); + PUBSUB_PUBLISH_EX.increment(); + } + } + } + PUBSUB_PUBLISHER_CURR.sub(1); + Ok(()) +} diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs index 9a9898bc..301a2261 100644 --- a/src/pubsub/mod.rs +++ b/src/pubsub/mod.rs @@ -2,12 +2,83 @@ use crate::clients::*; use crate::workload::Component; use crate::workload::PublisherWorkItem as WorkItem; use crate::*; +use ahash::RandomState; use async_channel::Receiver; use std::io::{Error, ErrorKind, Result}; use tokio::runtime::Runtime; +mod kafka; mod momento; +struct MessageValidator { + hash_builder: RandomState, +} +pub enum MessageValidationResult { + // u64 is the end-to-end latency in nanosecond) + Validated(u64), + Unexpected, + Corrupted, +} +impl MessageValidator { + // Deterministic seeds are used so that multiple MessageStamp can stamp and validate messages + pub fn new() -> Self { + MessageValidator { + hash_builder: RandomState::with_seeds( + 0xd5b96f9126d61cee, + 0x50af85c9d1b6de70, + 0xbd7bdf2fee6d15b2, + 0x3dbe88bb183ac6f4, + ), + } + } + pub fn stamp_msg(&self, message: &mut [u8]) -> u64 { + let timestamp = (UnixInstant::now() - UnixInstant::from_nanos(0)).as_nanos(); + let ts = timestamp.to_be_bytes(); + // write the current unix time into the message + [ + message[16], + message[17], + message[18], + message[19], + message[20], + message[21], + message[22], + message[23], + ] = ts; + + // todo, write a sequence number into the message + + // checksum the message and put the checksum into the message + [ + message[8], + message[9], + message[10], + message[11], + message[12], + message[13], + message[14], + message[15], + ] = self.hash_builder.hash_one(&message).to_be_bytes(); + timestamp + } + pub fn validate_msg(&self, v: &mut Vec) -> MessageValidationResult { + let now_unix = UnixInstant::now(); + if [v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]] + != [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21] + { + return MessageValidationResult::Unexpected; + } + let csum = [v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15]]; + [v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15]] = [0; 8]; + if csum != self.hash_builder.hash_one(&v).to_be_bytes() { + return MessageValidationResult::Corrupted; + } + let ts = u64::from_be_bytes([v[16], v[17], v[18], v[19], v[20], v[21], v[22], v[23]]); + let latency = now_unix - UnixInstant::from_nanos(ts); + MessageValidationResult::Validated(latency.as_nanos()) + } +} + pub struct PubsubRuntimes { publisher_rt: Option, subscriber_rt: Option, @@ -27,15 +98,19 @@ impl PubsubRuntimes { pub fn launch_pubsub( config: &Config, work_receiver: Receiver, - workload_components: Vec, + workload_components: &[Component], ) -> PubsubRuntimes { PubsubRuntimes { - publisher_rt: launch_publishers(config, work_receiver), + publisher_rt: launch_publishers(config, work_receiver, workload_components), subscriber_rt: launch_subscribers(config, workload_components), } } -fn launch_publishers(config: &Config, work_receiver: Receiver) -> Option { +fn launch_publishers( + config: &Config, + work_receiver: Receiver, + workload_components: &[Component], +) -> Option { if config.pubsub().is_none() { debug!("No pubsub configuration specified"); return None; @@ -54,6 +129,10 @@ fn launch_publishers(config: &Config, work_receiver: Receiver) -> Opti Protocol::Momento => { momento::launch_publishers(&mut publisher_rt, config.clone(), work_receiver); } + Protocol::Kafka => { + kafka::create_topics(&mut publisher_rt, config.clone(), workload_components); + kafka::launch_publishers(&mut publisher_rt, config.clone(), work_receiver); + } _ => { error!("pubsub is not supported for the selected protocol"); std::process::exit(1); @@ -63,7 +142,7 @@ fn launch_publishers(config: &Config, work_receiver: Receiver) -> Opti Some(publisher_rt) } -fn launch_subscribers(config: &Config, workload_components: Vec) -> Option { +fn launch_subscribers(config: &Config, workload_components: &[Component]) -> Option { if config.pubsub().is_none() { debug!("No pubsub configuration specified"); return None; @@ -82,6 +161,9 @@ fn launch_subscribers(config: &Config, workload_components: Vec) -> O Protocol::Momento => { momento::launch_subscribers(&mut subscriber_rt, config.clone(), workload_components); } + Protocol::Kafka => { + kafka::launch_subscribers(&mut subscriber_rt, config.clone(), workload_components); + } _ => { error!("pubsub is not supported for the selected protocol"); std::process::exit(1); diff --git a/src/pubsub/momento.rs b/src/pubsub/momento.rs index 50de648a..d15e04a4 100644 --- a/src/pubsub/momento.rs +++ b/src/pubsub/momento.rs @@ -1,7 +1,6 @@ use super::*; use ::momento::preview::topics::{SubscriptionItem, TopicClient, ValueKind}; use ::momento::CredentialProviderBuilder; -use ahash::RandomState; use futures::stream::StreamExt; use std::sync::Arc; use tokio::time::timeout; @@ -10,7 +9,7 @@ use tokio::time::timeout; pub fn launch_subscribers( runtime: &mut Runtime, config: Config, - workload_components: Vec, + workload_components: &[Component], ) { debug!("launching momento subscriber tasks"); @@ -77,54 +76,31 @@ async fn subscriber_task(client: Arc, cache_name: String, topic: St PUBSUB_SUBSCRIBER_CURR.add(1); PUBSUB_SUBSCRIBE_OK.increment(); - // Create a new hasher state to validate the integrity of received - // messages. Deterministic seeds are used so that multiple processes can - // verify the messages. - let hash_builder = RandomState::with_seeds( - 0xd5b96f9126d61cee, - 0x50af85c9d1b6de70, - 0xbd7bdf2fee6d15b2, - 0x3dbe88bb183ac6f4, - ); + let msg_stamp = MessageValidator::new(); while RUNNING.load(Ordering::Relaxed) { match subscription.next().await { Some(SubscriptionItem::Value(v)) => { if let ValueKind::Binary(mut v) = v.kind { - let now_unix = UnixInstant::now(); - - if [v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]] - != [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21] - { - // unexpected message - error!("pubsub: invalid message received"); - RESPONSE_EX.increment(); - PUBSUB_RECEIVE_INVALID.increment(); - continue; - } - - // grab the checksum and zero it in the message - let csum = [v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15]]; - [v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15]] = [0; 8]; - - if csum != hash_builder.hash_one(&v).to_be_bytes() { - // corrupted message - error!("pubsub: corrupt message received"); - PUBSUB_RECEIVE.increment(); - PUBSUB_RECEIVE_CORRUPT.increment(); - continue; + match msg_stamp.validate_msg(&mut v) { + MessageValidationResult::Unexpected => { + error!("pubsub: invalid message received"); + RESPONSE_EX.increment(); + PUBSUB_RECEIVE_INVALID.increment(); + continue; + } + MessageValidationResult::Corrupted => { + error!("pubsub: corrupt message received"); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_CORRUPT.increment(); + continue; + } + MessageValidationResult::Validated(latency) => { + let _ = PUBSUB_LATENCY.increment(latency); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_OK.increment(); + } } - - let ts = u64::from_be_bytes([ - v[16], v[17], v[18], v[19], v[20], v[21], v[22], v[23], - ]); - - let latency = now_unix - UnixInstant::from_nanos(ts); - - let _ = PUBSUB_LATENCY.increment(latency.as_nanos()); - - PUBSUB_RECEIVE.increment(); - PUBSUB_RECEIVE_OK.increment(); } else { error!("there was a string in the topic"); // unexpected message @@ -208,15 +184,7 @@ async fn publisher_task( }) .to_string(); - // Create a new hasher state to validate the integrity of received - // messages. Deterministic seeds are used so that multiple processes can - // verify the messages. - let hash_builder = RandomState::with_seeds( - 0xd5b96f9126d61cee, - 0x50af85c9d1b6de70, - 0xbd7bdf2fee6d15b2, - 0x3dbe88bb183ac6f4, - ); + let msg_stamp = MessageValidator::new(); while RUNNING.load(Ordering::Relaxed) { let work_item = work_receiver @@ -226,39 +194,14 @@ async fn publisher_task( REQUEST.increment(); let start = Instant::now(); - let now_unix = UnixInstant::now(); let result = match work_item { - WorkItem::Publish { topic, mut message } => { - let ts = (now_unix - UnixInstant::from_nanos(0)) - .as_nanos() - .to_be_bytes(); - - // write the current unix time into the message - [ - message[16], - message[17], - message[18], - message[19], - message[20], - message[21], - message[22], - message[23], - ] = ts; - - // todo, write a sequence number into the message - - // checksum the message and put the checksum into the message - [ - message[8], - message[9], - message[10], - message[11], - message[12], - message[13], - message[14], - message[15], - ] = hash_builder.hash_one(&message).to_be_bytes(); - + WorkItem::Publish { + topic, + mut message, + partition: _, + key: _, + } => { + msg_stamp.stamp_msg(&mut message); PUBSUB_PUBLISH.increment(); match timeout( diff --git a/src/workload/mod.rs b/src/workload/mod.rs index 89b9c571..f37fe3e8 100644 --- a/src/workload/mod.rs +++ b/src/workload/mod.rs @@ -150,16 +150,24 @@ impl Generator { } fn generate_pubsub(&self, topics: &Topics, rng: &mut dyn RngCore) -> PublisherWorkItem { - let index = topics.topic_dist.sample(rng); - let topic = topics.topics[index].clone(); + let topic_index = topics.topic_dist.sample(rng); + let topic = topics.topics[topic_index].clone(); + let partition = topics.partition_dist.sample(rng); let mut m = vec![0_u8; topics.message_len]; // add a header [m[0], m[1], m[2], m[3], m[4], m[5], m[6], m[7]] = [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21]; rng.fill(&mut m[32..topics.message_len]); - - PublisherWorkItem::Publish { topic, message: m } + let mut k = vec![0_u8; topics.key_len]; + rng.fill(&mut k[0..topics.key_len]); + + PublisherWorkItem::Publish { + topic, + partition, + key: k, + message: m, + } } fn generate_request(&self, keyspace: &Keyspace, rng: &mut dyn RngCore) -> ClientWorkItem { @@ -374,7 +382,10 @@ pub enum Component { #[derive(Clone)] pub struct Topics { topics: Vec>, + partitions: usize, topic_dist: Distribution, + partition_dist: Distribution, + key_len: usize, message_len: usize, subscriber_poolsize: usize, subscriber_concurrency: usize, @@ -384,8 +395,12 @@ impl Topics { pub fn new(config: &Config, topics: &config::Topics) -> Self { // ntopics must be >= 1 let ntopics = std::cmp::max(1, topics.topics()); + // partitions must be >= 1 + let partitions = std::cmp::max(1, topics.partitions()); let topiclen = topics.topic_len(); let message_len = topics.message_len(); + // key_len must be >= 1 + let key_len = std::cmp::max(1, topics.key_len()); let subscriber_poolsize = topics.subscriber_poolsize(); let subscriber_concurrency = topics.subscriber_concurrency(); let topic_dist = match topics.topic_distribution() { @@ -394,30 +409,50 @@ impl Topics { Distribution::Zipf(ZipfDistribution::new(ntopics, 1.0).unwrap()) } }; - - // initialize a PRNG with the default initial seed - let mut rng = Xoshiro512PlusPlus::from_seed(config.general().initial_seed()); - - // generate the seed for topic name PRNG - let mut raw_seed = [0_u8; 64]; - rng.fill_bytes(&mut raw_seed); - let topic_name_seed = Seed512(raw_seed); - - // initialize topic name PRNG and generate a set of unique topics - let mut rng = Xoshiro512PlusPlus::from_seed(topic_name_seed); - let mut topics = HashSet::with_capacity(ntopics); - while topics.len() < ntopics { - let topic = (&mut rng) - .sample_iter(&Alphanumeric) - .take(topiclen) - .collect::>(); - let _ = topics.insert(unsafe { std::str::from_utf8_unchecked(&topic) }.to_string()); + let partition_dist = match topics.partition_distribution() { + config::Distribution::Uniform => Distribution::Uniform(Uniform::new(0, partitions)), + config::Distribution::Zipf => { + Distribution::Zipf(ZipfDistribution::new(partitions, 1.0).unwrap()) + } + }; + let topic_names: Vec>; + // if the given topic_names has the matched format, we use topic names there + if topics + .topic_names() + .iter() + .map(|n| n.len() == topiclen) + .fold(topics.topic_names().len() == ntopics, |acc, c| acc && c) + { + topic_names = topics + .topic_names() + .iter() + .map(|k| Arc::new((*k).clone())) + .collect(); + debug!("Use given topic names:{:?}", topic_names); + } else { + // initialize topic name PRNG and generate a set of unique topics + let mut rng = Xoshiro512PlusPlus::from_seed(config.general().initial_seed()); + let mut raw_seed = [0_u8; 64]; + rng.fill_bytes(&mut raw_seed); + let topic_name_seed = Seed512(raw_seed); + let mut rng = Xoshiro512PlusPlus::from_seed(topic_name_seed); + let mut topics = HashSet::with_capacity(ntopics); + while topics.len() < ntopics { + let topic = (&mut rng) + .sample_iter(&Alphanumeric) + .take(topiclen) + .collect::>(); + let _ = topics.insert(unsafe { std::str::from_utf8_unchecked(&topic) }.to_string()); + } + topic_names = topics.drain().map(|k| k.into()).collect(); } - let topics = topics.drain().map(|k| k.into()).collect(); Self { - topics, + topics: topic_names, + partitions, topic_dist, + partition_dist, + key_len, message_len, subscriber_poolsize, subscriber_concurrency, @@ -428,6 +463,10 @@ impl Topics { &self.topics } + pub fn partitions(&self) -> usize { + self.partitions + } + pub fn subscriber_poolsize(&self) -> usize { self.subscriber_poolsize } diff --git a/src/workload/publisher.rs b/src/workload/publisher.rs index 59b377de..2507c7aa 100644 --- a/src/workload/publisher.rs +++ b/src/workload/publisher.rs @@ -5,6 +5,8 @@ use std::sync::Arc; pub enum PublisherWorkItem { Publish { topic: Arc, + partition: usize, + key: Vec, message: Vec, }, }