Skip to content

Commit

Permalink
#1538 SQLAlchemy Query Updates (#1573)
Browse files Browse the repository at this point in the history
Co-authored-by: EvanParish <[email protected]>
Co-authored-by: Kyle MacMillan <[email protected]>
Co-authored-by: Nikolai Efimov <[email protected]>
Co-authored-by: Kyle MacMillan <[email protected]>
  • Loading branch information
5 people authored Jan 5, 2024
1 parent 5aaf279 commit 88f3a23
Show file tree
Hide file tree
Showing 59 changed files with 1,794 additions and 1,165 deletions.
2 changes: 1 addition & 1 deletion .talismanrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
fileignoreconfig:
- filename: requirements.txt
checksum: d87d89892a2c545da7e94ecf4f6dbf891139c3cfcb254bb07a43dee582755f6a
checksum: d87d89892a2c545da7e94ecf4f6dbf891139c3cfcb254bb07a43dee582755f6a
22 changes: 14 additions & 8 deletions app/celery/nightly_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import pytz
from flask import current_app
from notifications_utils.statsd_decorators import statsd
from sqlalchemy import func
from sqlalchemy import func, select
from sqlalchemy.exc import SQLAlchemyError

from app import notify_celery, performance_platform_client, zendesk_client
from app import db, notify_celery, performance_platform_client, zendesk_client
from app.aws import s3
from app.celery.service_callback_tasks import (
send_delivery_status_to_service,
Expand Down Expand Up @@ -270,12 +270,18 @@ def raise_alert_if_letter_notifications_still_sending():
offset_days = 4
else:
offset_days = 2
still_sending = Notification.query.filter(
Notification.notification_type == LETTER_TYPE,
Notification.status == NOTIFICATION_SENDING,
Notification.key_type == KEY_TYPE_NORMAL,
func.date(Notification.sent_at) <= today - timedelta(days=offset_days)
).count()

stmt = (
select([func.count()])
.select_from(Notification)
.where(
Notification.notification_type == LETTER_TYPE,
Notification.status == NOTIFICATION_SENDING,
Notification.key_type == KEY_TYPE_NORMAL,
func.date(Notification.sent_at) <= today - timedelta(days=offset_days)
)
)
still_sending = db.session.scalar(stmt)

if still_sending:
message = "There are {} letters in the 'sending' state from {}".format(
Expand Down
10 changes: 3 additions & 7 deletions app/celery/process_delivery_status_result_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from app.dao.notifications_dao import (
dao_get_notification_by_reference,
dao_update_notification_by_id,
update_notification_status_by_id,
update_notification_delivery_status,
FINAL_STATUS_STATES
)

Expand Down Expand Up @@ -190,13 +190,9 @@ def _calculate_pricing(price_in_millicents_usd: float, notification: Notificatio
updated_at=datetime.utcnow()
)
else:
# notification_id - is the UID in the database for the notification
# status - is the notification platform status generated earlier
# current_status - is the notification.status
update_notification_status_by_id(
update_notification_delivery_status(
notification_id=notification.id,
status=notification_status,
current_status=notification.status
new_status=notification_status
)


Expand Down
17 changes: 11 additions & 6 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import boto3
from flask import current_app
from notifications_utils.statsd_decorators import statsd
from sqlalchemy import and_
from sqlalchemy import and_, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.exc import NoResultFound

from app import notify_celery, zendesk_client
from app import db, notify_celery, zendesk_client
from app.celery.tasks import process_job
from app.config import QueueNames, TaskNames
from app.dao.invited_org_user_dao import delete_org_invitations_created_more_than_two_days_ago
Expand Down Expand Up @@ -116,10 +116,15 @@ def check_job_status():
thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30)
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)

jobs_not_complete_after_30_minutes = Job.query.filter(
Job.job_status == JOB_STATUS_IN_PROGRESS,
and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago)
).order_by(Job.processing_started).all()
stmt = (
select(Job)
.where(
Job.job_status == JOB_STATUS_IN_PROGRESS,
and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago)
)
.order_by(Job.processing_started)
)
jobs_not_complete_after_30_minutes = db.session.scalars(stmt).all()

# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time.
Expand Down
40 changes: 25 additions & 15 deletions app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from click_datetime import Datetime as click_dt
from flask import current_app, json
from notifications_utils.template import SMSMessageTemplate
from sqlalchemy import select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from notifications_utils.statsd_decorators import statsd
Expand Down Expand Up @@ -97,7 +98,10 @@ def purge_functional_test_data(user_email_prefix):
users, services, etc. Give an email prefix. Probably "notify-test-preview".
"""
users = User.query.filter(User.email_address.like("{}%".format(user_email_prefix))).all()

stmt = select(User).where(User.email_address.like(f"{user_email_prefix}%"))
users = db.session.scalars(stmt).all()

for usr in users:
# Make sure the full email includes a uuid in it
# Just in case someone decides to use a similar email address.
Expand Down Expand Up @@ -249,11 +253,13 @@ def replay_service_callbacks(file_name, service_id, notification_status):
file = open(file_name)

for ref in file:
stmt = select(Notification).where(Notification.client_reference == ref.strip())

try:
notification = Notification.query.filter_by(client_reference=ref.strip()).one()
notification = db.session.scalars(stmt).one()
notifications.append(notification)
except NoResultFound:
errors.append("Reference: {} was not found in notifications.".format(ref))
errors.append(f"Reference: {ref} was not found in notifications.")

for e in errors:
print(e)
Expand Down Expand Up @@ -647,7 +653,8 @@ def boolean_or_none(field):
email_branding = None
email_branding_column = columns[5].strip()
if len(email_branding_column) > 0:
email_branding = EmailBranding.query.filter(EmailBranding.name == email_branding_column).one()
stmt = select(EmailBranding).where(EmailBranding.name == email_branding_column)
email_branding = db.session.scalars(stmt).one()
data = {
'name': columns[0],
'active': True,
Expand Down Expand Up @@ -713,12 +720,14 @@ def get_letter_details_from_zips_sent_file(file_paths):

@notify_command(name='associate-services-to-organisations')
def associate_services_to_organisations():
services = Service.get_history_model().query.filter_by(
version=1
).all()
service_history_model = Service.get_history_model()
stmt = select(service_history_model).where(service_history_model.version == 1)
services = db.session.scalars(stmt).all()

for s in services:
created_by_user = User.query.filter_by(id=s.created_by_id).first()
stmt = select(User).where(User.id == s.created_by_id)
created_by_user = db.session.scalars(stmt).first()

organisation = dao_get_organisation_by_email_address(created_by_user.email_address)
service = dao_fetch_service_by_id(service_id=s.id)
if organisation:
Expand Down Expand Up @@ -788,15 +797,15 @@ def populate_go_live(file_name):

@notify_command(name='fix-billable-units')
def fix_billable_units():
query = Notification.query.filter(
stmt = select(Notification).where(
Notification.notification_type == SMS_TYPE,
Notification.status != NOTIFICATION_CREATED,
Notification.sent_at == None, # noqa
Notification.sent_at.is_(None),
Notification.billable_units == 0,
Notification.key_type != KEY_TYPE_TEST,
)

for notification in query.all():
for notification in db.session.scalars(stmt).all():
template_model = dao_get_template_by_id(notification.template_id, notification.template_version)

template = SMSMessageTemplate(
Expand All @@ -807,10 +816,11 @@ def fix_billable_units():
)
print("Updating notification: {} with {} billable_units".format(notification.id, template.fragment_count))

Notification.query.filter(
Notification.id == notification.id
).update(
{"billable_units": template.fragment_count}
stmt = update(Notification).where(Notification.id == notification.id).values(
billable_units=template.fragment_count
)

db.session.execute(stmt)

db.session.commit()
print("End fix_billable_units")
12 changes: 7 additions & 5 deletions app/communication_item/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from app.schemas import communication_item_schema
from flask import Blueprint, current_app, jsonify, request
from jsonschema import validate, ValidationError
from sqlalchemy import delete, select
from sqlalchemy.exc import SQLAlchemyError

communication_item_blueprint = Blueprint("communication_item", __name__, url_prefix="/communication-item")
Expand Down Expand Up @@ -73,13 +74,13 @@ def create_communication_item():

@communication_item_blueprint.route('', methods=["GET"])
def get_all_communication_items():
communication_items = CommunicationItem.query.all()
communication_items = db.session.scalars(select(CommunicationItem)).all()
return jsonify(data=communication_item_schema.dump(communication_items, many=True).data)


@communication_item_blueprint.route("/<communication_item_id>", methods=["GET"])
def get_communication_item(communication_item_id):
communication_item = CommunicationItem.query.get(communication_item_id)
communication_item = db.session.get(CommunicationItem, communication_item_id)

if communication_item is None:
return {
Expand Down Expand Up @@ -114,7 +115,7 @@ def partially_update_communication_item(communication_item_id):
]
}, 400

communication_item = CommunicationItem.query.get(communication_item_id)
communication_item = db.session.get(CommunicationItem, communication_item_id)

if communication_item is None:
return {
Expand Down Expand Up @@ -164,10 +165,11 @@ def partially_update_communication_item(communication_item_id):

@communication_item_blueprint.route("/<communication_item_id>", methods=["DELETE"])
def delete_communication_item(communication_item_id):
rows_deleted = CommunicationItem.query.filter_by(id=communication_item_id).delete()
query = delete(CommunicationItem).where(CommunicationItem.id == communication_item_id)
rows_deleted = db.session.execute(query).rowcount
db.session.commit()

if rows_deleted:
if rows_deleted > 0:
return {}, 202

return {
Expand Down
4 changes: 3 additions & 1 deletion app/dao/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sqlalchemy import update
from sqlalchemy.exc import SQLAlchemyError
from app import db

Expand All @@ -20,7 +21,8 @@ def create_instance(self, inst, _commit=True):
def update_instance(self, inst, update_dict, _commit=True):
# Make sure the id is not included in the update_dict
update_dict.pop('id')
self.Meta.model.query.filter_by(id=inst.id).update(update_dict)
stmt = update(self.Meta.model).where(self.Meta.model.id == inst.id).values(**update_dict)
db.session.execute(stmt)
if _commit:
db.session.commit()

Expand Down
44 changes: 26 additions & 18 deletions app/dao/annual_billing_dao.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from app import db
from app.dao.dao_utils import (
transactional,
)
from app.models import AnnualBilling
from app.dao.dao_utils import transactional
from app.dao.date_util import get_current_financial_year_start_year
from app.models import AnnualBilling
from sqlalchemy import select, update


@transactional
Expand All @@ -20,34 +19,43 @@ def dao_create_or_update_annual_billing_for_year(service_id, free_sms_fragment_l


def dao_get_annual_billing(service_id):
return AnnualBilling.query.filter_by(
service_id=service_id,
).order_by(AnnualBilling.financial_year_start).all()
stmt = select(AnnualBilling).where(
AnnualBilling.service_id == service_id
).order_by(
AnnualBilling.financial_year_start
)

return db.session.scalars(stmt).all()


@transactional
def dao_update_annual_billing_for_future_years(service_id, free_sms_fragment_limit, financial_year_start):
AnnualBilling.query.filter(
stmt = update(AnnualBilling).where(
AnnualBilling.service_id == service_id,
AnnualBilling.financial_year_start > financial_year_start
).update(
{'free_sms_fragment_limit': free_sms_fragment_limit}
)
).values(free_sms_fragment_limit=free_sms_fragment_limit)

db.session.execute(stmt)


def dao_get_free_sms_fragment_limit_for_year(service_id, financial_year_start=None):

if not financial_year_start:
financial_year_start = get_current_financial_year_start_year()

return AnnualBilling.query.filter_by(
service_id=service_id,
financial_year_start=financial_year_start
).first()
stmt = select(AnnualBilling).where(
AnnualBilling.service_id == service_id,
AnnualBilling.financial_year_start == financial_year_start
)

return db.session.scalars(stmt).first()


def dao_get_all_free_sms_fragment_limit(service_id):
stmt = select(AnnualBilling).where(
AnnualBilling.service_id == service_id
).order_by(
AnnualBilling.financial_year_start
)

return AnnualBilling.query.filter_by(
service_id=service_id,
).order_by(AnnualBilling.financial_year_start).all()
return db.session.scalars(stmt).all()
Loading

0 comments on commit 88f3a23

Please sign in to comment.