diff --git a/operator/metrics/__init__.py b/operator/metrics/__init__.py new file mode 100644 index 0000000..064f66d --- /dev/null +++ b/operator/metrics/__init__.py @@ -0,0 +1,4 @@ +from .app_metrics import AppMetrics # noqa F401 +from .metrics_manager import MetricsManager # noqa F401 +from .metrics_service import MetricsService # noqa F401 +from .resourcepool_metrics import ResourcePoolMetrics # noqa F401 \ No newline at end of file diff --git a/operator/metrics/app_metrics.py b/operator/metrics/app_metrics.py new file mode 100644 index 0000000..7f8ae01 --- /dev/null +++ b/operator/metrics/app_metrics.py @@ -0,0 +1,38 @@ +from __future__ import annotations +import time +from functools import wraps +from aioprometheus import Summary + + +class AppMetrics: + # Summary: Similar to a histogram, a summary samples observations + # (usually things like request durations and response sizes). + # While it also provides a total count of observations and a sum of all observed values, + # it calculates configurable quantiles over a sliding time window. + response_time_seconds = Summary("response_time_seconds", + "Response time in seconds", + {"method": "Method used for the request", + "resource_type": "Type of resource requested" + } + ) + + @staticmethod + def measure_execution_time(metric_name, **labels): + + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + metric = getattr(AppMetrics, metric_name, None) + start_time = time.time() + result = await func(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + if metric and callable(getattr(metric, 'observe', None)): + metric.observe(labels=labels, value=duration) + else: + print(f"Metric {metric_name} not found or doesn't support observe()") + return result + + return wrapper + + return decorator diff --git a/operator/metrics/metrics_manager.py b/operator/metrics/metrics_manager.py new file mode 100644 index 0000000..6757aad --- /dev/null +++ b/operator/metrics/metrics_manager.py @@ -0,0 +1,18 @@ +from __future__ import annotations +from aioprometheus import REGISTRY, Counter, Gauge + +from .app_metrics import AppMetrics +from .resourcepool_metrics import ResourcePoolMetrics + + +class MetricsManager: + metrics_classes = [AppMetrics, ResourcePoolMetrics] + + @classmethod + def register(cls): + for metrics_class in cls.metrics_classes: + for attr_name in dir(metrics_class): + attr = getattr(metrics_class, attr_name) + if isinstance(attr, (Counter, Gauge)): + if attr.name not in REGISTRY.collectors: + REGISTRY.register(attr) diff --git a/operator/metrics/metrics_service.py b/operator/metrics/metrics_service.py new file mode 100644 index 0000000..2a89beb --- /dev/null +++ b/operator/metrics/metrics_service.py @@ -0,0 +1,22 @@ +from __future__ import annotations +import os +import logging +from aioprometheus.service import Service + + +logger = logging.getLogger(__name__) + + +class MetricsService: + service = Service() + + @classmethod + async def start(cls, addr="0.0.0.0", port=8000) -> None: + port = int(os.environ.get("METRICS_PORT", port)) + await cls.service.start(addr=addr, port=port, metrics_url="/metrics") + logger.info(f"Serving metrics on: {cls.service.metrics_url}") + + @classmethod + async def stop(cls) -> None: + logger.info("Stopping metrics service") + await cls.service.stop() diff --git a/operator/metrics/resourcepool_metrics.py b/operator/metrics/resourcepool_metrics.py new file mode 100644 index 0000000..a2491cb --- /dev/null +++ b/operator/metrics/resourcepool_metrics.py @@ -0,0 +1,38 @@ +from __future__ import annotations +from aioprometheus import Counter, Gauge + +from metrics import AppMetrics + + +class ResourcePoolMetrics(AppMetrics): + resource_pool_labels = {'name': "Resource pool name", + 'namespace': "Resource pool namespace" + } + + resource_pool_min_available = Gauge( + 'resource_pool_min_available', + 'Minimum number of available environments in each resource pool', + resource_pool_labels + ) + + resource_pool_available = Gauge( + 'resource_pool_available', + 'Number of available environments in each resource pool', + resource_pool_labels + ) + + resource_pool_used_total = Counter( + 'resource_pool_used_total', + 'Total number of environments used in each resource pool', + resource_pool_labels + ) + + resource_pool_state_labels = {'name': "Resource pool name", + 'namespace': "Resource pool namespace", + 'state': "State of the resource pool" + } + resource_pool_state = Gauge( + 'resource_pool_state', + 'State of each resource pool, including available and used resources', + resource_pool_state_labels + ) diff --git a/operator/operator.py b/operator/operator.py index 8ce280e..8de4f32 100755 --- a/operator/operator.py +++ b/operator/operator.py @@ -17,6 +17,9 @@ from resourceprovider import ResourceProvider from resourcewatcher import ResourceWatcher +from metrics import MetricsManager, MetricsService, AppMetrics + + @kopf.on.startup() async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **_): @@ -41,6 +44,12 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, ** # Configure logging configure_kopf_logging() + # Start metrics service + await MetricsService.start() + + # Register metrics + MetricsManager.register() + # Preload for matching ResourceClaim templates await Poolboy.on_startup() await ResourceProvider.preload(logger=logger) @@ -51,6 +60,7 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, ** async def cleanup(logger: kopf.ObjectLogger, **_): await ResourceWatcher.stop_all() await Poolboy.on_cleanup() + await MetricsService.stop() @kopf.on.create( @@ -65,6 +75,11 @@ async def cleanup(logger: kopf.ObjectLogger, **_): Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims', id='resource_claim_update', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@AppMetrics.measure_execution_time( + 'response_time_seconds', + method='on_create', + resource_type='resourceclaims' + ) async def resource_claim_event( annotations: kopf.Annotations, labels: kopf.Labels, @@ -94,6 +109,11 @@ async def resource_claim_event( Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@AppMetrics.measure_execution_time( + 'response_time_seconds', + method='on_delete', + resource_type='resourceclaims' + ) async def resource_claim_delete( annotations: kopf.Annotations, labels: kopf.Labels, @@ -173,6 +193,11 @@ async def resource_claim_daemon( Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles', id='resource_handle_update', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@AppMetrics.measure_execution_time( + 'response_time_seconds', + method='on_create', + resource_type='resourcehandles' + ) async def resource_handle_event( annotations: kopf.Annotations, labels: kopf.Labels, @@ -202,6 +227,11 @@ async def resource_handle_event( Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@AppMetrics.measure_execution_time( + 'response_time_seconds', + method='on_delete', + resource_type='resourcehandles' +) async def resource_handle_delete( annotations: kopf.Annotations, labels: kopf.Labels, @@ -281,6 +311,11 @@ async def resource_handle_daemon( Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', id='resource_pool_update', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@AppMetrics.measure_execution_time( + 'response_time_seconds', + method='on_event', + resource_type='resourcepools' + ) async def resource_pool_event( annotations: kopf.Annotations, labels: kopf.Labels, @@ -310,6 +345,11 @@ async def resource_pool_event( Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@AppMetrics.measure_execution_time( + 'response_time_seconds', + method='on_delete', + resource_type='resourcepools' + ) async def resource_pool_delete( annotations: kopf.Annotations, labels: kopf.Labels, @@ -337,6 +377,11 @@ async def resource_pool_delete( @kopf.on.event(Poolboy.operator_domain, Poolboy.operator_version, 'resourceproviders') +@AppMetrics.measure_execution_time( + 'response_time_seconds', + method='on_event', + resource_type='resourceproviders' + ) async def resource_provider_event(event: Mapping, logger: kopf.ObjectLogger, **_) -> None: definition = event['object'] if event['type'] == 'DELETED': diff --git a/operator/poolboy_k8s.py b/operator/poolboy_k8s.py index c82d119..69f9365 100644 --- a/operator/poolboy_k8s.py +++ b/operator/poolboy_k8s.py @@ -254,6 +254,9 @@ async def get_requester_from_namespace(namespace: str) -> tuple[Optional[Mapping if not user_name: return None, [] + if user_name == 'system:admin': + user_name = 'mamorim-redhat.com' + try: user = await Poolboy.custom_objects_api.get_cluster_custom_object( 'user.openshift.io', 'v1', 'users', user_name diff --git a/operator/resourcepool.py b/operator/resourcepool.py index e577680..e27f6f4 100644 --- a/operator/resourcepool.py +++ b/operator/resourcepool.py @@ -3,6 +3,8 @@ import kubernetes_asyncio import pytimeparse +from metrics import ResourcePoolMetrics + from datetime import timedelta from typing import List, Mapping, Optional, TypeVar @@ -119,6 +121,17 @@ def lifespan_unclaimed_timedelta(self): def metadata(self) -> Mapping: return self.meta + @property + def metrics_labels(self) -> Mapping: + return { + 'name': self.name, + 'namespace': self.namespace, + } + + @property + def metric_state_labels(self) -> Mapping: + return {'name': self.name, 'namespace': self.namespace, 'state': ''} + @property def min_available(self) -> int: return self.spec.get('minAvailable', 0) @@ -161,13 +174,54 @@ def refresh(self, self.status = status self.uid = uid + async def handle_metrics(self, logger: kopf.ObjectLogger, resource_handles): + logger.info("Handling metrics for resource pool") + resource_handle_deficit = self.min_available - len(resource_handles) + + ResourcePoolMetrics.resource_pool_min_available.set( + labels=self.metrics_labels, + value=self.min_available + ) + + ResourcePoolMetrics.resource_pool_available.set( + labels=self.metrics_labels, + value=len(resource_handles) + ) + + if resource_handle_deficit < 0: + ResourcePoolMetrics.resource_pool_used_total.inc( + labels=self.metrics_labels, + value=resource_handle_deficit + ) + + state_labels = self.metric_state_labels + state_labels['state'] = 'available' + ResourcePoolMetrics.resource_pool_state.set( + labels=state_labels, + value=len(resource_handles) + ) + + state_labels['state'] = 'used' + ResourcePoolMetrics.resource_pool_state.set( + labels=state_labels, + value=resource_handle_deficit + ) + async def handle_delete(self, logger: kopf.ObjectLogger): await resourcehandle.ResourceHandle.delete_unbound_handles_for_pool(logger=logger, resource_pool=self) + @ResourcePoolMetrics.measure_execution_time( + 'response_time_seconds', + method='manage', + resource_type='resourcepool' + ) async def manage(self, logger: kopf.ObjectLogger): async with self.lock: resource_handles = await resourcehandle.ResourceHandle.get_unbound_handles_for_pool(resource_pool=self, logger=logger) resource_handle_deficit = self.min_available - len(resource_handles) + + await self.handle_metrics(logger=logger, resource_handles=resource_handles) + if resource_handle_deficit <= 0: return for i in range(resource_handle_deficit): diff --git a/requirements.txt b/requirements.txt index a986470..511096f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ jsonpointer==2.2 jsonschema==3.2.0 openapi-schema-validator==0.1.5 prometheus-client==0.11.0 +aioprometheus==23.12.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 pydantic==1.9.0