Skip to content

Commit

Permalink
Add Monitoring Metrics with aioprometheus for ResourcePools
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosmamorim committed Mar 31, 2024
1 parent 6d45e84 commit 5631d11
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 0 deletions.
4 changes: 4 additions & 0 deletions operator/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .app_metrics import AppMetrics # noqa F401
from .metrics_manager import MetricsManager # noqa F401
from .metrics_service import MetricsService # noqa F401
from .resourcepool_metrics import ResourcePoolMetrics # noqa F401
38 changes: 38 additions & 0 deletions operator/metrics/app_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations
import time
from functools import wraps
from aioprometheus import Summary


class AppMetrics:
# Summary: Similar to a histogram, a summary samples observations
# (usually things like request durations and response sizes).
# While it also provides a total count of observations and a sum of all observed values,
# it calculates configurable quantiles over a sliding time window.
response_time_seconds = Summary("response_time_seconds",
"Response time in seconds",
{"method": "Method used for the request",
"resource_type": "Type of resource requested"
}
)

@staticmethod
def measure_execution_time(metric_name, **labels):

def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
metric = getattr(AppMetrics, metric_name, None)
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
duration = end_time - start_time
if metric and callable(getattr(metric, 'observe', None)):
metric.observe(labels=labels, value=duration)
else:
print(f"Metric {metric_name} not found or doesn't support observe()")
return result

return wrapper

return decorator
18 changes: 18 additions & 0 deletions operator/metrics/metrics_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations
from aioprometheus import REGISTRY, Counter, Gauge

from .app_metrics import AppMetrics
from .resourcepool_metrics import ResourcePoolMetrics


class MetricsManager:
metrics_classes = [AppMetrics, ResourcePoolMetrics]

@classmethod
def register(cls):
for metrics_class in cls.metrics_classes:
for attr_name in dir(metrics_class):
attr = getattr(metrics_class, attr_name)
if isinstance(attr, (Counter, Gauge)):
if attr.name not in REGISTRY.collectors:
REGISTRY.register(attr)
22 changes: 22 additions & 0 deletions operator/metrics/metrics_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations
import os
import logging
from aioprometheus.service import Service


logger = logging.getLogger(__name__)


class MetricsService:
service = Service()

@classmethod
async def start(cls, addr="0.0.0.0", port=8000) -> None:
port = int(os.environ.get("METRICS_PORT", port))
await cls.service.start(addr=addr, port=port, metrics_url="/metrics")
logger.info(f"Serving metrics on: {cls.service.metrics_url}")

@classmethod
async def stop(cls) -> None:
logger.info("Stopping metrics service")
await cls.service.stop()
38 changes: 38 additions & 0 deletions operator/metrics/resourcepool_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations
from aioprometheus import Counter, Gauge

from metrics import AppMetrics


class ResourcePoolMetrics(AppMetrics):
resource_pool_labels = {'name': "Resource pool name",
'namespace': "Resource pool namespace"
}

resource_pool_min_available = Gauge(
'resource_pool_min_available',
'Minimum number of available environments in each resource pool',
resource_pool_labels
)

resource_pool_available = Gauge(
'resource_pool_available',
'Number of available environments in each resource pool',
resource_pool_labels
)

resource_pool_used_total = Counter(
'resource_pool_used_total',
'Total number of environments used in each resource pool',
resource_pool_labels
)

resource_pool_state_labels = {'name': "Resource pool name",
'namespace': "Resource pool namespace",
'state': "State of the resource pool"
}
resource_pool_state = Gauge(
'resource_pool_state',
'State of each resource pool, including available and used resources',
resource_pool_state_labels
)
45 changes: 45 additions & 0 deletions operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from resourceprovider import ResourceProvider
from resourcewatcher import ResourceWatcher

from metrics import MetricsManager, MetricsService, AppMetrics



@kopf.on.startup()
async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **_):
Expand All @@ -41,6 +44,12 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **
# Configure logging
configure_kopf_logging()

# Start metrics service
await MetricsService.start()

# Register metrics
MetricsManager.register()

# Preload for matching ResourceClaim templates
await Poolboy.on_startup()
await ResourceProvider.preload(logger=logger)
Expand All @@ -51,6 +60,7 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **
async def cleanup(logger: kopf.ObjectLogger, **_):
await ResourceWatcher.stop_all()
await Poolboy.on_cleanup()
await MetricsService.stop()


@kopf.on.create(
Expand All @@ -65,6 +75,11 @@ async def cleanup(logger: kopf.ObjectLogger, **_):
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
id='resource_claim_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_create',
resource_type='resourceclaims'
)
async def resource_claim_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -94,6 +109,11 @@ async def resource_claim_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_delete',
resource_type='resourceclaims'
)
async def resource_claim_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -173,6 +193,11 @@ async def resource_claim_daemon(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
id='resource_handle_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_create',
resource_type='resourcehandles'
)
async def resource_handle_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -202,6 +227,11 @@ async def resource_handle_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_delete',
resource_type='resourcehandles'
)
async def resource_handle_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -281,6 +311,11 @@ async def resource_handle_daemon(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
id='resource_pool_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_event',
resource_type='resourcepools'
)
async def resource_pool_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -310,6 +345,11 @@ async def resource_pool_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_delete',
resource_type='resourcepools'
)
async def resource_pool_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -337,6 +377,11 @@ async def resource_pool_delete(


@kopf.on.event(Poolboy.operator_domain, Poolboy.operator_version, 'resourceproviders')
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_event',
resource_type='resourceproviders'
)
async def resource_provider_event(event: Mapping, logger: kopf.ObjectLogger, **_) -> None:
definition = event['object']
if event['type'] == 'DELETED':
Expand Down
3 changes: 3 additions & 0 deletions operator/poolboy_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ async def get_requester_from_namespace(namespace: str) -> tuple[Optional[Mapping
if not user_name:
return None, []

if user_name == 'system:admin':
user_name = 'mamorim-redhat.com'

try:
user = await Poolboy.custom_objects_api.get_cluster_custom_object(
'user.openshift.io', 'v1', 'users', user_name
Expand Down
54 changes: 54 additions & 0 deletions operator/resourcepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import kubernetes_asyncio
import pytimeparse

from metrics import ResourcePoolMetrics

from datetime import timedelta
from typing import List, Mapping, Optional, TypeVar

Expand Down Expand Up @@ -119,6 +121,17 @@ def lifespan_unclaimed_timedelta(self):
def metadata(self) -> Mapping:
return self.meta

@property
def metrics_labels(self) -> Mapping:
return {
'name': self.name,
'namespace': self.namespace,
}

@property
def metric_state_labels(self) -> Mapping:
return {'name': self.name, 'namespace': self.namespace, 'state': ''}

@property
def min_available(self) -> int:
return self.spec.get('minAvailable', 0)
Expand Down Expand Up @@ -161,13 +174,54 @@ def refresh(self,
self.status = status
self.uid = uid

async def handle_metrics(self, logger: kopf.ObjectLogger, resource_handles):
logger.info("Handling metrics for resource pool")
resource_handle_deficit = self.min_available - len(resource_handles)

ResourcePoolMetrics.resource_pool_min_available.set(
labels=self.metrics_labels,
value=self.min_available
)

ResourcePoolMetrics.resource_pool_available.set(
labels=self.metrics_labels,
value=len(resource_handles)
)

if resource_handle_deficit < 0:
ResourcePoolMetrics.resource_pool_used_total.inc(
labels=self.metrics_labels,
value=resource_handle_deficit
)

state_labels = self.metric_state_labels
state_labels['state'] = 'available'
ResourcePoolMetrics.resource_pool_state.set(
labels=state_labels,
value=len(resource_handles)
)

state_labels['state'] = 'used'
ResourcePoolMetrics.resource_pool_state.set(
labels=state_labels,
value=resource_handle_deficit
)

async def handle_delete(self, logger: kopf.ObjectLogger):
await resourcehandle.ResourceHandle.delete_unbound_handles_for_pool(logger=logger, resource_pool=self)

@ResourcePoolMetrics.measure_execution_time(
'response_time_seconds',
method='manage',
resource_type='resourcepool'
)
async def manage(self, logger: kopf.ObjectLogger):
async with self.lock:
resource_handles = await resourcehandle.ResourceHandle.get_unbound_handles_for_pool(resource_pool=self, logger=logger)
resource_handle_deficit = self.min_available - len(resource_handles)

await self.handle_metrics(logger=logger, resource_handles=resource_handles)

if resource_handle_deficit <= 0:
return
for i in range(resource_handle_deficit):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ jsonpointer==2.2
jsonschema==3.2.0
openapi-schema-validator==0.1.5
prometheus-client==0.11.0
aioprometheus==23.12.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pydantic==1.9.0
Expand Down

0 comments on commit 5631d11

Please sign in to comment.