-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
minor refactor for pubsub #75
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,26 +76,26 @@ async fn subscriber_task(client: Arc<TopicClient>, cache_name: String, topic: St | |
PUBSUB_SUBSCRIBER_CURR.add(1); | ||
PUBSUB_SUBSCRIBE_OK.increment(); | ||
|
||
let msg_stamp = MessageValidator::new(); | ||
let validator = MessageValidator::new(); | ||
|
||
while RUNNING.load(Ordering::Relaxed) { | ||
match subscription.next().await { | ||
Some(SubscriptionItem::Value(v)) => { | ||
if let ValueKind::Binary(mut v) = v.kind { | ||
match msg_stamp.validate_msg(&mut v) { | ||
MessageValidationResult::Unexpected => { | ||
match validator.validate(&mut v) { | ||
Err(ValidationError::Unexpected) => { | ||
error!("pubsub: invalid message received"); | ||
RESPONSE_EX.increment(); | ||
PUBSUB_RECEIVE_INVALID.increment(); | ||
continue; | ||
} | ||
MessageValidationResult::Corrupted => { | ||
Err(ValidationError::Corrupted) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious why one of these cases increments RESPONSE_EX and the other PUBSUB_RECEIVE, but both have a message suggesting they are pubsub messages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be up for debate. Initially my thought was that invalid messages (perhaps produced by some non-rpc-perf process) should be ignored with just an invalid counter incremented. |
||
error!("pubsub: corrupt message received"); | ||
PUBSUB_RECEIVE.increment(); | ||
PUBSUB_RECEIVE_CORRUPT.increment(); | ||
continue; | ||
} | ||
MessageValidationResult::Validated(latency) => { | ||
Ok(latency) => { | ||
let _ = PUBSUB_LATENCY.increment(latency); | ||
PUBSUB_RECEIVE.increment(); | ||
PUBSUB_RECEIVE_OK.increment(); | ||
|
@@ -184,7 +184,7 @@ async fn publisher_task( | |
}) | ||
.to_string(); | ||
|
||
let msg_stamp = MessageValidator::new(); | ||
let validator = MessageValidator::new(); | ||
|
||
while RUNNING.load(Ordering::Relaxed) { | ||
let work_item = work_receiver | ||
|
@@ -201,7 +201,7 @@ async fn publisher_task( | |
partition: _, | ||
key: _, | ||
} => { | ||
msg_stamp.stamp_msg(&mut message); | ||
validator.stamp(&mut message); | ||
PUBSUB_PUBLISH.increment(); | ||
|
||
match timeout( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth moving the counter stuff into the validation function and having a much simpler match case for both consumers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea. Moved into the validator.