Skip to content

Commit

Permalink
Extract a run_server function into cja
Browse files Browse the repository at this point in the history
  • Loading branch information
coreyja committed Mar 19, 2024
1 parent db0d69e commit c8f40b1
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 59 deletions.
55 changes: 28 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion cja/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ chrono.workspace = true
miette.workspace = true
base64.workspace = true

axum-core = "0.4.2"
async-trait = "0.1.77"
thiserror = "1.0.56"
tower-cookies = { version = "0.10.0", features = ["signed", "private"] }
http = "1.0.0"
tower-http = { version = "0.5.2", features = ["trace"] }
axum = "0.7.4"
tower-service = "0.3.2"
tower = "0.4.13"

[lints]
workspace = true
34 changes: 34 additions & 0 deletions cja/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,36 @@
use axum::{extract::Request, response::Response, serve::IncomingStream};
use miette::{Context, IntoDiagnostic};
use std::{convert::Infallible, net::SocketAddr};
use tokio::net::TcpListener;
use tower_cookies::CookieManagerLayer;
use tower_service::Service;

pub mod cookies;
pub mod session;

pub mod trace;

pub async fn run_server<AS: Clone + Sync + Send, S>(routes: axum::Router<AS>) -> miette::Result<()>
where
axum::Router<AS>:
for<'a> Service<IncomingStream<'a>, Error = Infallible, Response = S> + Send + 'static,
for<'a> <axum::Router<AS> as Service<IncomingStream<'a>>>::Future: Send,
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
S::Future: Send,
{
let tracer = trace::Tracer;
let trace_layer = tower_http::trace::TraceLayer::new_for_http()
.make_span_with(tracer)
.on_response(tracer);

let app = routes.layer(trace_layer).layer(CookieManagerLayer::new());

let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
let listener = TcpListener::bind(&addr).await.unwrap();
tracing::debug!("listening on {}", addr);

axum::serve(listener, app)
.await
.into_diagnostic()
.wrap_err("Failed to run server")
}
2 changes: 1 addition & 1 deletion cja/src/server/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use axum_core::{body::Body, extract::FromRequestParts, response::IntoResponse};
use axum::{body::Body, extract::FromRequestParts, response::IntoResponse};
use http::{header, Response};
use miette::IntoDiagnostic as _;
use serde::{Deserialize, Serialize};
Expand Down
73 changes: 73 additions & 0 deletions cja/src/server/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use tower_http::trace::{MakeSpan, OnResponse};
use tracing::Level;

#[derive(Debug, Clone, Copy)]
pub(crate) struct Tracer;

impl<Body> MakeSpan<Body> for Tracer {
fn make_span(&mut self, request: &http::Request<Body>) -> tracing::Span {
let route = http_route(request);
let span_name = format!("{} {}", request.method(), route);

tracing::span!(
Level::INFO,
"server.request",
otel.name = span_name,
kind = "server",
uri = %request.uri(),
url.path = %request.uri().path(),
url.query = request.uri().query(),
url.scheme = request.uri().scheme_str(),
server.address = request.uri().host(),
server.port = request.uri().port_u16(),
http_version = ?request.version(),
user_agent.original = request.headers().get("user-agent").and_then(|h| h.to_str().ok()),
http.route = route,
http.request.method = %request.method(),
http.request.header.host = request.headers().get("host").and_then(|h| h.to_str().ok()),
http.request.header.forwarded_for = request.headers().get("x-forwarded-for").and_then(|h| h.to_str().ok()),
http.request.header.forwarded_proto = request.headers().get("x-forwarded-proto").and_then(|h| h.to_str().ok()),
http.request.header.host = request.headers().get("x-forwarded-ssl").and_then(|h| h.to_str().ok()),
http.request.header.referer = request.headers().get("referer").and_then(|h| h.to_str().ok()),
http.request.header.fly_forwarded_port = request.headers().get("fly-forwarded-port").and_then(|h| h.to_str().ok()),
http.request.header.fly_region = request.headers().get("fly-region").and_then(|h| h.to_str().ok()),
http.request.header.via = request.headers().get("via").and_then(|h| h.to_str().ok()),

http.response.status_code = tracing::field::Empty,
http.response.header.content_type = tracing::field::Empty,
)
}
}

impl<Body> OnResponse<Body> for Tracer {
fn on_response(
self,
response: &http::Response<Body>,
latency: std::time::Duration,
span: &tracing::Span,
) {
let status_code = response.status().as_u16();
tracing::event!(
Level::INFO,
status = status_code,
latency = format_args!("{} ms", latency.as_millis()),
"finished processing request"
);

span.record("http.response.status_code", status_code);
span.record(
"http.response.header.content_type",
response
.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
);
}
}

#[inline]
fn http_route<B>(req: &http::Request<B>) -> &str {
req.extensions()
.get::<axum::extract::MatchedPath>()
.map_or_else(|| "", |mp| mp.as_str())
}
14 changes: 11 additions & 3 deletions server/src/http_server/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
use cja::jobs::worker::job_worker;
use cja::{jobs::worker::job_worker, server::run_server};
use miette::{IntoDiagnostic, Result};
use tokio::task::JoinError;
use tracing::info;

use crate::{cron::run_cron, http_server::run_axum, jobs::Jobs, AppState};
use crate::{cron::run_cron, http_server::routes, jobs::Jobs, AppState};

pub(crate) async fn serve() -> Result<()> {
let app_state = AppState::from_env().await?;

let job_registry = Jobs;

let syntax_css = syntect::html::css_for_theme_with_class_style(
&app_state.markdown_to_html_context.theme,
syntect::html::ClassStyle::Spaced,
)
.into_diagnostic()?;

info!("Spawning Tasks");
let futures = vec![
tokio::spawn(run_axum(app_state.clone())),
tokio::spawn(run_server(
routes::make_router(syntax_css).with_state(app_state.clone()),
)),
tokio::spawn(job_worker(app_state.clone(), job_registry)),
tokio::spawn(run_cron(app_state.clone())),
];
Expand Down
27 changes: 0 additions & 27 deletions server/src/http_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,6 @@ const STATIC_ASSETS: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/static");

type ResponseResult<T = Response> = Result<T, MietteError>;

pub(crate) async fn run_axum(config: AppState) -> miette::Result<()> {
let syntax_css = syntect::html::css_for_theme_with_class_style(
&config.markdown_to_html_context.theme,
syntect::html::ClassStyle::Spaced,
)
.into_diagnostic()?;

let tracer = server_tracing::Tracer;
let trace_layer = TraceLayer::new_for_http()
.make_span_with(tracer)
.on_response(tracer);

let app = routes::make_router(syntax_css)
.with_state(config)
.layer(trace_layer)
.layer(CookieManagerLayer::new());

let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
let listener = TcpListener::bind(&addr).await.unwrap();
tracing::debug!("listening on {}", addr);

axum::serve(listener, app)
.await
.into_diagnostic()
.wrap_err("Failed to run server")
}

pub(crate) trait LinkTo {
fn relative_link(&self) -> String;

Expand Down

0 comments on commit c8f40b1

Please sign in to comment.