Skip to content

Commit

Permalink
feat: specific error code FORGOTTEN_DUPLICATE
Browse files Browse the repository at this point in the history
  • Loading branch information
amalcaraz committed Feb 4, 2025
1 parent 333ab45 commit ff7cc94
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
logger = logging.getLogger("alembic")

# revision identifiers, used by Alembic.
revision = 'a3ef27f0db81'
down_revision = 'd8e9852e5775'
revision = "a3ef27f0db81"
down_revision = "d8e9852e5775"
branch_labels = None
depends_on = None


def refresh_vm_version(session: DbSession, vm_hash: str) -> None:
coalesced_ref = sa.func.coalesce(VmBaseDb.replaces, VmBaseDb.item_hash)
select_latest_revision_stmt = (
Expand Down Expand Up @@ -62,17 +63,22 @@ def refresh_vm_version(session: DbSession, vm_hash: str) -> None:
session.execute(sa.delete(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash))
session.execute(upsert_stmt)


def do_delete_vms(session: DbSession) -> None:
# DELETE VMS

vm_hashes = session.execute(
"""
vm_hashes = (
session.execute(
"""
SELECT m.item_hash
FROM messages m
INNER JOIN forgotten_messages fm on (m.item_hash = fm.item_hash)
WHERE m.type = 'INSTANCE' or m.type = 'PROGRAM'
"""
).scalars().all()
)
.scalars()
.all()
)

logger.debug("DELETE VMS: %r", vm_hashes)

Expand All @@ -85,8 +91,9 @@ def do_delete_vms(session: DbSession) -> None:
FROM messages m
INNER JOIN forgotten_messages fm on (m.item_hash = fm.item_hash)
WHERE m.type = 'INSTANCE' or m.type = 'PROGRAM')
""")

"""
)

session.execute(
"""
DELETE
Expand All @@ -96,11 +103,13 @@ def do_delete_vms(session: DbSession) -> None:
FROM messages m
INNER JOIN forgotten_messages fm on (m.item_hash = fm.item_hash)
WHERE m.type = 'INSTANCE' or m.type = 'PROGRAM')
""")

"""
)

for item_hash in vm_hashes:
refresh_vm_version(session, item_hash)


def do_delete_store(session: DbSession) -> None:
# DELETE STORE

Expand All @@ -114,7 +123,9 @@ def do_delete_store(session: DbSession) -> None:
INNER JOIN forgotten_messages fm ON m.item_hash = fm.item_hash
WHERE m.type = 'STORE'
)
""")
"""
)


def do_delete_messages(session: DbSession) -> None:
# DELETE MESSAGES
Expand All @@ -125,18 +136,29 @@ def do_delete_messages(session: DbSession) -> None:
FROM messages m
using forgotten_messages fm
WHERE m.item_hash = fm.item_hash
""")
"""
)


def do_delete(session: DbSession) -> None:
"""
NOTE: We need to migrate (delete duplicates) from aggregate_elements, aggregates, file_tags and posts tables.
The issue that was causing this inconsistent state has been fixed and we have considered that it doesn't worth to clean
this tables for now as there are less than 1k orphan rows
NOTE: We need to migrate (delete duplicates) from aggregate_elements, aggregates, file_tags and posts tables.
The issue that was causing this inconsistent state has been fixed and we have considered that it doesn't worth to clean
this tables for now as there are less than 1k orphan rows
"""

op.execute(
"""
INSERT INTO error_codes(code, description) VALUES
(504, 'Cannot process a forgotten message')
"""
)

do_delete_vms(session)
do_delete_store(session)
do_delete_messages(session)


async def upgrade_async() -> None:
session = DbSession(bind=op.get_bind())
do_delete(session)
Expand All @@ -146,10 +168,12 @@ async def upgrade_async() -> None:
def upgrade_thread():
asyncio.run(upgrade_async())


def upgrade() -> None:
thread = Thread(target=upgrade_thread, daemon=True)
thread.start()
thread.join()


def downgrade() -> None:
pass
op.execute("DELETE FROM error_codes WHERE code = 504")
2 changes: 1 addition & 1 deletion src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ async def process(
)
return RejectedMessage(
pending_message=pending_message,
error_code=ErrorCode.FORGET_TARGET_NOT_FOUND,
error_code=ErrorCode.FORGOTTEN_DUPLICATE,
)

message = await self.verify_and_fetch(
Expand Down
1 change: 1 addition & 0 deletions src/aleph/types/message_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ErrorCode(IntEnum):
FORGET_TARGET_NOT_FOUND = 501
FORGET_FORGET = 502
FORGET_NOT_ALLOWED = 503
FORGOTTEN_DUPLICATE = 504


class MessageProcessingException(Exception):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def test_duplicated_forgotten_message(
pending_message=m3,
)
assert isinstance(test3, RejectedMessage)
assert test3.error_code == ErrorCode.FORGET_TARGET_NOT_FOUND
assert test3.error_code == ErrorCode.FORGOTTEN_DUPLICATE

res3 = cast(
MessageDb,
Expand Down

0 comments on commit ff7cc94

Please sign in to comment.