Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka support #56

Merged
merged 18 commits into from
Oct 19, 2023
Merged
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rand_distr = "0.4.3"
rand_xoshiro = "0.6.0"
ratelimit = "0.7.0"
redis = { version = "0.22.3", features = ["tokio-comp"] }
rdkafka = { version = "0.25", features = ["cmake-build"] }
ringlog = "0.3.0"
rpcperf-dataspec = { path = "lib/dataspec" }
serde = { workspace = true }
Expand Down
83 changes: 83 additions & 0 deletions configs/kafka.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# An example configuration for benchmarking Momento (https://www.gomomento.com)
# and demonstrating the use of the preview functionality for collections. Each
# command family is using its own keyspace and covers key-value, hash, list,
# set, and sorted set.

[general]
# specify the protocol to be used
protocol = "kafka"
# the interval for stats integration and reporting
interval = 1
# the number of intervals to run the test for
duration = 300
# optionally, we can write some detailed stats to a file during the run
#json_output = "stats.json"
# run the admin thread with a HTTP listener at the address provided, this allows
# stats exposition via HTTP
admin = "127.0.0.1:9090"
# optionally, set an initial seed for the PRNGs used to generate the workload.
# The default is to intialize from the OS entropy pool.
#initial_seed = "0"

[debug]
# choose from: error, warn, info, debug, trace
log_level = "error"
# optionally, log to the file below instead of standard out
# log_file = "rpc-perf.log"
# backup file name for use with log rotation
log_backup = "rpc-perf.log.old"
# trigger log rotation when the file grows beyond this size (in bytes). Set this
# option to '0' to disable log rotation.
log_max_size = 1073741824

[target]
# kafka broker ip:port
endpoints = [
"127.0.0.1:9092"
]

[pubsub]
# TODO the connect timeout in milliseconds
connect_timeout = 10000
publish_timeout = 1000
# the number of threads in the publisher runtime
publisher_threads = 4
publisher_poolsize = 1
publisher_concurrency = 20
# the number of threads in the subscriber runtime
subscriber_threads = 4
# kafka-specific client configurations
kafka_acks = "1"
kafka_linger_ms = "1"
#kafka_batch_size
#kafka_batch_num_messages
#kafka_fetch_message_max_bytes
#kafka_request_timeout_ms

[workload]
# the number of threads that will be used to generate requests
threads = 1
# the global ratelimit
ratelimit = 1000

# An example set of
#topics using a single consumer multiple producer.
[[workload.topics]]
# the weight relative to other workload components
weight = 1
# the total number of Momento clients for subscribers to this set of topics
subscriber_poolsize = 1
# the total number of gRPC sessions per Momento client for this set of topics
subscriber_concurrency = 1
# sets the number of topics
topics = 1
# set the length of the topic names, in bytes
topic_len = 5
# set the topic names, if empty or the length and the number do not match topics and topic_len, generate random names
topic_names = ["hello"]
# sets the number of partitions in each topic
partitions = 10
# sets the value length, in bytes
message_len = 512
# sets the key length, in bytes
key_len = 8
4 changes: 4 additions & 0 deletions src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub fn launch_clients(config: &Config, work_receiver: Receiver<WorkItem>) -> Opt
Protocol::Resp => {
clients::redis::launch_tasks(&mut client_rt, config.clone(), work_receiver)
}
Protocol::Kafka => {
error!("keyspace is not supported for the kafka protocol");
std::process::exit(1);
}
}

Some(client_rt)
Expand Down
1 change: 1 addition & 0 deletions src/config/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ pub enum Protocol {
Momento,
Ping,
Resp,
Kafka,
}
32 changes: 32 additions & 0 deletions src/config/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ pub struct Pubsub {

publisher_poolsize: usize,
publisher_concurrency: usize,

// kafka specific configs
kafka_acks: Option<String>,
kafka_linger_ms: Option<String>,
kafka_batch_size: Option<String>,
kafka_batch_num_messages: Option<String>,
kafka_fetch_message_max_bytes: Option<String>,
kafka_request_timeout_ms: Option<String>,
}

impl Pubsub {
Expand All @@ -33,4 +41,28 @@ impl Pubsub {
pub fn publisher_concurrency(&self) -> usize {
self.publisher_concurrency
}

pub fn kafka_acks(&self) -> &Option<String> {
&self.kafka_acks
}

pub fn kafka_linger_ms(&self) -> &Option<String> {
&self.kafka_linger_ms
}

pub fn kafka_batch_size(&self) -> &Option<String> {
&self.kafka_batch_size
}

pub fn kafka_batch_num_messages(&self) -> &Option<String> {
&self.kafka_batch_num_messages
}

pub fn kafka_fetch_message_max_bytes(&self) -> &Option<String> {
&self.kafka_fetch_message_max_bytes
}

pub fn kafka_request_timeout_ms(&self) -> &Option<String> {
&self.kafka_request_timeout_ms
}
}
23 changes: 23 additions & 0 deletions src/config/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,49 @@ impl Workload {
#[derive(Clone, Deserialize)]
pub struct Topics {
topics: usize,
#[serde(default = "one")]
partitions: usize,
topic_len: usize,
#[serde(default)]
topic_names: Vec<String>,
message_len: usize,
#[serde(default = "one")]
key_len: usize,
weight: usize,
subscriber_poolsize: usize,
#[serde(default = "one")]
subscriber_concurrency: usize,
#[serde(default)]
topic_distribution: Distribution,
#[serde(default)]
partition_distribution: Distribution,
}

impl Topics {
pub fn weight(&self) -> usize {
self.weight
}

pub fn partitions(&self) -> usize {
self.partitions
}

pub fn topics(&self) -> usize {
self.topics
}

pub fn topic_names(&self) -> &[String] {
&self.topic_names
}

pub fn topic_len(&self) -> usize {
self.topic_len
}

pub fn key_len(&self) -> usize {
self.key_len
}

pub fn message_len(&self) -> usize {
self.message_len
}
Expand All @@ -83,6 +102,10 @@ impl Topics {
pub fn topic_distribution(&self) -> Distribution {
self.topic_distribution
}

pub fn partition_distribution(&self) -> Distribution {
self.partition_distribution
}
}

#[derive(Clone, Copy, PartialEq, Eq, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ fn main() {

let client_runtime = launch_clients(&config, client_receiver);

let mut pubsub_runtimes = launch_pubsub(&config, pubsub_receiver, workload_components);
let mut pubsub_runtimes = launch_pubsub(&config, pubsub_receiver, &workload_components);

// launch json log output
{
Expand Down
8 changes: 4 additions & 4 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ macro_rules! request {
});
paste! {
#[allow(dead_code)]
pub static [<$ident _COUNTER>]: &'static str = $name;
pub static [<$ident _COUNTER>]: &'static str = concat!($name, "/total");
}

paste! {
Expand All @@ -330,7 +330,7 @@ macro_rules! request {
});
paste! {
#[allow(dead_code)]
pub static [<$ident _EX_COUNTER>]: &'static str = $name;
pub static [<$ident _EX_COUNTER>]: &'static str = concat!($name, "/exception");
}
}

Expand All @@ -345,7 +345,7 @@ macro_rules! request {
});
paste! {
#[allow(dead_code)]
pub static [<$ident _OK_COUNTER>]: &'static str = $name;
pub static [<$ident _OK_COUNTER>]: &'static str = concat!($name, "/ok");
}
}

Expand All @@ -360,7 +360,7 @@ macro_rules! request {
});
paste! {
#[allow(dead_code)]
pub static [<$ident _TIMEOUT_COUNTER>]: &'static str = $name;
pub static [<$ident _TIMEOUT_COUNTER>]: &'static str = concat!($name, "/timeout");
}
}
}
Expand Down
Loading
Loading