-
Notifications
You must be signed in to change notification settings - Fork 0
/
influx.py
104 lines (85 loc) · 3.06 KB
/
influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import time
import typing
import attrs
import requests
name_validator = attrs.validators.matches_re("[a-zA-Z][_a-zA-Z0-9]*")
def render_item(key, value):
if isinstance(value, str):
formatted = '"{}"'.format(value.replace('"', r'\"'))
elif isinstance(value, bool):
formatted = 'true' if value else 'false'
elif isinstance(value, int):
if (value - 1 if value < 0 else 0).bit_length() > 63:
formatted = format(value, "f")
formatted = f"{value:d}i"
elif isinstance(value, float):
formatted = format(value, "f")
else:
raise TypeError("unsupported value type", type(value))
return f"{key}={formatted}"
@attrs.define()
class Point:
measurement: str = attrs.field(validator=name_validator)
fields: typing.Dict[str, typing.Union[float, int, str, bool]] = attrs.field(
validator=attrs.validators.deep_mapping(
key_validator=name_validator,
value_validator=attrs.validators.instance_of((float, int, str, bool)),
)
)
tags: typing.Dict[str, str] = attrs.field(
factory=dict,
validator=attrs.validators.deep_mapping(
key_validator=name_validator,
value_validator=attrs.validators.instance_of(str),
)
)
time: int = attrs.field(factory=time.time_ns)
def _render_tags(self) -> str:
if not self.tags:
return ""
tags = ",".join(render_item(*tag) for tag in self.tags.items())
return f",{tags}"
def _render_fields(self) -> str:
return ",".join(render_item(*tag) for tag in self.fields.items())
def as_line(self) -> str:
return "{}{} {} {}\n".format(
self.measurement,
self._render_tags(),
self._render_fields(),
self.time,
)
class Client:
def __init__(self, *, base_url, org=None, bucket=None, token=None):
self.base_url = base_url
self.org = org
self.bucket = bucket
self.token = token
def write_points(self, points: list[Point], *, org=None, bucket=None):
if org is None:
org = self.org
if bucket is None:
bucket = self.bucket
if org is None:
raise ValueError("no org specified, no default")
if bucket is None:
raise ValueError("no bucket specified, no default")
headers = {
"Content-Type": "text/plain",
"Accept": "application/json",
}
if self.token:
# feels like it should be 'Bearer', but nope, 'Token'
headers["Authorization"] = f"Token {self.token}"
params = {
"org": org,
"bucket": bucket,
"precision": "ns",
}
resp = requests.post(
url=self.base_url + "/api/v2/write",
headers=headers,
params=params,
data="".join(p.as_line() for p in points),
)
resp.raise_for_status()
return resp