Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Monitoring Metrics with aioprometheus for ResourcePools #103

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions helm/templates/service_monitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{- if .Values.serviceMonitor.create }}
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "poolboy.name" . }}
namespace: {{ include "poolboy.namespaceName" . }}
labels:
{{- include "poolboy.labels" . | nindent 4 }}
spec:
endpoints:
- interval: {{ .Values.serviceMonitor.interval }}
path: {{ .Values.serviceMonitor.path }}
port: {{ .Values.serviceMonitor.portName }}
scrapeTimeout: {{ .Values.serviceMonitor.scrapeTimeout }}
jobLabel: {{ include "poolboy.name" . }}
namespaceSelector:
matchNames:
- {{ include "poolboy.namespaceName" . }}
selector:
matchLabels:
{{- include "poolboy.selectorLabels" . | nindent 6 }}
{{- end }}
22 changes: 19 additions & 3 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,31 @@ serviceAccount:
# If not set and create is true, a name is generated using the fullname template
name:

serviceMonitor:
# ServiceMonitor requires prometheus-operator installed in the cluster
create: true
# The port name of the service monitor to use.
portName: metrics
# The path to scrape metrics from
path: /metrics
# How often Prometheus should scrape
interval: 30s
# How long Prometheus should wait for a scrape to complete
scrapeTimeout: 15s

service:
type: ClusterIP
port: 8000
port: 8080
ports:
- name: healthz
port: 8080
protocol: TCP
targetPort: 8080
- name: metrics
port: 8000
port: 80
protocol: TCP
targetPort: 8000

resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
Expand Down
4 changes: 4 additions & 0 deletions operator/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .app_metrics import AppMetrics # noqa F401
from .metrics_manager import MetricsManager # noqa F401
from .metrics_service import MetricsService # noqa F401
from .resourcepool_metrics import ResourcePoolMetrics # noqa F401
19 changes: 19 additions & 0 deletions operator/metrics/app_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations
import time
from functools import wraps
from aioprometheus import Summary, Counter


class AppMetrics:
response_time_seconds = Summary("response_time_seconds",
"Response time in seconds",
{"method": "Method used for the request",
"resource_type": "Type of resource requested"
}
)

request_handler_exceptions = Counter(
"request_handler_exceptions",
"Count of exceptions by handler function.",
{"handler": "Handler function name"}
)
18 changes: 18 additions & 0 deletions operator/metrics/metrics_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations
from aioprometheus import REGISTRY, Counter, Gauge

from .app_metrics import AppMetrics
from .resourcepool_metrics import ResourcePoolMetrics


class MetricsManager:
metrics_classes = [AppMetrics, ResourcePoolMetrics]

@classmethod
def register(cls):
for metrics_class in cls.metrics_classes:
for attr_name in dir(metrics_class):
attr = getattr(metrics_class, attr_name)
if isinstance(attr, (Counter, Gauge)):
if attr.name not in REGISTRY.collectors:
REGISTRY.register(attr)
22 changes: 22 additions & 0 deletions operator/metrics/metrics_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations
import os
import logging
from aioprometheus.service import Service


logger = logging.getLogger(__name__)


class MetricsService:
service = Service()

@classmethod
async def start(cls, addr="0.0.0.0", port=8000) -> None:
port = int(os.environ.get("METRICS_PORT", port))
await cls.service.start(addr=addr, port=port, metrics_url="/metrics")
logger.info(f"Serving metrics on: {cls.service.metrics_url}")

@classmethod
async def stop(cls) -> None:
logger.info("Stopping metrics service")
await cls.service.stop()
38 changes: 38 additions & 0 deletions operator/metrics/resourcepool_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations
from aioprometheus import Counter, Gauge

from metrics import AppMetrics


class ResourcePoolMetrics(AppMetrics):
resource_pool_labels = {'name': "Resource pool name",
'namespace': "Resource pool namespace"
}

resource_pool_min_available = Gauge(
'resource_pool_min_available',
'Minimum number of available environments in each resource pool',
resource_pool_labels
)

resource_pool_available = Gauge(
'resource_pool_available',
'Number of available environments in each resource pool',
resource_pool_labels
)

resource_pool_used_total = Counter(
'resource_pool_used_total',
'Total number of environments used in each resource pool',
resource_pool_labels
)

resource_pool_state_labels = {'name': "Resource pool name",
'namespace': "Resource pool namespace",
'state': "State of the resource pool"
}
resource_pool_state = Gauge(
'resource_pool_state',
'State of each resource pool, including available and used resources',
resource_pool_state_labels
)
47 changes: 47 additions & 0 deletions operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import kopf
import logging

from aioprometheus import timer

from copy import deepcopy
from datetime import datetime, timedelta
from typing import Any, Mapping, Optional
Expand All @@ -17,8 +19,14 @@
from resourceprovider import ResourceProvider
from resourcewatcher import ResourceWatcher

from metrics import MetricsManager, MetricsService, AppMetrics


@kopf.on.startup()
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'startup', 'resource_type': 'poolboy_operator'}
)
async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **_):
# Store last handled configuration in status
settings.persistence.diffbase_storage = kopf.StatusDiffBaseStorage(field='status.diffBase')
Expand All @@ -41,16 +49,27 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **
# Configure logging
configure_kopf_logging()

# Start metrics service
await MetricsService.start()

# Register metrics
MetricsManager.register()

# Preload for matching ResourceClaim templates
await Poolboy.on_startup()
await ResourceProvider.preload(logger=logger)
await ResourceHandle.preload(logger=logger)


@kopf.on.cleanup()
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'cleanup', 'resource_type': 'poolboy_operator'}
)
async def cleanup(logger: kopf.ObjectLogger, **_):
await ResourceWatcher.stop_all()
await Poolboy.on_cleanup()
await MetricsService.stop()


@kopf.on.create(
Expand All @@ -65,6 +84,10 @@ async def cleanup(logger: kopf.ObjectLogger, **_):
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
id='resource_claim_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'resource_claim_event', 'resource_type': 'resourceclaims'}
)
async def resource_claim_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -94,6 +117,10 @@ async def resource_claim_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'resource_claim_delete', 'resource_type': 'resourceclaims'}
)
async def resource_claim_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -173,6 +200,10 @@ async def resource_claim_daemon(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
id='resource_handle_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'resource_handle_event', 'resource_type': 'resourcehandles'}
)
async def resource_handle_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -202,6 +233,10 @@ async def resource_handle_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'resource_handle_delete', 'resource_type': 'resourcehandles'}
)
async def resource_handle_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -281,6 +316,10 @@ async def resource_handle_daemon(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
id='resource_pool_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'resource_pool_event', 'resource_type': 'resourcepools'}
)
async def resource_pool_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -310,6 +349,10 @@ async def resource_pool_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'resource_pool_delete', 'resource_type': 'resourcepools'}
)
async def resource_pool_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -337,6 +380,10 @@ async def resource_pool_delete(


@kopf.on.event(Poolboy.operator_domain, Poolboy.operator_version, 'resourceproviders')
@timer(
AppMetrics.response_time_seconds,
labels={'method': 'resource_provider_event', 'resource_type': 'resourceproviders'}
)
async def resource_provider_event(event: Mapping, logger: kopf.ObjectLogger, **_) -> None:
definition = event['object']
if event['type'] == 'DELETED':
Expand Down
53 changes: 53 additions & 0 deletions operator/resourcepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import kubernetes_asyncio
import pytimeparse

from metrics import ResourcePoolMetrics


from datetime import timedelta
from typing import List, Mapping, Optional, TypeVar

Expand Down Expand Up @@ -119,6 +122,17 @@ def lifespan_unclaimed_timedelta(self):
def metadata(self) -> Mapping:
return self.meta

@property
def metrics_labels(self) -> Mapping:
return {
'name': self.name,
'namespace': self.namespace,
}

@property
def metric_state_labels(self) -> Mapping:
return {'name': self.name, 'namespace': self.namespace, 'state': ''}

@property
def min_available(self) -> int:
return self.spec.get('minAvailable', 0)
Expand Down Expand Up @@ -161,13 +175,52 @@ def refresh(self,
self.status = status
self.uid = uid

async def handle_metrics(self, logger: kopf.ObjectLogger, resource_handles):
logger.info("Handling metrics for resource pool")
resource_handle_deficit = self.min_available - len(resource_handles)

try:
ResourcePoolMetrics.resource_pool_min_available.set(
labels=self.metrics_labels,
value=self.min_available
)

ResourcePoolMetrics.resource_pool_available.set(
labels=self.metrics_labels,
value=len(resource_handles)
)

if resource_handle_deficit < 0:
ResourcePoolMetrics.resource_pool_used_total.inc(
labels=self.metrics_labels,
)

state_labels = self.metric_state_labels
state_labels['state'] = 'available'
ResourcePoolMetrics.resource_pool_state.set(
labels=state_labels,
value=len(resource_handles)
)

state_labels['state'] = 'used'
ResourcePoolMetrics.resource_pool_state.set(
labels=state_labels,
value=resource_handle_deficit
)
except Exception as e:
logger.error(f"Error handling metrics for resource pool: {e}")
return

async def handle_delete(self, logger: kopf.ObjectLogger):
await resourcehandle.ResourceHandle.delete_unbound_handles_for_pool(logger=logger, resource_pool=self)

async def manage(self, logger: kopf.ObjectLogger):
async with self.lock:
resource_handles = await resourcehandle.ResourceHandle.get_unbound_handles_for_pool(resource_pool=self, logger=logger)
resource_handle_deficit = self.min_available - len(resource_handles)

await self.handle_metrics(logger=logger, resource_handles=resource_handles)

if resource_handle_deficit <= 0:
return
for i in range(resource_handle_deficit):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ jsonpointer==2.2
jsonschema==3.2.0
openapi-schema-validator==0.1.5
prometheus-client==0.11.0
aioprometheus==23.12.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pydantic==1.9.0
Expand Down
Loading