From 6fe90d9a7e65670b8a8397369e0cd3f73a03fb03 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 19 Oct 2023 08:23:46 -0700 Subject: [PATCH 1/5] minor refactor for pubsub Minor refactor for pubsub regarding the message validator and trying to make the code more idiomatic. --- src/pubsub/kafka.rs | 36 ++++++++++++++++++++++--------- src/pubsub/mod.rs | 49 ++++++++++++++++++++++++++++++------------- src/pubsub/momento.rs | 14 ++++++------- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/src/pubsub/kafka.rs b/src/pubsub/kafka.rs index 3bf07f33..2b20c596 100644 --- a/src/pubsub/kafka.rs +++ b/src/pubsub/kafka.rs @@ -150,30 +150,35 @@ pub fn launch_subscribers( async fn subscriber_task(client: Arc, topics: Vec) { PUBSUB_SUBSCRIBE.increment(); + let sub_topics: Vec<&str> = topics.iter().map(AsRef::as_ref).collect(); + if client.subscribe(&sub_topics).is_ok() { PUBSUB_SUBSCRIBER_CURR.add(1); PUBSUB_SUBSCRIBE_OK.increment(); - let msg_stamp = MessageValidator::new(); + + let validator = MessageValidator::new(); + while RUNNING.load(Ordering::Relaxed) { match client.recv().await { - Ok(m) => match m.payload_view::<[u8]>() { - Some(Ok(m)) => { - let mut v = m.to_owned(); - match msg_stamp.validate_msg(&mut v) { - MessageValidationResult::Unexpected => { + Ok(message) => match message.payload_view::<[u8]>() { + Some(Ok(message)) => { + let mut message = message.to_owned(); + + match validator.validate(&mut message) { + Err(ValidationError::Unexpected) => { error!("pubsub: invalid message received"); RESPONSE_EX.increment(); PUBSUB_RECEIVE_INVALID.increment(); continue; } - MessageValidationResult::Corrupted => { + Err(ValidationError::Corrupted) => { error!("pubsub: corrupt message received"); PUBSUB_RECEIVE.increment(); PUBSUB_RECEIVE_CORRUPT.increment(); continue; } - MessageValidationResult::Validated(latency) => { + Ok(latency) => { let _ = PUBSUB_LATENCY.increment(latency); PUBSUB_RECEIVE.increment(); PUBSUB_RECEIVE_OK.increment(); @@ -211,7 +216,9 @@ pub fn launch_publishers(runtime: &mut Runtime, config: Config, work_receiver: R let _guard = runtime.enter(); Arc::new(get_kafka_producer(&config)) }; + PUBSUB_PUBLISHER_CONNECT.increment(); + for _ in 0..config.pubsub().unwrap().publisher_concurrency() { runtime.spawn(publisher_task(client.clone(), work_receiver.clone())); } @@ -223,14 +230,19 @@ async fn publisher_task( work_receiver: Receiver, ) -> Result<()> { PUBSUB_PUBLISHER_CURR.add(1); - let msg_stamp = MessageValidator::new(); + + let validator = MessageValidator::new(); + while RUNNING.load(Ordering::Relaxed) { let work_item = work_receiver .recv() .await .map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?; + REQUEST.increment(); + let start = Instant::now(); + let result = match work_item { WorkItem::Publish { topic, @@ -238,7 +250,7 @@ async fn publisher_task( key, mut message, } => { - let timestamp = msg_stamp.stamp_msg(&mut message); + let timestamp = validator.stamp(&mut message); PUBSUB_PUBLISH.increment(); client .send( @@ -255,7 +267,9 @@ async fn publisher_task( .await } }; + let stop = Instant::now(); + match result { Ok(_) => { let latency = stop.duration_since(start).as_nanos(); @@ -268,6 +282,8 @@ async fn publisher_task( } } } + PUBSUB_PUBLISHER_CURR.sub(1); + Ok(()) } diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs index 301a2261..1bdf0c99 100644 --- a/src/pubsub/mod.rs +++ b/src/pubsub/mod.rs @@ -10,30 +10,38 @@ use tokio::runtime::Runtime; mod kafka; mod momento; +pub fn hasher() -> RandomState { + RandomState::with_seeds( + 0xd5b96f9126d61cee, + 0x50af85c9d1b6de70, + 0xbd7bdf2fee6d15b2, + 0x3dbe88bb183ac6f4, + ) +} + struct MessageValidator { hash_builder: RandomState, } -pub enum MessageValidationResult { - // u64 is the end-to-end latency in nanosecond) - Validated(u64), + +pub enum ValidationError { Unexpected, Corrupted, } + impl MessageValidator { - // Deterministic seeds are used so that multiple MessageStamp can stamp and validate messages + /// Deterministic seeds are used so that multiple validators can stamp and + /// validate messages produced by other instances. pub fn new() -> Self { MessageValidator { - hash_builder: RandomState::with_seeds( - 0xd5b96f9126d61cee, - 0x50af85c9d1b6de70, - 0xbd7bdf2fee6d15b2, - 0x3dbe88bb183ac6f4, - ), + hash_builder: hasher(), } } - pub fn stamp_msg(&self, message: &mut [u8]) -> u64 { + + /// Sets the checksum and timestamp in the message. Returns the timestamp. + pub fn stamp(&self, message: &mut [u8]) -> u64 { let timestamp = (UnixInstant::now() - UnixInstant::from_nanos(0)).as_nanos(); let ts = timestamp.to_be_bytes(); + // write the current unix time into the message [ message[16], @@ -59,23 +67,33 @@ impl MessageValidator { message[14], message[15], ] = self.hash_builder.hash_one(&message).to_be_bytes(); + timestamp } - pub fn validate_msg(&self, v: &mut Vec) -> MessageValidationResult { + + /// Validate the message checksum and returns a validation result. + pub fn validate(&self, v: &mut Vec) -> std::result::Result { let now_unix = UnixInstant::now(); + + // check if the magic bytes match if [v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]] != [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21] { - return MessageValidationResult::Unexpected; + return Err(ValidationError::Unexpected); } + + // validate the checksum let csum = [v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15]]; [v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15]] = [0; 8]; if csum != self.hash_builder.hash_one(&v).to_be_bytes() { - return MessageValidationResult::Corrupted; + return Err(ValidationError::Corrupted); } + + // calculate and return the end to end latency let ts = u64::from_be_bytes([v[16], v[17], v[18], v[19], v[20], v[21], v[22], v[23]]); let latency = now_unix - UnixInstant::from_nanos(ts); - MessageValidationResult::Validated(latency.as_nanos()) + + Ok(latency.as_nanos()) } } @@ -89,6 +107,7 @@ impl PubsubRuntimes { if let Some(rt) = self.publisher_rt.take() { rt.shutdown_timeout(duration); } + if let Some(rt) = self.subscriber_rt.take() { rt.shutdown_timeout(duration); } diff --git a/src/pubsub/momento.rs b/src/pubsub/momento.rs index d15e04a4..a4bf6965 100644 --- a/src/pubsub/momento.rs +++ b/src/pubsub/momento.rs @@ -76,26 +76,26 @@ async fn subscriber_task(client: Arc, cache_name: String, topic: St PUBSUB_SUBSCRIBER_CURR.add(1); PUBSUB_SUBSCRIBE_OK.increment(); - let msg_stamp = MessageValidator::new(); + let validator = MessageValidator::new(); while RUNNING.load(Ordering::Relaxed) { match subscription.next().await { Some(SubscriptionItem::Value(v)) => { if let ValueKind::Binary(mut v) = v.kind { - match msg_stamp.validate_msg(&mut v) { - MessageValidationResult::Unexpected => { + match validator.validate(&mut v) { + Err(ValidationError::Unexpected) => { error!("pubsub: invalid message received"); RESPONSE_EX.increment(); PUBSUB_RECEIVE_INVALID.increment(); continue; } - MessageValidationResult::Corrupted => { + Err(ValidationError::Corrupted) => { error!("pubsub: corrupt message received"); PUBSUB_RECEIVE.increment(); PUBSUB_RECEIVE_CORRUPT.increment(); continue; } - MessageValidationResult::Validated(latency) => { + Ok(latency) => { let _ = PUBSUB_LATENCY.increment(latency); PUBSUB_RECEIVE.increment(); PUBSUB_RECEIVE_OK.increment(); @@ -184,7 +184,7 @@ async fn publisher_task( }) .to_string(); - let msg_stamp = MessageValidator::new(); + let validator = MessageValidator::new(); while RUNNING.load(Ordering::Relaxed) { let work_item = work_receiver @@ -201,7 +201,7 @@ async fn publisher_task( partition: _, key: _, } => { - msg_stamp.stamp_msg(&mut message); + validator.stamp(&mut message); PUBSUB_PUBLISH.increment(); match timeout( From d0c2e4fdb71f5cfe6bf80762a4881ca7510c2390 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 19 Oct 2023 08:29:02 -0700 Subject: [PATCH 2/5] simplify that --- src/pubsub/kafka.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/pubsub/kafka.rs b/src/pubsub/kafka.rs index 2b20c596..4271a86b 100644 --- a/src/pubsub/kafka.rs +++ b/src/pubsub/kafka.rs @@ -163,9 +163,7 @@ async fn subscriber_task(client: Arc, topics: Vec) { match client.recv().await { Ok(message) => match message.payload_view::<[u8]>() { Some(Ok(message)) => { - let mut message = message.to_owned(); - - match validator.validate(&mut message) { + match validator.validate(&mut message.to_owned()) { Err(ValidationError::Unexpected) => { error!("pubsub: invalid message received"); RESPONSE_EX.increment(); From 7b33373eef73fa6aab213468d63cafbc7c06d1dc Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 19 Oct 2023 08:35:07 -0700 Subject: [PATCH 3/5] make the validator nicer --- src/pubsub/mod.rs | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs index 1bdf0c99..0572bf56 100644 --- a/src/pubsub/mod.rs +++ b/src/pubsub/mod.rs @@ -43,30 +43,13 @@ impl MessageValidator { let ts = timestamp.to_be_bytes(); // write the current unix time into the message - [ - message[16], - message[17], - message[18], - message[19], - message[20], - message[21], - message[22], - message[23], - ] = ts; + message[16..24].copy_from_slice(&ts[0..8]); // todo, write a sequence number into the message // checksum the message and put the checksum into the message - [ - message[8], - message[9], - message[10], - message[11], - message[12], - message[13], - message[14], - message[15], - ] = self.hash_builder.hash_one(&message).to_be_bytes(); + let checksum = self.hash_builder.hash_one(&message).to_be_bytes(); + message[8..16].copy_from_slice(&checksum); timestamp } @@ -76,15 +59,13 @@ impl MessageValidator { let now_unix = UnixInstant::now(); // check if the magic bytes match - if [v[0], v[1], v[2], v[3], v[4], v[5], v[6], v[7]] - != [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21] - { + if v[0..8] != [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21] { return Err(ValidationError::Unexpected); } // validate the checksum - let csum = [v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15]]; - [v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15]] = [0; 8]; + let csum = v[8..16].to_owned(); + v[8..16].copy_from_slice(&[0; 8]); if csum != self.hash_builder.hash_one(&v).to_be_bytes() { return Err(ValidationError::Corrupted); } From f0b42a25217d40e5af4ff9ef0307a030f2b0aee8 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 19 Oct 2023 08:41:51 -0700 Subject: [PATCH 4/5] rustfmt --- src/pubsub/kafka.rs | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/src/pubsub/kafka.rs b/src/pubsub/kafka.rs index 4271a86b..948a158b 100644 --- a/src/pubsub/kafka.rs +++ b/src/pubsub/kafka.rs @@ -162,27 +162,25 @@ async fn subscriber_task(client: Arc, topics: Vec) { while RUNNING.load(Ordering::Relaxed) { match client.recv().await { Ok(message) => match message.payload_view::<[u8]>() { - Some(Ok(message)) => { - match validator.validate(&mut message.to_owned()) { - Err(ValidationError::Unexpected) => { - error!("pubsub: invalid message received"); - RESPONSE_EX.increment(); - PUBSUB_RECEIVE_INVALID.increment(); - continue; - } - Err(ValidationError::Corrupted) => { - error!("pubsub: corrupt message received"); - PUBSUB_RECEIVE.increment(); - PUBSUB_RECEIVE_CORRUPT.increment(); - continue; - } - Ok(latency) => { - let _ = PUBSUB_LATENCY.increment(latency); - PUBSUB_RECEIVE.increment(); - PUBSUB_RECEIVE_OK.increment(); - } + Some(Ok(message)) => match validator.validate(&mut message.to_owned()) { + Err(ValidationError::Unexpected) => { + error!("pubsub: invalid message received"); + RESPONSE_EX.increment(); + PUBSUB_RECEIVE_INVALID.increment(); + continue; } - } + Err(ValidationError::Corrupted) => { + error!("pubsub: corrupt message received"); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_CORRUPT.increment(); + continue; + } + Ok(latency) => { + let _ = PUBSUB_LATENCY.increment(latency); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_OK.increment(); + } + }, Some(Err(e)) => { error!("Error in deserializing the message:{:?}", e); PUBSUB_RECEIVE.increment(); From 861dd0dd955ac1bd53b18b146544e760f65344ef Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 19 Oct 2023 09:14:13 -0700 Subject: [PATCH 5/5] move stat increments into message validator --- src/pubsub/kafka.rs | 22 +++------------------- src/pubsub/mod.rs | 10 ++++++++++ src/pubsub/momento.rs | 20 +------------------- 3 files changed, 14 insertions(+), 38 deletions(-) diff --git a/src/pubsub/kafka.rs b/src/pubsub/kafka.rs index 948a158b..62b54429 100644 --- a/src/pubsub/kafka.rs +++ b/src/pubsub/kafka.rs @@ -162,25 +162,9 @@ async fn subscriber_task(client: Arc, topics: Vec) { while RUNNING.load(Ordering::Relaxed) { match client.recv().await { Ok(message) => match message.payload_view::<[u8]>() { - Some(Ok(message)) => match validator.validate(&mut message.to_owned()) { - Err(ValidationError::Unexpected) => { - error!("pubsub: invalid message received"); - RESPONSE_EX.increment(); - PUBSUB_RECEIVE_INVALID.increment(); - continue; - } - Err(ValidationError::Corrupted) => { - error!("pubsub: corrupt message received"); - PUBSUB_RECEIVE.increment(); - PUBSUB_RECEIVE_CORRUPT.increment(); - continue; - } - Ok(latency) => { - let _ = PUBSUB_LATENCY.increment(latency); - PUBSUB_RECEIVE.increment(); - PUBSUB_RECEIVE_OK.increment(); - } - }, + Some(Ok(message)) => { + let _ = validator.validate(&mut message.to_owned()); + } Some(Err(e)) => { error!("Error in deserializing the message:{:?}", e); PUBSUB_RECEIVE.increment(); diff --git a/src/pubsub/mod.rs b/src/pubsub/mod.rs index 0572bf56..68aef414 100644 --- a/src/pubsub/mod.rs +++ b/src/pubsub/mod.rs @@ -60,6 +60,9 @@ impl MessageValidator { // check if the magic bytes match if v[0..8] != [0x54, 0x45, 0x53, 0x54, 0x49, 0x4E, 0x47, 0x21] { + error!("pubsub: unexpected/invalid message received"); + RESPONSE_EX.increment(); + PUBSUB_RECEIVE_INVALID.increment(); return Err(ValidationError::Unexpected); } @@ -67,6 +70,9 @@ impl MessageValidator { let csum = v[8..16].to_owned(); v[8..16].copy_from_slice(&[0; 8]); if csum != self.hash_builder.hash_one(&v).to_be_bytes() { + error!("pubsub: corrupt message received"); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_CORRUPT.increment(); return Err(ValidationError::Corrupted); } @@ -74,6 +80,10 @@ impl MessageValidator { let ts = u64::from_be_bytes([v[16], v[17], v[18], v[19], v[20], v[21], v[22], v[23]]); let latency = now_unix - UnixInstant::from_nanos(ts); + let _ = PUBSUB_LATENCY.increment(latency.as_nanos()); + PUBSUB_RECEIVE.increment(); + PUBSUB_RECEIVE_OK.increment(); + Ok(latency.as_nanos()) } } diff --git a/src/pubsub/momento.rs b/src/pubsub/momento.rs index a4bf6965..f696e70d 100644 --- a/src/pubsub/momento.rs +++ b/src/pubsub/momento.rs @@ -82,25 +82,7 @@ async fn subscriber_task(client: Arc, cache_name: String, topic: St match subscription.next().await { Some(SubscriptionItem::Value(v)) => { if let ValueKind::Binary(mut v) = v.kind { - match validator.validate(&mut v) { - Err(ValidationError::Unexpected) => { - error!("pubsub: invalid message received"); - RESPONSE_EX.increment(); - PUBSUB_RECEIVE_INVALID.increment(); - continue; - } - Err(ValidationError::Corrupted) => { - error!("pubsub: corrupt message received"); - PUBSUB_RECEIVE.increment(); - PUBSUB_RECEIVE_CORRUPT.increment(); - continue; - } - Ok(latency) => { - let _ = PUBSUB_LATENCY.increment(latency); - PUBSUB_RECEIVE.increment(); - PUBSUB_RECEIVE_OK.increment(); - } - } + let _ = validator.validate(&mut v); } else { error!("there was a string in the topic"); // unexpected message