Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status Updates on DiagramDetectBundle #1253

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cognite/client/_api/diagrams.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,14 @@ def detect(
except CogniteAPIError as exc:
unposted_files.append({"error": str(exc), "files": batch})

res = (
DetectJobBundle(cognite_client=self._cognite_client, job_ids=[j.job_id for j in jobs if j.job_id])
if jobs
else None
)
return res, unposted_files
bundle = None
if jobs:
bundle = DetectJobBundle(
cognite_client=self._cognite_client, job_ids=[j.job_id for j in jobs if j.job_id]
)
bundle.update_status(timeout=10)

return bundle, unposted_files

return self._run_job(
job_path="/detect",
Expand Down
52 changes: 31 additions & 21 deletions cognite/client/data_classes/contextualization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import time
import warnings
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Set, Tuple, Type, TypeVar, Union, cast
Expand Down Expand Up @@ -597,7 +598,7 @@ class PersonalProtectiveEquipmentDetectionParameters(VisionResource, ThresholdPa
class DetectJobBundle:
_RESOURCE_PATH = "/context/diagram/detect/"
_STATUS_PATH = "/context/diagram/detect/status"
_WAIT_TIME = 2
_WAIT_TIME = 0

def __init__(self, job_ids: List[int], cognite_client: CogniteClient = None):
warnings.warn(
Expand All @@ -608,15 +609,18 @@ def __init__(self, job_ids: List[int], cognite_client: CogniteClient = None):
self._cognite_client = cast("CogniteClient", cognite_client)
if not job_ids:
raise ValueError("You need to specify job_ids")
self.job_ids = job_ids
self._remaining_job_ids: List[int] = []
self.job_ids = deepcopy(job_ids)
self._remaining_job_ids: List[int] = deepcopy(job_ids)

self.jobs: List[Dict[str, Any]] = []
self.error: Optional[Dict[str, Any]] = None

self._result: List[Dict[str, Any]] = []

def __str__(self) -> str:
return f"DetectJobBundle({self.job_ids=}, {self.jobs=}, {self._result=}, {self._remaining_job_ids=})"
return (
f"DetectJobBundle({self.job_ids=}, {self.jobs=}, {self._result=}, {self._remaining_job_ids=} {self.error=})"
)

def _back_off(self) -> None:
"""
Expand All @@ -627,33 +631,39 @@ def _back_off(self) -> None:
if self._WAIT_TIME < 10:
self._WAIT_TIME += 2

def wait_for_completion(self, timeout: int = None) -> None:
"""Waits for all jobs to complete, generally not needed to call as it is called by result.
def update_status(self, timeout: int = None) -> List[Dict[str, Any]]:
"""Fetches the status for all jobs in the bundle. Blocks until the status is updated.

Args:
timeout (int): Time out after this many seconds. (None means wait indefinitely)
interval (int): Poll status every this many seconds.
"""
start = time.time()
self._remaining_job_ids = self.job_ids
while timeout is None or time.time() < start + timeout:
self._back_off()
try:
res = self._cognite_client.diagrams._post(self._STATUS_PATH, json={"items": self._remaining_job_ids})
res_json = self._cognite_client.diagrams._post(
self._STATUS_PATH, json={"items": self._remaining_job_ids}
).json()
self.error = res_json.get("error")
if self.error is None:
self.jobs = res_json["items"]
self._remaining_job_ids = [
j["jobId"] for j in self.jobs if JobStatus(j["status"]).is_not_finished()
]
return self.jobs
except CogniteAPIError:
self._back_off()
continue
if res.json().get("error"):
break
self.jobs = res.json()["items"]
pass
self.error = {"message": f"DetectJobBundle timed out after {timeout} seconds"}
return self.jobs

# Assign the jobs that aren't finished
self._remaining_job_ids = [j["jobId"] for j in self.jobs if JobStatus(j["status"]).is_not_finished()]
def wait_for_completion(self, timeout: int = None) -> None:
"""Waits for all jobs to complete, generally not needed to call as it is called by result.

if self._remaining_job_ids:
self._back_off()
else:
self._WAIT_TIME = 2
break
Args:
timeout (int): Time out after this many seconds. (None means wait indefinitely)
"""
while self.error is None and self._remaining_job_ids:
self.update_status(timeout=timeout)

def fetch_results(self) -> List[Dict[str, Any]]:
return [self._cognite_client.diagrams._get(f"{self._RESOURCE_PATH}{j}").json() for j in self.job_ids]
Expand Down