Skip to content

Commit b2610c7

Browse files
Saving the blob to the manager as it is returned from the invocation
1 parent 034ea9a commit b2610c7

File tree

4 files changed

+39
-169
lines changed

4 files changed

+39
-169
lines changed

src/labthings_fastapi/actions/__init__.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
InvocationCancelledError,
2020
invocation_logger,
2121
)
22-
from ..outputs.blob import BlobIOContextDep
22+
from ..outputs.blob import Blob, BlobDataManager, BlobRef
2323

2424
if TYPE_CHECKING:
2525
# We only need these imports for type hints, so this avoids circular imports.
@@ -40,6 +40,7 @@ def __init__(
4040
self,
4141
action: ActionDescriptor,
4242
thing: Thing,
43+
blob_data_manager: BlobDataManager,
4344
input: Optional[BaseModel] = None,
4445
dependencies: Optional[dict[str, Any]] = None,
4546
default_stop_timeout: float = 5,
@@ -56,6 +57,8 @@ def __init__(
5657
self.dependencies = dependencies if dependencies is not None else {}
5758
self.cancel_hook = cancel_hook
5859

60+
self._blob_data_manager = blob_data_manager
61+
5962
# A UUID for the Invocation (not the same as the threading.Thread ident)
6063
self._ID = id if id is not None else uuid.uuid4() # Task ID
6164

@@ -181,6 +184,9 @@ def run(self):
181184
ret = action.__get__(thing)(**kwargs, **self.dependencies)
182185

183186
with self._status_lock:
187+
if isinstance(ret, Blob):
188+
blob_id = self._blob_data_manager.add_blob(ret.data)
189+
ret.href=f"/blob/{blob_id}"
184190
self._return_value = ret
185191
self._status = InvocationStatus.COMPLETED
186192
self.action.emit_changed_event(self.thing, self._status)
@@ -241,7 +247,8 @@ def emit(self, record):
241247
class ActionManager:
242248
"""A class to manage a collection of actions"""
243249

244-
def __init__(self):
250+
def __init__(self, server):
251+
self._server = server
245252
self._invocations = {}
246253
self._invocations_lock = Lock()
247254

@@ -271,6 +278,7 @@ def invoke_action(
271278
dependencies=dependencies,
272279
id=id,
273280
cancel_hook=cancel_hook,
281+
blob_data_manager=self._server.blob_data_manager
274282
)
275283
self.append_invocation(thread)
276284
thread.start()
@@ -312,17 +320,15 @@ def attach_to_app(self, app: FastAPI):
312320
"""Add /action_invocations and /action_invocation/{id} endpoints to FastAPI"""
313321

314322
@app.get(ACTION_INVOCATIONS_PATH, response_model=list[InvocationModel])
315-
def list_all_invocations(request: Request, _blob_manager: BlobIOContextDep):
323+
def list_all_invocations(request: Request):
316324
return self.list_invocations(as_responses=True, request=request)
317325

318326
@app.get(
319327
ACTION_INVOCATIONS_PATH + "/{id}",
320328
response_model=InvocationModel,
321329
responses={404: {"description": "Invocation ID not found"}},
322330
)
323-
def action_invocation(
324-
id: uuid.UUID, request: Request, _blob_manager: BlobIOContextDep
325-
):
331+
def action_invocation(id: uuid.UUID, request: Request):
326332
try:
327333
with self._invocations_lock:
328334
return self._invocations[id].response(request=request)
@@ -346,7 +352,7 @@ def action_invocation(
346352
503: {"description": "No result is available for this invocation"},
347353
},
348354
)
349-
def action_invocation_output(id: uuid.UUID, _blob_manager: BlobIOContextDep):
355+
def action_invocation_output(id: uuid.UUID):
350356
"""Get the output of an action invocation
351357
352358
This returns just the "output" component of the action invocation. If the

src/labthings_fastapi/descriptors/action.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
input_model_from_signature,
2424
return_type,
2525
)
26-
from ..outputs.blob import BlobIOContextDep
26+
2727
from ..thing_description import type_to_dataschema
2828
from ..thing_description.model import ActionAffordance, ActionOp, Form, Union
2929
from ..utilities import labthings_data, get_blocking_portal
@@ -178,7 +178,6 @@ def add_to_fastapi(self, app: FastAPI, thing: Thing):
178178
# the function to the decorator.
179179
def start_action(
180180
action_manager: ActionManagerContextDep,
181-
_blob_manager: BlobIOContextDep,
182181
request: Request,
183182
body,
184183
id: InvocationID,

src/labthings_fastapi/outputs/blob.py

Lines changed: 24 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ def get_image(self) -> MyImageBlob:
4444

4545
from __future__ import annotations
4646
from contextvars import ContextVar
47+
import traceback
48+
import logging
4749
import io
4850
import os
4951
import re
@@ -69,7 +71,6 @@ def get_image(self) -> MyImageBlob:
6971
model_validator,
7072
)
7173
from labthings_fastapi.dependencies.thing_server import find_thing_server
72-
from starlette.exceptions import HTTPException
7374
from typing_extensions import Self, Protocol, runtime_checkable
7475

7576

@@ -187,6 +188,17 @@ def open(self) -> io.IOBase:
187188
def response(self) -> Response:
188189
return FileResponse(self._file_path, media_type=self.media_type)
189190

191+
class BlobRef(BaseModel):
192+
href: str
193+
"""The URL where the data may be retrieved. This will be `blob://local`
194+
if the data is stored locally."""
195+
rel: Literal["output"] = "output"
196+
description: str = (
197+
"The output from this action is not serialised to JSON, so it must be "
198+
"retrieved as a file. This link will return the file."
199+
)
200+
media_type: str
201+
190202

191203
class Blob(BaseModel):
192204
"""A container for binary data that may be retrieved over HTTP
@@ -202,89 +214,25 @@ class Blob(BaseModel):
202214
`media_type` attribute, as this will propagate to the auto-generated
203215
documentation.
204216
"""
205-
206-
href: str
217+
href: str = "blob://local"
207218
"""The URL where the data may be retrieved. This will be `blob://local`
208219
if the data is stored locally."""
209-
media_type: str = "*/*"
210-
"""The MIME type of the data. This should be overridden in subclasses."""
211220
rel: Literal["output"] = "output"
212221
description: str = (
213222
"The output from this action is not serialised to JSON, so it must be "
214223
"retrieved as a file. This link will return the file."
215224
)
216-
217-
_data: Optional[ServerSideBlobData] = None
218-
"""This object holds the data, either in memory or as a file.
225+
media_type: str = "*/*"
226+
"""The MIME type of the data. This should be overridden in subclasses."""
219227

220-
If `_data` is `None`, then the Blob has not been deserialised yet, and the
221-
`href` should point to a valid address where the data may be downloaded.
222-
"""
223-
224-
@model_validator(mode="after")
225-
def retrieve_data(self):
226-
"""Retrieve the data from the URL
227-
228-
When a [`Blob`](#labthings_fastapi.outputs.blob.Blob) is created
229-
using its constructor, [`pydantic`](https://docs.pydantic.dev/latest/)
230-
will attempt to deserialise it by retrieving the data from the URL
231-
specified in `href`. Currently, this must be a URL pointing to a
232-
[`Blob`](#labthings_fastapi.outputs.blob.Blob) that already exists on
233-
this server.
234-
235-
This validator will only work if the function to resolve URLs to
236-
[`BlobData`](#labthings_fastapi.outputs.blob.BlobData) objects
237-
has been set in the context variable
238-
[`url_to_blobdata_ctx`](#labthings_fastapi.outputs.blob.url_to_blobdata_ctx).
239-
This is done when actions are being invoked over HTTP by the
240-
[`BlobIOContextDep`](#labthings_fastapi.outputs.blob.BlobIOContextDep) dependency.
241-
"""
242-
if self.href == "blob://local":
243-
if self._data:
244-
return self
245-
raise ValueError("Blob objects must have data if the href is blob://local")
246-
try:
247-
url_to_blobdata = url_to_blobdata_ctx.get()
248-
self._data = url_to_blobdata(self.href)
249-
self.href = "blob://local"
250-
except LookupError:
251-
raise LookupError(
252-
"Blobs may only be created from URLs passed in over HTTP."
253-
f"The URL in question was {self.href}."
254-
)
255-
return self
228+
_data: ServerSideBlobData
229+
"""This object holds the data, either in memory or as a file."""
256230

257231
@model_serializer(mode="plain", when_used="always")
258232
def to_dict(self) -> Mapping[str, str]:
259-
"""Serialise the Blob to a dictionary and make it downloadable
260-
261-
When [`pydantic`](https://docs.pydantic.dev/latest/) serialises this object,
262-
it will call this method to convert it to a dictionary. There is a
263-
significant side-effect, which is that we will add the blob to the
264-
[`BlobDataManager`](#labthings_fastapi.outputs.blob.BlobDataManager) so it
265-
can be downloaded.
266-
267-
This serialiser will only work if the function to assign URLs to
268-
[`BlobData`](#labthings_fastapi.outputs.blob.BlobData) objects
269-
has been set in the context variable
270-
[`blobdata_to_url_ctx`](#labthings_fastapi.outputs.blob.blobdata_to_url_ctx).
271-
This is done when actions are being returned over HTTP by the
272-
[`BlobIOContextDep`](#labthings_fastapi.outputs.blob.BlobIOContextDep) dependency.
273-
"""
274-
if self.href == "blob://local":
275-
try:
276-
blobdata_to_url = blobdata_to_url_ctx.get()
277-
# MyPy seems to miss that `self.data` is a property, hence the ignore
278-
href = blobdata_to_url(self.data) # type: ignore[arg-type]
279-
except LookupError:
280-
raise LookupError(
281-
"Blobs may only be serialised inside the "
282-
"context created by BlobIOContextDep."
283-
)
284-
else:
285-
href = self.href
233+
"""Serialise the Blob to a dictionary and make it downloadable"""
286234
return {
287-
"href": href,
235+
"href": self.href,
288236
"media_type": self.media_type,
289237
"rel": self.rel,
290238
"description": self.description,
@@ -348,10 +296,7 @@ def open(self) -> io.IOBase:
348296
@classmethod
349297
def from_bytes(cls, data: bytes) -> Self:
350298
"""Create a BlobOutput from a bytes object"""
351-
return cls.model_construct( # type: ignore[return-value]
352-
href="blob://local",
353-
_data=BlobBytes(data, media_type=cls.default_media_type()),
354-
)
299+
return cls.model_construct(_data = BlobBytes(data, media_type=cls.default_media_type()))
355300

356301
@classmethod
357302
def from_temporary_directory(cls, folder: TemporaryDirectory, file: str) -> Self:
@@ -362,9 +307,8 @@ def from_temporary_directory(cls, folder: TemporaryDirectory, file: str) -> Self
362307
collected.
363308
"""
364309
file_path = os.path.join(folder.name, file)
365-
return cls.model_construct( # type: ignore[return-value]
366-
href="blob://local",
367-
_data=BlobFile(
310+
return cls.model_construct(
311+
_data = BlobFile(
368312
file_path,
369313
media_type=cls.default_media_type(),
370314
# Prevent the temporary directory from being cleaned up
@@ -381,36 +325,13 @@ def from_file(cls, file: str) -> Self:
381325
temporary. If you are using temporary files, consider creating your
382326
Blob with `from_temporary_directory` instead.
383327
"""
384-
return cls.model_construct( # type: ignore[return-value]
385-
href="blob://local",
386-
_data=BlobFile(file, media_type=cls.default_media_type()),
387-
)
328+
return cls.model_construct(_data = BlobFile(file, media_type=cls.default_media_type()))
388329

389330
def response(self):
390331
""" "Return a suitable response for serving the output"""
391332
return self.data.response()
392333

393334

394-
def blob_type(media_type: str) -> type[Blob]:
395-
"""Create a BlobOutput subclass for a given media type
396-
397-
This convenience function may confuse static type checkers, so it is usually
398-
clearer to make a subclass instead, e.g.:
399-
400-
```python
401-
class MyImageBlob(Blob):
402-
media_type = "image/png"
403-
```
404-
"""
405-
if "'" in media_type or "\\" in media_type:
406-
raise ValueError("media_type must not contain single quotes or backslashes")
407-
return create_model(
408-
f"{media_type.replace('/', '_')}_blob",
409-
__base__=Blob,
410-
media_type=(eval(f"Literal[r'{media_type}']"), media_type),
411-
)
412-
413-
414335
class BlobDataManager:
415336
"""A class to manage BlobData objects
416337
@@ -452,59 +373,3 @@ def download_blob(self, blob_id: uuid.UUID):
452373
def attach_to_app(self, app: FastAPI):
453374
"""Attach the BlobDataManager to a FastAPI app"""
454375
app.get("/blob/{blob_id}")(self.download_blob)
455-
456-
457-
blobdata_to_url_ctx = ContextVar[Callable[[ServerSideBlobData], str]]("blobdata_to_url")
458-
"""This context variable gives access to a function that makes BlobData objects
459-
downloadable, by assigning a URL and adding them to the
460-
[`BlobDataManager`](#labthings_fastapi.outputs.blob.BlobDataManager).
461-
462-
It is only available within a
463-
[`blob_serialisation_context_manager`](#labthings_fastapi.outputs.blob.blob_serialisation_context_manager)
464-
because it requires access to the `BlobDataManager` and the `url_for` function
465-
from the FastAPI app.
466-
"""
467-
468-
url_to_blobdata_ctx = ContextVar[Callable[[str], BlobData]]("url_to_blobdata")
469-
"""This context variable gives access to a function that makes BlobData objects
470-
from a URL, by retrieving them from the
471-
[`BlobDataManager`](#labthings_fastapi.outputs.blob.BlobDataManager).
472-
473-
It is only available within a
474-
[`blob_serialisation_context_manager`](#labthings_fastapi.outputs.blob.blob_serialisation_context_manager)
475-
because it requires access to the `BlobDataManager`.
476-
"""
477-
478-
479-
async def blob_serialisation_context_manager(request: Request):
480-
"""Set context variables to allow blobs to be [de]serialised"""
481-
thing_server = find_thing_server(request.app)
482-
blob_manager: BlobDataManager = thing_server.blob_data_manager
483-
url_for = request.url_for
484-
485-
def blobdata_to_url(blob: ServerSideBlobData) -> str:
486-
blob_id = blob_manager.add_blob(blob)
487-
return str(url_for("download_blob", blob_id=blob_id))
488-
489-
def url_to_blobdata(url: str) -> BlobData:
490-
m = re.search(r"blob/([0-9a-z\-]+)", url)
491-
if not m:
492-
raise HTTPException(
493-
status_code=404, detail="Could not find blob ID in href"
494-
)
495-
invocation_id = uuid.UUID(m.group(1))
496-
return blob_manager.get_blob(invocation_id)
497-
498-
t1 = blobdata_to_url_ctx.set(blobdata_to_url)
499-
t2 = url_to_blobdata_ctx.set(url_to_blobdata)
500-
try:
501-
yield blob_manager
502-
finally:
503-
blobdata_to_url_ctx.reset(t1)
504-
url_to_blobdata_ctx.reset(t2)
505-
506-
507-
BlobIOContextDep: TypeAlias = Annotated[
508-
BlobDataManager, Depends(blob_serialisation_context_manager)
509-
]
510-
"""A dependency that enables `Blob`s to be serialised and deserialised."""

src/labthings_fastapi/server/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(self, settings_folder: Optional[str] = None):
3030
self.app = FastAPI(lifespan=self.lifespan)
3131
self.set_cors_middleware()
3232
self.settings_folder = settings_folder or "./settings"
33-
self.action_manager = ActionManager()
33+
self.action_manager = ActionManager(self)
3434
self.action_manager.attach_to_app(self.app)
3535
self.blob_data_manager = BlobDataManager()
3636
self.blob_data_manager.attach_to_app(self.app)

0 commit comments

Comments
 (0)