Skip to content

Commit

Permalink
Merge pull request #6256 from hotosm/fix/unlock-after-validation-api-…
Browse files Browse the repository at this point in the history
…threading-for-tasks

Unlock tasks after validation using in parallel using python threads
  • Loading branch information
robsavoye authored Mar 6, 2024
2 parents 7307769 + bafd7ba commit 10eb67c
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 51 deletions.
110 changes: 85 additions & 25 deletions backend/models/postgis/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,37 @@ def delete(self):
db.session.commit()

@staticmethod
def get_open_for_task(project_id, task_id):
def get_open_for_task(project_id, task_id, local_session=None):
if local_session:
return (
local_session.query(TaskInvalidationHistory)
.filter_by(task_id=task_id, project_id=project_id, is_closed=False)
.one_or_none()
)
return TaskInvalidationHistory.query.filter_by(
task_id=task_id, project_id=project_id, is_closed=False
).one_or_none()

@staticmethod
def close_all_for_task(project_id, task_id):
def close_all_for_task(project_id, task_id, local_session=None):
if local_session:
return (
local_session.query(TaskInvalidationHistory)
.filter_by(task_id=task_id, project_id=project_id, is_closed=False)
.update({"is_closed": True})
)
TaskInvalidationHistory.query.filter_by(
task_id=task_id, project_id=project_id, is_closed=False
).update({"is_closed": True})

@staticmethod
def record_invalidation(project_id, task_id, invalidator_id, history):
def record_invalidation(
project_id, task_id, invalidator_id, history, local_session=None
):
# Invalidation always kicks off a new entry for a task, so close any existing ones.
TaskInvalidationHistory.close_all_for_task(project_id, task_id)
TaskInvalidationHistory.close_all_for_task(
project_id, task_id, local_session=local_session
)

last_mapped = TaskHistory.get_last_mapped_action(project_id, task_id)
if last_mapped is None:
Expand All @@ -124,11 +140,18 @@ def record_invalidation(project_id, task_id, invalidator_id, history):
entry.invalidator_id = invalidator_id
entry.invalidated_date = history.action_date
entry.updated_date = timestamp()
db.session.add(entry)
if local_session:
local_session.add(entry)
else:
db.session.add(entry)

@staticmethod
def record_validation(project_id, task_id, validator_id, history):
entry = TaskInvalidationHistory.get_open_for_task(project_id, task_id)
def record_validation(
project_id, task_id, validator_id, history, local_session=None
):
entry = TaskInvalidationHistory.get_open_for_task(
project_id, task_id, local_session=local_session
)

# If no open invalidation to update, then nothing to do
if entry is None:
Expand Down Expand Up @@ -259,7 +282,7 @@ def delete(self):

@staticmethod
def update_task_locked_with_duration(
task_id: int, project_id: int, lock_action, user_id: int
task_id: int, project_id: int, lock_action, user_id: int, local_session=None
):
"""
Calculates the duration a task was locked for and sets it on the history record
Expand All @@ -270,13 +293,26 @@ def update_task_locked_with_duration(
:return:
"""
try:
last_locked = TaskHistory.query.filter_by(
task_id=task_id,
project_id=project_id,
action=lock_action.name,
action_text=None,
user_id=user_id,
).one()
if local_session:
last_locked = (
local_session.query(TaskHistory)
.filter_by(
task_id=task_id,
project_id=project_id,
action=lock_action.name,
action_text=None,
user_id=user_id,
)
.one()
)
else:
last_locked = TaskHistory.query.filter_by(
task_id=task_id,
project_id=project_id,
action=lock_action.name,
action_text=None,
user_id=user_id,
).one()
except NoResultFound:
# We suspect there's some kind or race condition that is occasionally deleting history records
# prior to user unlocking task. Most likely stemming from auto-unlock feature. However, given that
Expand All @@ -302,7 +338,10 @@ def update_task_locked_with_duration(
last_locked.action_text = (
(datetime.datetime.min + duration_task_locked).time().isoformat()
)
db.session.commit()
if local_session:
local_session.commit()
else:
db.session.commit()

@staticmethod
def remove_duplicate_task_history_rows(
Expand Down Expand Up @@ -539,9 +578,12 @@ def create(self):
db.session.add(self)
db.session.commit()

def update(self):
def update(self, local_session=None):
"""Updates the DB with the current state of the Task"""
db.session.commit()
if local_session:
local_session.commit()
else:
db.session.commit()

def delete(self):
"""Deletes the current model from the DB"""
Expand Down Expand Up @@ -592,15 +634,20 @@ def from_geojson_feature(cls, task_id, task_feature):
return task

@staticmethod
def get(task_id: int, project_id: int):
def get(task_id: int, project_id: int, local_session=None):
"""
Gets specified task
:param task_id: task ID in scope
:param project_id: project ID in scope
:return: Task if found otherwise None
"""
# LIKELY PROBLEM AREA

if local_session:
return (
local_session.query(Task)
.filter_by(id=task_id, project_id=project_id)
.one_or_none()
)
return Task.query.filter_by(id=task_id, project_id=project_id).one_or_none()

@staticmethod
Expand Down Expand Up @@ -781,7 +828,13 @@ def record_auto_unlock(self, lock_duration):
self.update()

def unlock_task(
self, user_id, new_state=None, comment=None, undo=False, issues=None
self,
user_id,
new_state=None,
comment=None,
undo=False,
issues=None,
local_session=None,
):
"""Unlock task and ensure duration task locked is saved in History"""
if comment:
Expand Down Expand Up @@ -812,25 +865,32 @@ def unlock_task(
self.mapped_by = user_id
elif new_state == TaskStatus.VALIDATED:
TaskInvalidationHistory.record_validation(
self.project_id, self.id, user_id, history
self.project_id, self.id, user_id, history, local_session=local_session
)
self.validated_by = user_id
elif new_state == TaskStatus.INVALIDATED:
TaskInvalidationHistory.record_invalidation(
self.project_id, self.id, user_id, history
self.project_id, self.id, user_id, history, local_session=local_session
)
self.mapped_by = None
self.validated_by = None

if not undo:
# Using a slightly evil side effect of Actions and Statuses having the same name here :)
TaskHistory.update_task_locked_with_duration(
self.id, self.project_id, TaskStatus(self.task_status), user_id
self.id,
self.project_id,
TaskStatus(self.task_status),
user_id,
local_session=local_session,
)

self.task_status = new_state.value
self.locked_by = None
self.update()
if local_session:
self.update(local_session=local_session)
else:
self.update()

def reset_lock(self, user_id, comment=None):
"""Removes a current lock from a task, resets to last status and
Expand Down
12 changes: 9 additions & 3 deletions backend/models/postgis/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,12 @@ def filter_users(user_filter: str, project_id: int, page: int) -> UserFilterDTO:
return dto

@staticmethod
def upsert_mapped_projects(user_id: int, project_id: int):
def upsert_mapped_projects(user_id: int, project_id: int, local_session=None):
"""Adds projects to mapped_projects if it doesn't exist"""
query = User.query.filter_by(id=user_id)
if local_session:
query = local_session.query(User).filter_by(id=user_id)
else:
query = User.query.filter_by(id=user_id)
result = query.filter(
User.projects_mapped.op("@>")("{}".format("{" + str(project_id) + "}"))
).count()
Expand All @@ -235,7 +238,10 @@ def upsert_mapped_projects(user_id: int, project_id: int):
if user.projects_mapped is None:
user.projects_mapped = []
user.projects_mapped.append(project_id)
db.session.commit()
if local_session:
local_session.commit()
else:
db.session.commit()

@staticmethod
def get_mapped_projects(
Expand Down
5 changes: 4 additions & 1 deletion backend/services/stats_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def update_stats_after_task_state_change(
last_state: TaskStatus,
new_state: TaskStatus,
action="change",
local_session=None,
):
"""Update stats when a task has had a state change"""

Expand All @@ -62,7 +63,9 @@ def update_stats_after_task_state_change(
project, user = StatsService._update_tasks_stats(
project, user, last_state, new_state, action
)
UserService.upsert_mapped_projects(user_id, project_id)
UserService.upsert_mapped_projects(
user_id, project_id, local_session=local_session
)
project.last_updated = timestamp()

# Transaction will be saved when task is saved
Expand Down
4 changes: 2 additions & 2 deletions backend/services/users/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,9 @@ def get_countries_contributed(user_id: int):
return countries_dto

@staticmethod
def upsert_mapped_projects(user_id: int, project_id: int):
def upsert_mapped_projects(user_id: int, project_id: int, local_session=None):
"""Add project to mapped projects if it doesn't exist, otherwise return"""
User.upsert_mapped_projects(user_id, project_id)
User.upsert_mapped_projects(user_id, project_id, local_session=local_session)

@staticmethod
def get_mapped_projects(user_name: str, preferred_locale: str):
Expand Down
83 changes: 65 additions & 18 deletions backend/services/validator_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from flask import current_app
from sqlalchemy import text
from multiprocessing.dummy import Pool as ThreadPool
from sqlalchemy.orm import scoped_session, sessionmaker
import os

from backend import db
from backend.exceptions import NotFound
from backend.models.dtos.mapping_dto import TaskDTOs
from backend.models.dtos.stats_dto import Pagination
Expand Down Expand Up @@ -135,25 +139,24 @@ def _user_can_validate_task(user_id: int, mapped_by: int) -> bool:
return False

@staticmethod
def unlock_tasks_after_validation(
validated_dto: UnlockAfterValidationDTO,
) -> TaskDTOs:
"""
Unlocks supplied tasks after validation
:raises ValidatorServiceError
"""
validated_tasks = validated_dto.validated_tasks
project_id = validated_dto.project_id
user_id = validated_dto.user_id
tasks_to_unlock = ValidatorService.get_tasks_locked_by_user(
project_id, validated_tasks, user_id
)

# Unlock all tasks
dtos = []
message_sent_to = []
for task_to_unlock in tasks_to_unlock:
def _process_tasks(args):
(
app_context,
task_to_unlock,
project_id,
validated_dto,
message_sent_to,
dtos,
) = args
with app_context:
Session = scoped_session(sessionmaker(bind=db.engine))
local_session = Session()
task = task_to_unlock["task"]
task = (
local_session.query(Task)
.filter_by(id=task.id, project_id=project_id)
.one()
)

if task_to_unlock["comment"]:
# Parses comment to see if any users have been @'d
Expand Down Expand Up @@ -191,6 +194,7 @@ def unlock_tasks_after_validation(
validated_dto.user_id,
prev_status,
task_to_unlock["new_state"],
local_session=local_session,
)
task_mapping_issues = ValidatorService.get_task_mapping_issues(
task_to_unlock
Expand All @@ -200,8 +204,51 @@ def unlock_tasks_after_validation(
task_to_unlock["new_state"],
task_to_unlock["comment"],
issues=task_mapping_issues,
local_session=local_session,
)
dtos.append(task.as_dto_with_instructions(validated_dto.preferred_locale))
local_session.commit()
Session.remove()

@staticmethod
def unlock_tasks_after_validation(
validated_dto: UnlockAfterValidationDTO,
) -> TaskDTOs:
"""
Unlocks supplied tasks after validation
:raises ValidatorServiceError
"""
validated_tasks = validated_dto.validated_tasks
project_id = validated_dto.project_id
user_id = validated_dto.user_id
tasks_to_unlock = ValidatorService.get_tasks_locked_by_user(
project_id, validated_tasks, user_id
)

# Unlock all tasks
dtos = []
message_sent_to = []
args_list = []
for task_to_unlock in tasks_to_unlock:
args = (
current_app.app_context(),
task_to_unlock,
project_id,
validated_dto,
message_sent_to,
dtos,
)
args_list.append(args)

# Create a pool and Process the tasks in parallel
pool = ThreadPool(os.cpu_count())
pool.map(ValidatorService._process_tasks, args_list)

# Close the pool and wait for the threads to finish
pool.close()
pool.join()

# Send email on project progress
ProjectService.send_email_on_project_progress(validated_dto.project_id)
task_dtos = TaskDTOs()
task_dtos.tasks = dtos
Expand Down
Loading

0 comments on commit 10eb67c

Please sign in to comment.