Skip to content

Commit

Permalink
feat: Expose metric dictionary to make logging metrics as JSON easier
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jan 31, 2024
1 parent 32f3dec commit 2125742
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 51 deletions.
33 changes: 19 additions & 14 deletions docs/implementation/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,41 @@ Users of a tap can configure the SDK logging by setting the `SINGER_SDK_LOG_CONF
environment variable. The value of this variable should be a path to a YAML file in the
[Python logging dict format](https://docs.python.org/3/library/logging.config.html#dictionary-schema-details).

For example, to send [metrics](./metrics.md) (with logger name `singer_sdk.metrics`) to a file, you could use the following config:
### Metrics logging

The Singer SDK provides a logger named `singer_sdk.metrics` for logging [Singer metrics](./metrics.md). Metric log records contain an extra field `point` which is a dictionary containing the metric data. The `point` field is formatted as JSON by default.

To send metrics to a file in JSON format, you could use the following config:

```yaml
version: 1
disable_existing_loggers: false
formatters:
metrics:
format: "{asctime} {levelname} {message}"
(): pythonjsonlogger.jsonlogger.JsonFormatter
format: "{created} {message} {point}"
style: "{"
handlers:
metrics:
class: logging.FileHandler
formatter: metrics
filename: metrics.log
filename: metrics.jsonl
loggers:
singer_sdk.metrics:
level: INFO
handlers: [ metrics ]
propagate: yes
propagate: no
```
This will send metrics to a `metrics.log`:
This will send metrics to a `metrics.jsonl`:

```
2022-09-29 00:48:52,746 INFO METRIC: {"metric_type": "timer", "metric": "http_request_duration", "value": 0.501743, "tags": {"stream": "continents", "endpoint": "", "http_status_code": 200, "status": "succeeded"}}
2022-09-29 00:48:52,775 INFO METRIC: {"metric_type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "continents", "endpoint": ""}}
2022-09-29 00:48:52,776 INFO METRIC: {"metric_type": "timer", "metric": "sync_duration", "value": 0.7397160530090332, "tags": {"stream": "continents", "context": {}, "status": "succeeded"}}
2022-09-29 00:48:52,776 INFO METRIC: {"metric_type": "counter", "metric": "record_count", "value": 7, "tags": {"stream": "continents", "context": {}}}
2022-09-29 00:48:53,225 INFO METRIC: {"metric_type": "timer", "metric": "http_request_duration", "value": 0.392148, "tags": {"stream": "countries", "endpoint": "", "http_status_code": 200, "status": "succeeded"}}
2022-09-29 00:48:53,302 INFO METRIC: {"metric_type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "countries", "endpoint": ""}}
2022-09-29 00:48:53,302 INFO METRIC: {"metric_type": "timer", "metric": "sync_duration", "value": 0.5258760452270508, "tags": {"stream": "countries", "context": {}, "status": "succeeded"}}
2022-09-29 00:48:53,303 INFO METRIC: {"metric_type": "counter", "metric": "record_count", "value": 250, "tags": {"stream": "countries", "context": {}}}
```json
{"created": 1705709074.883021, "message": "METRIC", "point": {"type": "timer", "metric": "http_request_duration", "value": 0.501743, "tags": {"stream": "continents", "endpoint": "", "http_status_code": 200, "status": "succeeded"}}}
{"created": 1705709074.897184, "message": "METRIC", "point": {"type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "continents", "endpoint": ""}}}
{"created": 1705709074.897256, "message": "METRIC", "point": {"type": "timer", "metric": "sync_duration", "value": 0.7397160530090332, "tags": {"stream": "continents", "context": {}, "status": "succeeded"}}}
{"created": 1705709074.897292, "message": "METRIC", "point": {"type": "counter", "metric": "record_count", "value": 7, "tags": {"stream": "continents", "context": {}}}}
{"created": 1705709075.397254, "message": "METRIC", "point": {"type": "timer", "metric": "http_request_duration", "value": 0.392148, "tags": {"stream": "countries", "endpoint": "", "http_status_code": 200, "status": "succeeded"}}}
{"created": 1705709075.421888, "message": "METRIC", "point": {"type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "countries", "endpoint": ""}}}
{"created": 1705709075.422001, "message": "METRIC", "point": {"type": "timer", "metric": "sync_duration", "value": 0.5258760452270508, "tags": {"stream": "countries", "context": {}, "status": "succeeded"}}}
{"created": 1705709075.422047, "message": "METRIC", "point": {"type": "counter", "metric": "record_count", "value": 250, "tags": {"stream": "countries", "context": {}}}}
```
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pendulum = ">=2.1.0,<4"
PyJWT = "~=2.4"
python-dateutil = ">=2.8.2"
python-dotenv = ">=0.20"
python-json-logger = ">=2"
PyYAML = ">=6.0"
referencing = ">=0.30.0"
requests = ">=2.25.1"
Expand Down
13 changes: 13 additions & 0 deletions singer_sdk/default_logging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@ formatters:
console:
format: "{asctime:23s} | {levelname:8s} | {name:20s} | {message}"
style: "{"
metrics:
(): singer_sdk.metrics.SingerMetricsFormatter
format: "{asctime:23s} | {levelname:8s} | {name:20s} | {message}: {metric_json}"
style: "{"
handlers:
default:
class: logging.StreamHandler
formatter: console
stream: ext://sys.stderr
metrics:
class: logging.StreamHandler
formatter: metrics
stream: ext://sys.stderr
root:
level: INFO
handlers: [default]
loggers:
singer_sdk.metrics:
level: INFO
handlers: [ metrics ]
propagate: no
40 changes: 23 additions & 17 deletions singer_sdk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,35 @@ class Point(t.Generic[_TVal]):
value: _TVal
tags: dict[str, t.Any] = field(default_factory=dict)

def __str__(self) -> str:
"""Get string representation of this measurement.
def to_dict(self) -> dict[str, t.Any]:
"""Convert this measure to a dictionary.
Returns:
A string representation of this measurement.
A dictionary.
"""
return self.to_json()
return {
"type": self.metric_type,
"metric": self.metric.value,
"value": self.value,
"tags": self.tags,
}

def to_json(self) -> str:
"""Convert this measure to a JSON object.

class SingerMetricsFormatter(logging.Formatter):
"""Logging formatter that adds a ``metric_json`` field to the log record."""

def format(self, record: logging.LogRecord) -> str:
"""Format a log record.
Args:
record: A log record.
Returns:
A JSON object.
A formatted log record.
"""
return json.dumps(
{
"type": self.metric_type,
"metric": self.metric.value,
"value": self.value,
"tags": self.tags,
},
default=str,
)
point = record.__dict__.get("point")
record.__dict__["metric_json"] = json.dumps(point, default=str) if point else ""
return super().format(record)


def log(logger: logging.Logger, point: Point) -> None:
Expand All @@ -99,7 +105,7 @@ def log(logger: logging.Logger, point: Point) -> None:
logger: An logger instance.
point: A measurement.
"""
logger.info("METRIC: %s", point)
logger.info("METRIC", extra={"point": point.to_dict()})


class Meter(metaclass=abc.ABCMeta):
Expand Down
77 changes: 58 additions & 19 deletions tests/core/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import json
import logging
import time

import pytest
import time_machine

from singer_sdk import metrics

Expand All @@ -17,6 +18,32 @@ def __str__(self) -> str:
return f"{self.name}={self.value}"


def test_singer_metrics_formatter():
formatter = metrics.SingerMetricsFormatter(fmt="{metric_json}", style="{")

record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="METRIC",
args=(),
exc_info=None,
)

assert formatter.format(record) == ""

metric_dict = {
"type": "counter",
"metric": "test_metric",
"tags": {"test_tag": "test_value"},
"value": 1,
}
record.__dict__["point"] = metric_dict

assert formatter.format(record) == json.dumps(metric_dict)


def test_meter():
class _MyMeter(metrics.Meter):
def __enter__(self):
Expand All @@ -38,6 +65,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):


def test_record_counter(caplog: pytest.LogCaptureFixture):
metrics_logger = logging.getLogger(metrics.METRICS_LOGGER_NAME)
metrics_logger.propagate = True

caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME)
custom_object = CustomObject("test", 1)

Expand All @@ -56,48 +86,57 @@ def test_record_counter(caplog: pytest.LogCaptureFixture):
total = 0

assert len(caplog.records) == 100 + 1
# raise AssertionError

for record in caplog.records:
assert record.levelname == "INFO"
assert record.msg == "METRIC: %s"
assert "test=1" in record.message
assert record.msg.startswith("METRIC")

point: metrics.Point[int] = record.args[0]
assert point.metric_type == "counter"
assert point.metric == "record_count"
assert point.tags == {
point = record.__dict__["point"]
assert point["type"] == "counter"
assert point["metric"] == "record_count"
assert point["tags"] == {
metrics.Tag.STREAM: "test_stream",
metrics.Tag.ENDPOINT: "test_endpoint",
"custom_tag": "pytest",
"custom_obj": custom_object,
}

total += point.value
total += point["value"]

assert total == 100


def test_sync_timer(caplog: pytest.LogCaptureFixture):
metrics_logger = logging.getLogger(metrics.METRICS_LOGGER_NAME)
metrics_logger.propagate = True

caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME)
with metrics.sync_timer("test_stream", custom_tag="pytest") as timer:
start_time = timer.start_time
for _ in range(1000):
time.sleep(0.001)
end_time = time.time()

traveller = time_machine.travel(0, tick=False)
traveller.start()

with metrics.sync_timer("test_stream", custom_tag="pytest"):
traveller.stop()

traveller = time_machine.travel(10, tick=False)
traveller.start()

traveller.stop()

assert len(caplog.records) == 1

record = caplog.records[0]
assert record.levelname == "INFO"
assert record.msg == "METRIC: %s"
assert record.msg.startswith("METRIC")

point: metrics.Point[float] = record.args[0]
assert point.metric_type == "timer"
assert point.metric == "sync_duration"
assert point.tags == {
point = record.__dict__["point"]
assert point["type"] == "timer"
assert point["metric"] == "sync_duration"
assert point["tags"] == {
metrics.Tag.STREAM: "test_stream",
metrics.Tag.STATUS: "succeeded",
"custom_tag": "pytest",
}

assert pytest.approx(point.value, rel=0.001) == end_time - start_time
assert pytest.approx(point["value"], rel=0.001) == 10.0

0 comments on commit 2125742

Please sign in to comment.