-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add kafka support #56
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clippy found more than 10 potential problems in the proposed changes. Check the Files changed tab for more details.
That’s because you’re using the wrong counter name. Should be
PUBSUB_PUBLISH_COUNTER like it was before?
…On Tue, Oct 17, 2023 at 6:09 PM Xi Yang ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In src/config/workload.rs
<#56 (comment)>:
> topic_len: usize,
message_len: usize,
+ #[serde(default = "four")]
+ key_len: usize,
Since we explicitly set the partition when producing one message, we don't
need to control the keys, as long as there is a key, it's OK. 4 is just a
random number, I can change it to at least one.
------------------------------
In src/metrics/mod.rs
<#56 (comment)>:
> @@ -589,7 +589,10 @@ request!(SORTED_SET_SCORE, "sorted_set_score");
* PUBSUB
*/
-request!(PUBSUB_PUBLISH, "publisher/publish");
+counter!(PUBSUB_PUBLISH, "publisher/publish/total");
But using request! doesn't work. The
snapshot.counter_rate(REQUEST_COUNTER) returns 0. Here is the output when
using request!(PUBSUB_PUBLISH):
2023-10-18T00:06:25.662+00:00 Publishers: Current: 1
2023-10-18T00:06:25.662+00:00 Publisher Publish: Success: NaN % Timeout:
NaN %
2023-10-18T00:06:25.662+00:00 Publisher Publish Rate (/s): Ok: 0.00 Error:
0.00 Timeout: 0.00
2023-10-18T00:06:25.662+00:00 Subscribers: Current: 1
2023-10-18T00:06:25.662+00:00 Subscriber Receive: Success: 100.00 %
Corrupted: 0.00 %
2023-10-18T00:06:25.662+00:00 Subscriber Receive Rate (/s): Ok: 99.65
Error: 0.00 Corrupt: 0.00 Invalid: 0.00
2023-10-18T00:06:25.662+00:00 Pubsub Publish Latency (us): p25: 5341 p50:
5373 p75: 5373 p90: 10944 p99: 19136 p999: 19136 p9999: 19136
2023-10-18T00:06:25.662+00:00 Pubsub End-to-End Latency (us): p25: 5373
p50: 5406 p75: 5406 p90: 10944 p99: 19136 p999: 19136 p9999: 19136
—
Reply to this email directly, view it on GitHub
<#56 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACP6HFYF7WZSATOD6SU7TWLX74T5BAVCNFSM6AAAAAA6CZMHYKVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTMOBTG4YDSNZZGY>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
src/pubsub/mod.rs
Outdated
mod momento; | ||
|
||
struct MessageStamp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call this a message validator?
src/pubsub/kafka.rs
Outdated
} | ||
} | ||
|
||
pub fn create_topics(runtime: &mut Runtime, config: Config, workload_components: &Vec<Component>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this &[Component]
src/config/workload.rs
Outdated
@@ -4,6 +4,10 @@ fn one() -> usize { | |||
1 | |||
} | |||
|
|||
fn empty_names() -> Vec<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't necessary
src/config/workload.rs
Outdated
topic_len: usize, | ||
#[serde(default = "empty_names")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just #[serde(default)]
does what you want
src/pubsub/kafka.rs
Outdated
Ok(m) => match m.payload_view::<[u8]>() { | ||
Some(Ok(m)) => { | ||
let mut v = Vec::new(); | ||
v.extend_from_slice(m); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just do let mut v = m.to_owned();
src/pubsub/mod.rs
Outdated
@@ -29,9 +100,20 @@ pub fn launch_pubsub( | |||
work_receiver: Receiver<WorkItem>, | |||
workload_components: Vec<Component>, | |||
) -> PubsubRuntimes { | |||
if config.pubsub().is_some() { | |||
if let Protocol::Kafka = config.general().protocol() { | |||
let mut topic_rt = Builder::new_multi_thread() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some comment here explaining why we need an additional Tokio runtime for this. I'm unclear why we wouldn't handle this differently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved kafka::create_topics to lauch_publishers, so it reuses the publisher_rt to create the topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still requires some changes. Please address the clippy lints and my comments
No description provided.