diff --git a/spartan/src/node/replication/message.rs b/spartan/src/node/replication/message.rs index eb12cea7..2f33a5b5 100644 --- a/spartan/src/node/replication/message.rs +++ b/spartan/src/node/replication/message.rs @@ -20,3 +20,25 @@ pub enum ReplicaRequest<'a> { RecvRange, QueueNotFound(Cow<'a, str>), } + +#[derive(Serialize, Deserialize)] +pub enum Request<'a> { + Primary(PrimaryRequest<'a>), + Replica(ReplicaRequest<'a>), +} + +impl<'a> Request<'a> { + pub fn get_primary(self) -> Option> { + match self { + Request::Primary(r) => Some(r), + Request::Replica(_) => None, + } + } + + pub fn get_replica(self) -> Option> { + match self { + Request::Replica(r) => Some(r), + Request::Primary(_) => None, + } + } +} diff --git a/spartan/src/node/replication/primary/error.rs b/spartan/src/node/replication/primary/error.rs index 29e5a48f..7b6bfc86 100644 --- a/spartan/src/node/replication/primary/error.rs +++ b/spartan/src/node/replication/primary/error.rs @@ -4,10 +4,10 @@ use tokio::io::Error as IoError; #[derive(Error, Debug)] pub enum PrimaryError { - #[error("Unable to serialize stream message: {0}")] - SerializationError(Box), - #[error("TCP connection error: {0}")] - SocketError(IoError), + #[error("Socket codec error")] + CodecError(#[from] Box), + #[error("TCP connection error")] + SocketError(#[from] IoError), #[error("TCP socket is empty")] EmptySocket, #[error("Protocol mismatch")] diff --git a/spartan/src/node/replication/primary/stream.rs b/spartan/src/node/replication/primary/stream.rs index b691388b..29a241e6 100644 --- a/spartan/src/node/replication/primary/stream.rs +++ b/spartan/src/node/replication/primary/stream.rs @@ -6,53 +6,52 @@ use crate::{ config::replication::Primary, node::replication::{ event::Event, - message::{PrimaryRequest, ReplicaRequest}, + message::{PrimaryRequest, ReplicaRequest, Request}, }, + utils::codec::BincodeCodec, }; -use bincode::{deserialize, serialize}; use futures_util::{stream::iter, SinkExt, StreamExt, TryStreamExt}; use maybe_owned::MaybeOwned; use std::borrow::Cow; use tokio::net::TcpStream; -use tokio_util::codec::{BytesCodec, Decoder, Framed}; +use tokio_util::codec::{Decoder, Framed}; -pub struct Stream(Framed); +pub struct Stream(Framed); pub struct StreamPool(Box<[Stream]>); impl<'a> Stream { - fn serialize(message: &PrimaryRequest) -> PrimaryResult> { - serialize(&message).map_err(PrimaryError::SerializationError) - } - pub async fn exchange( &mut self, - message: &PrimaryRequest<'_>, + message: PrimaryRequest<'_>, ) -> PrimaryResult> { self.0 - .send(Self::serialize(message)?.into()) + .send(Request::Primary(message)) .await - .map_err(PrimaryError::SocketError)?; + .map_err(PrimaryError::CodecError)?; - self.0.flush().await.map_err(PrimaryError::SocketError)?; + SinkExt::::flush(&mut self.0) + .await + .map_err(PrimaryError::CodecError)?; let buf = match self.0.next().await { - Some(r) => r.map_err(PrimaryError::SocketError)?, + Some(r) => r.map_err(PrimaryError::CodecError)?, None => return Err(PrimaryError::EmptySocket), }; - Ok(deserialize(&buf).map_err(PrimaryError::SerializationError)?) + buf.get_replica() + .ok_or_else(|| PrimaryError::ProtocolMismatch) } async fn ping(&mut self) -> PrimaryResult<()> { - match self.exchange(&PrimaryRequest::Ping).await? { + match self.exchange(PrimaryRequest::Ping).await? { ReplicaRequest::Pong => Ok(()), _ => Err(PrimaryError::ProtocolMismatch), } } async fn ask(&'a mut self) -> PrimaryResult> { - match self.exchange(&PrimaryRequest::AskIndex).await? { + match self.exchange(PrimaryRequest::AskIndex).await? { ReplicaRequest::RecvIndex(recv) => Ok(RecvIndex::new(self, recv)), _ => Err(PrimaryError::ProtocolMismatch), } @@ -64,7 +63,7 @@ impl<'a> Stream { range: Box<[(MaybeOwned<'a, u64>, MaybeOwned<'a, Event>)]>, ) -> PrimaryResult<()> { match self - .exchange(&PrimaryRequest::SendRange(Cow::Borrowed(queue), range)) + .exchange(PrimaryRequest::SendRange(Cow::Borrowed(queue), range)) .await? { ReplicaRequest::RecvRange => Ok(()), @@ -83,7 +82,7 @@ impl<'a> StreamPool { for host in &*config.destination { pool.push(Stream( - BytesCodec::new().framed( + BincodeCodec::default().framed( TcpStream::connect(host) .await .map_err(PrimaryError::SocketError)?, diff --git a/spartan/src/node/replication/replica/error.rs b/spartan/src/node/replication/replica/error.rs index 573dac0c..965abfb3 100644 --- a/spartan/src/node/replication/replica/error.rs +++ b/spartan/src/node/replication/replica/error.rs @@ -5,16 +5,18 @@ use tokio::io::Error as IoError; #[derive(Error, Debug)] pub enum ReplicaError { - #[error("Manager persistence error: {0}")] - PersistenceError(PersistenceError), + #[error("Manager persistence error")] + PersistenceError(#[from] PersistenceError), #[error("Unable to find replica node config")] ReplicaConfigNotFound, - #[error("TCP socket error: {0}")] - SocketError(IoError), + #[error("TCP socket error")] + SocketError(#[from] IoError), #[error("Empty TCP socket")] EmptySocket, - #[error("Packet serialization error: {0}")] - SerializationError(Box), + #[error("Socket codec error")] + CodecError(#[from] Box), + #[error("Protocol mismatch")] + ProtocolMismatch, } pub type ReplicaResult = Result; diff --git a/spartan/src/node/replication/replica/mod.rs b/spartan/src/node/replication/replica/mod.rs index 8e43ca1e..db58fb57 100644 --- a/spartan/src/node/replication/replica/mod.rs +++ b/spartan/src/node/replication/replica/mod.rs @@ -4,6 +4,7 @@ pub mod error; /// Replica node storage pub mod storage; +use super::message::Request; use crate::{ config::replication::Replica, node::{ @@ -13,19 +14,19 @@ use crate::{ }, Manager, }, + utils::codec::BincodeCodec, }; use actix_rt::time::delay_for; -use bincode::{deserialize, serialize}; use error::{ReplicaError, ReplicaResult}; use futures_util::{SinkExt, StreamExt}; use std::{future::Future, time::Duration}; use tokio::net::TcpStream; -use tokio_util::codec::{BytesCodec, Decoder, Framed}; +use tokio_util::codec::{Decoder, Framed}; pub struct ReplicaSocket<'a> { manager: &'a Manager<'a>, config: &'a Replica, - socket: Framed, + socket: Framed, } impl<'a> ReplicaSocket<'a> { @@ -33,7 +34,7 @@ impl<'a> ReplicaSocket<'a> { ReplicaSocket { manager, config, - socket: BytesCodec::new().framed(socket), + socket: BincodeCodec::default().framed(socket), } } @@ -64,29 +65,25 @@ impl<'a> ReplicaSocket<'a> { Fut: Future>, { let buf = match self.socket.next().await { - Some(r) => r.map_err(ReplicaError::SocketError)?, + Some(r) => r.map_err(ReplicaError::CodecError)?, None => return Err(ReplicaError::EmptySocket), }; let request = f( - deserialize(&buf).map_err(ReplicaError::SerializationError)?, + buf.get_primary() + .ok_or_else(|| ReplicaError::ProtocolMismatch)?, self.manager, ) .await; self.socket - .send( - serialize(&request) - .map_err(ReplicaError::SerializationError)? - .into(), - ) + .send(Request::Replica(request)) .await - .map_err(ReplicaError::SocketError)?; + .map_err(ReplicaError::CodecError)?; - self.socket - .flush() + SinkExt::::flush(&mut self.socket) .await - .map_err(ReplicaError::SocketError)?; + .map_err(ReplicaError::CodecError)?; Ok(()) } diff --git a/spartan/src/utils/codec.rs b/spartan/src/utils/codec.rs new file mode 100644 index 00000000..a51669ba --- /dev/null +++ b/spartan/src/utils/codec.rs @@ -0,0 +1,35 @@ +use crate::node::replication::message::Request; +use actix_web::web::BytesMut; +use bincode::{deserialize, serialize, Error}; +use serde::Serialize; +use tokio_util::codec::{Decoder, Encoder}; + +#[derive(Default)] +pub struct BincodeCodec; + +impl Encoder for BincodeCodec +where + I: Serialize, +{ + type Error = Error; + + fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.extend_from_slice(&serialize(&item)?); + Ok(()) + } +} + +impl Decoder for BincodeCodec { + // GAT's required to have generic impl + type Item = Request<'static>; + + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if !src.is_empty() { + Ok(Some(deserialize(&src.split_to(src.len()))?)) + } else { + Ok(None) + } + } +} diff --git a/spartan/src/utils/mod.rs b/spartan/src/utils/mod.rs index d4e2083f..248541b7 100644 --- a/spartan/src/utils/mod.rs +++ b/spartan/src/utils/mod.rs @@ -1,2 +1,4 @@ #[cfg(test)] pub mod testing; + +pub mod codec;