Skip to content

Commit

Permalink
Improve logs
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniosarosi committed Jan 22, 2023
1 parent fd67690 commit c864b80
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 54 deletions.
1 change: 1 addition & 0 deletions rxh.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[[server]]

listen = ["127.0.0.1:8100", "192.168.0.106:8100"]
name = "rxh/main"

match = [
{ uri = "/api", forward = "127.0.0.1:8080" },
Expand Down
16 changes: 15 additions & 1 deletion src/config/deser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ enum Field {
Forward,
Serve,
Uri,
Name,
}

/// Custom errors that can happen while manually deserializing [`Server`].
Expand Down Expand Up @@ -277,6 +278,7 @@ impl<'de> Visitor<'de> for ServerVisitor {
let mut listen: Vec<SocketAddr> = vec![];
let mut patterns: Vec<Pattern> = vec![];
let mut simple_pattern: Option<Pattern> = None;
let mut name = None;
let mut uri = super::default::uri();

while let Some(key) = map.next_key()? {
Expand Down Expand Up @@ -344,6 +346,14 @@ impl<'de> Visitor<'de> for ServerVisitor {

uri = map.next_value()?;
}

Field::Name => {
if name.is_some() {
return Err(de::Error::duplicate_field("name"));
}

name = Some(map.next_value()?);
}
}
}

Expand All @@ -360,6 +370,10 @@ impl<'de> Visitor<'de> for ServerVisitor {
return Err(de::Error::missing_field("listen"));
}

Ok(Server { listen, patterns })
Ok(Server {
listen,
patterns,
name,
})
}
}
3 changes: 3 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub struct Server {
/// Patterns that this server should match against.
#[serde(rename = "match")]
pub patterns: Vec<Pattern>,

/// Optional server name to show in logs and forwarded requests.
pub name: Option<String>,
}

/// This is a single element of a `match` list in the configuration of a server.
Expand Down
59 changes: 46 additions & 13 deletions src/http/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,25 @@ pub(crate) struct ProxyRequest<T> {

/// Local socket currently handling this request.
server_addr: SocketAddr,

/// Optional ID to use in the "by" parameter of the "Forwarded" header
/// instead of the IP address.
proxy_id: Option<String>,
}

impl<T> ProxyRequest<T> {
/// Creates a new [`ProxyRequest`].
pub fn new(request: Request<T>, client_addr: SocketAddr, server_addr: SocketAddr) -> Self {
pub fn new(
request: Request<T>,
client_addr: SocketAddr,
server_addr: SocketAddr,
proxy_id: Option<String>,
) -> Self {
Self {
request,
client_addr,
server_addr,
proxy_id,
}
}

Expand Down Expand Up @@ -116,11 +126,10 @@ impl<T> ProxyRequest<T> {
self.server_addr.to_string()
};

let by = self.proxy_id.unwrap_or(self.server_addr.to_string());

// TODO: Proto
let mut forwarded = format!(
"for={};by={};host={}",
self.client_addr, self.server_addr, host
);
let mut forwarded = format!("for={};by={};host={}", self.client_addr, by, host);

if let Some(value) = self.request.headers().get(header::FORWARDED) {
if let Ok(previous_proxies) = value.to_str() {
Expand All @@ -141,6 +150,17 @@ impl<T> ProxyRequest<T> {
mod tests {
use super::*;

fn forwarded_header<T>(request: &Request<T>) -> String {
let forwarded = request
.headers()
.get(header::FORWARDED)
.unwrap()
.to_str()
.unwrap();

String::from(forwarded)
}

#[test]
fn forwarded_request() {
let client = "127.0.0.1:8000".parse().unwrap();
Expand All @@ -150,20 +170,33 @@ mod tests {
Request::builder().body(crate::http::body::empty()).unwrap(),
client,
proxy,
None,
);

let forwarded = request.into_forwarded();
let expected = format!("for={client};by={proxy};host={proxy}");

assert!(forwarded.headers().contains_key(header::FORWARDED));
assert_eq!(
forwarded
.headers()
.get(header::FORWARDED)
.unwrap()
.to_str()
.unwrap(),
expected.as_str()
assert_eq!(forwarded_header(&forwarded), expected.as_str());
}

#[test]
fn forwarded_request_with_proxy_id() {
let client = "127.0.0.1:8000".parse().unwrap();
let proxy = "127.0.0.1:9000".parse().unwrap();
let proxy_id = String::from("rxh/main");

let request = ProxyRequest::new(
Request::builder().body(crate::http::body::empty()).unwrap(),
client,
proxy,
Some(proxy_id.clone()),
);

let forwarded = request.into_forwarded();
let expected = format!("for={client};by={proxy_id};host={proxy}");

assert!(forwarded.headers().contains_key(header::FORWARDED));
assert_eq!(forwarded_header(&forwarded), expected.as_str());
}
}
4 changes: 2 additions & 2 deletions src/service/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::http::response::{BoxBodyResponse, LocalResponse};

/// Returns an HTTP response whose body is the content of a file. The file
/// must be located inside the root directory specified by the configuration
/// and must be readable, otherwise a 404 response is returned.
/// and must be readable, otherwise a 404 response is returned. This function
/// also assumes that `path` is relative, so it can't start with "/".
pub(super) async fn transfer(path: &str, root: &str) -> Result<BoxBodyResponse, hyper::Error> {
let path = std::path::Path::new(root).join(path);
println!("Send file {}", path.to_str().unwrap());

if !path
.canonicalize()
Expand Down
31 changes: 24 additions & 7 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{future::Future, net::SocketAddr, pin::Pin};
use hyper::{body::Incoming, service::Service, Request};

use crate::{
config::{self, Forward},
config::{self, Action, Forward},
http::{
request::ProxyRequest,
response::{BoxBodyResponse, LocalResponse},
Expand Down Expand Up @@ -65,24 +65,41 @@ impl Service<Request<Incoming>> for Rxh {
} = *self;

Box::pin(async move {
let uri = request.uri().to_string();
let method = request.method().to_string();

let maybe_pattern = config
.patterns
.iter()
.find(|pattern| request.uri().to_string().starts_with(pattern.uri.as_str()));
.find(|pattern| uri.starts_with(pattern.uri.as_str()));

let Some(pattern) = maybe_pattern else {
return Ok(LocalResponse::not_found());
};

match &pattern.action {
config::Action::Forward(Forward { scheduler, .. }) => {
let request = ProxyRequest::new(request, client_addr, server_addr);
let response = match &pattern.action {
Action::Forward(Forward { scheduler, .. }) => {
let by = config.name.as_ref().map(|name| name.clone());
let request = ProxyRequest::new(request, client_addr, server_addr, by);
proxy::forward(request, scheduler.next_server()).await
}
config::Action::Serve(directory) => {
files::transfer(&request.uri().path()[1..], directory).await

Action::Serve(directory) => {
let path = if request.uri().path().starts_with("/") {
&request.uri().path()[1..]
} else {
request.uri().path()
};
files::transfer(path, directory).await
}
};

if let Ok(response) = &response {
let status = response.status();
println!("{client_addr} -> {server_addr} {method} {uri} HTTP {status}");
}

response
})
}
}
29 changes: 1 addition & 28 deletions src/task/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use tokio::sync::{broadcast, watch};

use crate::{config::Config, Server, ShutdownState, State};
use crate::{config::Config, Server, State};

/// The master task is responsible for creating, spawning and shutting down all
/// the [`Server`] instances described in the configuration file. Both spawning
Expand Down Expand Up @@ -171,10 +171,6 @@ impl Master {
set.spawn(server.run());
}

for (addr, state) in self.states {
tokio::task::spawn(log_state_updates(addr, state));
}

let mut first_error = None;

tokio::select! {
Expand Down Expand Up @@ -206,26 +202,3 @@ impl Master {
self.states.iter().map(|(addr, _)| *addr).collect()
}
}

async fn log_state_updates(addr: SocketAddr, mut state: watch::Receiver<State>) {
loop {
if let Err(_) = state.changed().await {
println!("Could not receive state update from server at {addr}");
break;
}

match *state.borrow() {
State::Starting => println!("Server at {addr} is starting"),
State::Listening => println!("Server at {addr} is listening"),
State::ShuttingDown(shutdown) => match shutdown {
ShutdownState::Done => {
println!("Server at {addr} is down");
break;
}
ShutdownState::PendingConnections(n) => {
println!("Server at {addr} has {n} pending connections")
}
},
}
}
}
14 changes: 11 additions & 3 deletions src/task/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,14 @@ impl Server {
..
} = self;

let name = if let Some(ref id) = config.name {
format!("{address} ({id})")
} else {
address.to_string()
};

state.send_replace(State::Listening);
println!("{name} => Listening for requests");

// Leak the configuration to get a 'static lifetime, which we need to
// spawn tokio tasks. Later when all tasks have finished, we'll drop this
Expand All @@ -188,15 +195,16 @@ impl Server {
tokio::select! {
result = Self::listen(listener, config, &notifier) => {
if let Err(err) = result {
println!("Error while accepting connections: {err}");
println!("{name} => Error while accepting connections: {err}");
}
}
_ = shutdown => {
println!("Server at {address} received shutdown signal");
println!("{name} => Received shutdown signal");
}
}

if let Ok(num_tasks) = notifier.send(Notification::Shutdown) {
println!("{name} => Can't shutdown yet, {num_tasks} pending connections");
state.send_replace(State::ShuttingDown(ShutdownState::PendingConnections(
num_tasks,
)));
Expand All @@ -211,6 +219,7 @@ impl Server {
}

state.send_replace(State::ShuttingDown(ShutdownState::Done));
println!("{name} => Shutdown complete");

Ok(())
}
Expand All @@ -225,7 +234,6 @@ impl Server {
let (stream, client_addr) = listener.accept().await?;
let server_addr = stream.local_addr()?;
let mut subscription = notifier.subscribe();
println!("Connection from {client_addr}");

tokio::task::spawn(async move {
if let Err(err) = hyper::server::conn::http1::Builder::new()
Expand Down
2 changes: 2 additions & 0 deletions tests/util/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod proxy {
};

Server {
name: None,
listen: vec!["127.0.0.1:0".parse().unwrap()],
patterns: vec![Pattern {
uri: String::from(uri),
Expand All @@ -63,6 +64,7 @@ pub mod files {
/// Serves files from `root` if the request URI matchees `uri`.
pub fn serve_at_uri(root: &str, uri: &str) -> Server {
Server {
name: None,
listen: vec!["127.0.0.1:0".parse().unwrap()],
patterns: vec![Pattern {
uri: String::from(uri),
Expand Down

0 comments on commit c864b80

Please sign in to comment.