Skip to content

Commit

Permalink
run black
Browse files Browse the repository at this point in the history
  • Loading branch information
Moggach committed Dec 10, 2024
1 parent 8506ffd commit 4748637
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 52 deletions.
13 changes: 5 additions & 8 deletions hub/graphql/types/model_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,20 +809,17 @@ class BatchJobProgress:
number_of_jobs_ahead_in_queue: Optional[int] = None
send_email: bool = False


@strawberry_django.field
def in_queue(
self, info: Info
) -> bool:
if self.status == ProcrastinateJobStatus.doing.value or self.status == ProcrastinateJobStatus.todo.value:
def in_queue(self, info: Info) -> bool:
if (
self.status == ProcrastinateJobStatus.doing.value
or self.status == ProcrastinateJobStatus.todo.value
):
return True
else:
return False





@strawberry.enum
class CrmType(Enum):
airtable = "airtable"
Expand Down
83 changes: 40 additions & 43 deletions hub/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,13 +1209,11 @@ def get_scheduled_parent_job(self, filter: dict):
.first()
)
return original_job
def get_latest_parent_job(self, filter:dict):

def get_latest_parent_job(self, filter: dict):
latest_batch_job_for_this_source = (
self.event_log_queryset()
.filter(
**filter, args__request_id__isnull=False
)
.filter(**filter, args__request_id__isnull=False)
.first()
)
if latest_batch_job_for_this_source is None:
Expand All @@ -1229,28 +1227,22 @@ def get_latest_parent_job(self, filter:dict):
.first()
)
return original_job


def get_scheduled_import_job(self):
return self.get_scheduled_parent_job(
dict(task_name__contains="hub.tasks.import")
)

def get_scheduled_update_job(self):
return self.get_scheduled_parent_job(
dict(task_name__contains="hub.tasks.refresh")
)

def get_latest_import_job(self):
return self.get_latest_parent_job(
dict(task_name__contains="hub.tasks.import")
)

def get_latest_update_job(self):
return self.get_latest_parent_job(
dict(task_name__contains="hub.tasks.refresh")
)
return self.get_latest_parent_job(dict(task_name__contains="hub.tasks.import"))

def get_latest_update_job(self):
return self.get_latest_parent_job(dict(task_name__contains="hub.tasks.refresh"))

class BatchJobProgress(TypedDict):
status: str
Expand Down Expand Up @@ -1306,15 +1298,14 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob, user=No
)

jobs = self.event_log_queryset().filter(args__request_id=request_id).all()
status = 'todo'

if any([job.status == 'doing' for job in jobs]):
status = 'doing'
elif any([job.status == 'failed' for job in jobs]):
status = 'failed'
elif all([job.status == 'succeeded' for job in jobs]):
status = 'succeeded'

status = "todo"

if any([job.status == "doing" for job in jobs]):
status = "doing"
elif any([job.status == "failed" for job in jobs]):
status = "failed"
elif all([job.status == "succeeded" for job in jobs]):
status = "succeeded"

total = 0
statuses = dict()
Expand All @@ -1327,10 +1318,18 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob, user=No
else:
statuses[job.status] = job_record_count

done = statuses.get("succeeded", 0) + statuses.get("failed", 0) + statuses.get("doing", 0)

number_of_jobs_ahead_in_queue = ProcrastinateJob.objects.filter(id__lt=parent_job.id).filter(status__in=['todo', 'doing']).count()

done = (
statuses.get("succeeded", 0)
+ statuses.get("failed", 0)
+ statuses.get("doing", 0)
)

number_of_jobs_ahead_in_queue = (
ProcrastinateJob.objects.filter(id__lt=parent_job.id)
.filter(status__in=["todo", "doing"])
.count()
)

time_started = (
ProcrastinateEvent.objects.filter(job_id=parent_job.id)
.order_by("at")
Expand All @@ -1344,30 +1343,30 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob, user=No
duration_per_record = time_so_far / (done or 1)
time_remaining = duration_per_record * remaining
estimated_finish_time = datetime.now(pytz.utc) + time_remaining
if status == 'succeeded' or status == 'failed':

if status == "succeeded" or status == "failed":
actual_finish_time = (
ProcrastinateEvent.objects.filter(job__in=jobs)
.order_by("-at")
.first()
.at.replace(tzinfo=pytz.utc)
)

else:
actual_finish_time = None

time_threshold = timedelta(minutes=5)
send_email = False
send_email = False
actual_job_duration = None

estimated_job_duration = estimated_finish_time - time_started

if actual_finish_time is not None:
actual_job_duration = actual_finish_time - time_started

if estimated_job_duration > time_threshold:
send_email = True

if actual_job_duration and actual_job_duration > time_threshold:
if status == "succeeded" and user and user.is_authenticated:
user_email = user.email
Expand All @@ -1381,7 +1380,7 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob, user=No
to=[user_email],
)
email.send()

except Exception as e:
logger.error(f"Failed to send email: {e}")

Expand All @@ -1397,11 +1396,9 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob, user=No
to=[user_email],
)
email.send()

except Exception as e:
logger.error(
f"Failed to send email to {user_email}: {e}"
)
logger.error(f"Failed to send email to {user_email}: {e}")

return self.BatchJobProgress(
send_email=send_email,
Expand Down Expand Up @@ -2219,7 +2216,7 @@ async def deferred_refresh_page(
async def deferred_refresh_many(
cls, external_data_source_id: str, members: list, request_id: str = None
):

if not cls.allow_updates:
logger.error(f"Updates requested for non-updatable CRM {cls}")
return
Expand Down Expand Up @@ -2283,7 +2280,7 @@ async def deferred_setup_webhooks(cls, external_data_source_id: str, refresh=Tru
@classmethod
async def deferred_import_many(
cls, external_data_source_id: str, members: list, request_id: str = None
):
):

external_data_source: ExternalDataSource = await cls.objects.aget(
id=external_data_source_id
Expand Down
2 changes: 1 addition & 1 deletion local_intelligence_hub/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@

ASYNC_CLIENT_TIMEOUT_SECONDS = 30

IMPORT_UPDATE_ALL_BATCH_SIZE = 500
IMPORT_UPDATE_ALL_BATCH_SIZE = 500
IMPORT_UPDATE_MANY_RETRY_COUNT = 3


Expand Down

0 comments on commit 4748637

Please sign in to comment.