Skip to content

Commit

Permalink
Merge pull request #211 from amosproj/158-aggregator
Browse files Browse the repository at this point in the history
Aggregation / Timeseries creation in backend
  • Loading branch information
fhilgers authored Jan 21, 2025
2 parents 925e0fe + a1767a5 commit 92753b9
Show file tree
Hide file tree
Showing 15 changed files with 612 additions and 195 deletions.
152 changes: 84 additions & 68 deletions frontend/client/src/real/java/de.amosproj3.ziofa.client/RustClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import uniffi.client.jniMethodNameFromI32
import uniffi.client.sysFdActionFromI32
import uniffi.shared.Cmd
import uniffi.shared.EventData
import uniffi.shared.EventType
import uniffi.shared.JniMethodName
import uniffi.shared.SysFdAction

Expand All @@ -32,74 +33,89 @@ private fun uniffi.shared.Process.into() =
)

private fun uniffi.shared.Event.into() =
when (val d = eventData) {
is EventData.VfsWrite ->
Event.VfsWrite(
pid = d.v1.pid,
tid = d.v1.tid,
beginTimeStamp = d.v1.beginTimeStamp,
fp = d.v1.fp,
bytesWritten = d.v1.bytesWritten,
)
is EventData.SysSendmsg ->
Event.SysSendmsg(
pid = d.v1.pid,
tid = d.v1.tid,
beginTimeStamp = d.v1.beginTimeStamp,
fd = d.v1.fd,
durationNanoSecs = d.v1.durationNanoSec,
)
is EventData.JniReferences ->
Event.JniReferences(
pid = d.v1.pid,
tid = d.v1.tid,
beginTimeStamp = d.v1.beginTimeStamp,
jniMethodName =
when (jniMethodNameFromI32(d.v1.jniMethodName)) {
JniMethodName.ADD_LOCAL_REF -> Event.JniReferences.JniMethodName.AddLocalRef
JniMethodName.DELETE_LOCAL_REF ->
Event.JniReferences.JniMethodName.DeleteLocalRef
JniMethodName.ADD_GLOBAL_REF ->
Event.JniReferences.JniMethodName.AddGlobalRef
JniMethodName.DELETE_GLOBAL_REF ->
Event.JniReferences.JniMethodName.DeleteGlobalRef
JniMethodName.UNDEFINED -> null
},
)
is EventData.SysSigquit ->
Event.SysSigquit(
pid = d.v1.pid,
tid = d.v1.tid,
timeStamp = d.v1.timeStamp,
targetPid = d.v1.targetPid,
)
is EventData.Gc ->
Event.Gc(
pid = d.v1.pid,
tid = d.v1.tid,
targetFootprint = d.v1.targetFootprint,
numBytesAllocated = d.v1.numBytesAllocated,
gcsCompleted = d.v1.gcsCompleted,
gcCause = d.v1.gcCause,
durationNs = d.v1.durationNs,
freedObjects = d.v1.freedObjects,
freedBytes = d.v1.freedBytes,
freedLosObjects = d.v1.freedLosObjects,
freedLosBytes = d.v1.freedLosBytes,
pauseTimes = d.v1.pauseTimes,
)
is EventData.SysFdTracking ->
Event.SysFdTracking(
pid = d.v1.pid,
tid = d.v1.tid,
timeStamp = d.v1.timeStamp,
fdAction =
when (sysFdActionFromI32(d.v1.fdAction)) {
SysFdAction.CREATED -> Event.SysFdTracking.SysFdAction.Created
SysFdAction.DESTROYED -> Event.SysFdTracking.SysFdAction.Destroyed
SysFdAction.UNDEFINED -> null
},
)
when (val x = eventType) {
is EventType.Log ->
when (val d = x.v1.eventData) {
is EventData.VfsWrite ->
Event.VfsWrite(
pid = d.v1.pid,
tid = d.v1.tid,
beginTimeStamp = d.v1.beginTimeStamp,
fp = d.v1.fp,
bytesWritten = d.v1.bytesWritten,
)

is EventData.SysSendmsg ->
Event.SysSendmsg(
pid = d.v1.pid,
tid = d.v1.tid,
beginTimeStamp = d.v1.beginTimeStamp,
fd = d.v1.fd,
durationNanoSecs = d.v1.durationNanoSec,
)

is EventData.JniReferences ->
Event.JniReferences(
pid = d.v1.pid,
tid = d.v1.tid,
beginTimeStamp = d.v1.beginTimeStamp,
jniMethodName =
when (jniMethodNameFromI32(d.v1.jniMethodName)) {
JniMethodName.ADD_LOCAL_REF ->
Event.JniReferences.JniMethodName.AddLocalRef
JniMethodName.DELETE_LOCAL_REF ->
Event.JniReferences.JniMethodName.DeleteLocalRef

JniMethodName.ADD_GLOBAL_REF ->
Event.JniReferences.JniMethodName.AddGlobalRef

JniMethodName.DELETE_GLOBAL_REF ->
Event.JniReferences.JniMethodName.DeleteGlobalRef

JniMethodName.UNDEFINED -> null
},
)

is EventData.SysSigquit ->
Event.SysSigquit(
pid = d.v1.pid,
tid = d.v1.tid,
timeStamp = d.v1.timeStamp,
targetPid = d.v1.targetPid,
)

is EventData.Gc ->
Event.Gc(
pid = d.v1.pid,
tid = d.v1.tid,
targetFootprint = d.v1.targetFootprint,
numBytesAllocated = d.v1.numBytesAllocated,
gcsCompleted = d.v1.gcsCompleted,
gcCause = d.v1.gcCause,
durationNs = d.v1.durationNs,
freedObjects = d.v1.freedObjects,
freedBytes = d.v1.freedBytes,
freedLosObjects = d.v1.freedLosObjects,
freedLosBytes = d.v1.freedLosBytes,
pauseTimes = d.v1.pauseTimes,
)

is EventData.SysFdTracking ->
Event.SysFdTracking(
pid = d.v1.pid,
tid = d.v1.tid,
timeStamp = d.v1.timeStamp,
fdAction =
when (sysFdActionFromI32(d.v1.fdAction)) {
SysFdAction.CREATED -> Event.SysFdTracking.SysFdAction.Created
SysFdAction.DESTROYED -> Event.SysFdTracking.SysFdAction.Destroyed
SysFdAction.UNDEFINED -> null
},
)

null -> null
}
is EventType.TimeSeries -> null
null -> null
}

Expand Down
176 changes: 176 additions & 0 deletions rust/backend/daemon/src/collector/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// SPDX-FileCopyrightText: 2024 Benedikt Zinn <[email protected]>
//
// SPDX-License-Identifier: MIT

use crate::collector::time_series::TimeSeries;
use crate::constants::TIMESERIES_LENGTH;
use ractor::concurrency::{Duration, JoinHandle};
use ractor::{cast, Actor, ActorProcessingErr, ActorRef};
use shared::ziofa::event::EventType;
use shared::ziofa::log_event::EventData;
use shared::ziofa::time_series_event::EventTypeEnum;
use shared::ziofa::time_series_event::TimeSeriesType;
use shared::ziofa::{Event, TimeSeriesEvent as ZioTimeSeriesEvent};
use std::collections::HashMap;
use tokio::time;

pub struct Aggregator;
impl Aggregator {
fn convert_map_to_prototype(
time_series_map: HashMap<u32, TimeSeries>,
) -> HashMap<u32, TimeSeriesType> {
let mut map = HashMap::<u32, TimeSeriesType>::with_capacity(time_series_map.len());
for (id, time_series) in time_series_map {
map.insert(id, time_series.into());
}
map
}
}

impl Default for Aggregator {
fn default() -> Self {
Aggregator
}
}

pub struct AggregatorState {
event_type: EventTypeEnum,
event_count_map: HashMap<u32, u64>, // map pid to count
timeframe: Duration,
event_actor: ActorRef<Event>,
timer: Option<JoinHandle<()>>,
time_series_map: HashMap<u32, TimeSeries>,
}

pub struct AggregatorArguments {
event_actor: ActorRef<Event>,
timeframe: time::Duration,
event_type_enum: EventTypeEnum,
}

impl AggregatorArguments {
pub fn _new(
event_actor: ActorRef<Event>,
timeframe: Duration,
event_type_enum: EventTypeEnum,
) -> Self {
AggregatorArguments {
event_actor,
timeframe,
event_type_enum,
}
}
}

impl TryFrom<AggregatorArguments> for AggregatorState {
type Error = ActorProcessingErr;
fn try_from(args: AggregatorArguments) -> Result<AggregatorState, Self::Error> {
Ok(AggregatorState {
event_type: args.event_type_enum,
event_count_map: HashMap::new(),
timeframe: args.timeframe,
event_actor: args.event_actor,
timer: None,
time_series_map: HashMap::new(),
})
}
}

impl Actor for Aggregator {
type Msg = Event;
type State = AggregatorState;
type Arguments = AggregatorArguments;

async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Self::State::try_from(args)
}

async fn post_start(
&self,
myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.timer = Some(myself.send_interval(state.timeframe, || Event { event_type: None }));
Ok(())
}

async fn post_stop(
&self,
_: ActorRef<Self::Msg>,
state: &mut AggregatorState,
) -> Result<(), ActorProcessingErr> {
if let Some(timer) = state.timer.take() {
timer.abort();
}
Ok(())
}

async fn handle(
&self,
_: ActorRef<Self::Msg>,
msg: Self::Msg,
state: &mut AggregatorState,
) -> Result<(), ActorProcessingErr> {
match msg.event_type {
Some(EventType::Log(event)) => {
let pid = match event.event_data.clone() {
Some(EventData::VfsWrite(item)) => item.pid,
Some(EventData::SysSendmsg(item)) => item.pid,
Some(EventData::JniReferences(item)) => item.pid,
Some(EventData::SysSigquit(item)) => item.pid,
Some(EventData::Gc(item)) => item.pid,
_ => {
panic!("unexpected event type");
}
};

let msg_event_type = EventTypeEnum::from(event);

if msg_event_type != state.event_type {
panic!(
"event type mismatch in Aggregator -> I was initialized with {:?}, but was send an {:?}",
state.event_type, msg_event_type
);
}
state.event_count_map.entry(pid).or_insert(1);
}
_ => {
// event type is none -> timer was triggered -> send the metric
for (key, value) in state.event_count_map.iter() {
if !state.time_series_map.contains_key(key) {
let mut new_ts = TimeSeries::new(TIMESERIES_LENGTH);
new_ts.append(*value);
state.time_series_map.insert(*key, new_ts);
} else {
state.time_series_map.get_mut(key).unwrap().append(*value);
}
}

//convert type for sending
//ziofa::time_series_event::TimeSeries

let time_series = ZioTimeSeriesEvent {
event_type_enum: state.event_type.into(),
timeframe_ms: state.timeframe.as_millis() as u32,
time_series_map: Self::convert_map_to_prototype(state.time_series_map.clone()),
};

cast!(
state.event_actor,
Event {
event_type: Some(EventType::TimeSeries(time_series))
}
)
.map_err(|_| ActorProcessingErr::from("Failed to send metric to event actor"))?;
for (_, value) in state.event_count_map.iter_mut() {
*value = 0;
}
}
}
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ impl EventDispatcherState {


impl Actor for EventDispatcher {
type Arguments = EventDispatcherState;
type Msg = Event;
type State = EventDispatcherState;
type Arguments = EventDispatcherState;

async fn pre_start(
&self,
Expand Down
Loading

0 comments on commit 92753b9

Please sign in to comment.