From 455394e1617949f7dae1aea0166799dc6e3583f4 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 4 May 2022 17:10:49 +0200 Subject: [PATCH 1/2] [Storage] Garbage collector Added a new process in charge of deleting files from local storage. Files can now be marked for deletion by being listed in the scheduled_deletions collection. The garbage collector process will periodically look up this collection and delete all the files for which the `delete_by` datetime field is in the past. Files are now automatically marked for deletion when the user posts them using the /storage/add_json and /storage/add_file endpoints. The deletion is cancelled if a user creates a message using this content before a given period of time (one hour by default). Added a migration script that goes through all the files currently stored on a CCN and schedules all the files not related to an Aleph message for deletion. --- deployment/migrations/config_updater.py | 4 +- .../scripts/0003-clean-local-storage.py | 99 +++++++++++++++++++ docs/architecture.rst | 6 -- docs/architecture/garbage_collector.rst | 33 +++++++ docs/architecture/index.rst | 11 +++ docs/index.rst | 2 +- src/aleph/config.py | 10 +- src/aleph/handlers/storage.py | 14 ++- src/aleph/jobs/__init__.py | 19 +++- src/aleph/jobs/garbage_collector.py | 64 ++++++++++++ src/aleph/model/__init__.py | 5 +- src/aleph/model/scheduled_deletions.py | 46 +++++++++ src/aleph/storage.py | 38 +++++-- src/aleph/web/controllers/storage.py | 21 +++- tests/garbage_collector/test_delete_file.py | 57 +++++++++++ .../test_scheduled_deletions.py | 76 ++++++++++++++ 16 files changed, 474 insertions(+), 31 deletions(-) create mode 100644 deployment/migrations/scripts/0003-clean-local-storage.py delete mode 100644 docs/architecture.rst create mode 100644 docs/architecture/garbage_collector.rst create mode 100644 docs/architecture/index.rst create mode 100644 src/aleph/jobs/garbage_collector.py create mode 100644 src/aleph/model/scheduled_deletions.py create mode 100644 tests/garbage_collector/test_delete_file.py create mode 100644 tests/garbage_collector/test_scheduled_deletions.py diff --git a/deployment/migrations/config_updater.py b/deployment/migrations/config_updater.py index 0608ac85b..2554db12c 100755 --- a/deployment/migrations/config_updater.py +++ b/deployment/migrations/config_updater.py @@ -159,6 +159,6 @@ async def main(args: argparse.Namespace): if __name__ == "__main__": try: asyncio.run(main(cli_parse())) - except Exception as e: - LOGGER.error("%s", str(e)) + except Exception: + LOGGER.exception("Failed to run migration scripts") sys.exit(1) diff --git a/deployment/migrations/scripts/0003-clean-local-storage.py b/deployment/migrations/scripts/0003-clean-local-storage.py new file mode 100644 index 000000000..44b7b35bc --- /dev/null +++ b/deployment/migrations/scripts/0003-clean-local-storage.py @@ -0,0 +1,99 @@ +""" +This migration checks all the files stored in local storage (=GridFS) and compares them to the list +of messages already on the node. The files that are not linked to any message are scheduled for +deletion. +""" + +import asyncio +import datetime as dt +import logging +from dataclasses import asdict +from typing import Optional, FrozenSet, Any, List + +from aleph_message.models import MessageType +from configmanager import Config + +import aleph.model +from aleph.config import get_defaults +from aleph.model import init_db_globals +from aleph.model.messages import Message +from aleph.model.scheduled_deletions import ScheduledDeletion, ScheduledDeletionInfo + +logger = logging.getLogger() + + +async def async_upgrade(config_file: Optional[str], **kwargs): + config = Config(schema=get_defaults()) + if config_file is not None: + config.yaml.load(config_file) + + init_db_globals(config=config) + collections = await aleph.model.db.list_collection_names() + if ScheduledDeletion.COLLECTION in collections: + logging.info( + "%s collection is already present. Skipping migration.", + ScheduledDeletion.COLLECTION, + ) + return + + # Get a set of all the files currently in GridFS + gridfs_files = frozenset( + [ + file["filename"] + async for file in aleph.model.db["fs.files"].find( + projection={"filename": 1}, batch_size=1000 + ) + ] + ) + + print(len(gridfs_files)) + + # Get all the messages that potentially store data in local storage: + # * AGGREGATEs with item_type=="storage" + # * POSTs with item_type=="storage" + # * STOREs with content.item_type=="storage" + async def get_hashes( + msg_type: MessageType, item_type_field: str, item_hash_field: str + ) -> FrozenSet[str]: + def rgetitem(dictionary: Any, fields: List[str]) -> Any: + value = dictionary[fields[0]] + if len(fields) > 1: + return rgetitem(value, fields[1:]) + return value + + return frozenset( + [ + rgetitem(msg, item_hash_field.split(".")) + async for msg in Message.collection.find( + {"type": msg_type, item_type_field: "storage"}, + {item_hash_field: 1}, + batch_size=1000, + ) + ] + ) + + aggregates = await get_hashes(MessageType.aggregate, "item_type", "item_hash") + posts = await get_hashes(MessageType.post, "item_type", "item_hash") + stores = await get_hashes( + MessageType.store, "content.item_type", "content.item_hash" + ) + + files_to_preserve = aggregates | posts | stores + files_to_delete = gridfs_files - files_to_preserve + delete_by = dt.datetime.utcnow() + + await ScheduledDeletion.collection.insert_many( + [ + asdict(ScheduledDeletionInfo(filename=file_to_delete, delete_by=delete_by)) + for file_to_delete in files_to_delete + ] + ) + + +def upgrade(config_file: str, **kwargs): + asyncio.run(async_upgrade(config_file=config_file, **kwargs)) + + +def downgrade(**kwargs): + # Nothing to do, processing the chain data multiple times only adds some load on the node. + pass diff --git a/docs/architecture.rst b/docs/architecture.rst deleted file mode 100644 index 4c471ce52..000000000 --- a/docs/architecture.rst +++ /dev/null @@ -1,6 +0,0 @@ -============ -Architecture -============ - -.. image:: figures/architecture-stack.* - :width: 100% diff --git a/docs/architecture/garbage_collector.rst b/docs/architecture/garbage_collector.rst new file mode 100644 index 000000000..d4fa4f4fb --- /dev/null +++ b/docs/architecture/garbage_collector.rst @@ -0,0 +1,33 @@ +***************** +Garbage collector +***************** + +Core Channel Nodes dispose of unneeded files through a process called garbage collection. +Two kinds of garbage collection are in place, one for the local storage system and +one for the IPFS service. + +Local storage +============= + +CCNs have a dedicated process to dispose of files, the garbage collector. +This process monitors the files on the local storage and deletes them once +they are scheduled for deletion. + +Files can be scheduled for deletion for a number of reasons: +- They were temporary files that ended up being unused by the user that pushed them +- The user decided to delete them +- The payment plan of the user no longer covered for them. + +In any of these situations, a date and time of deletion is assigned to the file. +The garbage collector runs periodically and simply deletes the files for which +this date and time is passed. + +By default, the garbage collector runs once every hour. Temporary files uploaded +using the /storage/add_[json|file] endpoints are given a lifetime of one hour +before deletion. + +IPFS +==== + +The IPFS daemon has its own garbage collector process. You can read more about it +in their `official documentation `_. diff --git a/docs/architecture/index.rst b/docs/architecture/index.rst new file mode 100644 index 000000000..bc637a715 --- /dev/null +++ b/docs/architecture/index.rst @@ -0,0 +1,11 @@ +============ +Architecture +============ + +.. image:: ../figures/architecture-stack.* + :width: 100% + +.. toctree:: + :maxdepth: 2 + + garbage_collector diff --git a/docs/index.rst b/docs/index.rst index cfd93f5de..d9731e506 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -16,7 +16,7 @@ Contents .. toctree:: :maxdepth: 2 - architecture + architecture/index guides/index node-synchronisation protocol/index diff --git a/src/aleph/config.py b/src/aleph/config.py index 1f84d8d49..c944c004f 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -30,7 +30,13 @@ def get_defaults(): "/ip4/62.210.93.220/tcp/4025/p2p/QmXdci5feFmA2pxTg8p3FCyWmSKnWYAAmr7Uys1YCTFD8U", ], }, - "storage": {"folder": "./data/", "store_files": False, "engine": "mongodb"}, + "storage": { + "folder": "./data/", + "store_files": False, + "engine": "mongodb", + "delete_interval": 3600, + "garbage_collector": {"period": 3600}, + }, "nuls": { "chain_id": 8964, "enabled": False, @@ -80,7 +86,7 @@ def get_defaults(): "peers": [ "/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx", "/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2", - "/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF" + "/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF", ], }, "sentry": { diff --git a/src/aleph/handlers/storage.py b/src/aleph/handlers/storage.py index b6fe55d2f..6353165e7 100644 --- a/src/aleph/handlers/storage.py +++ b/src/aleph/handlers/storage.py @@ -14,13 +14,15 @@ import aioipfs from aioipfs import InvalidCIDError +from aleph_message.models import StoreMessage +from pydantic import ValidationError + from aleph.config import get_config from aleph.exceptions import AlephStorageException, UnknownHashError +from aleph.model.scheduled_deletions import ScheduledDeletion from aleph.services.ipfs.common import get_ipfs_api -from aleph.storage import get_hash_content +from aleph.storage import get_hash_content, ContentSource from aleph.types import ItemType -from aleph_message.models import StoreMessage -from pydantic import ValidationError LOGGER = logging.getLogger("HANDLERS.STORAGE") @@ -111,6 +113,12 @@ async def handle_new_storage(message: Dict, content: Dict): except AlephStorageException: return None + # If the file was found locally, it might be marked for deletion. + # Ensure that we keep the content in the DB by removing the scheduled + # deletion entry. + if file_content.source == ContentSource.DB: + ScheduledDeletion.collection.delete_one({"filename": item_hash}) + size = len(file_content) content["size"] = size diff --git a/src/aleph/jobs/__init__.py b/src/aleph/jobs/__init__.py index 5a455ff5e..5dfa50e35 100644 --- a/src/aleph/jobs/__init__.py +++ b/src/aleph/jobs/__init__.py @@ -2,7 +2,11 @@ from multiprocessing import Process from typing import Dict, List, Coroutine -from aleph.jobs.process_pending_messages import pending_messages_subprocess, retry_messages_task +from aleph.jobs.garbage_collector import garbage_collector_subprocess +from aleph.jobs.process_pending_messages import ( + pending_messages_subprocess, + retry_messages_task, +) from aleph.jobs.process_pending_txs import pending_txs_subprocess, handle_txs_task from aleph.jobs.reconnect_ipfs import reconnect_ipfs_job @@ -20,7 +24,7 @@ def start_jobs( if use_processes: config_values = config.dump_values() - p1 = Process( + pending_messages_job = Process( target=pending_messages_subprocess, args=( config_values, @@ -28,12 +32,17 @@ def start_jobs( api_servers, ), ) - p2 = Process( + pending_txs_job = Process( target=pending_txs_subprocess, args=(config_values, api_servers), ) - p1.start() - p2.start() + + garbage_collector_job = Process( + target=garbage_collector_subprocess, args=(config_values,) + ) + pending_messages_job.start() + pending_txs_job.start() + garbage_collector_job.start() else: tasks.append(retry_messages_task(shared_stats=shared_stats)) tasks.append(handle_txs_task()) diff --git a/src/aleph/jobs/garbage_collector.py b/src/aleph/jobs/garbage_collector.py new file mode 100644 index 000000000..e734319eb --- /dev/null +++ b/src/aleph/jobs/garbage_collector.py @@ -0,0 +1,64 @@ +""" +Job in charge of deleting files from IPFS and local storage when they are scheduled for deletion. +""" + +import asyncio +import datetime as dt +import logging +from typing import Dict + +import sentry_sdk +from setproctitle import setproctitle + +from aleph.logging import setup_logging +from aleph.model.hashes import delete_value as delete_gridfs_file +from aleph.model.scheduled_deletions import ScheduledDeletion, ScheduledDeletionInfo +from .job_utils import prepare_loop + +LOGGER = logging.getLogger("jobs.garbage_collector") + + +async def delete_file(file_to_delete: ScheduledDeletionInfo) -> None: + await delete_gridfs_file(key=file_to_delete.filename) + LOGGER.info("Deleted '%s' from local storage", file_to_delete.filename) + + +async def garbage_collector_task(job_period: int): + while True: + try: + async for file_to_delete in ScheduledDeletion.files_to_delete( + delete_by=dt.datetime.utcnow() + ): + try: + await delete_file(file_to_delete) + finally: + ScheduledDeletion.collection.delete_one( + {"_id": file_to_delete.object_id} + ) + + except Exception: + LOGGER.exception("Error in garbage collector job") + # Sleep to avoid overloading the logs in case of a repeating error + await asyncio.sleep(5) + + await asyncio.sleep(job_period) + + +def garbage_collector_subprocess(config_values: Dict): + setproctitle("aleph.jobs.garbage_collector") + loop, config = prepare_loop(config_values) + + sentry_sdk.init( + dsn=config.sentry.dsn.value, + traces_sample_rate=config.sentry.traces_sample_rate.value, + ignore_errors=[KeyboardInterrupt], + ) + setup_logging( + loglevel=config.logging.level.value, + filename="/tmp/aleph_ccn_garbage_collector.log", + max_log_file_size=config.logging.max_log_file_size.value, + ) + + loop.run_until_complete( + garbage_collector_task(job_period=config.storage.garbage_collector.period.value) + ) diff --git a/src/aleph/model/__init__.py b/src/aleph/model/__init__.py index ac3e4cc9f..f363e841d 100644 --- a/src/aleph/model/__init__.py +++ b/src/aleph/model/__init__.py @@ -1,9 +1,9 @@ -import asyncio from logging import getLogger from configmanager import Config from aleph.model.filepin import PermanentPin +from aleph.model.scheduled_deletions import ScheduledDeletion try: from pymongo import MongoClient @@ -56,8 +56,7 @@ def init_db(config: Config, ensure_indexes: bool = True): Peer.ensure_indexes(sync_db) PermanentPin.ensure_indexes(sync_db) - # from aleph.model.hashes import Hash - # Hash.ensure_indexes(sync_db) + ScheduledDeletion.ensure_indexes(sync_db) from aleph.model.messages import Message diff --git a/src/aleph/model/scheduled_deletions.py b/src/aleph/model/scheduled_deletions.py new file mode 100644 index 000000000..f5dbedc3e --- /dev/null +++ b/src/aleph/model/scheduled_deletions.py @@ -0,0 +1,46 @@ +import datetime as dt +from dataclasses import asdict, dataclass +from typing import AsyncIterator, Dict, Optional + +from bson.objectid import ObjectId +from pymongo import ASCENDING, IndexModel + +from aleph.model.base import BaseClass + + +@dataclass +class ScheduledDeletionInfo: + filename: str + delete_by: dt.datetime + object_id: Optional[ObjectId] = None + + @classmethod + def from_db(cls, db_value: Dict) -> "ScheduledDeletionInfo": + return cls( + filename=db_value["filename"], + delete_by=db_value["delete_by"], + object_id=db_value["_id"], + ) + + +class ScheduledDeletion(BaseClass): + COLLECTION = "scheduled_deletions" + + IndexModel([("delete_by", ASCENDING)]) + IndexModel([("filename", ASCENDING)]) + + @classmethod + async def insert(cls, scheduled_deletion: ScheduledDeletionInfo): + await cls.collection.insert_one(asdict(scheduled_deletion)) + + @classmethod + async def files_to_delete( + cls, delete_by: dt.datetime + ) -> AsyncIterator[ScheduledDeletionInfo]: + + query = cls.collection.find(filter={"delete_by": {"$lte": delete_by}}).sort( + [("delete_by", 1)] + ) + + async for result in query: + yield ScheduledDeletionInfo.from_db(result) diff --git a/src/aleph/storage.py b/src/aleph/storage.py index 4340c8ba2..368e5372d 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -2,6 +2,7 @@ Basically manages the IPFS storage. """ import asyncio +import datetime as dt import json import logging from dataclasses import dataclass @@ -9,8 +10,11 @@ from hashlib import sha256 from typing import Any, AnyStr, Dict, IO, Optional +from aleph.config import get_config from aleph.exceptions import InvalidContent, ContentCurrentlyUnavailable +from aleph.model.scheduled_deletions import ScheduledDeletion, ScheduledDeletionInfo from aleph.services.filestore import get_value, set_value +from aleph.services.ipfs.common import get_cid_version from aleph.services.ipfs.storage import add_bytes as add_ipfs_bytes from aleph.services.ipfs.storage import add_file as ipfs_add_file from aleph.services.ipfs.storage import get_ipfs_content @@ -19,8 +23,6 @@ from aleph.services.p2p.singleton import get_streamer from aleph.types import ItemType from aleph.utils import run_in_executor, get_sha256 -from aleph.services.ipfs.common import get_cid_version -from aleph.config import get_config LOGGER = logging.getLogger("STORAGE") @@ -242,16 +244,33 @@ async def pin_hash(chash: str, timeout: int = 2, tries: int = 1): return await ipfs_pin_add(chash, timeout=timeout, tries=tries) -async def add_json(value: Any, engine: ItemType = ItemType.IPFS) -> str: +async def schedule_for_deletion(filename: str, delete_interval: int) -> None: + """ + Schedules a file stored in local storage for deletion. + :param filename: Name of the file to delete. + :param delete_interval: Interval before the deletion of the file, in seconds. + """ + + delete_by = dt.datetime.utcnow() + dt.timedelta(seconds=delete_interval) + await ScheduledDeletion.insert( + ScheduledDeletionInfo(filename=filename, delete_by=delete_by) + ) + + +async def add_json( + value: Any, engine: ItemType = ItemType.IPFS, delete_interval: Optional[int] = None +) -> str: # TODO: determine which storage engine to use content = await run_in_executor(None, json.dumps, value) content = content.encode("utf-8") + if engine == ItemType.IPFS: chash = await add_ipfs_bytes(content) elif engine == ItemType.Storage: - if isinstance(content, str): - content = content.encode("utf-8") chash = sha256(content).hexdigest() + if delete_interval: + await schedule_for_deletion(chash, delete_interval) + else: raise NotImplementedError("storage engine %s not supported" % engine) @@ -259,7 +278,11 @@ async def add_json(value: Any, engine: ItemType = ItemType.IPFS) -> str: return chash -async def add_file(fileobject: IO, engine: ItemType = ItemType.IPFS) -> str: +async def add_file( + fileobject: IO, + engine: ItemType = ItemType.IPFS, + delete_interval: Optional[int] = None, +) -> str: if engine == ItemType.IPFS: output = await ipfs_add_file(fileobject) @@ -271,6 +294,9 @@ async def add_file(fileobject: IO, engine: ItemType = ItemType.IPFS) -> str: file_content = fileobject.read() file_hash = sha256(file_content).hexdigest() + if delete_interval: + await schedule_for_deletion(file_hash, delete_interval) + else: raise ValueError(f"Unsupported item type: {engine}") diff --git a/src/aleph/web/controllers/storage.py b/src/aleph/web/controllers/storage.py index 59536fb65..8cafe9a59 100644 --- a/src/aleph/web/controllers/storage.py +++ b/src/aleph/web/controllers/storage.py @@ -21,10 +21,17 @@ async def add_ipfs_json_controller(request): async def add_storage_json_controller(request): - """Forward the json content to IPFS server and return an hash""" + """Forward the json content to IPFS server and return a hash""" data = await request.json() - output = {"status": "success", "hash": await add_json(data, engine=ItemType.Storage)} + delete_interval = request.app["config"].storage.delete_interval.value + output = { + "status": "success", + "hash": await add_json( + data, engine=ItemType.Storage, delete_interval=delete_interval + ), + } + return web.json_response(output) @@ -32,7 +39,15 @@ async def storage_add_file(request): # No need to pin it here anymore. # TODO: find a way to specify linked ipfs hashes in posts/aggr. post = await request.post() - file_hash = await add_file(post["file"].file, engine=ItemType.Storage) + + # Schedule the file for deletion + delete_interval = request.app["config"].storage.delete_interval.value + + file_hash = await add_file( + post["file"].file, + engine=ItemType.Storage, + delete_interval=delete_interval, + ) output = {"status": "success", "hash": file_hash} return web.json_response(output) diff --git a/tests/garbage_collector/test_delete_file.py b/tests/garbage_collector/test_delete_file.py new file mode 100644 index 000000000..6cef53c6d --- /dev/null +++ b/tests/garbage_collector/test_delete_file.py @@ -0,0 +1,57 @@ +import datetime as dt + +import pytest +import pytz + +from aleph.jobs.garbage_collector import delete_file +from aleph.model.hashes import ( + get_value as get_gridfs_file, + set_value as store_gridfs_file, +) +from aleph.model.scheduled_deletions import ScheduledDeletionInfo + + +@pytest.mark.asyncio +async def test_delete_gridfs_file(mocker, test_db): + """ + Checks that the garbage collector can delete a file from local (GridFS) storage. + """ + + file_content = b"Some data stored using GridFS+MongoDB" + filename = "file_to_delete" + + await store_gridfs_file(filename, file_content) + + # Check that the file was properly inserted + db_content = await get_gridfs_file(filename) + assert db_content == file_content + + deletion = ScheduledDeletionInfo( + filename=filename, + delete_by=pytz.utc.localize(dt.datetime(2022, 1, 1, 0, 0, 0)), + ) + + await delete_file(deletion) + db_content = await get_gridfs_file(filename) + assert db_content is None + + +@pytest.mark.asyncio +async def test_delete_nonexisting_file(mocker, test_db): + """ + Checks that the delete_file function does not raise an exception + if the file does not exist. + """ + + filename = "the_mystery_file" + + # Check that the file indeed does not exist + db_content = await get_gridfs_file(filename) + assert db_content is None + + deletion = ScheduledDeletionInfo( + filename=filename, + delete_by=pytz.utc.localize(dt.datetime(2022, 1, 1, 0, 0, 0)), + ) + + await delete_file(deletion) diff --git a/tests/garbage_collector/test_scheduled_deletions.py b/tests/garbage_collector/test_scheduled_deletions.py new file mode 100644 index 000000000..30541939c --- /dev/null +++ b/tests/garbage_collector/test_scheduled_deletions.py @@ -0,0 +1,76 @@ +import datetime as dt + +import pytest +import pytz + +from aleph.model.scheduled_deletions import ScheduledDeletion, ScheduledDeletionInfo + + +@pytest.mark.asyncio +async def test_insert_deletion(mocker, test_db): + deletion = ScheduledDeletionInfo( + filename="test-filename", + delete_by=pytz.utc.localize(dt.datetime(2022, 1, 1)), + ) + await ScheduledDeletion.insert(deletion) + + deletion_from_db = await ScheduledDeletion.collection.find_one( + filter={"filename": deletion.filename} + ) + assert deletion_from_db is not None + assert deletion_from_db["filename"] == deletion.filename + assert deletion_from_db["delete_by"] == deletion.delete_by + + +@pytest.mark.asyncio +async def test_list_files_to_delete(mocker, test_db): + deletions = ( + ScheduledDeletionInfo( + filename="test-file-1", + delete_by=pytz.utc.localize(dt.datetime(2022, 1, 1, 5, 0, 0)), + ), + ScheduledDeletionInfo( + filename="test-file-2", + delete_by=pytz.utc.localize(dt.datetime(2022, 1, 1, 0, 0, 0)), + ), + ScheduledDeletionInfo( + filename="test-file-3", + delete_by=pytz.utc.localize(dt.datetime(2023, 1, 1, 0, 0, 0)), + ), + ) + + for deletion in deletions: + await ScheduledDeletion.insert(deletion) + + # Just after the first message + deletions_from_db = [ + d + async for d in ScheduledDeletion.files_to_delete( + delete_by=deletions[1].delete_by + ) + ] + assert len(deletions_from_db) == 1 + + deletion = deletions_from_db[0] + assert deletion.filename == deletions[1].filename + + # Some date in the future + deletions_from_db = [ + d + async for d in ScheduledDeletion.files_to_delete( + delete_by=pytz.utc.localize(dt.datetime(2024, 1, 1)) + ) + ] + # Check that the items were sorted + assert deletions_from_db[0].filename == deletions[1].filename + assert deletions_from_db[1].filename == deletions[0].filename + assert deletions_from_db[2].filename == deletions[2].filename + + # Some date before all the scheduled deletions + deletions_from_db = [ + d + async for d in ScheduledDeletion.files_to_delete( + delete_by=pytz.utc.localize(dt.datetime(2000, 1, 1)) + ) + ] + assert len(deletions_from_db) == 0 From 54c37396e4bbed8ec908092b25b53078d9dd317a Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 10 May 2022 13:01:46 +0200 Subject: [PATCH 2/2] [Garbage collector] Additional tests New tests: * Tests based on the API, checking that files added with the /storage/add_* endpoints are scheduled for deletion. * A test to check that a STORE message will dequeue the deletion of the underlying file. --- src/aleph/model/scheduled_deletions.py | 5 +- tests/api/fixtures/fixture_messages.json | 4 +- tests/conftest.py | 32 +++++-- .../test_api_storage_and_gc.py | 92 +++++++++++++++++++ .../test_cancel_scheduled_deletion.py | 56 +++++++++++ tests/storage/conftest.py | 16 ---- 6 files changed, 178 insertions(+), 27 deletions(-) create mode 100644 tests/garbage_collector/test_api_storage_and_gc.py create mode 100644 tests/garbage_collector/test_cancel_scheduled_deletion.py delete mode 100644 tests/storage/conftest.py diff --git a/src/aleph/model/scheduled_deletions.py b/src/aleph/model/scheduled_deletions.py index f5dbedc3e..f2b7fcd83 100644 --- a/src/aleph/model/scheduled_deletions.py +++ b/src/aleph/model/scheduled_deletions.py @@ -22,6 +22,9 @@ def from_db(cls, db_value: Dict) -> "ScheduledDeletionInfo": object_id=db_value["_id"], ) + def to_dict(self): + return {"filename": self.filename, "delete_by": self.delete_by} + class ScheduledDeletion(BaseClass): COLLECTION = "scheduled_deletions" @@ -31,7 +34,7 @@ class ScheduledDeletion(BaseClass): @classmethod async def insert(cls, scheduled_deletion: ScheduledDeletionInfo): - await cls.collection.insert_one(asdict(scheduled_deletion)) + await cls.collection.insert_one(scheduled_deletion.to_dict()) @classmethod async def files_to_delete( diff --git a/tests/api/fixtures/fixture_messages.json b/tests/api/fixtures/fixture_messages.json index bcbc6cc04..d52776a6b 100644 --- a/tests/api/fixtures/fixture_messages.json +++ b/tests/api/fixtures/fixture_messages.json @@ -6,6 +6,7 @@ "type": "POST", "time": 1652126646.5008686, "item_content": "{\"address\":\"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106\",\"time\":1652126646.5007327,\"content\":{\"title\":\"My first blog post using Aleph.im\",\"body\":\"Using Aleph.im, we can make a decentralized blog.\"},\"type\":\"test\"}", + "item_type": "inline", "item_hash": "4c33dd1ebf61bbb4342d8258b591fcd52cca73fd7c425542f78311d8f45ba274", "signature": "0x999ab556b92351e6edf894b4a67f01f0344c7023883eb5bafdf4cd0b98ca91781692ac6b95246c1bf940eedcedfd6dc04751accfbc417ee1b1ae13893634e7eb1c", "confirmed": false, @@ -27,7 +28,8 @@ "time": 1652126721.497669, "item_content": "{\"address\":\"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106\",\"time\":1652126721.4974446,\"item_type\":\"storage\",\"item_hash\":\"5ccdd7bccfbc5955e2e40166dd0cdea0b093154fd87bc2bea57e7c768cde2f21\",\"mime_type\":\"text/plain\"}", "item_hash": "2953f0b52beb79fc0ed1bc455346fdcb530611605e16c636778a0d673d7184af", - "signature": "0xa10129dd561c1bc93e8655daf09520e9f1694989263e25f330b403ad33563f4b64c9ae18f6cbfb33e8a47a095be7a181b140a369e6205fd04eef55397624a7121b", + "item_type": "inline", + "signature": "0xa10129dd561c1bc93e8655daf09520e9f1694989263e25f330b403ad33563f4b64c9ae18f6cbfb33e8a47a095be7a181b140a369e6205fd04eef55397624a7121b", "content": { "address": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", "time": 1652126721.4974446, diff --git a/tests/conftest.py b/tests/conftest.py index 12d399a40..ab14d15e7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,30 +1,44 @@ -import pytest -from aleph.model import init_db -from aleph.config import get_defaults -from configmanager import Config import pymongo +import pytest import pytest_asyncio +from configmanager import Config +import aleph.config +from aleph.config import get_defaults +from aleph.model import init_db TEST_DB = "ccn_automated_tests" +@pytest.fixture +def mock_config(): + config = Config(aleph.config.get_defaults()) + # To test handle_new_storage + config.storage.store_files.value = True + + # We set the global variable directly instead of patching it because of an issue + # with mocker.patch. mocker.patch uses hasattr to determine the properties of + # the mock, which does not work well with configmanager Config objects. + aleph.config.app_config = config + return config + + def drop_db(db_name: str, config: Config): client = pymongo.MongoClient(config.mongodb.uri.value) client.drop_database(db_name) @pytest_asyncio.fixture -async def test_db(): +async def test_db(mock_config): """ Initializes and cleans a MongoDB database dedicated to automated tests. """ - config = Config(schema=get_defaults()) - config.mongodb.database.value = TEST_DB + mock_config.mongodb.database.value = TEST_DB - drop_db(TEST_DB, config) - init_db(config, ensure_indexes=True) + drop_db(TEST_DB, mock_config) + init_db(mock_config, ensure_indexes=True) from aleph.model import db + yield db diff --git a/tests/garbage_collector/test_api_storage_and_gc.py b/tests/garbage_collector/test_api_storage_and_gc.py new file mode 100644 index 000000000..1a9a6d87d --- /dev/null +++ b/tests/garbage_collector/test_api_storage_and_gc.py @@ -0,0 +1,92 @@ +""" +Tests for the storage API to check that temporary files are properly marked for deletion. +""" + + +import datetime as dt +import json + +import pytest +import pytz +from aiohttp import FormData +from configmanager import Config + +from aleph.model import ScheduledDeletion +from aleph.model.hashes import get_value as read_gridfs_file +from aleph.web import create_app + + +async def check_scheduled_deletion( + config: Config, file_hash: str, post_datetime: dt.datetime +): + scheduled_deletion = await ScheduledDeletion.collection.find_one( + {"filename": file_hash} + ) + + assert scheduled_deletion is not None + + # Check that the file is scheduled for deletion at least after + # the expected interval. + delete_interval = config.storage.delete_interval.value + delete_by = scheduled_deletion["delete_by"] + assert delete_by >= post_datetime + dt.timedelta(seconds=delete_interval) + + +@pytest.mark.asyncio +async def test_store_temporary_file(mock_config, test_db, aiohttp_client): + """ + Checks that the garbage collector schedules temporary files uploaded + with /storage/add_file for deletion. + """ + + app = create_app() + app["config"] = mock_config + client = await aiohttp_client(app) + + file_content = b"Some file I'd like to upload" + + data = FormData() + data.add_field("file", file_content) + + post_datetime = pytz.utc.localize(dt.datetime.utcnow()) + response = await client.post(f"/api/v0/storage/add_file", data=data) + assert response.status == 200, await response.text() + + data = await response.json() + assert data["status"] == "success" + file_hash = data["hash"] + + db_content = await read_gridfs_file(file_hash) + assert db_content == file_content + + await check_scheduled_deletion(mock_config, file_hash, post_datetime) + + +@pytest.mark.asyncio +async def test_store_temporary_json(mock_config, test_db, aiohttp_client): + """ + Checks that the garbage collector schedules temporary JSON files uploaded + with /storage/add_json for deletion. + """ + + app = create_app() + app["config"] = mock_config + client = await aiohttp_client(app) + + json_content = { + "title": "A garbage collector for CCNs", + "body": "Discover the new GC for Aleph CCNs. Deletes all the files, even useful ones!", + } + + post_datetime = pytz.utc.localize(dt.datetime.utcnow()) + response = await client.post(f"/api/v0/storage/add_json", json=json_content) + assert response.status == 200, await response.text() + + data = await response.json() + assert data["status"] == "success" + file_hash = data["hash"] + + db_content = await read_gridfs_file(file_hash) + assert json.loads(db_content) == json_content + + await check_scheduled_deletion(mock_config, file_hash, post_datetime) diff --git a/tests/garbage_collector/test_cancel_scheduled_deletion.py b/tests/garbage_collector/test_cancel_scheduled_deletion.py new file mode 100644 index 000000000..af13057cb --- /dev/null +++ b/tests/garbage_collector/test_cancel_scheduled_deletion.py @@ -0,0 +1,56 @@ +import datetime as dt +import json + +import pytest + +from aleph.chains.common import process_one_message +from aleph.model.hashes import ( + get_value as read_gridfs_file, + set_value as store_gridfs_file, +) +from aleph.model.scheduled_deletions import ScheduledDeletionInfo, ScheduledDeletion + + +@pytest.mark.asyncio +async def test_cancel_scheduled_deletion(test_db): + """ + Test that a file marked for deletion will be preserved once a message + stores that content. + """ + + store_message = { + "chain": "ETH", + "channel": "unit-tests", + "sender": "0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106", + "type": "STORE", + "time": 1652126721.497669, + "item_content": '{"address":"0x696879aE4F6d8DaDD5b8F1cbb1e663B89b08f106","time":1652126721.4974446,"item_type":"storage","item_hash":"5ccdd7bccfbc5955e2e40166dd0cdea0b093154fd87bc2bea57e7c768cde2f21","mime_type":"text/plain"}', + "item_type": "inline", + "item_hash": "2953f0b52beb79fc0ed1bc455346fdcb530611605e16c636778a0d673d7184af", + "signature": "0xa10129dd561c1bc93e8655daf09520e9f1694989263e25f330b403ad33563f4b64c9ae18f6cbfb33e8a47a095be7a181b140a369e6205fd04eef55397624a7121b", + } + + content = json.loads(store_message["item_content"]) + file_hash = content["item_hash"] + file_content = b"Hello, Aleph.im!\n" + + # Store the file + await store_gridfs_file(file_hash, file_content) + await ScheduledDeletion.insert( + ScheduledDeletionInfo( + filename=file_hash, + delete_by=dt.datetime.utcnow() + dt.timedelta(seconds=3600), + ) + ) + + await process_one_message(store_message) + + # Check that the file is no longer marked for deletion + scheduled_deletion = await ScheduledDeletion.collection.find_one( + {"filename": file_hash} + ) + assert scheduled_deletion is None + + # Check that the file is unmodified + db_file_content = await read_gridfs_file(file_hash) + assert db_file_content == file_content diff --git a/tests/storage/conftest.py b/tests/storage/conftest.py deleted file mode 100644 index 70a2bb450..000000000 --- a/tests/storage/conftest.py +++ /dev/null @@ -1,16 +0,0 @@ -import aleph.config -import pytest -from configmanager import Config - - -@pytest.fixture -def mock_config(mocker): - config = Config(aleph.config.get_defaults()) - # To test handle_new_storage - config.storage.store_files.value = True - - # We set the global variable directly instead of patching it because of an issue - # with mocker.patch. mocker.patch uses hasattr to determine the properties of - # the mock, which does not work well with configmanager Config objects. - aleph.config.app_config = config - return mock_config