Skip to content

Commit

Permalink
use InfluxDBClient
Browse files Browse the repository at this point in the history
  • Loading branch information
bleykauf committed Jan 4, 2024
1 parent fb139eb commit 92e1540
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 43 deletions.
78 changes: 35 additions & 43 deletions linien-server/linien_server/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from threading import Event, Thread
from time import sleep

import requests
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from linien_common.communication import ParameterValues
from linien_common.influxdb import InfluxDBCredentials, save_credentials
from linien_server.parameters import Parameters

Expand All @@ -27,10 +29,11 @@ class InfluxDBLogger:
def __init__(
self, credentials: InfluxDBCredentials, parameters: Parameters
) -> None:
self.credentials = credentials
self.parameters = parameters
self.credentials: InfluxDBClient = credentials
self.parameters: Parameters = parameters
self.stop_event = Event()
self.stop_event.set()
self.update_connection()

@property
def credentials(self) -> InfluxDBCredentials:
Expand All @@ -41,6 +44,14 @@ def credentials(self, value: InfluxDBCredentials) -> None:
self._credentials = value
save_credentials(value)

def update_connection(self) -> InfluxDBClient:
client = InfluxDBClient(
url=self.credentials.url,
token=self.credentials.token,
org=self.credentials.org,
)
self.write_api = client.write_api(write_options=SYNCHRONOUS)

def start_logging(self, interval: float) -> None:
conn_success, status_code, message = self.test_connection(self.credentials)
self.thread = Thread(
Expand Down Expand Up @@ -78,46 +89,27 @@ def test_connection(
self, credentials: InfluxDBCredentials
) -> tuple[bool, int, str]:
"""Write empty data to the server to test the connection"""
try:
response = self.write_data(credentials, data={})
success = response.status_code == 204
status_code = response.status_code
text = response.text
except requests.exceptions.ConnectionError:
success = False
status_code = 404
text = "Failed to establish connection."
return success, status_code, text
client = InfluxDBClient(
url=credentials.url,
token=credentials.token,
org=credentials.org,
)

# FIXME: This does not test the credentials, yet.
status_code = 0
message = ""
success = client.ping()
return success, status_code, message

def write_data(
self, credentials: InfluxDBCredentials, data: dict
) -> requests.Response:
self, credentials: InfluxDBCredentials, fields: dict[str, ParameterValues]
) -> None:
"""Write data to the database"""
endpoint = credentials.url + "/api/v2/write"
headers = {
"Authorization": "Token " + credentials.token,
"Content-Type": "text/plain; charset=utf-8",
"Accept": "application/json",
}
params = {
"org": credentials.org,
"bucket": credentials.bucket,
"precision": "ns",
}

point = self._convert_to_line_protocol(data)

response = requests.post(endpoint, headers=headers, params=params, data=point)
return response

def _convert_to_line_protocol(self, data: dict) -> str:
if not data:
return ""
point = self.credentials.measurement
for i, (key, value) in enumerate(data.items()):
if i == 0:
point += " "
else:
point += ","
point += f"{key}={value}"
return point
self.write_api.write(
bucket=credentials.bucket,
org=credentials.org,
record={
"measurement": credentials.measurement,
"fields": fields,
},
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ module = [
"pylpsd.*",
"pyqtgraph.*",
"fire",
"influxdb_client.*",
]
ignore_missing_imports = true

0 comments on commit 92e1540

Please sign in to comment.