Skip to content

Commit

Permalink
Fix minor issue
Browse files Browse the repository at this point in the history
  • Loading branch information
yangxi committed Oct 18, 2023
1 parent fb5172d commit 4c3a4f7
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/pubsub/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub fn create_topics(runtime: &mut Runtime, config: Config, workload_components:
pub fn launch_subscribers(
runtime: &mut Runtime,
config: Config,
workload_components: &Vec<Component>, /* */
workload_components: &[Component],
) {
let group_id = "rpc_subscriber";
for component in workload_components {
Expand Down
4 changes: 2 additions & 2 deletions src/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ fn launch_subscribers(config: &Config, workload_components: &Vec<Component>) ->

match config.general().protocol() {
Protocol::Momento => {
momento::launch_subscribers(&mut subscriber_rt, config.clone(), &workload_components);
momento::launch_subscribers(&mut subscriber_rt, config.clone(), workload_components);
}
Protocol::Kafka => {
kafka::launch_subscribers(&mut subscriber_rt, config.clone(), &workload_components);
kafka::launch_subscribers(&mut subscriber_rt, config.clone(), workload_components);
}
_ => {
error!("pubsub is not supported for the selected protocol");
Expand Down
3 changes: 2 additions & 1 deletion src/pubsub/momento.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::time::timeout;
pub fn launch_subscribers(
runtime: &mut Runtime,
config: Config,
workload_components: &Vec<Component>,
workload_components: &[Component],
) {
debug!("launching momento subscriber tasks");

Expand Down Expand Up @@ -203,6 +203,7 @@ async fn publisher_task(
} => {
msg_stamp.stamp_msg(&mut message);
PUBSUB_PUBLISH.increment();

match timeout(
config.pubsub().unwrap().publish_timeout(),
client.publish(cache_name.clone(), topic.to_string(), message),
Expand Down
4 changes: 3 additions & 1 deletion src/workload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ impl Topics {
let partitions = std::cmp::max(1, topics.partitions());
let topiclen = topics.topic_len();
let message_len = topics.message_len();
let key_len = topics.key_len();
// key_len must be >= 1
let key_len = std::cmp::max(1, topics.key_len());
let subscriber_poolsize = topics.subscriber_poolsize();
let subscriber_concurrency = topics.subscriber_concurrency();
let topic_dist = match topics.topic_distribution() {
Expand All @@ -415,6 +416,7 @@ impl Topics {
}
};
let topic_names: Vec<Arc<String>>;
// if the given topic_names has the matched format, we use topic names there
if topics
.topic_names()
.iter()
Expand Down

0 comments on commit 4c3a4f7

Please sign in to comment.