Skip to content

Commit

Permalink
Merge pull request #23 from ivan770/codec
Browse files Browse the repository at this point in the history
Bincode framed codec
  • Loading branch information
ivan770 authored Jul 26, 2020
2 parents 51c71fe + 56ef71b commit 18caaa4
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 43 deletions.
22 changes: 22 additions & 0 deletions spartan/src/node/replication/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,25 @@ pub enum ReplicaRequest<'a> {
RecvRange,
QueueNotFound(Cow<'a, str>),
}

#[derive(Serialize, Deserialize)]
pub enum Request<'a> {
Primary(PrimaryRequest<'a>),
Replica(ReplicaRequest<'a>),
}

impl<'a> Request<'a> {
pub fn get_primary(self) -> Option<PrimaryRequest<'a>> {
match self {
Request::Primary(r) => Some(r),
Request::Replica(_) => None,
}
}

pub fn get_replica(self) -> Option<ReplicaRequest<'a>> {
match self {
Request::Replica(r) => Some(r),
Request::Primary(_) => None,
}
}
}
8 changes: 4 additions & 4 deletions spartan/src/node/replication/primary/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use tokio::io::Error as IoError;

#[derive(Error, Debug)]
pub enum PrimaryError {
#[error("Unable to serialize stream message: {0}")]
SerializationError(Box<ErrorKind>),
#[error("TCP connection error: {0}")]
SocketError(IoError),
#[error("Socket codec error")]
CodecError(#[from] Box<ErrorKind>),
#[error("TCP connection error")]
SocketError(#[from] IoError),
#[error("TCP socket is empty")]
EmptySocket,
#[error("Protocol mismatch")]
Expand Down
35 changes: 17 additions & 18 deletions spartan/src/node/replication/primary/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,52 @@ use crate::{
config::replication::Primary,
node::replication::{
event::Event,
message::{PrimaryRequest, ReplicaRequest},
message::{PrimaryRequest, ReplicaRequest, Request},
},
utils::codec::BincodeCodec,
};
use bincode::{deserialize, serialize};
use futures_util::{stream::iter, SinkExt, StreamExt, TryStreamExt};
use maybe_owned::MaybeOwned;
use std::borrow::Cow;
use tokio::net::TcpStream;
use tokio_util::codec::{BytesCodec, Decoder, Framed};
use tokio_util::codec::{Decoder, Framed};

pub struct Stream(Framed<TcpStream, BytesCodec>);
pub struct Stream(Framed<TcpStream, BincodeCodec>);

pub struct StreamPool(Box<[Stream]>);

impl<'a> Stream {
fn serialize(message: &PrimaryRequest) -> PrimaryResult<Vec<u8>> {
serialize(&message).map_err(PrimaryError::SerializationError)
}

pub async fn exchange(
&mut self,
message: &PrimaryRequest<'_>,
message: PrimaryRequest<'_>,
) -> PrimaryResult<ReplicaRequest<'_>> {
self.0
.send(Self::serialize(message)?.into())
.send(Request::Primary(message))
.await
.map_err(PrimaryError::SocketError)?;
.map_err(PrimaryError::CodecError)?;

self.0.flush().await.map_err(PrimaryError::SocketError)?;
SinkExt::<Request>::flush(&mut self.0)
.await
.map_err(PrimaryError::CodecError)?;

let buf = match self.0.next().await {
Some(r) => r.map_err(PrimaryError::SocketError)?,
Some(r) => r.map_err(PrimaryError::CodecError)?,
None => return Err(PrimaryError::EmptySocket),
};

Ok(deserialize(&buf).map_err(PrimaryError::SerializationError)?)
buf.get_replica()
.ok_or_else(|| PrimaryError::ProtocolMismatch)
}

async fn ping(&mut self) -> PrimaryResult<()> {
match self.exchange(&PrimaryRequest::Ping).await? {
match self.exchange(PrimaryRequest::Ping).await? {
ReplicaRequest::Pong => Ok(()),
_ => Err(PrimaryError::ProtocolMismatch),
}
}

async fn ask(&'a mut self) -> PrimaryResult<RecvIndex<'a>> {
match self.exchange(&PrimaryRequest::AskIndex).await? {
match self.exchange(PrimaryRequest::AskIndex).await? {
ReplicaRequest::RecvIndex(recv) => Ok(RecvIndex::new(self, recv)),
_ => Err(PrimaryError::ProtocolMismatch),
}
Expand All @@ -64,7 +63,7 @@ impl<'a> Stream {
range: Box<[(MaybeOwned<'a, u64>, MaybeOwned<'a, Event>)]>,
) -> PrimaryResult<()> {
match self
.exchange(&PrimaryRequest::SendRange(Cow::Borrowed(queue), range))
.exchange(PrimaryRequest::SendRange(Cow::Borrowed(queue), range))
.await?
{
ReplicaRequest::RecvRange => Ok(()),
Expand All @@ -83,7 +82,7 @@ impl<'a> StreamPool {

for host in &*config.destination {
pool.push(Stream(
BytesCodec::new().framed(
BincodeCodec::default().framed(
TcpStream::connect(host)
.await
.map_err(PrimaryError::SocketError)?,
Expand Down
14 changes: 8 additions & 6 deletions spartan/src/node/replication/replica/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ use tokio::io::Error as IoError;

#[derive(Error, Debug)]
pub enum ReplicaError {
#[error("Manager persistence error: {0}")]
PersistenceError(PersistenceError),
#[error("Manager persistence error")]
PersistenceError(#[from] PersistenceError),
#[error("Unable to find replica node config")]
ReplicaConfigNotFound,
#[error("TCP socket error: {0}")]
SocketError(IoError),
#[error("TCP socket error")]
SocketError(#[from] IoError),
#[error("Empty TCP socket")]
EmptySocket,
#[error("Packet serialization error: {0}")]
SerializationError(Box<ErrorKind>),
#[error("Socket codec error")]
CodecError(#[from] Box<ErrorKind>),
#[error("Protocol mismatch")]
ProtocolMismatch,
}

pub type ReplicaResult<T> = Result<T, ReplicaError>;
27 changes: 12 additions & 15 deletions spartan/src/node/replication/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod error;
/// Replica node storage
pub mod storage;

use super::message::Request;
use crate::{
config::replication::Replica,
node::{
Expand All @@ -13,27 +14,27 @@ use crate::{
},
Manager,
},
utils::codec::BincodeCodec,
};
use actix_rt::time::delay_for;
use bincode::{deserialize, serialize};
use error::{ReplicaError, ReplicaResult};
use futures_util::{SinkExt, StreamExt};
use std::{future::Future, time::Duration};
use tokio::net::TcpStream;
use tokio_util::codec::{BytesCodec, Decoder, Framed};
use tokio_util::codec::{Decoder, Framed};

pub struct ReplicaSocket<'a> {
manager: &'a Manager<'a>,
config: &'a Replica,
socket: Framed<TcpStream, BytesCodec>,
socket: Framed<TcpStream, BincodeCodec>,
}

impl<'a> ReplicaSocket<'a> {
pub fn new(manager: &'a Manager<'a>, config: &'a Replica, socket: TcpStream) -> Self {
ReplicaSocket {
manager,
config,
socket: BytesCodec::new().framed(socket),
socket: BincodeCodec::default().framed(socket),
}
}

Expand Down Expand Up @@ -64,29 +65,25 @@ impl<'a> ReplicaSocket<'a> {
Fut: Future<Output = ReplicaRequest<'a>>,
{
let buf = match self.socket.next().await {
Some(r) => r.map_err(ReplicaError::SocketError)?,
Some(r) => r.map_err(ReplicaError::CodecError)?,
None => return Err(ReplicaError::EmptySocket),
};

let request = f(
deserialize(&buf).map_err(ReplicaError::SerializationError)?,
buf.get_primary()
.ok_or_else(|| ReplicaError::ProtocolMismatch)?,
self.manager,
)
.await;

self.socket
.send(
serialize(&request)
.map_err(ReplicaError::SerializationError)?
.into(),
)
.send(Request::Replica(request))
.await
.map_err(ReplicaError::SocketError)?;
.map_err(ReplicaError::CodecError)?;

self.socket
.flush()
SinkExt::<Request>::flush(&mut self.socket)
.await
.map_err(ReplicaError::SocketError)?;
.map_err(ReplicaError::CodecError)?;

Ok(())
}
Expand Down
35 changes: 35 additions & 0 deletions spartan/src/utils/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::node::replication::message::Request;
use actix_web::web::BytesMut;
use bincode::{deserialize, serialize, Error};
use serde::Serialize;
use tokio_util::codec::{Decoder, Encoder};

#[derive(Default)]
pub struct BincodeCodec;

impl<I> Encoder<I> for BincodeCodec
where
I: Serialize,
{
type Error = Error;

fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend_from_slice(&serialize(&item)?);
Ok(())
}
}

impl Decoder for BincodeCodec {
// GAT's required to have generic impl
type Item = Request<'static>;

type Error = Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if !src.is_empty() {
Ok(Some(deserialize(&src.split_to(src.len()))?))
} else {
Ok(None)
}
}
}
2 changes: 2 additions & 0 deletions spartan/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
#[cfg(test)]
pub mod testing;

pub mod codec;

0 comments on commit 18caaa4

Please sign in to comment.