Skip to content

Commit

Permalink
Add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Aug 30, 2024
1 parent a98e712 commit 3c57f54
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 9 deletions.
314 changes: 311 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ env_logger = "0.11.5"
futures = "0.3.30"
gcloud-sdk = { version = "0.25.5", features = ["google-pubsub-v1"] }
indexmap = "2.4.0"
metrics = "0.23.0"
metrics-exporter-prometheus = "0.15.3"
neo4rs = "0.8.0"
nostr-sdk = "0.33.0"
rand = { version = "0.8.5", features = ["std"] }
regex = "1.10.6"
rustls = { version = "0.23.12", features = ["ring"] }
serde = { version = "1.0.209", features = ["derive"] }
serde_json = "1.0.127"
thiserror = "1.0.63"
Expand Down
9 changes: 9 additions & 0 deletions prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
global:
scrape_interval: 15s
evaluation_interval: 30s
scrape_configs:
- job_name: 'followers-monitoring'
metrics_path: /metrics
honor_labels: true
static_configs:
- targets: ['host.docker.internal:3000']
5 changes: 5 additions & 0 deletions src/domain/follow_change.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::{DateTime, Utc};
use metrics::counter;
use nostr_sdk::prelude::*;
use serde::{Deserialize, Serialize, Serializer};
use std::fmt;
Expand Down Expand Up @@ -34,6 +35,8 @@ impl Eq for FollowChange {}

impl FollowChange {
pub fn new_followed(at: DateTime<Utc>, follower: PublicKey, followee: PublicKey) -> Self {
counter!("follows").increment(1);

Self {
change_type: ChangeType::Followed,
at,
Expand All @@ -45,6 +48,8 @@ impl FollowChange {
}

pub fn new_unfollowed(at: DateTime<Utc>, follower: PublicKey, followee: PublicKey) -> Self {
counter!("unfollows").increment(1);

Self {
change_type: ChangeType::Unfollowed,
at,
Expand Down
2 changes: 2 additions & 0 deletions src/domain/follows_differ.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
worker_pool::{WorkerTask, WorkerTaskItem},
};
use chrono::{DateTime, Utc};
use metrics::counter;
use nostr_sdk::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -215,6 +216,7 @@ where
info!("{}", log_line);
}

counter!("contact_lists_processed").increment(1);
Ok(())
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/follow_change_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ impl<T: GetEventsOf> WorkerTask<FollowChange> for FollowChangeHandler<T> {
let WorkerTaskItem {
item: mut follow_change,
} = worker_task_item;
// Fetch friendly IDs for the pubkeys or get it from DB if it takes more
// than timeout_secs. Whatever if found through the network is cached.
// Fetch friendly IDs for the pubkeys or fallback to the DB if it takes
// more than timeout_secs. Whatever is found through the network is
// cached.
let (friendly_follower, friendly_followee) = tokio::select!(
result = fetch_friendly_ids(
&self.repo,
Expand Down
3 changes: 3 additions & 0 deletions src/google_pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use gcloud_sdk::{
google::pubsub::v1::{publisher_client::PublisherClient, PublishRequest, PubsubMessage},
*,
};
use metrics::counter;
use thiserror::Error;
use tracing::info;

Expand Down Expand Up @@ -114,6 +115,8 @@ impl PublishEvents for GooglePubSubClient {
len, self.google_full_topic
);

counter!("pubsub_messages").increment(len as u64);

Ok(())
}
}
34 changes: 33 additions & 1 deletion src/http_server/router.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::Result;
use axum::{http::HeaderMap, response::Html};
use axum::{response::IntoResponse, routing::get, Router};
use metrics::describe_counter;
use metrics_exporter_prometheus::PrometheusBuilder;
use std::time::Duration;
use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer};
use tower_http::LatencyUnit;
Expand All @@ -17,10 +19,40 @@ pub fn create_router() -> Result<Router> {
)
.on_failure(DefaultOnFailure::new().level(Level::ERROR));

let metrics_handle = setup_metrics()?;
Ok(Router::new()
.route("/", get(serve_root_page))
.layer(tracing_layer)
.layer(TimeoutLayer::new(Duration::from_secs(1))))
.layer(TimeoutLayer::new(Duration::from_secs(1)))
.route("/metrics", get(|| async move { metrics_handle.render() })))
}

fn setup_metrics() -> Result<metrics_exporter_prometheus::PrometheusHandle, anyhow::Error> {
describe_counter!(
"pubsub_messages",
"Number of messages published to google pubsub"
);
describe_counter!(
"contact_lists_processed",
"Number of contact lists processed"
);
describe_counter!("follows", "Number of follows");
describe_counter!("unfollows", "Number of unfollows");
describe_counter!(
"worker_lagged",
"Number of times a worker lagged behind and missed messaged, consider increasing worker pool size or channel buffer size"
);
describe_counter!("worker_closed", "Number of times a worker channel closed");
describe_counter!(
"worker_failures",
"Number of times a worker failed to process an item"
);
describe_counter!("worker_timeouts", "Number of times a worker timedout");
describe_counter!("verified_nip05", "Number of verified NIP05 ids fetched");

let prometheus_builder = PrometheusBuilder::new();
let prometheus_handle = prometheus_builder.install_recorder()?;
Ok(prometheus_handle)
}

async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse {
Expand Down
8 changes: 6 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod repo;
mod worker_pool;

use config::{Config, Settings};
use core::panic;
use domain::{FollowChange, FollowsDiffer};
use follow_change_handler::FollowChangeHandler;
use http_server::HttpServer;
Expand All @@ -19,8 +20,7 @@ use neo4rs::Graph;
use nostr_sdk::prelude::*;
use relay_subscriber::{create_client, start_nostr_subscription};
use repo::Repo;

use core::panic;
use rustls::crypto::ring;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
Expand All @@ -37,6 +37,10 @@ async fn main() -> Result<()> {
.with(EnvFilter::from_default_env())
.init();

ring::default_provider()
.install_default()
.expect("Failed to install ring crypto provider");

// Load the configuration
let config = Config::new("config")?;
let settings = config.get::<Settings>()?;
Expand Down
2 changes: 2 additions & 0 deletions src/refresh_friendly_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
use cached::proc_macro::cached;
use cached::TimedSizedCache;
use chrono::{DateTime, Utc};
use metrics::counter;
use nostr_sdk::prelude::*;
use std::sync::Arc;
use tracing::{debug, error};
Expand Down Expand Up @@ -171,6 +172,7 @@ async fn verified_friendly_id(

if let Some(nip05_value) = metadata.nip05 {
if nip05_verifier.verify_nip05(public_key, &nip05_value).await {
counter!("verified_nip05").increment(1);
return FriendlyId::Nip05(nip05_value);
}
return name_or_npub_or_pubkey;
Expand Down
6 changes: 5 additions & 1 deletion src/worker_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::Future;
use metrics::counter;
use std::error::Error;
use std::sync::Arc;
use tokio::sync::{
Expand Down Expand Up @@ -56,9 +57,11 @@ impl WorkerPool {
match result {
Ok(Ok(())) => {},
Ok(Err(e)) => {
counter!("worker_failures").increment(1);
error!("Worker failed: {}", e);
},
Err(_) => {
counter!("worker_timeouts").increment(1);
error!("Worker task timed out after {} seconds", worker_timeout_secs);
}
}
Expand Down Expand Up @@ -95,16 +98,17 @@ impl WorkerPool {

let worker_item = WorkerTaskItem { item };

// This is a single item channel so we don't use send_with_checks
if let Err(e) = worker_tx.send(worker_item).await {
error!("Failed to send to worker: {}", e);
break;
}
}
Err(RecvError::Lagged(n)) => {
counter!("worker_lagged").increment(1);
warn!("Receiver lagged and missed {} messages", n);
}
Err(RecvError::Closed) => {
counter!("worker_closed").increment(1);
error!("Item receiver channel closed");
break;
}
Expand Down

0 comments on commit 3c57f54

Please sign in to comment.