diff --git a/Cargo.lock b/Cargo.lock index d453adb..4a047fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1437,7 +1437,7 @@ dependencies = [ [[package]] name = "polygon-data-relay" -version = "1.2.4" +version = "1.2.5" dependencies = [ "actix-service", "actix-web", diff --git a/Cargo.toml b/Cargo.toml index 29f714a..b48dc63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "polygon-data-relay" -version = "1.2.4" +version = "1.2.5" authors = ["Sebastian Rollen "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index 5d1e72e..d1f889c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,16 +8,15 @@ use sentry_anyhow::capture_anyhow; use std::sync::mpsc::channel; use std::thread; use tracing::{debug, info, subscriber::set_global_default}; -use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; use tracing_log::LogTracer; -use tracing_subscriber::{layer::SubscriberExt, Registry}; +use tracing_subscriber::EnvFilter; fn main() -> Result<()> { let _ = dotenv(); - let formatting_layer = BunyanFormattingLayer::new("polygon-data-relay".into(), std::io::stdout); - let subscriber = Registry::default() - .with(JsonStorageLayer) - .with(formatting_layer); + let subscriber = tracing_subscriber::fmt() + .json() + .with_env_filter(EnvFilter::from_default_env()) + .finish(); set_global_default(subscriber).expect("Failed to set subscriber"); LogTracer::init().expect("Failed to set logger"); let settings = Settings::new()?; @@ -28,25 +27,25 @@ fn main() -> Result<()> { ..Default::default() }, )); - info!("Starting polygon-data-relay"); + info!("Starting"); let mut data: Vec<&str> = vec![]; if settings.polygon.quotes { data.push("Q"); - debug!("Will subscribe to Quotes"); + debug!(subscription = "Quotes", "Subscribing"); } if settings.polygon.trades { data.push("T"); - debug!("Will subscribe to Trades"); + debug!(subscription = "Trades", "Subscribing"); } if settings.polygon.second_aggregates { data.push("A"); - debug!("Will subscribe to SecondAggregates"); + debug!(subscription = "SecondAggregates", "Subscribing"); } if settings.polygon.minute_aggregates { data.push("AM"); - debug!("Will subscribe to MinuteAggregates"); + debug!(subscription = "MinuteAggregates", "Subscribing"); } let (tx, rx) = channel(); diff --git a/src/relay.rs b/src/relay.rs index cea3433..0576b11 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -47,10 +47,7 @@ pub async fn run( let payload = serde_json::to_string(&polygon_message); match payload { Ok(payload) => { - debug!( - "Message received: {}. Assigning key: {}, sending to topic: {}", - &payload, &key, &topic - ); + debug!(?polygon_message, ?key, ?topic); let res = producer .send( FutureRecord::to(topic).key(key).payload(&payload), @@ -60,15 +57,13 @@ pub async fn run( if let Err((e, msg)) = res { let e = e.into(); sentry_anyhow::capture_anyhow(&e); - error!( - "Failed to send message to kafka. Message: {:?}\nError: {}", - msg, e - ) + error!(?msg, %e) } } Err(e) => { - sentry_anyhow::capture_anyhow(&e.into()); - error!("Failed to serialize payload: {:?}", &polygon_message) + let e = e.into(); + sentry_anyhow::capture_anyhow(&e); + error!(%e, ?polygon_message) } } } @@ -76,7 +71,7 @@ pub async fn run( polygon::errors::Error::Serde { .. } => { let e = e.into(); sentry_anyhow::capture_anyhow(&e); - error!("Failed to reveive message from the WebSocket: {}", e) + error!(%e) } _ => panic!("Failed to receive message from the WebSocket: {}", e), },