Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to the UI for importing and enriching data [MAP-347] [MAP-78] [MAP-77] #154

Merged
merged 29 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c684147
reduce import batch size temporarily for testing purposes
Moggach Dec 5, 2024
bd94f68
add inQueue property to BatchJobProgress which contains all jobs with…
Moggach Dec 5, 2024
2e1e32a
add get_latest_parent_job and get_latest_import_job methods to Extern…
Moggach Dec 5, 2024
bd038d0
revert update all batch size reset
Moggach Dec 9, 2024
5b46d8a
add get_latest_update_job method to ExternalDataSource model
Moggach Dec 9, 2024
b55727b
update base url for Codegen Config to 127.0.0.1
Moggach Dec 9, 2024
d270091
add actualFinishTime field to BatchJobProgress types
Moggach Dec 9, 2024
eda3365
imported message disappears 30 seconds after actual finish time of jo…
Moggach Dec 9, 2024
49afd28
add actual finish time property to ExternalDataSourceInspectPage quer…
Moggach Dec 9, 2024
ffb3e47
add number of jobs ahead in queue property to get_scheduled_batch_job…
Moggach Dec 9, 2024
59da27c
add auto generated types
Moggach Dec 9, 2024
8e9a783
add numberOfJobsAheadInQueue to import progress and update progress o…
Moggach Dec 9, 2024
6bd8f0c
add logic for sending an email when an import or update job will take…
Moggach Dec 10, 2024
6db1c6e
add send email flag to BatchJobProgress model types
Moggach Dec 10, 2024
de695d7
change default value of send_email in BatchJobProgress type to False
Moggach Dec 10, 2024
999cc26
update time threshold to 5 minutes
Moggach Dec 10, 2024
0de288a
add sendEmail flag to ExternalDataSourceInspectPage query and render …
Moggach Dec 10, 2024
8506ffd
merge latest from main
Moggach Dec 10, 2024
4748637
run black
Moggach Dec 10, 2024
8758315
run issort
Moggach Dec 10, 2024
b05986e
fix typescript errors in BatchJobProgress component
Moggach Dec 10, 2024
a9c5050
pass user into update_progress method to match import_progress
Moggach Dec 11, 2024
ce761e4
merge in latest from main
Moggach Dec 12, 2024
d6be198
make a BatchRequest model and create an instance of it with user info…
Moggach Dec 12, 2024
936a477
add check for presence of associated BatchRequest and its user before…
Moggach Dec 12, 2024
614bf4c
python linting
Moggach Dec 12, 2024
66123cc
refactor to use estimated job duration instead of actual job duration
Moggach Dec 12, 2024
80c7825
Merge remote-tracking branch 'origin/main' into improvements-to-impor…
janbaykara Dec 16, 2024
5598f1b
Fix migrations
janbaykara Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions hub/graphql/types/model_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,22 @@ class BatchJobProgress:
failed: Optional[int] = None
estimated_seconds_remaining: Optional[float] = None
estimated_finish_time: Optional[datetime] = None
actual_finish_time: Optional[datetime] = None
seconds_per_record: Optional[float] = None
done: Optional[int] = None
remaining: Optional[int] = None
number_of_jobs_ahead_in_queue: Optional[int] = None
send_email: bool = False

@strawberry_django.field
def in_queue(self, info: Info) -> bool:
if (
self.status == ProcrastinateJobStatus.doing.value
or self.status == ProcrastinateJobStatus.todo.value
):
return True
else:
return False


@strawberry.enum
Expand Down Expand Up @@ -863,10 +876,11 @@ def is_import_scheduled(self: models.ExternalDataSource, info: Info) -> bool:
def import_progress(
self: models.ExternalDataSource, info: Info
) -> Optional[BatchJobProgress]:
job = self.get_scheduled_import_job()
user = info.context.request.user
job = self.get_latest_import_job()
if job is None:
return None
progress = self.get_scheduled_batch_job_progress(job)
progress = self.get_scheduled_batch_job_progress(job, user=user)
if progress is None:
return None
return BatchJobProgress(**progress)
Expand All @@ -880,7 +894,7 @@ def is_update_scheduled(self: models.ExternalDataSource, info: Info) -> bool:
def update_progress(
self: models.ExternalDataSource, info: Info
) -> Optional[BatchJobProgress]:
job = self.get_scheduled_update_job()
job = self.get_latest_update_job()
if job is None:
return None
progress = self.get_scheduled_batch_job_progress(job)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to pass the user in here, too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've added that now

Expand Down
144 changes: 119 additions & 25 deletions hub/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import math
import uuid
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import List, Optional, Self, Type, TypedDict, Union
from urllib.parse import urlencode, urljoin
Expand All @@ -15,6 +15,7 @@
from django.contrib.gis.db.models import MultiPolygonField, PointField
from django.contrib.gis.geos import Point
from django.core.cache import cache
from django.core.mail import EmailMessage
from django.db import models
from django.db.models import Avg, IntegerField, Max, Min, Q
from django.db.models.functions import Cast, Coalesce
Expand Down Expand Up @@ -1208,6 +1209,24 @@ def get_scheduled_parent_job(self, filter: dict):
)
return original_job

def get_latest_parent_job(self, filter: dict):
latest_batch_job_for_this_source = (
self.event_log_queryset()
.filter(**filter, args__request_id__isnull=False)
.first()
)
if latest_batch_job_for_this_source is None:
return None
request_id = latest_batch_job_for_this_source.args.get("request_id", None)
# Now find the oldest, first job with that request_id
original_job = (
self.event_log_queryset()
.filter(args__request_id=request_id)
.order_by("id")
.first()
)
return original_job

def get_scheduled_import_job(self):
return self.get_scheduled_parent_job(
dict(task_name__contains="hub.tasks.import")
Expand All @@ -1218,6 +1237,12 @@ def get_scheduled_update_job(self):
dict(task_name__contains="hub.tasks.refresh")
)

def get_latest_import_job(self):
return self.get_latest_parent_job(dict(task_name__contains="hub.tasks.import"))

def get_latest_update_job(self):
return self.get_latest_parent_job(dict(task_name__contains="hub.tasks.refresh"))

class BatchJobProgress(TypedDict):
status: str
id: str
Expand All @@ -1227,13 +1252,16 @@ class BatchJobProgress(TypedDict):
doing: int = 0
failed: int = 0
estimated_seconds_remaining: float = 0
actual_finish_time: Optional[datetime]
estimated_finish_time: Optional[datetime]
has_forecast: bool = True
seconds_per_record: float = 0
done: int = 0
remaining: int = 0
number_of_jobs_ahead_in_queue: int = 0
send_email: bool = False

def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob):
def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob, user=None):
# TODO: This doesn't work for import/refresh by page. How can it cover this case?
request_id = parent_job.args.get("request_id")

Expand Down Expand Up @@ -1269,25 +1297,36 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob):
)

jobs = self.event_log_queryset().filter(args__request_id=request_id).all()
status = "todo"

if any([job.status == "doing" for job in jobs]):
status = "doing"
elif any([job.status == "failed" for job in jobs]):
status = "failed"
elif all([job.status == "succeeded" for job in jobs]):
status = "succeeded"

total = 2
total = 0
statuses = dict()

for job in jobs:
job_count = len(job.args.get("members", []))
total += job_count
job_record_count = len(job.args.get("members", []))
total += job_record_count
if statuses.get(job.status, None) is not None:
statuses[job.status] += job_count
statuses[job.status] += job_record_count
else:
statuses[job.status] = job_count
statuses[job.status] = job_record_count

done = (
int(
statuses.get("succeeded", 0)
+ statuses.get("failed", 0)
+ statuses.get("doing", 0)
)
+ 1
statuses.get("succeeded", 0)
+ statuses.get("failed", 0)
+ statuses.get("doing", 0)
)

number_of_jobs_ahead_in_queue = (
ProcrastinateJob.objects.filter(id__lt=parent_job.id)
.filter(status__in=["todo", "doing"])
.count()
)

time_started = (
Expand All @@ -1300,29 +1339,82 @@ def get_scheduled_batch_job_progress(self, parent_job: ProcrastinateJob):
remaining = total - done

time_so_far = datetime.now(pytz.utc) - time_started
duration_per_record = time_so_far / done
duration_per_record = time_so_far / (done or 1)
time_remaining = duration_per_record * remaining
estimated_finish_time = datetime.now() + time_remaining
estimated_finish_time = datetime.now(pytz.utc) + time_remaining

if status == "succeeded" or status == "failed":
actual_finish_time = (
ProcrastinateEvent.objects.filter(job__in=jobs)
.order_by("-at")
.first()
.at.replace(tzinfo=pytz.utc)
)

else:
actual_finish_time = None

time_threshold = timedelta(minutes=5)
send_email = False
actual_job_duration = None

estimated_job_duration = estimated_finish_time - time_started

if actual_finish_time is not None:
actual_job_duration = actual_finish_time - time_started

if estimated_job_duration > time_threshold:
send_email = True

if actual_job_duration and actual_job_duration > time_threshold:
if status == "succeeded" and user and user.is_authenticated:
user_email = user.email
email_subject = "Mapped Job Progress Notification"
email_body = "Your job has been successfully completed."
try:
email = EmailMessage(
subject=email_subject,
body=email_body,
from_email="[email protected]",
to=[user_email],
)
email.send()

except Exception as e:
logger.error(f"Failed to send email: {e}")

elif status == "failed" and user and user.is_authenticated:
user_email = user.email
email_subject = "Mapped Job Progress Notification"
email_body = "Your job has failed. Please check the details."
try:
email = EmailMessage(
subject=email_subject,
body=email_body,
from_email="[email protected]",
to=[user_email],
)
email.send()

except Exception as e:
logger.error(f"Failed to send email to {user_email}: {e}")

return self.BatchJobProgress(
status=(
"succeeded"
if remaining <= 0
else (
parent_job.status if parent_job.status != "succeeded" else "doing"
)
),
send_email=send_email,
status=status,
id=request_id,
started_at=time_started,
estimated_seconds_remaining=time_remaining,
estimated_finish_time=estimated_finish_time,
actual_finish_time=actual_finish_time,
seconds_per_record=duration_per_record.seconds,
total=total - 2,
done=done - 1,
remaining=remaining - 1,
total=total,
done=done,
remaining=remaining,
succeeded=statuses.get("succeeded", 0),
failed=statuses.get("failed", 0),
doing=statuses.get("doing", 0),
number_of_jobs_ahead_in_queue=number_of_jobs_ahead_in_queue,
)

def get_update_mapping(self) -> list[UpdateMapping]:
Expand Down Expand Up @@ -2123,6 +2215,7 @@ async def deferred_refresh_page(
async def deferred_refresh_many(
cls, external_data_source_id: str, members: list, request_id: str = None
):

if not cls.allow_updates:
logger.error(f"Updates requested for non-updatable CRM {cls}")
return
Expand Down Expand Up @@ -2187,6 +2280,7 @@ async def deferred_setup_webhooks(cls, external_data_source_id: str, refresh=Tru
async def deferred_import_many(
cls, external_data_source_id: str, members: list, request_id: str = None
):

external_data_source: ExternalDataSource = await cls.objects.aget(
id=external_data_source_id
)
Expand Down
2 changes: 1 addition & 1 deletion nextjs/codegen.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { CodegenConfig } from '@graphql-codegen/cli'

const config: CodegenConfig = {
schema: 'http://localhost:8000/graphql',
schema: 'http://127.0.0.1:8000/graphql',
documents: [
'src/components/**/*.{ts,tsx}',
'src/app/**/*.{ts,tsx}',
Expand Down
Loading
Loading