Skip to content

Commit

Permalink
Merge pull request #27 from Overmuse/SR/disconnect_on_lost_connection
Browse files Browse the repository at this point in the history
panic on disconnection
  • Loading branch information
SebRollen authored Jun 10, 2021
2 parents d8b4cbf + ce3b5b3 commit 763f070
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polygon-data-relay"
version = "1.2.2"
version = "1.2.3"
authors = ["Sebastian Rollen <[email protected]>"]
edition = "2018"

Expand All @@ -14,7 +14,7 @@ config = "0.11.0"
dotenv = "0.15"
futures = "0.3"
kafka-settings = { git = "ssh://[email protected]/Overmuse/kafka-settings", tag = "v0.3.1" }
polygon = { git = "ssh://[email protected]/Overmuse/polygon", tag = "v0.6.3", default-features = false, features = ["ws"] }
polygon = { git = "ssh://[email protected]/Overmuse/polygon", tag = "v0.10.2", default-features = false, features = ["ws"] }
rdkafka = { version = "0.26", features = ["ssl-vendored"] }
sentry = { version = "0.21.0", features = ["anyhow"] }
sentry-anyhow = "0.21.0"
Expand Down
21 changes: 15 additions & 6 deletions src/relay.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::{anyhow, Context, Result};
use futures::{SinkExt, StreamExt};
use kafka_settings::{producer, KafkaSettings};
use polygon::ws::{Aggregate, Connection, PolygonAction, PolygonMessage, Quote, Trade};
use polygon::ws::{
Aggregate, Connection, PolygonAction, PolygonMessage, PolygonStatus, Quote, Trade,
};
use rdkafka::producer::FutureRecord;
use std::sync::mpsc::Receiver;
use std::time::Duration;
Expand Down Expand Up @@ -32,10 +34,17 @@ pub async fn run(
10_000, // Equal to 1/10 the max buffer size in rdkafka
|message| async {
match message {
Ok(message) => {
let topic = get_topic(&message);
let key = get_key(&message);
let payload = serde_json::to_string(&message);
Ok(polygon_message) => {
if let PolygonMessage::Status { status, message } = &polygon_message {
if let PolygonStatus::MaxConnections | PolygonStatus::ForceDisconnect =
status
{
panic!("Disconnecting: {}", message)
}
}
let topic = get_topic(&polygon_message);
let key = get_key(&polygon_message);
let payload = serde_json::to_string(&polygon_message);
match payload {
Ok(payload) => {
debug!(
Expand All @@ -59,7 +68,7 @@ pub async fn run(
}
Err(e) => {
sentry_anyhow::capture_anyhow(&e.into());
error!("Failed to serialize payload: {:?}", &message)
error!("Failed to serialize payload: {:?}", &polygon_message)
}
}
}
Expand Down

0 comments on commit 763f070

Please sign in to comment.