From 3c8437973a10b9a118a203034826dac5bd3efe06 Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Sat, 14 Dec 2024 22:10:09 +0100 Subject: [PATCH 01/14] started work on aggregator Signed-off-by: Benedikt Zinn Co-authored-by: Mr-Kanister <68117355+Mr-Kanister@users.noreply.github.com> --- .../daemon/src/collector/aggregator.rs | 78 +++++++++++++++++++ rust/backend/daemon/src/collector/mod.rs | 2 + rust/backend/daemon/src/collector/ring_buf.rs | 2 +- 3 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 rust/backend/daemon/src/collector/aggregator.rs diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs new file mode 100644 index 00000000..70deace6 --- /dev/null +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -0,0 +1,78 @@ +use crate::collector::IntoEvent; +use crate::registry::{RegistryGuard, RegistryItem, TypedRingBuffer}; +use backend_common::TryFromRaw; +use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; +use shared::ziofa::Event; +use std::io; +use std::marker::PhantomData; +use tokio::io::unix::AsyncFd; + +pub struct Aggregator(PhantomData); + +impl Default for Aggregator { + fn default() -> Self {Self(PhantomData)} +} + + +pub struct AggregatorState { + map: AsyncFd>>, + event_actor: ActorRef, +} + +pub struct AggregatorArguments { + item: RegistryItem>, + event_actor: ActorRef, +} + +impl AggregatorArguments { + pub fn new(item: RegistryItem>, event_actor: ActorRef) -> Self { + Self { item, event_actor } + } +} + +impl TryFrom> for AggregatorState { + type Error = io::Error; + + fn try_from(value: AggregatorArguments) -> Result { + Ok(Self { + map: AsyncFd::new(value.item.take())?, + event_actor: value.event_actor, + }) + } +} + +impl Actor for Aggregator +where + T: TryFromRaw + IntoEvent + Send + Sync + 'static, +{ + type Msg = (); + type State = AggregatorState; + type Arguments = AggregatorState; + + async fn pre_start( + &self, + myself: ActorRef, + args: Self::Arguments, + ) -> Result{ + cast!(myself, ())?; + Ok(args.try_into()?) + } + + async fn handle( + &self, + myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr>{ + let mut guard = state.map.readable_mut().await?; + let inner = guard.get_inner_mut(); + + while let Some(item) = inner.next().map(T::into_event) { + cast!(state.event_actor, item)?; + } + + guard.clear_ready(); + + Ok(cast!(myself, ())?) + } +} diff --git a/rust/backend/daemon/src/collector/mod.rs b/rust/backend/daemon/src/collector/mod.rs index e11eebd6..82dacd46 100644 --- a/rust/backend/daemon/src/collector/mod.rs +++ b/rust/backend/daemon/src/collector/mod.rs @@ -11,6 +11,8 @@ use shared::ziofa::jni_references_event::{JniMethodName}; mod ring_buf; mod supervisor; mod event_dipatcher; +mod aggregator; + pub use supervisor::{CollectorSupervisor, CollectorSupervisorArguments}; diff --git a/rust/backend/daemon/src/collector/ring_buf.rs b/rust/backend/daemon/src/collector/ring_buf.rs index 1b49bcd5..6fac04f7 100644 --- a/rust/backend/daemon/src/collector/ring_buf.rs +++ b/rust/backend/daemon/src/collector/ring_buf.rs @@ -58,9 +58,9 @@ impl TryFrom> for RingBufCollectorState { impl Actor for RingBufCollector where T: TryFromRaw + IntoEvent + Send + Sync + 'static { - type Arguments = RingBufCollectorArguments; type Msg = (); type State = RingBufCollectorState; + type Arguments = RingBufCollectorArguments; async fn pre_start( &self, From 5ad4c23e2511068082e8f43c9ce092e6485cd4a7 Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Sun, 15 Dec 2024 23:08:57 +0100 Subject: [PATCH 02/14] [untested] implemented aggregator Signed-off-by: Benedikt Zinn --- .../daemon/src/collector/aggregator.rs | 123 ++++++++++-------- ...event_dipatcher.rs => event_dispatcher.rs} | 0 rust/backend/daemon/src/collector/mod.rs | 73 ++++++----- .../daemon/src/collector/supervisor.rs | 10 +- rust/shared/proto/ziofa.proto | 17 +++ rust/shared/src/lib.rs | 33 +++++ 6 files changed, 161 insertions(+), 95 deletions(-) rename rust/backend/daemon/src/collector/{event_dipatcher.rs => event_dispatcher.rs} (100%) diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index 70deace6..3fada250 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -1,78 +1,87 @@ -use crate::collector::IntoEvent; -use crate::registry::{RegistryGuard, RegistryItem, TypedRingBuffer}; -use backend_common::TryFromRaw; +// SPDX-FileCopyrightText: 2024 Benedikt Zinn +// +// SPDX-License-Identifier: MIT + use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; -use shared::ziofa::Event; -use std::io; -use std::marker::PhantomData; -use tokio::io::unix::AsyncFd; +use shared::ziofa::event::EventType; +use shared::ziofa::metric::EventTypeEnum; +use shared::ziofa::{Event, Metric}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::time::{self}; -pub struct Aggregator(PhantomData); +pub struct Aggregator; -impl Default for Aggregator { - fn default() -> Self {Self(PhantomData)} +pub struct AggregatorState { + event_type: EventTypeEnum, + event_count: Arc>, } - -pub struct AggregatorState { - map: AsyncFd>>, +pub struct AggregatorArguments { event_actor: ActorRef, + timeframe: time::Duration, + event_type: Event, } -pub struct AggregatorArguments { - item: RegistryItem>, - event_actor: ActorRef, -} - -impl AggregatorArguments { - pub fn new(item: RegistryItem>, event_actor: ActorRef) -> Self { - Self { item, event_actor } - } -} - -impl TryFrom> for AggregatorState { - type Error = io::Error; - - fn try_from(value: AggregatorArguments) -> Result { - Ok(Self { - map: AsyncFd::new(value.item.take())?, - event_actor: value.event_actor, - }) - } -} - -impl Actor for Aggregator -where - T: TryFromRaw + IntoEvent + Send + Sync + 'static, -{ - type Msg = (); - type State = AggregatorState; - type Arguments = AggregatorState; +impl Actor for Aggregator { + type Msg = Event; + type State = AggregatorState; + type Arguments = AggregatorArguments; async fn pre_start( &self, - myself: ActorRef, + _: ActorRef, args: Self::Arguments, - ) -> Result{ - cast!(myself, ())?; - Ok(args.try_into()?) + ) -> Result { + let count = Arc::new(Mutex::new(0)); + let l_count = count.clone(); + let event_type_enum: EventTypeEnum = args.event_type.try_into().expect("invalid event type (you are probably trying to aggregate an already aggregated event)"); + tokio::task::spawn(async move { + loop { + tokio::time::sleep(args.timeframe).await; + let c_val = l_count.lock().await; + + let metric = Metric { + timeframe_ms: args.timeframe.clone().as_millis() as u32, + event_count: *c_val, + event_type_enum: event_type_enum.into(), + }; + cast!( + args.event_actor, + Event { + event_type: Some(EventType::Metric(metric)) + } + ) + .expect("Event couldn't be send to next actor"); + } + }); + Ok(Self::State { + event_type: event_type_enum, + event_count: count, + }) } async fn handle( &self, - myself: ActorRef, - message: Self::Msg, - state: &mut Self::State, - ) -> Result<(), ActorProcessingErr>{ - let mut guard = state.map.readable_mut().await?; - let inner = guard.get_inner_mut(); + _: ActorRef, + msg: Self::Msg, + state: &mut AggregatorState, + ) -> Result<(), ActorProcessingErr> { + let AggregatorState { + event_type: state_event_type, + event_count: state_event_count, + } = state; - while let Some(item) = inner.next().map(T::into_event) { - cast!(state.event_actor, item)?; - } + let msg_event_type: EventTypeEnum = msg.try_into().expect("Invalid event type in my stream (The event type during initialization was correct, but i got a mismatching item at runtime)"); - guard.clear_ready(); + if *state_event_type != msg_event_type { + return Err(ActorProcessingErr::from( + "[Aggregator] Received event type does not match the type i was initialized with", + )); + } - Ok(cast!(myself, ())?) + let mut event_count_mut = state_event_count.lock().await; + *event_count_mut += 1; + Ok(()) } } diff --git a/rust/backend/daemon/src/collector/event_dipatcher.rs b/rust/backend/daemon/src/collector/event_dispatcher.rs similarity index 100% rename from rust/backend/daemon/src/collector/event_dipatcher.rs rename to rust/backend/daemon/src/collector/event_dispatcher.rs diff --git a/rust/backend/daemon/src/collector/mod.rs b/rust/backend/daemon/src/collector/mod.rs index 82dacd46..bda121dd 100644 --- a/rust/backend/daemon/src/collector/mod.rs +++ b/rust/backend/daemon/src/collector/mod.rs @@ -5,18 +5,18 @@ // SPDX-License-Identifier: MIT use backend_common::{JNICall, JNIMethodName, SysSendmsgCall, VfsWriteCall}; -use shared::ziofa::{Event, JniReferencesEvent, SysSendmsgEvent, VfsWriteEvent}; -use shared::ziofa::event::{EventData}; -use shared::ziofa::jni_references_event::{JniMethodName}; +use shared::ziofa::event::EventType; +use shared::ziofa::jni_references_event::JniMethodName; +use shared::ziofa::log::EventData; +use shared::ziofa::{Event, JniReferencesEvent, Log, SysSendmsgEvent, VfsWriteEvent}; + +mod aggregator; +mod event_dispatcher; mod ring_buf; mod supervisor; -mod event_dipatcher; -mod aggregator; pub use supervisor::{CollectorSupervisor, CollectorSupervisorArguments}; - - pub trait IntoEvent { fn into_event(self) -> Event; } @@ -24,13 +24,15 @@ pub trait IntoEvent { impl IntoEvent for VfsWriteCall { fn into_event(self) -> Event { Event { - event_data: Some(EventData::VfsWrite(VfsWriteEvent { - pid: self.pid, - tid: self.tid, - begin_time_stamp: self.begin_time_stamp, - fp: self.fp, - bytes_written: self.bytes_written as u64 - })) + event_type: Some(EventType::Log(Log { + event_data: Some(EventData::VfsWrite(VfsWriteEvent { + pid: self.pid, + tid: self.tid, + begin_time_stamp: self.begin_time_stamp, + fp: self.fp, + bytes_written: self.bytes_written as u64, + })), + })), } } } @@ -38,13 +40,15 @@ impl IntoEvent for VfsWriteCall { impl IntoEvent for SysSendmsgCall { fn into_event(self) -> Event { Event { - event_data: Some(EventData::SysSendmsg(SysSendmsgEvent { - pid: self.pid, - tid: self.tid, - begin_time_stamp: self.begin_time_stamp, - fd: self.fd, - duration_nano_sec: self.duration_nano_sec - })) + event_type: Some(EventType::Log(Log { + event_data: Some(EventData::SysSendmsg(SysSendmsgEvent { + pid: self.pid, + tid: self.tid, + begin_time_stamp: self.begin_time_stamp, + fd: self.fd, + duration_nano_sec: self.duration_nano_sec, + })), + })), } } } @@ -52,17 +56,20 @@ impl IntoEvent for SysSendmsgCall { impl IntoEvent for JNICall { fn into_event(self) -> Event { Event { - event_data: Some(EventData::JniReferences(JniReferencesEvent { - pid: self.pid, - tid: self.tid, - begin_time_stamp: self.begin_time_stamp, - jni_method_name: (match self.method_name { - JNIMethodName::AddLocalRef => JniMethodName::AddLocalRef, - JNIMethodName::DeleteLocalRef => JniMethodName::DeleteLocalRef, - JNIMethodName::AddGlobalRef => JniMethodName::AddGlobalRef, - JNIMethodName::DeleteGlobalRef => JniMethodName::DeleteGlobalRef, - }).into(), - })) + event_type: Some(EventType::Log(Log { + event_data: Some(EventData::JniReferences(JniReferencesEvent { + pid: self.pid, + tid: self.tid, + begin_time_stamp: self.begin_time_stamp, + jni_method_name: (match self.method_name { + JNIMethodName::AddLocalRef => JniMethodName::AddLocalRef, + JNIMethodName::DeleteLocalRef => JniMethodName::DeleteLocalRef, + JNIMethodName::AddGlobalRef => JniMethodName::AddGlobalRef, + JNIMethodName::DeleteGlobalRef => JniMethodName::DeleteGlobalRef, + }) + .into(), + })), + })), } } -} \ No newline at end of file +} diff --git a/rust/backend/daemon/src/collector/supervisor.rs b/rust/backend/daemon/src/collector/supervisor.rs index 6cead8f6..e72a03b0 100644 --- a/rust/backend/daemon/src/collector/supervisor.rs +++ b/rust/backend/daemon/src/collector/supervisor.rs @@ -12,7 +12,7 @@ use tracing::error; use crate::registry::{EbpfEventRegistry, RegistryItem, TypedRingBuffer}; -use super::{event_dipatcher::{EventDispatcher, EventDispatcherState}, ring_buf::{RingBufCollector, RingBufCollectorArguments}, IntoEvent}; +use super::{event_dispatcher::{EventDispatcher, EventDispatcherState}, ring_buf::{RingBufCollector, RingBufCollectorArguments}, IntoEvent}; @@ -80,15 +80,15 @@ impl CollectorSupervisorArguments { } impl Actor for CollectorSupervisor { - type Arguments = CollectorSupervisorArguments; type Msg = (); type State = CollectorSupervisorState; + type Arguments = CollectorSupervisorArguments; async fn pre_start( &self, - myself: ractor::ActorRef, + myself: ActorRef, args: Self::Arguments, - ) -> Result { + ) -> Result { let (events, _) = Actor::spawn_linked(None, EventDispatcher, EventDispatcherState::new(args.sender), myself.get_cell()).await?; let collectors = CollectorRefs::from_registry(args.registry.clone(), events.clone(), myself.get_cell()).await?; @@ -103,7 +103,7 @@ impl Actor for CollectorSupervisor { async fn handle_supervisor_evt( &self, myself: ActorRef, - message: ractor::SupervisionEvent, + message: SupervisionEvent, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { if let SupervisionEvent::ActorFailed(actor_cell, error) = message { diff --git a/rust/shared/proto/ziofa.proto b/rust/shared/proto/ziofa.proto index a4a571de..35cd0bb8 100644 --- a/rust/shared/proto/ziofa.proto +++ b/rust/shared/proto/ziofa.proto @@ -65,6 +65,13 @@ message CmdlineData { } message Event { + oneof event_type{ + Metric metric = 1; + Log log = 2; + } +} + +message Log{ oneof event_data { VfsWriteEvent vfs_write = 1; SysSendmsgEvent sys_sendmsg = 2; @@ -102,3 +109,13 @@ message JniReferencesEvent { JniMethodName jni_method_name = 4; } +message Metric{ + enum EventTypeEnum{ + VfsWriteEvent = 0; + SysSendmsgEvent = 1; + JniReferencesEvent = 2; + } + uint32 timeframe_ms = 1; + uint32 event_count = 2; + EventTypeEnum event_type_enum = 3; +} \ No newline at end of file diff --git a/rust/shared/src/lib.rs b/rust/shared/src/lib.rs index 6127e0a5..3e4dc37c 100644 --- a/rust/shared/src/lib.rs +++ b/rust/shared/src/lib.rs @@ -4,6 +4,11 @@ // // SPDX-License-Identifier: MIT +use crate::ziofa::event::EventType; +use crate::ziofa::log::EventData; +use crate::ziofa::metric::EventTypeEnum; +use crate::ziofa::{Event, Log}; + #[cfg(feature = "uniffi")] uniffi::setup_scaffolding!(); @@ -18,3 +23,31 @@ pub mod ziofa { pub mod config { tonic::include_proto!("config"); } + +impl TryInto for Event { + type Error = (); + + fn try_into(self) -> Result { + match self { + Event { + event_type: + Some(EventType::Log(Log { + event_data: Some(EventData::VfsWrite(_)), + })), + } => Ok(EventTypeEnum::VfsWriteEvent), + Event { + event_type: + Some(EventType::Log(Log { + event_data: Some(EventData::SysSendmsg(_)), + })), + } => Ok(EventTypeEnum::JniReferencesEvent), + Event { + event_type: + Some(EventType::Log(Log { + event_data: Some(EventData::JniReferences(_)), + })), + } => Ok(EventTypeEnum::SysSendmsgEvent), + _ => Err(()), + } + } +} From 82f0b7885a7d466638cf5a58361a5e5bbb7d19d3 Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Tue, 17 Dec 2024 13:55:41 +0100 Subject: [PATCH 03/14] [untested] reworked aggregator Signed-off-by: Benedikt Zinn --- .../daemon/src/collector/aggregator.rs | 101 ++++++++++-------- rust/client/src/bin/cli.rs | 5 +- rust/shared/src/lib.rs | 18 ++++ 3 files changed, 80 insertions(+), 44 deletions(-) diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index 3fada250..7f2212b2 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -2,25 +2,27 @@ // // SPDX-License-Identifier: MIT +use ractor::concurrency::{Duration, JoinHandle}; use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; use shared::ziofa::event::EventType; use shared::ziofa::metric::EventTypeEnum; use shared::ziofa::{Event, Metric}; -use std::sync::Arc; -use tokio::sync::Mutex; -use tokio::time::{self}; +use tokio::time; pub struct Aggregator; pub struct AggregatorState { event_type: EventTypeEnum, - event_count: Arc>, + event_count: u32, + timeframe: Duration, + event_actor: ActorRef, + timer: Option>, } pub struct AggregatorArguments { event_actor: ActorRef, timeframe: time::Duration, - event_type: Event, + event_type: EventTypeEnum, } impl Actor for Aggregator { @@ -33,55 +35,70 @@ impl Actor for Aggregator { _: ActorRef, args: Self::Arguments, ) -> Result { - let count = Arc::new(Mutex::new(0)); - let l_count = count.clone(); - let event_type_enum: EventTypeEnum = args.event_type.try_into().expect("invalid event type (you are probably trying to aggregate an already aggregated event)"); - tokio::task::spawn(async move { - loop { - tokio::time::sleep(args.timeframe).await; - let c_val = l_count.lock().await; - - let metric = Metric { - timeframe_ms: args.timeframe.clone().as_millis() as u32, - event_count: *c_val, - event_type_enum: event_type_enum.into(), - }; - cast!( - args.event_actor, - Event { - event_type: Some(EventType::Metric(metric)) - } - ) - .expect("Event couldn't be send to next actor"); - } - }); Ok(Self::State { - event_type: event_type_enum, - event_count: count, + event_type: args.event_type, + event_count: 0, + timeframe: args.timeframe.into(), + event_actor: args.event_actor, + timer: None, }) } + async fn post_start( + &self, + myself: ActorRef, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + state.timer = Some(myself.send_interval(state.timeframe, || Event { event_type: None })); + Ok(()) + } + + async fn post_stop( + &self, + _: ActorRef, + state: &mut AggregatorState, + ) -> Result<(), ActorProcessingErr> { + if let Some(timer) = state.timer.take() { + timer.abort(); + } + Ok(()) + } + async fn handle( &self, _: ActorRef, msg: Self::Msg, state: &mut AggregatorState, ) -> Result<(), ActorProcessingErr> { - let AggregatorState { - event_type: state_event_type, - event_count: state_event_count, - } = state; - - let msg_event_type: EventTypeEnum = msg.try_into().expect("Invalid event type in my stream (The event type during initialization was correct, but i got a mismatching item at runtime)"); + match msg.event_type { + Some(EventType::Log(event)) => { + let msg_event_type = EventTypeEnum::try_from(event).unwrap(); - if *state_event_type != msg_event_type { - return Err(ActorProcessingErr::from( - "[Aggregator] Received event type does not match the type i was initialized with", - )); + if msg_event_type != state.event_type { + panic!( + "event type mismatch -> I was initialized with {:?}, but was send an {:?}", + state.event_type, msg_event_type + ); + } + state.event_count +=1; + } + _ => { + // event type is none -> timer was triggered -> send the metric + let metric = Metric { + timeframe_ms: state.timeframe.as_millis() as u32, + event_count: state.event_count, + event_type_enum: state.event_type.into(), + }; + cast!( + state.event_actor, + Event { + event_type: Some(EventType::Metric(metric)) + } + ) + .map_err(|_| ActorProcessingErr::from("Failed to send metric to event actor"))?; + state.event_count = 0; + } } - - let mut event_count_mut = state_event_count.lock().await; - *event_count_mut += 1; Ok(()) } } diff --git a/rust/client/src/bin/cli.rs b/rust/client/src/bin/cli.rs index 97b0ef77..520ad739 100644 --- a/rust/client/src/bin/cli.rs +++ b/rust/client/src/bin/cli.rs @@ -1,5 +1,6 @@ // SPDX-FileCopyrightText: 2024 Felix Hilgers // SPDX-FileCopyrightText: 2024 Robin Seidl +// SPDX-FileCopyrightText: 2024 Benedikt Zinn // // SPDX-License-Identifier: MIT @@ -97,10 +98,10 @@ async fn set_config(client: &mut Client) -> Result<()> { .set_configuration(Configuration { uprobes: vec![], vfs_write: Some(VfsWriteConfig { - entries: std::collections::HashMap::new(), + entries: HashMap::new(), }), sys_sendmsg: Some(SysSendmsgConfig { - entries: std::collections::HashMap::new(), + entries: HashMap::new(), }), jni_references: Some(JniReferencesConfig { pids: vec![] }), }) diff --git a/rust/shared/src/lib.rs b/rust/shared/src/lib.rs index 3e4dc37c..4afd1449 100644 --- a/rust/shared/src/lib.rs +++ b/rust/shared/src/lib.rs @@ -51,3 +51,21 @@ impl TryInto for Event { } } } + +impl From for EventTypeEnum { + + fn from(value: Log) -> Self { + match value { + Log { + event_data: Some(EventData::VfsWrite(_)), + } => EventTypeEnum::VfsWriteEvent, + Log { + event_data: Some(EventData::SysSendmsg(_)), + } => EventTypeEnum::SysSendmsgEvent, + Log { + event_data: Some(EventData::JniReferences(_)), + } => EventTypeEnum::JniReferencesEvent, + _ => panic!() + } + } +} From 152401bedd34862302fca092e1b421d7d57a4035 Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Tue, 17 Dec 2024 17:52:41 +0100 Subject: [PATCH 04/14] changed initialization of aggregator Signed-off-by: Benedikt Zinn --- .../daemon/src/collector/aggregator.rs | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index 7f2212b2..f89bb2cc 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -25,6 +25,21 @@ pub struct AggregatorArguments { event_type: EventTypeEnum, } + +impl TryFrom for AggregatorState { + type Error = ActorProcessingErr; + fn try_from(args: AggregatorArguments) -> Result { + Ok(AggregatorState { + event_type: args.event_type, + event_count: 0, + timeframe: args.timeframe, + event_actor: args.event_actor, + timer: None, + }) + } +} + + impl Actor for Aggregator { type Msg = Event; type State = AggregatorState; @@ -35,13 +50,7 @@ impl Actor for Aggregator { _: ActorRef, args: Self::Arguments, ) -> Result { - Ok(Self::State { - event_type: args.event_type, - event_count: 0, - timeframe: args.timeframe.into(), - event_actor: args.event_actor, - timer: None, - }) + Self::State::try_from(args) } async fn post_start( @@ -80,7 +89,7 @@ impl Actor for Aggregator { state.event_type, msg_event_type ); } - state.event_count +=1; + state.event_count += 1; } _ => { // event type is none -> timer was triggered -> send the metric @@ -95,7 +104,7 @@ impl Actor for Aggregator { event_type: Some(EventType::Metric(metric)) } ) - .map_err(|_| ActorProcessingErr::from("Failed to send metric to event actor"))?; + .map_err(|_| ActorProcessingErr::from("Failed to send metric to event actor"))?; state.event_count = 0; } } From 662bae1927b9a5946413da542bde8b62df2462f9 Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Tue, 14 Jan 2025 22:45:16 +0100 Subject: [PATCH 05/14] Added timeseries Signed-off-by: Benedikt Zinn --- .../daemon/src/collector/aggregator.rs | 46 +++-- rust/backend/daemon/src/collector/mod.rs | 1 + .../daemon/src/collector/supervisor.rs | 180 +++++++++++++----- .../daemon/src/collector/time_series.rs | 61 ++++++ rust/backend/daemon/src/constants.rs | 4 + rust/shared/proto/ziofa.proto | 12 +- rust/shared/src/lib.rs | 2 +- 7 files changed, 245 insertions(+), 61 deletions(-) create mode 100644 rust/backend/daemon/src/collector/time_series.rs diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index f89bb2cc..12b67c18 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -5,41 +5,63 @@ use ractor::concurrency::{Duration, JoinHandle}; use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; use shared::ziofa::event::EventType; -use shared::ziofa::metric::EventTypeEnum; -use shared::ziofa::{Event, Metric}; +use shared::ziofa::time_series::EventTypeEnum; +use shared::ziofa::{Event, TimeSeries as ZioTimeSeries}; use tokio::time; +use crate::collector::time_series::TimeSeries; +use crate::constants::TIMESERIES_LENGTH; pub struct Aggregator; +impl Default for Aggregator { + fn default() -> Self { + Aggregator + } +} + pub struct AggregatorState { event_type: EventTypeEnum, event_count: u32, timeframe: Duration, event_actor: ActorRef, timer: Option>, + timeseries: TimeSeries } pub struct AggregatorArguments { event_actor: ActorRef, timeframe: time::Duration, - event_type: EventTypeEnum, + event_type_enum: EventTypeEnum, } +impl AggregatorArguments { + pub fn new( + event_actor: ActorRef, + timeframe: Duration, + event_type_enum: EventTypeEnum, + ) -> Self { + AggregatorArguments { + event_actor, + timeframe, + event_type_enum, + } + } +} impl TryFrom for AggregatorState { type Error = ActorProcessingErr; fn try_from(args: AggregatorArguments) -> Result { Ok(AggregatorState { - event_type: args.event_type, + event_type: args.event_type_enum, event_count: 0, timeframe: args.timeframe, event_actor: args.event_actor, timer: None, + timeseries: TimeSeries::new(TIMESERIES_LENGTH), }) } } - impl Actor for Aggregator { type Msg = Event; type State = AggregatorState; @@ -85,7 +107,7 @@ impl Actor for Aggregator { if msg_event_type != state.event_type { panic!( - "event type mismatch -> I was initialized with {:?}, but was send an {:?}", + "event type mismatch in Aggregator -> I was initialized with {:?}, but was send an {:?}", state.event_type, msg_event_type ); } @@ -93,18 +115,20 @@ impl Actor for Aggregator { } _ => { // event type is none -> timer was triggered -> send the metric - let metric = Metric { - timeframe_ms: state.timeframe.as_millis() as u32, - event_count: state.event_count, + state.timeseries.append(state.event_count as u64); + let time_series = ZioTimeSeries { event_type_enum: state.event_type.into(), + timeframe_ms: state.timeframe.as_millis() as u32, + data: state.timeseries.as_array(), }; + cast!( state.event_actor, Event { - event_type: Some(EventType::Metric(metric)) + event_type: Some(EventType::TimeSeries(time_series)) } ) - .map_err(|_| ActorProcessingErr::from("Failed to send metric to event actor"))?; + .map_err(|_| ActorProcessingErr::from("Failed to send metric to event actor"))?; state.event_count = 0; } } diff --git a/rust/backend/daemon/src/collector/mod.rs b/rust/backend/daemon/src/collector/mod.rs index bda121dd..9d443e50 100644 --- a/rust/backend/daemon/src/collector/mod.rs +++ b/rust/backend/daemon/src/collector/mod.rs @@ -14,6 +14,7 @@ mod aggregator; mod event_dispatcher; mod ring_buf; mod supervisor; +mod time_series; pub use supervisor::{CollectorSupervisor, CollectorSupervisorArguments}; diff --git a/rust/backend/daemon/src/collector/supervisor.rs b/rust/backend/daemon/src/collector/supervisor.rs index e72a03b0..9e3e8e37 100644 --- a/rust/backend/daemon/src/collector/supervisor.rs +++ b/rust/backend/daemon/src/collector/supervisor.rs @@ -4,17 +4,21 @@ use std::collections::HashMap; +use crate::collector::aggregator::{Aggregator, AggregatorArguments}; +use crate::constants::_DEFAULT_TIMEFRAME; +use crate::registry::{EbpfEventRegistry, RegistryItem, TypedRingBuffer}; use backend_common::TryFromRaw; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; +use shared::ziofa::time_series::EventTypeEnum; use shared::ziofa::Event; use tonic::Status; use tracing::error; -use crate::registry::{EbpfEventRegistry, RegistryItem, TypedRingBuffer}; - -use super::{event_dispatcher::{EventDispatcher, EventDispatcherState}, ring_buf::{RingBufCollector, RingBufCollectorArguments}, IntoEvent}; - - +use super::{ + event_dispatcher::{EventDispatcher, EventDispatcherState}, + ring_buf::{RingBufCollector, RingBufCollectorArguments}, + IntoEvent, +}; #[derive(Clone, Copy)] enum CollectorT { @@ -31,14 +35,25 @@ pub struct CollectorSupervisorState { events: ActorRef, } +type AggregatorT = u32; + struct CollectorRefs { - collectors: HashMap + collectors: HashMap, + aggregators: HashMap, } impl CollectorRefs { - async fn from_registry(registry: EbpfEventRegistry, event_actor: ActorRef, supervisor: ActorCell) -> Result { - let mut this = Self { collectors: HashMap::new() }; - this.start_all(®istry, &event_actor, &supervisor).await?; + async fn from_registry( + registry: EbpfEventRegistry, + destination_actor: ActorRef, + supervisor: ActorCell, + ) -> Result { + let mut this = Self { + collectors: HashMap::new(), + aggregators: HashMap::new(), + }; + this.start_all(®istry, &destination_actor, &supervisor) + .await?; Ok(this) } fn who_is(&self, cell: &ActorCell) -> Option { @@ -47,35 +62,71 @@ impl CollectorRefs { fn remove(&mut self, cell: &ActorCell) -> Option { self.collectors.remove(cell) } - async fn start_all(&mut self, registry: &EbpfEventRegistry, event_actor: &ActorRef, supervisor: &ActorCell) -> Result<(), ActorProcessingErr> { - for who in [CollectorT::VfsWrite, CollectorT::SysSendmsg, CollectorT::JniCall] { - self.start(who, registry, event_actor, supervisor).await?; + async fn start_all( + &mut self, + registry: &EbpfEventRegistry, + destination_actor: &ActorRef, + supervisor: &ActorCell, + ) -> Result<(), ActorProcessingErr> { + for who in [ + CollectorT::VfsWrite, + CollectorT::SysSendmsg, + CollectorT::JniCall, + ] { + self.start(who, registry, destination_actor, supervisor) + .await?; } Ok(()) } - async fn start(&mut self, who: CollectorT, registry: &EbpfEventRegistry, event_actor: &ActorRef, supervisor: &ActorCell) -> Result<(), ActorProcessingErr> { + async fn start( + &mut self, + who: CollectorT, + registry: &EbpfEventRegistry, + destination_actor: &ActorRef, + supervisor: &ActorCell, + ) -> Result<(), ActorProcessingErr> { let actor_ref = match who { - CollectorT::VfsWrite => start_collector(registry.vfs_write_events.clone(), event_actor.clone(), supervisor.clone()).await?, - CollectorT::SysSendmsg => start_collector(registry.sys_sendmsg_events.clone(), event_actor.clone(), supervisor.clone()).await?, - CollectorT::JniCall => start_collector(registry.jni_ref_calls.clone(), event_actor.clone(), supervisor.clone()).await?, + CollectorT::VfsWrite => { + start_collector( + registry.vfs_write_events.clone(), + destination_actor.clone(), + supervisor.clone(), + ) + .await? + } + CollectorT::SysSendmsg => { + start_collector( + registry.sys_sendmsg_events.clone(), + destination_actor.clone(), + supervisor.clone(), + ) + .await? + } + CollectorT::JniCall => { + start_collector( + registry.jni_ref_calls.clone(), + destination_actor.clone(), + supervisor.clone(), + ) + .await? + } }; self.collectors.insert(actor_ref.get_cell(), who); Ok(()) } } - pub struct CollectorSupervisorArguments { registry: EbpfEventRegistry, - sender: async_broadcast::Sender> + sender: async_broadcast::Sender>, } impl CollectorSupervisorArguments { - pub fn new(registry: EbpfEventRegistry, sender: async_broadcast::Sender>) -> Self { - Self { - registry, - sender - } + pub fn new( + registry: EbpfEventRegistry, + sender: async_broadcast::Sender>, + ) -> Self { + Self { registry, sender } } } @@ -83,15 +134,22 @@ impl Actor for CollectorSupervisor { type Msg = (); type State = CollectorSupervisorState; type Arguments = CollectorSupervisorArguments; - + async fn pre_start( - &self, - myself: ActorRef, - args: Self::Arguments, - ) -> Result { - - let (events, _) = Actor::spawn_linked(None, EventDispatcher, EventDispatcherState::new(args.sender), myself.get_cell()).await?; - let collectors = CollectorRefs::from_registry(args.registry.clone(), events.clone(), myself.get_cell()).await?; + &self, + myself: ActorRef, + args: Self::Arguments, + ) -> Result { + let (events, _) = Actor::spawn_linked( + None, + EventDispatcher, + EventDispatcherState::new(args.sender), + myself.get_cell(), + ) + .await?; + let collectors = + CollectorRefs::from_registry(args.registry.clone(), events.clone(), myself.get_cell()) + .await?; Ok(CollectorSupervisorState { registry: args.registry.clone(), @@ -99,21 +157,27 @@ impl Actor for CollectorSupervisor { events, }) } - + async fn handle_supervisor_evt( - &self, - myself: ActorRef, - message: SupervisionEvent, - state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { + &self, + myself: ActorRef, + message: SupervisionEvent, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { if let SupervisionEvent::ActorFailed(actor_cell, error) = message { if let Some(who) = state.collectors.who_is(&actor_cell) { error!("Collector {:?} failed with {:?}", actor_cell, error); let _ = state.collectors.remove(&actor_cell); - state.collectors.start(who, &state.registry, &state.events, &myself.get_cell()).await?; + state + .collectors + .start(who, &state.registry, &state.events, &myself.get_cell()) + .await?; Ok(()) } else { - Err(ActorProcessingErr::from(format!("Fatal {:?} failed with {:?}", actor_cell, error))) + Err(ActorProcessingErr::from(format!( + "Fatal {:?} failed with {:?}", + actor_cell, error + ))) } } else { Ok(()) @@ -121,8 +185,38 @@ impl Actor for CollectorSupervisor { } } -async fn start_collector(item: RegistryItem>, event_actor: ActorRef, supervisor: ActorCell) -> Result, ActorProcessingErr> -where T: TryFromRaw + IntoEvent + Send + Sync + 'static { - let (actor_ref, _) = Actor::spawn_linked(None, RingBufCollector::default(), RingBufCollectorArguments::new(item, event_actor), supervisor).await?; +async fn start_collector( + item: RegistryItem>, + event_actor: ActorRef, + supervisor: ActorCell, +) -> Result, ActorProcessingErr> +where + T: TryFromRaw + IntoEvent + Send + Sync + 'static, +{ + let (actor_ref, _) = Actor::spawn_linked( + None, + RingBufCollector::default(), + RingBufCollectorArguments::new(item, event_actor), + supervisor, + ) + .await?; Ok(actor_ref) -} \ No newline at end of file +} + +async fn start_aggregator( + event_type_enum: EventTypeEnum, + destination_actor: ActorRef, + supervisor: ActorCell, +) -> Result, ActorProcessingErr> +where + T: TryFromRaw + IntoEvent + Send + Sync + 'static, +{ + let (actor_ref, _) = Actor::spawn_linked( + None, + Aggregator::default(), + AggregatorArguments::new(destination_actor, _DEFAULT_TIMEFRAME, event_type_enum), + supervisor, + ) + .await?; + Ok(actor_ref) +} diff --git a/rust/backend/daemon/src/collector/time_series.rs b/rust/backend/daemon/src/collector/time_series.rs new file mode 100644 index 00000000..fc9b93f1 --- /dev/null +++ b/rust/backend/daemon/src/collector/time_series.rs @@ -0,0 +1,61 @@ +// SPDX-FileCopyrightText: 2024 Benedikt Zinn +// +// SPDX-License-Identifier: MIT + +pub struct TimeSeries { + internal_time_series: Vec, + internal_head_pointer: usize, +} + +impl TimeSeries { + pub fn new(length: usize) -> TimeSeries { + let zeroed_ts: Vec = vec![0; length]; + + TimeSeries { + internal_time_series: zeroed_ts, + internal_head_pointer: 0, + } + } + + pub fn length(&self) -> usize { + self.internal_time_series.len() + } + + pub fn append(&mut self, value: u64) { + self.internal_time_series[self.internal_head_pointer] = value; + self.internal_head_pointer = (self.internal_head_pointer + 1) % self.length(); + } + + pub fn as_array(&self) -> Vec { + let len = self.length(); + let mut result = Vec::with_capacity(len); + + for i in 0..len { + let index = (self.internal_head_pointer + i) % len; + result.push(self.internal_time_series[index]); + } + + result + } +} + + + + + + +#[test] +fn some_test() { + let mut ts = TimeSeries::new(5); + + ts.append(10); + ts.append(20); + ts.append(30); + ts.append(40); + ts.append(50); + + println!("{:?}", ts.as_array()); + + ts.append(60); + println!("{:?}", ts.as_array()); +} diff --git a/rust/backend/daemon/src/constants.rs b/rust/backend/daemon/src/constants.rs index 47b1d4da..e7635c51 100644 --- a/rust/backend/daemon/src/constants.rs +++ b/rust/backend/daemon/src/constants.rs @@ -4,6 +4,7 @@ // SPDX-License-Identifier: MIT use std::net::SocketAddr; +use std::time; pub(crate) const DEV_DEFAULT_FILE_PATH: &str = "./ziofa.json"; @@ -13,3 +14,6 @@ pub fn sock_addr() -> SocketAddr { pub const OATDUMP_PATH: &str = "/data/local/tmp/dump.json"; pub const ZIOFA_EBPF_PATH: &str = "/sys/fs/bpf/ziofa"; + +pub const _DEFAULT_TIMEFRAME: time::Duration = time::Duration::from_secs(1); +pub const TIMESERIES_LENGTH: usize = 40; \ No newline at end of file diff --git a/rust/shared/proto/ziofa.proto b/rust/shared/proto/ziofa.proto index 35cd0bb8..5d542096 100644 --- a/rust/shared/proto/ziofa.proto +++ b/rust/shared/proto/ziofa.proto @@ -66,7 +66,7 @@ message CmdlineData { message Event { oneof event_type{ - Metric metric = 1; + TimeSeries time_series = 1; Log log = 2; } } @@ -109,13 +109,13 @@ message JniReferencesEvent { JniMethodName jni_method_name = 4; } -message Metric{ +message TimeSeries{ enum EventTypeEnum{ VfsWriteEvent = 0; SysSendmsgEvent = 1; JniReferencesEvent = 2; } - uint32 timeframe_ms = 1; - uint32 event_count = 2; - EventTypeEnum event_type_enum = 3; -} \ No newline at end of file + EventTypeEnum event_type_enum = 1; + uint32 timeframe_ms = 2; + repeated uint64 data = 3; +} diff --git a/rust/shared/src/lib.rs b/rust/shared/src/lib.rs index 4afd1449..007219ee 100644 --- a/rust/shared/src/lib.rs +++ b/rust/shared/src/lib.rs @@ -6,7 +6,7 @@ use crate::ziofa::event::EventType; use crate::ziofa::log::EventData; -use crate::ziofa::metric::EventTypeEnum; +use crate::ziofa::time_series::EventTypeEnum; use crate::ziofa::{Event, Log}; #[cfg(feature = "uniffi")] From 02c528fbe5cb91058807e98693eca64786498ebe Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Tue, 14 Jan 2025 23:39:14 +0100 Subject: [PATCH 06/14] Fixed some warnings Signed-off-by: Benedikt Zinn --- .../daemon/src/collector/event_dispatcher.rs | 2 +- .../daemon/src/collector/supervisor.rs | 2 +- rust/backend/daemon/src/symbols/actors.rs | 34 +++++++++---------- rust/backend/ebpf-test/tests/prog_test_run.rs | 10 +++--- 4 files changed, 25 insertions(+), 23 deletions(-) diff --git a/rust/backend/daemon/src/collector/event_dispatcher.rs b/rust/backend/daemon/src/collector/event_dispatcher.rs index fbb8eb1b..a1edb825 100644 --- a/rust/backend/daemon/src/collector/event_dispatcher.rs +++ b/rust/backend/daemon/src/collector/event_dispatcher.rs @@ -23,9 +23,9 @@ impl EventDispatcherState { impl Actor for EventDispatcher { - type Arguments = EventDispatcherState; type Msg = Event; type State = EventDispatcherState; + type Arguments = EventDispatcherState; async fn pre_start( &self, diff --git a/rust/backend/daemon/src/collector/supervisor.rs b/rust/backend/daemon/src/collector/supervisor.rs index 26d43a0d..22bae146 100644 --- a/rust/backend/daemon/src/collector/supervisor.rs +++ b/rust/backend/daemon/src/collector/supervisor.rs @@ -129,9 +129,9 @@ impl CollectorSupervisorArguments { } impl Actor for CollectorSupervisor { - type Arguments = CollectorSupervisorArguments; type Msg = (); type State = CollectorSupervisorState; + type Arguments = CollectorSupervisorArguments; async fn pre_start( &self, diff --git a/rust/backend/daemon/src/symbols/actors.rs b/rust/backend/daemon/src/symbols/actors.rs index d12e878d..246cbc9d 100644 --- a/rust/backend/daemon/src/symbols/actors.rs +++ b/rust/backend/daemon/src/symbols/actors.rs @@ -22,9 +22,9 @@ unsafe impl Sync for SymbolFilePathCollector where S: Stream + State {} impl Actor for SymbolFilePathCollector where S: Stream + State + Unpin { - type Arguments = (S, ActorRef); - type State = (S, ActorRef); type Msg = (); + type State = (S, ActorRef); + type Arguments = (S, ActorRef); async fn pre_start( &self, @@ -54,16 +54,16 @@ where S: Stream + State + Unpin { struct SymbolFileParserProxy; impl Actor for SymbolFileParserProxy { - type Arguments = (usize, ActorCell, ActorRef); - type State = ActorRef>; type Msg = SymbolFilePath; + type State = ActorRef>; + type Arguments = (usize, ActorCell, ActorRef); async fn pre_start( &self, _: ActorRef, args: Self::Arguments, ) -> Result { - let factory_def = Factory::<(), SymbolFilePath, ActorRef, SymbolFileParser, QueuerRouting<(), SymbolFilePath>, DefaultQueue::<(), SymbolFilePath>>::default(); + let factory_def = Factory::<(), SymbolFilePath, ActorRef, SymbolFileParser, QueuerRouting<(), SymbolFilePath>, DefaultQueue<(), SymbolFilePath>>::default(); let factory_args = FactoryArguments::builder() .num_initial_workers(args.0) .worker_builder(Box::new(SymbolFileParserBuilder(args.2.clone()))) @@ -75,22 +75,22 @@ impl Actor for SymbolFileParserProxy { Ok(actor_ref) } - - async fn handle( + + async fn post_stop( &self, _: ActorRef, - message: Self::Msg, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - Ok(cast!(state, FactoryMessage::Dispatch(Job { key: (), accepted: None, msg: message, options: JobOptions::default() }))?) + Ok(cast!(state, FactoryMessage::DrainRequests)?) } - - async fn post_stop( + + async fn handle( &self, _: ActorRef, + message: Self::Msg, state: &mut Self::State, ) -> Result<(), ActorProcessingErr> { - Ok(cast!(state, FactoryMessage::DrainRequests)?) + Ok(cast!(state, FactoryMessage::Dispatch(Job { key: (), accepted: None, msg: message, options: JobOptions::default() }))?) } async fn handle_supervisor_evt( @@ -122,9 +122,9 @@ impl WorkerBuilder> for SymbolFilePar struct SymbolFileParser; impl Actor for SymbolFileParser { - type Arguments = WorkerStartContext<(), SymbolFilePath, ActorRef>; - type State = WorkerStartContext<(), SymbolFilePath, ActorRef>; type Msg = WorkerMessage<(), SymbolFilePath>; + type State = WorkerStartContext<(), SymbolFilePath, ActorRef>; + type Arguments = WorkerStartContext<(), SymbolFilePath, ActorRef>; async fn pre_start( &self, @@ -167,9 +167,9 @@ impl Actor for SymbolFileParser { struct SymbolIndexer; impl Actor for SymbolIndexer { - type Arguments = Arc; - type State = (Field, Field, Field, Field, Arc); type Msg = SymbolWithPath; + type State = (Field, Field, Field, Field, Arc); + type Arguments = Arc; async fn pre_start( &self, @@ -240,8 +240,8 @@ pub enum SymbolActorMsg { impl Actor for SymbolActor { type Msg = SymbolActorMsg; - type Arguments = (); type State = Index; + type Arguments = (); async fn pre_start( &self, diff --git a/rust/backend/ebpf-test/tests/prog_test_run.rs b/rust/backend/ebpf-test/tests/prog_test_run.rs index 58a9fe2b..cd470ef7 100644 --- a/rust/backend/ebpf-test/tests/prog_test_run.rs +++ b/rust/backend/ebpf-test/tests/prog_test_run.rs @@ -39,10 +39,12 @@ fn prog_test_run_example() { let args = [0u64, 0u64, target_pid, signal]; let mut attr = unsafe { mem::zeroed::() }; - - attr.test.prog_fd = fd as u32; - attr.test.ctx_in = args.as_ptr() as u64; - attr.test.ctx_size_in = args.len() as u32 * 8; + + unsafe { + attr.test.prog_fd = fd as u32; + attr.test.ctx_in = args.as_ptr() as u64; + attr.test.ctx_size_in = args.len() as u32 * 8; + } let _ = { let ret = unsafe { syscall(SYS_bpf, bpf_cmd::BPF_PROG_TEST_RUN, &mut attr, size_of::()) }; From ae8cbcf259eb6aadbabbf428b0fb7d9c53ebd10f Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Tue, 14 Jan 2025 23:47:40 +0100 Subject: [PATCH 07/14] Made clippy happy Signed-off-by: Benedikt Zinn --- rust/backend/daemon/src/collector/aggregator.rs | 4 ++-- rust/backend/ebpf-test/tests/prog_test_run.rs | 8 +++----- rust/shared/build.rs | 4 +++- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index 12b67c18..d967c3b2 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -35,7 +35,7 @@ pub struct AggregatorArguments { } impl AggregatorArguments { - pub fn new( + pub fn _new( event_actor: ActorRef, timeframe: Duration, event_type_enum: EventTypeEnum, @@ -103,7 +103,7 @@ impl Actor for Aggregator { ) -> Result<(), ActorProcessingErr> { match msg.event_type { Some(EventType::Log(event)) => { - let msg_event_type = EventTypeEnum::try_from(event).unwrap(); + let msg_event_type = EventTypeEnum::from(event); if msg_event_type != state.event_type { panic!( diff --git a/rust/backend/ebpf-test/tests/prog_test_run.rs b/rust/backend/ebpf-test/tests/prog_test_run.rs index cd470ef7..d600bf9d 100644 --- a/rust/backend/ebpf-test/tests/prog_test_run.rs +++ b/rust/backend/ebpf-test/tests/prog_test_run.rs @@ -40,11 +40,9 @@ fn prog_test_run_example() { let mut attr = unsafe { mem::zeroed::() }; - unsafe { - attr.test.prog_fd = fd as u32; - attr.test.ctx_in = args.as_ptr() as u64; - attr.test.ctx_size_in = args.len() as u32 * 8; - } + attr.test.prog_fd = fd as u32; + attr.test.ctx_in = args.as_ptr() as u64; + attr.test.ctx_size_in = args.len() as u32 * 8; let _ = { let ret = unsafe { syscall(SYS_bpf, bpf_cmd::BPF_PROG_TEST_RUN, &mut attr, size_of::()) }; diff --git a/rust/shared/build.rs b/rust/shared/build.rs index b4c85014..4e87c544 100644 --- a/rust/shared/build.rs +++ b/rust/shared/build.rs @@ -19,6 +19,8 @@ static UNIFFI_RECORDS: LazyLock> = LazyLock::new(|| { "EbpfEntry", "UprobeConfig", "Event", + "TimeSeries", + "Log", "VfsWriteEvent", "SysSendmsgEvent", "JniReferencesEvent", @@ -38,7 +40,7 @@ static UNIFFI_RECORDS: LazyLock> = LazyLock::new(|| { static UNIFFI_ENUMS: LazyLock> = LazyLock::new(|| { if cfg!(feature = "uniffi") { - vec!["Process.cmd", "Event.event_data", "JniReferencesEvent.JniMethodName"] + vec!["Process.cmd", "Event.event_data", "JniReferencesEvent.JniMethodName", "Event.event_type", "Log.event_data"] } else { vec![] } From 5695c731d8eaa17f8cc67f5d2fa12af2bab96e57 Mon Sep 17 00:00:00 2001 From: Felix Hilgers Date: Wed, 15 Jan 2025 10:11:19 +0100 Subject: [PATCH 08/14] fix: uniffi issues Signed-off-by: Felix Hilgers --- .../daemon/src/collector/aggregator.rs | 4 ++-- rust/backend/daemon/src/collector/mod.rs | 12 +++++----- rust/shared/build.rs | 6 ++--- rust/shared/proto/ziofa.proto | 10 ++++----- rust/shared/src/lib.rs | 22 +++++++++---------- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index d967c3b2..e8ab6268 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -5,8 +5,8 @@ use ractor::concurrency::{Duration, JoinHandle}; use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; use shared::ziofa::event::EventType; -use shared::ziofa::time_series::EventTypeEnum; -use shared::ziofa::{Event, TimeSeries as ZioTimeSeries}; +use shared::ziofa::time_series_event::EventTypeEnum; +use shared::ziofa::{Event, TimeSeriesEvent as ZioTimeSeries}; use tokio::time; use crate::collector::time_series::TimeSeries; use crate::constants::TIMESERIES_LENGTH; diff --git a/rust/backend/daemon/src/collector/mod.rs b/rust/backend/daemon/src/collector/mod.rs index 831021b1..04bd08a8 100644 --- a/rust/backend/daemon/src/collector/mod.rs +++ b/rust/backend/daemon/src/collector/mod.rs @@ -7,8 +7,8 @@ use backend_common::{JNICall, JNIMethodName, SysSendmsgCall, SysSigquitCall, VfsWriteCall}; use shared::ziofa::event::EventType; use shared::ziofa::jni_references_event::JniMethodName; -use shared::ziofa::log::EventData; -use shared::ziofa::{Event, JniReferencesEvent, Log, SysSendmsgEvent, SysSigquitEvent, VfsWriteEvent}; +use shared::ziofa::log_event::EventData; +use shared::ziofa::{Event, JniReferencesEvent, LogEvent, SysSendmsgEvent, SysSigquitEvent, VfsWriteEvent}; mod aggregator; mod event_dispatcher; @@ -25,7 +25,7 @@ pub trait IntoEvent { impl IntoEvent for VfsWriteCall { fn into_event(self) -> Event { Event { - event_type: Some(EventType::Log(Log { + event_type: Some(EventType::Log(LogEvent { event_data: Some(EventData::VfsWrite(VfsWriteEvent { pid: self.pid, tid: self.tid, @@ -41,7 +41,7 @@ impl IntoEvent for VfsWriteCall { impl IntoEvent for SysSendmsgCall { fn into_event(self) -> Event { Event { - event_type: Some(EventType::Log(Log { + event_type: Some(EventType::Log(LogEvent { event_data: Some(EventData::SysSendmsg(SysSendmsgEvent { pid: self.pid, tid: self.tid, @@ -57,7 +57,7 @@ impl IntoEvent for SysSendmsgCall { impl IntoEvent for JNICall { fn into_event(self) -> Event { Event { - event_type: Some(EventType::Log(Log { + event_type: Some(EventType::Log(LogEvent { event_data: Some(EventData::JniReferences(JniReferencesEvent { pid: self.pid, tid: self.tid, @@ -78,7 +78,7 @@ impl IntoEvent for JNICall { impl IntoEvent for SysSigquitCall { fn into_event(self) -> Event { Event { - event_type: Some(EventType::Log(Log { + event_type: Some(EventType::Log(LogEvent { event_data: Some(EventData::SysSigquit(SysSigquitEvent { pid: self.pid, tid: self.tid, diff --git a/rust/shared/build.rs b/rust/shared/build.rs index 4e87c544..b3c53780 100644 --- a/rust/shared/build.rs +++ b/rust/shared/build.rs @@ -19,8 +19,8 @@ static UNIFFI_RECORDS: LazyLock> = LazyLock::new(|| { "EbpfEntry", "UprobeConfig", "Event", - "TimeSeries", - "Log", + "TimeSeriesEvent", + "LogEvent", "VfsWriteEvent", "SysSendmsgEvent", "JniReferencesEvent", @@ -40,7 +40,7 @@ static UNIFFI_RECORDS: LazyLock> = LazyLock::new(|| { static UNIFFI_ENUMS: LazyLock> = LazyLock::new(|| { if cfg!(feature = "uniffi") { - vec!["Process.cmd", "Event.event_data", "JniReferencesEvent.JniMethodName", "Event.event_type", "Log.event_data"] + vec!["Process.cmd", "Event.event_data", "JniReferencesEvent.JniMethodName", "Event.event_type", "LogEvent.event_data"] } else { vec![] } diff --git a/rust/shared/proto/ziofa.proto b/rust/shared/proto/ziofa.proto index d6eb6ca0..33e3e76e 100644 --- a/rust/shared/proto/ziofa.proto +++ b/rust/shared/proto/ziofa.proto @@ -89,13 +89,13 @@ message CmdlineData { } message Event { - oneof event_type{ - TimeSeries time_series = 1; - Log log = 2; + oneof event_type { + TimeSeriesEvent time_series = 1; + LogEvent log = 2; } } -message Log{ +message LogEvent { oneof event_data { VfsWriteEvent vfs_write = 1; SysSendmsgEvent sys_sendmsg = 2; @@ -141,7 +141,7 @@ message SysSigquitEvent { uint64 target_pid = 4; } -message TimeSeries{ +message TimeSeriesEvent { enum EventTypeEnum{ VfsWriteEvent = 0; SysSendmsgEvent = 1; diff --git a/rust/shared/src/lib.rs b/rust/shared/src/lib.rs index 007219ee..e00f1f7e 100644 --- a/rust/shared/src/lib.rs +++ b/rust/shared/src/lib.rs @@ -5,9 +5,9 @@ // SPDX-License-Identifier: MIT use crate::ziofa::event::EventType; -use crate::ziofa::log::EventData; -use crate::ziofa::time_series::EventTypeEnum; -use crate::ziofa::{Event, Log}; +use crate::ziofa::log_event::EventData; +use crate::ziofa::time_series_event::EventTypeEnum; +use crate::ziofa::{Event, LogEvent}; #[cfg(feature = "uniffi")] uniffi::setup_scaffolding!(); @@ -31,19 +31,19 @@ impl TryInto for Event { match self { Event { event_type: - Some(EventType::Log(Log { + Some(EventType::Log(LogEvent { event_data: Some(EventData::VfsWrite(_)), })), } => Ok(EventTypeEnum::VfsWriteEvent), Event { event_type: - Some(EventType::Log(Log { + Some(EventType::Log(LogEvent { event_data: Some(EventData::SysSendmsg(_)), })), } => Ok(EventTypeEnum::JniReferencesEvent), Event { event_type: - Some(EventType::Log(Log { + Some(EventType::Log(LogEvent { event_data: Some(EventData::JniReferences(_)), })), } => Ok(EventTypeEnum::SysSendmsgEvent), @@ -52,17 +52,17 @@ impl TryInto for Event { } } -impl From for EventTypeEnum { +impl From for EventTypeEnum { - fn from(value: Log) -> Self { + fn from(value: LogEvent) -> Self { match value { - Log { + LogEvent { event_data: Some(EventData::VfsWrite(_)), } => EventTypeEnum::VfsWriteEvent, - Log { + LogEvent { event_data: Some(EventData::SysSendmsg(_)), } => EventTypeEnum::SysSendmsgEvent, - Log { + LogEvent { event_data: Some(EventData::JniReferences(_)), } => EventTypeEnum::JniReferencesEvent, _ => panic!() From b0d1b92dd4027f437a3555c352688b4ceb07995b Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Mon, 20 Jan 2025 14:39:44 +0100 Subject: [PATCH 09/14] Fixed faulty cast Signed-off-by: Benedikt Zinn --- rust/shared/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/shared/src/lib.rs b/rust/shared/src/lib.rs index e00f1f7e..e89f4748 100644 --- a/rust/shared/src/lib.rs +++ b/rust/shared/src/lib.rs @@ -40,13 +40,13 @@ impl TryInto for Event { Some(EventType::Log(LogEvent { event_data: Some(EventData::SysSendmsg(_)), })), - } => Ok(EventTypeEnum::JniReferencesEvent), + } => Ok(EventTypeEnum::SysSendmsgEvent), Event { event_type: Some(EventType::Log(LogEvent { event_data: Some(EventData::JniReferences(_)), })), - } => Ok(EventTypeEnum::SysSendmsgEvent), + } => Ok(EventTypeEnum::JniReferencesEvent), _ => Err(()), } } From d0e8a3735ab8af6def7ecb2129e845bd7e86e033 Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Mon, 20 Jan 2025 22:54:35 +0100 Subject: [PATCH 10/14] Added pid matching Signed-off-by: Benedikt Zinn --- .../daemon/src/collector/aggregator.rs | 46 ++++++++++++++----- .../daemon/src/collector/time_series.rs | 9 ++++ rust/shared/proto/ziofa.proto | 10 +++- rust/shared/src/lib.rs | 13 +++++- 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index e8ab6268..66dc2281 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -2,14 +2,16 @@ // // SPDX-License-Identifier: MIT +use crate::collector::time_series::TimeSeries; +use crate::constants::TIMESERIES_LENGTH; use ractor::concurrency::{Duration, JoinHandle}; use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; use shared::ziofa::event::EventType; +use shared::ziofa::log_event::EventData; use shared::ziofa::time_series_event::EventTypeEnum; use shared::ziofa::{Event, TimeSeriesEvent as ZioTimeSeries}; +use std::collections::HashMap; use tokio::time; -use crate::collector::time_series::TimeSeries; -use crate::constants::TIMESERIES_LENGTH; pub struct Aggregator; @@ -21,11 +23,11 @@ impl Default for Aggregator { pub struct AggregatorState { event_type: EventTypeEnum, - event_count: u32, + event_count_map: HashMap, // map pid to count timeframe: Duration, event_actor: ActorRef, timer: Option>, - timeseries: TimeSeries + timeseries: HashMap, } pub struct AggregatorArguments { @@ -53,11 +55,11 @@ impl TryFrom for AggregatorState { fn try_from(args: AggregatorArguments) -> Result { Ok(AggregatorState { event_type: args.event_type_enum, - event_count: 0, + event_count_map: HashMap::new(), timeframe: args.timeframe, event_actor: args.event_actor, timer: None, - timeseries: TimeSeries::new(TIMESERIES_LENGTH), + timeseries: HashMap::new(), }) } } @@ -103,6 +105,14 @@ impl Actor for Aggregator { ) -> Result<(), ActorProcessingErr> { match msg.event_type { Some(EventType::Log(event)) => { + let pid = match event.clone() { + EventData::VfsWrite(item) => item.pid, + EventData::SysSendmsg(item) => item.pid, + EventData::JniReferences(item) => item.pid, + EventData::SysSigquit(item) => item.pid, + EventData::Gc(item) => item.pid, + }; + let msg_event_type = EventTypeEnum::from(event); if msg_event_type != state.event_type { @@ -111,17 +121,28 @@ impl Actor for Aggregator { state.event_type, msg_event_type ); } - state.event_count += 1; + if !state.event_count_map.contains_key(&pid) { + state.event_count_map.insert(pid, 1); + } } _ => { // event type is none -> timer was triggered -> send the metric - state.timeseries.append(state.event_count as u64); + for (key, value) in state.event_count_map.iter() { + if !state.timeseries.contains_key(key) { + let mut new_ts = TimeSeries::new(TIMESERIES_LENGTH); + new_ts.append(*value); + state.timeseries.insert(*key, new_ts); + } else { + state.timeseries.get_mut(key).unwrap().append(*value); + } + } + let time_series = ZioTimeSeries { event_type_enum: state.event_type.into(), timeframe_ms: state.timeframe.as_millis() as u32, - data: state.timeseries.as_array(), + time_series_map: state.timeseries.clone().into(), }; - + cast!( state.event_actor, Event { @@ -129,7 +150,10 @@ impl Actor for Aggregator { } ) .map_err(|_| ActorProcessingErr::from("Failed to send metric to event actor"))?; - state.event_count = 0; + for key in state.event_count_map.keys() { + let mut count = state.event_count_map.get_mut(key).unwrap(); + *count = 0; + } } } Ok(()) diff --git a/rust/backend/daemon/src/collector/time_series.rs b/rust/backend/daemon/src/collector/time_series.rs index fc9b93f1..83f3f03c 100644 --- a/rust/backend/daemon/src/collector/time_series.rs +++ b/rust/backend/daemon/src/collector/time_series.rs @@ -40,6 +40,15 @@ impl TimeSeries { } +impl Into for TimeSeries { + fn into(self) -> shared::ziofa::time_series_event::TimeSeries { + shared::ziofa::time_series_event::TimeSeries{ + list: self.as_array(), + } + } +} + + diff --git a/rust/shared/proto/ziofa.proto b/rust/shared/proto/ziofa.proto index a382b9fc..60ada927 100644 --- a/rust/shared/proto/ziofa.proto +++ b/rust/shared/proto/ziofa.proto @@ -147,12 +147,20 @@ message TimeSeriesEvent { VfsWriteEvent = 0; SysSendmsgEvent = 1; JniReferencesEvent = 2; + SysSigquitEvent = 3; + GcEvent = 4; } EventTypeEnum event_type_enum = 1; uint32 timeframe_ms = 2; - repeated uint64 data = 3; + + message time_series{ + repeated uint64 list = 1; + } + + maptime_series_map = 3; } + message GcEvent { uint32 pid = 1; uint32 tid = 2; diff --git a/rust/shared/src/lib.rs b/rust/shared/src/lib.rs index e89f4748..d9257cd9 100644 --- a/rust/shared/src/lib.rs +++ b/rust/shared/src/lib.rs @@ -46,7 +46,18 @@ impl TryInto for Event { Some(EventType::Log(LogEvent { event_data: Some(EventData::JniReferences(_)), })), - } => Ok(EventTypeEnum::JniReferencesEvent), + } => Ok(EventTypeEnum::JniReferencesEvent),Event { + event_type: + Some(EventType::Log(LogEvent { + event_data: Some(EventData::SysSigquit(_)), + })), + } => Ok(EventTypeEnum::SysSigquitEvent), + Event{ + event_type: + Some(EventType::Log(LogEvent { + event_data: Some(EventData::Gc(_)), + })) + } => Ok(EventTypeEnum::GcEvent), _ => Err(()), } } From 3bb755f4abc1fce6a05bd1e318e0dc584172d9a6 Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Mon, 20 Jan 2025 23:45:25 +0100 Subject: [PATCH 11/14] Made clippy happy Signed-off-by: Benedikt Zinn --- .../daemon/src/collector/aggregator.rs | 55 ++++++++++++------- .../daemon/src/collector/time_series.rs | 15 ++--- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index 66dc2281..3960e718 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -9,11 +9,23 @@ use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; use shared::ziofa::event::EventType; use shared::ziofa::log_event::EventData; use shared::ziofa::time_series_event::EventTypeEnum; -use shared::ziofa::{Event, TimeSeriesEvent as ZioTimeSeries}; +use shared::ziofa::time_series_event::TimeSeries as ZioTimeSeries; +use shared::ziofa::{Event, TimeSeriesEvent as ZioTimeSeriesEvent}; use std::collections::HashMap; use tokio::time; pub struct Aggregator; +impl Aggregator { + fn convert_map_to_prototype( + time_series_map: HashMap, + ) -> HashMap { + let mut map = HashMap::::with_capacity(time_series_map.len()); + for (id, time_series) in time_series_map { + map.insert(id, time_series.into()); + } + map + } +} impl Default for Aggregator { fn default() -> Self { @@ -27,7 +39,7 @@ pub struct AggregatorState { timeframe: Duration, event_actor: ActorRef, timer: Option>, - timeseries: HashMap, + time_series_map: HashMap, } pub struct AggregatorArguments { @@ -59,7 +71,7 @@ impl TryFrom for AggregatorState { timeframe: args.timeframe, event_actor: args.event_actor, timer: None, - timeseries: HashMap::new(), + time_series_map: HashMap::new(), }) } } @@ -105,12 +117,15 @@ impl Actor for Aggregator { ) -> Result<(), ActorProcessingErr> { match msg.event_type { Some(EventType::Log(event)) => { - let pid = match event.clone() { - EventData::VfsWrite(item) => item.pid, - EventData::SysSendmsg(item) => item.pid, - EventData::JniReferences(item) => item.pid, - EventData::SysSigquit(item) => item.pid, - EventData::Gc(item) => item.pid, + let pid = match event.event_data.clone() { + Some(EventData::VfsWrite(item)) => item.pid, + Some(EventData::SysSendmsg(item)) => item.pid, + Some(EventData::JniReferences(item)) => item.pid, + Some(EventData::SysSigquit(item)) => item.pid, + Some(EventData::Gc(item)) => item.pid, + _ => { + panic!("unexpected event type"); + } }; let msg_event_type = EventTypeEnum::from(event); @@ -121,26 +136,27 @@ impl Actor for Aggregator { state.event_type, msg_event_type ); } - if !state.event_count_map.contains_key(&pid) { - state.event_count_map.insert(pid, 1); - } + state.event_count_map.entry(pid).or_insert(1); } _ => { // event type is none -> timer was triggered -> send the metric for (key, value) in state.event_count_map.iter() { - if !state.timeseries.contains_key(key) { + if !state.time_series_map.contains_key(key) { let mut new_ts = TimeSeries::new(TIMESERIES_LENGTH); new_ts.append(*value); - state.timeseries.insert(*key, new_ts); + state.time_series_map.insert(*key, new_ts); } else { - state.timeseries.get_mut(key).unwrap().append(*value); + state.time_series_map.get_mut(key).unwrap().append(*value); } } - let time_series = ZioTimeSeries { + //convert type for sending + //ziofa::time_series_event::TimeSeries + + let time_series = ZioTimeSeriesEvent { event_type_enum: state.event_type.into(), timeframe_ms: state.timeframe.as_millis() as u32, - time_series_map: state.timeseries.clone().into(), + time_series_map: Self::convert_map_to_prototype(state.time_series_map.clone()), }; cast!( @@ -150,9 +166,8 @@ impl Actor for Aggregator { } ) .map_err(|_| ActorProcessingErr::from("Failed to send metric to event actor"))?; - for key in state.event_count_map.keys() { - let mut count = state.event_count_map.get_mut(key).unwrap(); - *count = 0; + for (_, value) in state.event_count_map.iter_mut() { + *value = 0; } } } diff --git a/rust/backend/daemon/src/collector/time_series.rs b/rust/backend/daemon/src/collector/time_series.rs index 83f3f03c..49f61964 100644 --- a/rust/backend/daemon/src/collector/time_series.rs +++ b/rust/backend/daemon/src/collector/time_series.rs @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: MIT +#[derive(Clone)] pub struct TimeSeries { internal_time_series: Vec, internal_head_pointer: usize, @@ -39,20 +40,14 @@ impl TimeSeries { } } - -impl Into for TimeSeries { - fn into(self) -> shared::ziofa::time_series_event::TimeSeries { - shared::ziofa::time_series_event::TimeSeries{ - list: self.as_array(), +impl From for shared::ziofa::time_series_event::TimeSeries { + fn from(series: TimeSeries) -> Self { + shared::ziofa::time_series_event::TimeSeries { + list: series.as_array(), } } } - - - - - #[test] fn some_test() { let mut ts = TimeSeries::new(5); From e66e49f3e3a577c6eb0d10ea9f5918a1558b022f Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Tue, 21 Jan 2025 00:00:26 +0100 Subject: [PATCH 12/14] Fixed uniffi problems Signed-off-by: Benedikt Zinn --- rust/backend/daemon/src/collector/aggregator.rs | 6 +++--- rust/backend/daemon/src/collector/time_series.rs | 4 ++-- rust/shared/build.rs | 1 + rust/shared/proto/ziofa.proto | 4 ++-- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/rust/backend/daemon/src/collector/aggregator.rs b/rust/backend/daemon/src/collector/aggregator.rs index 3960e718..bb3cf350 100644 --- a/rust/backend/daemon/src/collector/aggregator.rs +++ b/rust/backend/daemon/src/collector/aggregator.rs @@ -9,7 +9,7 @@ use ractor::{cast, Actor, ActorProcessingErr, ActorRef}; use shared::ziofa::event::EventType; use shared::ziofa::log_event::EventData; use shared::ziofa::time_series_event::EventTypeEnum; -use shared::ziofa::time_series_event::TimeSeries as ZioTimeSeries; +use shared::ziofa::time_series_event::TimeSeriesType; use shared::ziofa::{Event, TimeSeriesEvent as ZioTimeSeriesEvent}; use std::collections::HashMap; use tokio::time; @@ -18,8 +18,8 @@ pub struct Aggregator; impl Aggregator { fn convert_map_to_prototype( time_series_map: HashMap, - ) -> HashMap { - let mut map = HashMap::::with_capacity(time_series_map.len()); + ) -> HashMap { + let mut map = HashMap::::with_capacity(time_series_map.len()); for (id, time_series) in time_series_map { map.insert(id, time_series.into()); } diff --git a/rust/backend/daemon/src/collector/time_series.rs b/rust/backend/daemon/src/collector/time_series.rs index 49f61964..467545df 100644 --- a/rust/backend/daemon/src/collector/time_series.rs +++ b/rust/backend/daemon/src/collector/time_series.rs @@ -40,9 +40,9 @@ impl TimeSeries { } } -impl From for shared::ziofa::time_series_event::TimeSeries { +impl From for shared::ziofa::time_series_event::TimeSeriesType { fn from(series: TimeSeries) -> Self { - shared::ziofa::time_series_event::TimeSeries { + shared::ziofa::time_series_event::TimeSeriesType{ list: series.as_array(), } } diff --git a/rust/shared/build.rs b/rust/shared/build.rs index 8075a51c..83d8b6ad 100644 --- a/rust/shared/build.rs +++ b/rust/shared/build.rs @@ -34,6 +34,7 @@ static UNIFFI_RECORDS: LazyLock> = LazyLock::new(|| { "SysSigquitConfig", "GcConfig", "GcEvent", + "TimeSeriesType" ] } else { vec![] diff --git a/rust/shared/proto/ziofa.proto b/rust/shared/proto/ziofa.proto index 60ada927..ba12655f 100644 --- a/rust/shared/proto/ziofa.proto +++ b/rust/shared/proto/ziofa.proto @@ -153,11 +153,11 @@ message TimeSeriesEvent { EventTypeEnum event_type_enum = 1; uint32 timeframe_ms = 2; - message time_series{ + message TimeSeriesType{ repeated uint64 list = 1; } - maptime_series_map = 3; + maptime_series_map = 3; } From 07b59333ad0d6799423be2dcc9a1e9c4877eb5de Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Tue, 21 Jan 2025 22:16:25 +0100 Subject: [PATCH 13/14] Added the missing config to make test compile Signed-off-by: Benedikt Zinn --- rust/client/tests/base.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/client/tests/base.rs b/rust/client/tests/base.rs index 56cf40f6..dc4f7fb0 100644 --- a/rust/client/tests/base.rs +++ b/rust/client/tests/base.rs @@ -5,7 +5,7 @@ // SPDX-License-Identifier: MIT use client::Client; -use shared::config::{Configuration, GcConfig, SysSendmsgConfig, SysSigquitConfig, VfsWriteConfig}; +use shared::config::{Configuration, GcConfig, SysFdTrackingConfig, SysSendmsgConfig, SysSigquitConfig, VfsWriteConfig}; use shared::ziofa::process::Cmd; // client tests assume daemon is running! @@ -56,7 +56,8 @@ async fn set_get_empty_config() { // jni_references: Some(JniReferencesConfig { pids: vec![] }), jni_references: None, sys_sigquit: Some(SysSigquitConfig { pids: vec![] }), - gc: Some(GcConfig { }) + gc: Some(GcConfig { }), + sys_fd_tracking: Some(SysFdTrackingConfig { pids: vec![] }), }; client From a1767a5f13298734be11f3bf70e0b64461f573d3 Mon Sep 17 00:00:00 2001 From: Benedikt Zinn Date: Tue, 21 Jan 2025 22:23:08 +0100 Subject: [PATCH 14/14] ktfmtFormat Signed-off-by: Benedikt Zinn --- .../de.amosproj3.ziofa.client/RustClient.kt | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt b/frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt index 72a541e4..ef3c8064 100644 --- a/frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt +++ b/frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt @@ -101,17 +101,17 @@ private fun uniffi.shared.Event.into() = ) is EventData.SysFdTracking -> - Event.SysFdTracking( - pid = d.v1.pid, - tid = d.v1.tid, - timeStamp = d.v1.timeStamp, - fdAction = - when (sysFdActionFromI32(d.v1.fdAction)) { - SysFdAction.CREATED -> Event.SysFdTracking.SysFdAction.Created - SysFdAction.DESTROYED -> Event.SysFdTracking.SysFdAction.Destroyed - SysFdAction.UNDEFINED -> null - }, - ) + Event.SysFdTracking( + pid = d.v1.pid, + tid = d.v1.tid, + timeStamp = d.v1.timeStamp, + fdAction = + when (sysFdActionFromI32(d.v1.fdAction)) { + SysFdAction.CREATED -> Event.SysFdTracking.SysFdAction.Created + SysFdAction.DESTROYED -> Event.SysFdTracking.SysFdAction.Destroyed + SysFdAction.UNDEFINED -> null + }, + ) null -> null }