Skip to content

Commit

Permalink
Merge pull request #29 from Overmuse/SR/remove_webserver
Browse files Browse the repository at this point in the history
remove webserver
  • Loading branch information
SebRollen authored Jun 18, 2021
2 parents 36cd8fd + 85d64f6 commit 47cd889
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 733 deletions.
578 changes: 5 additions & 573 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
[package]
name = "polygon-data-relay"
version = "1.2.6"
version = "1.3.0"
authors = ["Sebastian Rollen <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix-web = "4.0.0-beta.4"
actix-service = "= 2.0.0-beta.5"
anyhow = "1.0"
config = "0.11.0"
dotenv = "0.15"
Expand All @@ -22,6 +20,4 @@ serde = "1.0"
serde_json = "1.0"
tokio = {version = "1.2", features = ["macros", "rt-multi-thread"]}
tracing = "0.1"
tracing-log = "0.1"
tracing-subscriber = "0.2"
tracing-bunyan-formatter = { version = "0.2.4", default-features = false }
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod relay;
pub mod server;
pub mod settings;
56 changes: 17 additions & 39 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@ use anyhow::Result;
use dotenv::dotenv;
use polygon::ws::Connection;
use polygon_data_relay::relay::run;
use polygon_data_relay::server::launch_server;
use polygon_data_relay::settings::Settings;
use sentry_anyhow::capture_anyhow;
use std::sync::mpsc::channel;
use std::thread;
use tracing::{debug, info, subscriber::set_global_default};
use tracing_log::LogTracer;
use tracing_subscriber::EnvFilter;

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let _ = dotenv();
let subscriber = tracing_subscriber::fmt()
.json()
.with_env_filter(EnvFilter::from_default_env())
.finish();
set_global_default(subscriber).expect("Failed to set subscriber");
LogTracer::init().expect("Failed to set logger");
let settings = Settings::new()?;
let _guard = sentry::init((
settings.sentry.address,
Expand Down Expand Up @@ -48,42 +44,24 @@ fn main() -> Result<()> {
debug!(subscription = "MinuteAggregates", "Subscribing");
}

let (tx, rx) = channel();

let kafka_settings = settings.kafka;
let server_settings = settings.server;
let polygon_settings = settings.polygon;

thread::spawn(move || {
let half_owned: Vec<_> = polygon_settings
.tickers
.iter()
.map(|x| x.as_ref())
.collect();
let connection = Connection::new(
&polygon_settings.base_url,
&polygon_settings.key_id,
&data,
&half_owned,
);
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
tokio_runtime.block_on(async {
let res = run(&kafka_settings, connection, rx).await;
if let Err(e) = res {
capture_anyhow(&e);
}
});
});
let sys = actix_web::rt::System::new();
sys.block_on(async move {
let res = launch_server(&server_settings, tx)
.unwrap()
.await
.map_err(From::from);
if let Err(e) = res {
capture_anyhow(&e);
}
});
let half_owned: Vec<_> = polygon_settings
.tickers
.iter()
.map(|x| x.as_ref())
.collect();
let connection = Connection::new(
&polygon_settings.base_url,
&polygon_settings.key_id,
&data,
&half_owned,
);
let res = run(&kafka_settings, connection).await;
if let Err(e) = res {
capture_anyhow(&e);
}

Ok(())
}
31 changes: 6 additions & 25 deletions src/relay.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,15 @@
use anyhow::{anyhow, Context, Result};
use futures::{SinkExt, StreamExt};
use anyhow::{Context, Result};
use futures::StreamExt;
use kafka_settings::{producer, KafkaSettings};
use polygon::ws::{
Aggregate, Connection, PolygonAction, PolygonMessage, PolygonStatus, Quote, Trade,
};
use polygon::ws::{Aggregate, Connection, PolygonMessage, PolygonStatus, Quote, Trade};
use rdkafka::producer::FutureRecord;
use std::sync::mpsc::Receiver;
use std::time::Duration;
use tracing::{debug, error, info};
use tracing::{debug, error};

pub async fn run(
settings: &KafkaSettings,
connection: Connection<'_>,
rx: Receiver<PolygonAction>,
) -> Result<()> {
pub async fn run(settings: &KafkaSettings, connection: Connection<'_>) -> Result<()> {
let producer = producer(settings)?;
let ws = connection.connect().await.context("Failed to connect")?;
let (mut sink, stream) = ws.split::<String>();
tokio::spawn(async move {
loop {
let msg = rx.recv().expect("Failed to receive message");
let msg_str = serde_json::to_string(&msg).expect("Failed to serialize command");
info!(%msg_str);
sink.send(msg_str)
.await
.map_err(|_| anyhow!("Failed to send message to Sink"))
.unwrap();
}
});

let (_, stream) = ws.split::<String>();
stream
.for_each_concurrent(
10_000, // Equal to 1/10 the max buffer size in rdkafka
Expand Down
26 changes: 0 additions & 26 deletions src/server/mod.rs

This file was deleted.

5 changes: 0 additions & 5 deletions src/server/routes/health_check.rs

This file was deleted.

7 changes: 0 additions & 7 deletions src/server/routes/mod.rs

This file was deleted.

25 changes: 0 additions & 25 deletions src/server/routes/subscribe.rs

This file was deleted.

27 changes: 0 additions & 27 deletions src/server/routes/unsubscribe.rs

This file was deleted.

0 comments on commit 47cd889

Please sign in to comment.