diff --git a/CHANGELOG.md b/CHANGELOG.md index 115f1f3..81f2929 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # ilagent CHANGELOG +## 2024-10-07, Version 0.5.1 + +* added option to send event message payloads directly to integration endpoint targets + ## 2024-10-04, Version 0.5.0 * upgraded dependencies diff --git a/Cargo.toml b/Cargo.toml index f2a4a68..3af45b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ilagent" -version = "0.5.0" +version = "0.5.1" authors = ["Chris Froehlingsdorf "] edition = "2021" @@ -16,7 +16,7 @@ json = "0.12" reqwest = "0.12" chrono = "0.4" clap = "2" -ilert = "4.0.1" +ilert = "4.1.1" uuid = { version = "1.10", features = ["v4"] } ctrlc = { version = "3.4" } rusqlite = { version = "0.32", features = ["bundled"] } # SQLite 3.46.0 diff --git a/docs/mosquitto.conf b/docs/mosquitto.conf new file mode 100644 index 0000000..daa4137 --- /dev/null +++ b/docs/mosquitto.conf @@ -0,0 +1,2 @@ +listener 1883 +allow_anonymous true \ No newline at end of file diff --git a/docs/mqtt.md b/docs/mqtt.md new file mode 100644 index 0000000..77985b6 --- /dev/null +++ b/docs/mqtt.md @@ -0,0 +1,8 @@ +### Testing MQTT locally + +Run the broker: + +```sh +docker run -d -p 1883:1883 -p 9001:9001 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto +``` +Recommended UI: https://mqttx.app \ No newline at end of file diff --git a/examples/_mqtt b/examples/_mqtt index f2199af..6753f83 100755 --- a/examples/_mqtt +++ b/examples/_mqtt @@ -1,3 +1,3 @@ #!/bin/bash cargo run -- daemon -p 8977 -v -v --heartbeat il1hbt123 \ - -m 192.168.1.14 -q 1883 -n ilbro -e 'ilert/e' -r 'ilert/h' + -m localhost -q 1883 -n ilbro -e 'ilert/e' -r 'ilert/h' diff --git a/examples/_mqtt_overwrite b/examples/_mqtt_overwrite index 1524eab..f6bd43c 100755 --- a/examples/_mqtt_overwrite +++ b/examples/_mqtt_overwrite @@ -1,6 +1,6 @@ #!/bin/bash cargo run -- daemon -v -v \ - -m 127.0.0.1 -q 1883 -n ilagent -e '#' \ + -m localhost -q 1883 -n ilagent -e '#' \ --event_key 'il1api123...' \ --map_key_alert_key 'mCode' \ --map_key_summary 'comment' \ diff --git a/src/consumers/kafka.rs b/src/consumers/kafka.rs index 34d248c..4969f26 100644 --- a/src/consumers/kafka.rs +++ b/src/consumers/kafka.rs @@ -150,12 +150,15 @@ async fn handle_event_message(daemon_context: Arc, key: &str, pay let parsed = EventQueueItemJson::parse_event_json(&daemon_context.config, payload, topic); if let Some(mut event) = parsed { // info!("Event queue item: {:?}", event); - event.customDetails = Some(json!({ - "kafka_key": key, - "kafka_topic": topic - })); - let db_event_format = EventQueueItemJson::to_db(event); - let should_retry = poll::process_queued_event(&daemon_context.ilert_client, &db_event_format).await; + if event.customDetails.is_none() { + event.customDetails = Some(json!({ + "messageKey": key, + "topic": topic + })); + } + let event_api_path = format!("/v1/events/kafka/{}", event.apiKey.as_str()); + let db_event_format = EventQueueItemJson::to_db(event, Some(event_api_path)); + let should_retry = poll::send_queued_event(&daemon_context.ilert_client, &db_event_format).await; should_retry } else { false diff --git a/src/consumers/mqtt.rs b/src/consumers/mqtt.rs index 8465398..820aa9f 100644 --- a/src/consumers/mqtt.rs +++ b/src/consumers/mqtt.rs @@ -5,7 +5,7 @@ use std::{str, thread}; use std::sync::Arc; use std::sync::atomic::Ordering; use ilert::ilert::ILert; - +use serde_json::json; use crate::db::ILDatabase; use crate::config::ILConfig; use crate::{hbt, DaemonContext}; @@ -132,8 +132,14 @@ fn handle_heartbeat_message(payload: &str) -> () { fn handle_event_message(config: &ILConfig, db: &ILDatabase, payload: &str, topic: &str) -> () { let parsed = crate::models::event::EventQueueItemJson::parse_event_json(&config, payload, topic); - if let Some(event) = parsed { - let db_event = EventQueueItemJson::to_db(event); + if let Some(mut event) = parsed { + if event.customDetails.is_none() { + event.customDetails = Some(json!({ + "topic": topic + })); + } + let event_api_path = format!("/v1/events/mqtt/{}", event.apiKey.as_str()); + let db_event = EventQueueItemJson::to_db(event, Some(event_api_path)); let insert_result = db.create_il_event(&db_event); match insert_result { Ok(res) => match res { diff --git a/src/db.rs b/src/db.rs index a08a507..c5f01d1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -10,6 +10,7 @@ use crate::models::event_db::EventQueueItem; const DB_MIGRATION_VAL: &str = "1"; const DB_MIGRATION_V1: &str = "mig_1"; const DB_MIGRATION_V2: &str = "mig_2"; +const DB_MIGRATION_V3: &str = "mig_3"; #[derive(Debug)] struct ILAgentItem { @@ -96,6 +97,19 @@ impl ILDatabase { info!("Database migrated to {}", DB_MIGRATION_V2); } + let mig_3 = self.get_il_value(DB_MIGRATION_V3); + if mig_3.is_none() { + + self.conn.execute( + "ALTER TABLE event_items ADD COLUMN event_api_path TEXT NULL", + [], + ).expect("Database migration failed (v3, 1)"); + + self.set_il_val(DB_MIGRATION_V3, DB_MIGRATION_VAL) + .expect("Database migration failed (v3, set)"); + info!("Database migrated to {}", DB_MIGRATION_V3); + } + /* Run simple db migrations, if needed, like this: @@ -186,13 +200,15 @@ impl ILDatabase { images: row.get(7).unwrap_or(None), links: row.get(8).unwrap_or(None), custom_details: row.get(9).unwrap_or(None), - details: row.get(10).unwrap_or(None) + details: row.get(10).unwrap_or(None), + event_api_path: row.get(11).unwrap_or(None) }) } pub fn get_il_event(&self, event_id: &str) -> Result, rusqlite::Error> { - let mut stmt = self.conn.prepare("SELECT * FROM event_items WHERE id = ?1").unwrap(); + let mut stmt = self.conn.prepare("SELECT id, api_key, event_type, alert_key, summary, created_at, + priority, images, links, custom_details, details, event_api_path FROM event_items WHERE id = ?1")?; let query_result = stmt .query_map(&[&event_id], |row| { ILDatabase::convert_db_row_to_event(row) @@ -231,7 +247,8 @@ impl ILDatabase { pub fn get_il_events(&self, limit: i32) -> Result, rusqlite::Error> { - let mut stmt = self.conn.prepare("SELECT * FROM event_items ORDER BY inserted_at ASC LIMIT ?1").unwrap(); + let mut stmt = self.conn.prepare("SELECT id, api_key, event_type, alert_key, summary, created_at, + priority, images, links, custom_details, details, event_api_path FROM event_items ORDER BY inserted_at ASC LIMIT ?1")?; let query_result = stmt .query_map(&[&limit], |row| { ILDatabase::convert_db_row_to_event(row) @@ -276,11 +293,12 @@ impl ILDatabase { let insert_result = self.conn.execute( "INSERT INTO event_items (api_key, event_type, alert_key, summary, created_at, id, - priority, images, links, custom_details, details) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + priority, images, links, custom_details, details, event_api_path) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)", &[&item.api_key as &dyn ToSql, &item.event_type, &item.alert_key, &item.summary, created_at, &item_id, - &item.priority, &item.images, &item.links, &item.custom_details, &item.details], + &item.priority, &item.images, &item.links, &item.custom_details, &item.details, + &item.event_api_path], ); match insert_result { diff --git a/src/http_server.rs b/src/http_server.rs index d2ee11d..5391bf9 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -54,7 +54,7 @@ async fn post_event(_req: HttpRequest, container: web::Data) -> () { event.images = images; event.links = links; - poll::process_queued_event(&ilert_client, &event).await; + poll::send_queued_event(&ilert_client, &event).await; } /** diff --git a/src/models/event.rs b/src/models/event.rs index 98a24d1..b1f0868 100644 --- a/src/models/event.rs +++ b/src/models/event.rs @@ -51,7 +51,7 @@ impl EventQueueItemJson { } } - pub fn to_db(item: EventQueueItemJson) -> EventQueueItem { + pub fn to_db(item: EventQueueItemJson, event_api_path: Option) -> EventQueueItem { let images = match item.images { Some(v) => { @@ -91,7 +91,8 @@ impl EventQueueItemJson { priority: item.priority, images, links, - custom_details + custom_details, + event_api_path } } diff --git a/src/models/event_db.rs b/src/models/event_db.rs index ad4ed5b..8ef4803 100644 --- a/src/models/event_db.rs +++ b/src/models/event_db.rs @@ -13,7 +13,8 @@ pub struct EventQueueItem { pub priority: Option, pub images: Option, pub links: Option, - pub custom_details: Option + pub custom_details: Option, + pub event_api_path: Option } impl EventQueueItem { @@ -30,7 +31,8 @@ impl EventQueueItem { priority: None, images: None, links: None, - custom_details: None + custom_details: None, + event_api_path: None } } @@ -47,7 +49,8 @@ impl EventQueueItem { priority: None, images: None, links: None, - custom_details: None + custom_details: None, + event_api_path: None } } } \ No newline at end of file diff --git a/src/poll.rs b/src/poll.rs index bb658da..fedaa38 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -37,7 +37,7 @@ pub async fn run_poll_job(daemon_ctx: Arc) -> () { async fn process_queued_events(daemon_ctx: Arc, events: Vec) -> () { for event in events.iter() { - let should_retry = process_queued_event(&daemon_ctx.ilert_client, event).await; + let should_retry = send_queued_event(&daemon_ctx.ilert_client, event).await; let event_id = event.id.clone().unwrap_or("".to_string()); if !should_retry { let del_result = daemon_ctx.db.lock().await.delete_il_event(event_id.as_str()); @@ -51,7 +51,7 @@ async fn process_queued_events(daemon_ctx: Arc, events: Vec bool { +pub async fn send_queued_event(ilert_client: &ILert, event: &EventQueueItem) -> bool { let parsed_event = EventQueueItemJson::from_db(event.clone()); @@ -79,8 +79,15 @@ pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem) None => None }; - let post_result = ilert_client - .create() + let mut post_request = ilert_client.create(); + + if let Some(event_api_path) = event.event_api_path.as_ref() { + post_request.builder.options.path = Some(event_api_path.to_string()); + } else { + post_request.builder.options.path = Some("/events".to_string()); + } + + let post_result = post_request .event_with_details( event.api_key.as_str(), event_type, @@ -92,7 +99,7 @@ pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem) parsed_event.links, parsed_event.customDetails, None - ) + ) .execute() .await; @@ -117,6 +124,11 @@ pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem) return true; // too many requests, retry } + if status == 404 { + warn!("Event {} failed with bad URL {}, potentially due to bad api key value", event_id, response.url); + return false; // no point in retrying + } + if status > 499 { warn!("Event {} failed server side exception", event_id); return true; // 500 exceptions, retry