Skip to content

Commit

Permalink
Switch foxglove client to using Requests.Session (#108)
Browse files Browse the repository at this point in the history
Switch from using raw `requests` methods to using a `requests.Session`
object, which automatically uses keep-alive connections and connection
pooling (as per
https://docs.python-requests.org/en/latest/user/advanced/). This makes
the client faster when making multiple API calls in succession, because
it doesn't need to re-negotiate a TCP connection for each call. The
session object can also hold the headers and use them for every request.
Tested with unit tests and `examples/downloading_data.py`
  • Loading branch information
rg-gravis authored Oct 11, 2024
1 parent 3b99d96 commit aa8a269
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 49 deletions.
83 changes: 35 additions & 48 deletions foxglove/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ def json_or_raise(response: requests.Response):

def _download_stream_with_progress(
url: str,
headers: Optional[dict] = None,
session: requests.Session,
callback: Optional[ProgressCallback] = None,
):
response = requests.get(url, headers=headers, stream=True)
response = session.get(url, stream=True)
response.raise_for_status()
data = BytesIO()
for chunk in response.iter_content(chunk_size=32 * 1024):
Expand All @@ -167,10 +167,13 @@ def _download_stream_with_progress(
class Client:
def __init__(self, token: str, host: str = "api.foxglove.dev"):
self.__token = token
self.__headers = {
"Content-type": "application/json",
"Authorization": "Bearer " + self.__token,
}
self.__session = requests.Session()
self.__session.headers.update(
{
"Content-type": "application/json",
"Authorization": "Bearer " + self.__token,
}
)
self.__host = host

def __url__(self, path: str):
Expand Down Expand Up @@ -208,9 +211,8 @@ def create_event(
"end": end.astimezone().isoformat(),
"metadata": metadata,
}
response = requests.post(
response = self.__session.post(
self.__url__("/v1/events"),
headers=self.__headers,
json={k: v for k, v in params.items() if v is not None},
)

Expand All @@ -226,10 +228,7 @@ def delete_event(
event_id: The id of the event to delete.
"""
response = requests.delete(
self.__url__(f"/v1/events/{event_id}"),
headers=self.__headers,
)
response = self.__session.delete(self.__url__(f"/v1/events/{event_id}"))
return json_or_raise(response)

def get_events(
Expand Down Expand Up @@ -271,9 +270,8 @@ def get_events(
"end": end.astimezone().isoformat() if end else None,
"query": query,
}
response = requests.get(
response = self.__session.get(
self.__url__("/v1/events"),
headers=self.__headers,
params={k: v for k, v in params.items() if v is not None},
)

Expand Down Expand Up @@ -350,7 +348,7 @@ def iter_messages(
end=end,
topics=topics,
)
response = requests.get(stream_link, headers=self.__headers, stream=True)
response = self.__session.get(stream_link, stream=True)
response.raise_for_status()
if decoder_factories is None:
# We deep-copy here as these factories might be mutated
Expand Down Expand Up @@ -385,15 +383,16 @@ def download_recording_data(
"includeAttachments": include_attachments,
"outputFormat": output_format.value,
}
link_response = requests.post(
link_response = self.__session.post(
self.__url__("/v1/data/stream"),
headers=self.__headers,
json={k: v for k, v in params.items() if v is not None},
)

json = json_or_raise(link_response)

return _download_stream_with_progress(json["link"], callback=callback)
return _download_stream_with_progress(
json["link"], self.__session, callback=callback
)

def _make_stream_link(
self,
Expand All @@ -416,9 +415,8 @@ def _make_stream_link(
"start": start.astimezone().isoformat(),
"topics": topics,
}
link_response = requests.post(
link_response = self.__session.post(
self.__url__("/v1/data/stream"),
headers=self.__headers,
json={k: v for k, v in params.items() if v is not None},
)

Expand Down Expand Up @@ -456,6 +454,7 @@ def download_data(
topics=topics,
output_format=output_format,
),
self.__session,
callback=callback,
)

Expand Down Expand Up @@ -484,9 +483,8 @@ def get_coverage(
"start": start.astimezone().isoformat(),
"end": end.astimezone().isoformat(),
}
response = requests.get(
response = self.__session.get(
self.__url__("/v1/data/coverage"),
headers=self.__headers,
params={k: v for k, v in params.items() if v is not None},
)
json = json_or_raise(response)
Expand Down Expand Up @@ -514,9 +512,8 @@ def get_device(
raise RuntimeError("device_id and device_name are mutually exclusive")
if device_name is None and device_id is None:
raise RuntimeError("device_id or device_name must be provided")
response = requests.get(
response = self.__session.get(
self.__url__(f"/v1/devices/{device_name or device_id}"),
headers=self.__headers,
)

device = json_or_raise(response)
Expand All @@ -531,9 +528,8 @@ def get_devices(self):
"""
Returns a list of all devices.
"""
response = requests.get(
response = self.__session.get(
self.__url__("/v1/devices"),
headers=self.__headers,
)

json = json_or_raise(response)
Expand Down Expand Up @@ -561,9 +557,8 @@ def create_device(
Each key must be defined as a custom property for your organization,
and each value must be of the appropriate type
"""
response = requests.post(
response = self.__session.post(
self.__url__("/v1/devices"),
headers=self.__headers,
json=without_nulls({"name": name, "properties": properties}),
)

Expand Down Expand Up @@ -598,9 +593,8 @@ def update_device(
if device_name is None and device_id is None:
raise RuntimeError("device_id or device_name must be provided")

response = requests.patch(
response = self.__session.patch(
self.__url__(f"/v1/devices/{device_name or device_id}"),
headers=self.__headers,
json=without_nulls({"name": new_name, "properties": properties}),
)

Expand All @@ -627,9 +621,8 @@ def delete_device(
raise RuntimeError("device_id and device_name are mutually exclusive")
if device_name is None and device_id is None:
raise RuntimeError("device_id or device_name must be provided")
response = requests.delete(
response = self.__session.delete(
self.__url__(f"/v1/devices/{device_name or device_id}"),
headers=self.__headers,
)
json_or_raise(response)

Expand All @@ -644,15 +637,14 @@ def delete_import(self, *, device_id: Optional[str] = None, import_id: str):
warnings.warn(
"The `device_id` parameter is deprecated.", DeprecationWarning
)
response = requests.delete(
response = self.__session.delete(
self.__url__(f"/v1/data/imports/{import_id}"),
headers=self.__headers,
)
json_or_raise(response)

def delete_recording(self, *, recording_id: str):
response = requests.delete(
self.__url__(f"/v1/recordings/{recording_id}"), headers=self.__headers
response = self.__session.delete(
self.__url__(f"/v1/recordings/{recording_id}"),
)
json_or_raise(response)

Expand Down Expand Up @@ -699,10 +691,9 @@ def get_imports(
"limit": limit,
"offset": offset,
}
response = requests.get(
response = self.__session.get(
self.__url__("/v1/data/imports"),
params={k: v for k, v in all_params.items() if v is not None},
headers=self.__headers,
)
json = json_or_raise(response)

Expand Down Expand Up @@ -770,10 +761,9 @@ def get_recordings(
"limit": limit,
"offset": offset,
}
response = requests.get(
response = self.__session.get(
self.__url__("/v1/recordings"),
params={k: v for k, v in all_params.items() if v is not None},
headers=self.__headers,
)
json = json_or_raise(response)

Expand Down Expand Up @@ -836,10 +826,9 @@ def get_attachments(
"limit": limit,
"offset": offset,
}
response = requests.get(
response = self.__session.get(
self.__url__("/v1/recording-attachments"),
params={k: v for k, v in all_params.items() if v is not None},
headers=self.__headers,
)
json = json_or_raise(response)
return [
Expand Down Expand Up @@ -872,7 +861,7 @@ def download_attachment(
"""
return _download_stream_with_progress(
self.__url__(f"/v1/recording-attachments/{id}/download"),
headers=self.__headers,
self.__session,
callback=callback,
)

Expand All @@ -885,9 +874,8 @@ def get_topics(
end: datetime.datetime,
include_schemas: bool = False,
):
response = requests.get(
response = self.__session.get(
self.__url__("/v1/data/topics"),
headers=self.__headers,
params={
"deviceId": device_id,
"deviceName": device_name,
Expand Down Expand Up @@ -941,17 +929,16 @@ def upload_data(
"filename": filename,
"key": key,
}
link_response = requests.post(
link_response = self.__session.post(
self.__url__("/v1/data/upload"),
headers=self.__headers,
json={k: v for k, v in params.items() if v is not None},
)

json = json_or_raise(link_response)

link = json["link"]
buffer = ProgressBufferReader(data, callback=callback)
upload_request = requests.put(
upload_request = self.__session.put(
link,
data=buffer,
headers={"Content-Type": "application/octet-stream"},
Expand Down
2 changes: 1 addition & 1 deletion tests/test_stream_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def raise_for_status(self):
return Resp()


@patch("requests.get", side_effect=get_generated_data)
@patch("requests.Session.get", side_effect=get_generated_data)
def test_boot(arg):
client = Client("test")
client._make_stream_link = MagicMock(return_value="the_link")
Expand Down

0 comments on commit aa8a269

Please sign in to comment.