Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka support #56

Merged
merged 18 commits into from
Oct 19, 2023
Merged
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rand_distr = "0.4.3"
rand_xoshiro = "0.6.0"
ratelimit = "0.7.0"
redis = { version = "0.22.3", features = ["tokio-comp"] }
rdkafka = { version = "0.25", features = ["cmake-build"] }
ringlog = "0.3.0"
rpcperf-dataspec = { path = "lib/dataspec" }
serde = { workspace = true }
Expand Down
76 changes: 76 additions & 0 deletions configs/kafka.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# An example configuration for benchmarking Momento (https://www.gomomento.com)
# and demonstrating the use of the preview functionality for collections. Each
# command family is using its own keyspace and covers key-value, hash, list,
# set, and sorted set.

[general]
# specify the protocol to be used
protocol = "kafka"
# the interval for stats integration and reporting
interval = 1
# the number of intervals to run the test for
duration = 300
# optionally, we can write some detailed stats to a file during the run
#json_output = "stats.json"
# run the admin thread with a HTTP listener at the address provided, this allows
# stats exposition via HTTP
admin = "127.0.0.1:9090"
# optionally, set an initial seed for the PRNGs used to generate the workload.
# The default is to intialize from the OS entropy pool.
#initial_seed = "0"

[debug]
# choose from: error, warn, info, debug, trace
log_level = "error"
# optionally, log to the file below instead of standard out
# log_file = "rpc-perf.log"
# backup file name for use with log rotation
log_backup = "rpc-perf.log.old"
# trigger log rotation when the file grows beyond this size (in bytes). Set this
# option to '0' to disable log rotation.
log_max_size = 1073741824

[target]
# we don't need to specify any endpoints for momento
endpoints = [
"127.0.0.1:9092"
]

[pubsub]
# TODO the connect timeout in milliseconds
connect_timeout = 10000
publish_timeout = 1000
# the number of threads in the publisher runtime
publisher_threads = 4
# the number of threads in the subscriber runtime
subscriber_threads = 4
publisher_poolsize = 1
publisher_concurrency = 1


[workload]
# the number of threads that will be used to generate requests
threads = 1
# the global ratelimit
ratelimit = 100

# An example set of
#topics using a single consumer multiple producer.
[[workload.topics]]
# the weight relative to other workload components
weight = 1
# the total number of Momento clients for subscribers to this set of topics
subscriber_poolsize = 1
# the total number of gRPC sessions per Momento client for this set of topics
subscriber_concurrency = 1
# sets the number of topics
topics = 1
# sets the number of partitions in each topic
partitions = 10
# set the length of the topic names, in bytes
topic_len = 10
# sets the value length, in bytes
message_len = 128
# sets the key length, in bytes
key_len = 4

4 changes: 4 additions & 0 deletions src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub fn launch_clients(config: &Config, work_receiver: Receiver<WorkItem>) -> Opt
Protocol::Resp => {
clients::redis::launch_tasks(&mut client_rt, config.clone(), work_receiver)
}
Protocol::Kafka => {
error!("keyspace is not supported for the selected protocol");
std::process::exit(1);
}
}

Some(client_rt)
Expand Down
1 change: 1 addition & 0 deletions src/config/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ pub enum Protocol {
Momento,
Ping,
Resp,
Kafka,
}
22 changes: 22 additions & 0 deletions src/config/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use super::*;
fn one() -> usize {
1
}
fn four() -> usize {
4
}

#[derive(Clone, Deserialize)]
pub struct Workload {
Expand Down Expand Up @@ -44,22 +47,33 @@ impl Workload {
#[derive(Clone, Deserialize)]
pub struct Topics {
topics: usize,
#[serde(default = "one")]
partitions: usize,
topic_len: usize,
message_len: usize,
#[serde(default = "four")]
key_len: usize,
yangxi marked this conversation as resolved.
Show resolved Hide resolved
#[serde(default = "one")]
weight: usize,
#[serde(default = "one")]
subscriber_poolsize: usize,
#[serde(default = "one")]
subscriber_concurrency: usize,
#[serde(default)]
topic_distribution: Distribution,
#[serde(default)]
partition_distribution: Distribution,
}

impl Topics {
pub fn weight(&self) -> usize {
self.weight
}

pub fn partitions(&self) -> usize {
self.partitions
}

pub fn topics(&self) -> usize {
self.topics
}
Expand All @@ -68,6 +82,10 @@ impl Topics {
self.topic_len
}

pub fn key_len(&self) -> usize {
self.key_len
}

pub fn message_len(&self) -> usize {
self.message_len
}
Expand All @@ -83,6 +101,10 @@ impl Topics {
pub fn topic_distribution(&self) -> Distribution {
self.topic_distribution
}

pub fn partition_distribution(&self) -> Distribution {
self.partition_distribution
}
}

#[derive(Clone, Copy, PartialEq, Eq, Deserialize)]
Expand Down
5 changes: 4 additions & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,10 @@ request!(SORTED_SET_SCORE, "sorted_set_score");
* PUBSUB
*/

request!(PUBSUB_PUBLISH, "publisher/publish");
counter!(PUBSUB_PUBLISH, "publisher/publish/total");
yangxi marked this conversation as resolved.
Show resolved Hide resolved
counter!(PUBSUB_PUBLISH_OK, "publisher/publish/ok");
counter!(PUBSUB_PUBLISH_EX, "publisher/publish/exception");
counter!(PUBSUB_PUBLISH_TIMEOUT, "publisher/publish/timeout");
counter!(PUBSUB_PUBLISH_RATELIMITED, "publisher/publish/ratelimiter");

request!(PUBSUB_SUBSCRIBE, "subscriber/subscribe");
Expand Down
1 change: 1 addition & 0 deletions src/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ fn pubsub_stats(snapshot: &mut MetricsSnapshot) {
// publisher stats
let pubsub_tx_ex = snapshot.counter_rate(PUBSUB_PUBLISH_EX_COUNTER);
let pubsub_tx_ok = snapshot.counter_rate(PUBSUB_PUBLISH_OK_COUNTER);

yangxi marked this conversation as resolved.
Show resolved Hide resolved
let pubsub_tx_timeout = snapshot.counter_rate(PUBSUB_PUBLISH_TIMEOUT_COUNTER);
let pubsub_tx_total = snapshot.counter_rate(PUBSUB_PUBLISH_COUNTER);

Expand Down
Loading
Loading