Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Garbage collector #250

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deployment/migrations/config_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,6 @@ async def main(args: argparse.Namespace):
if __name__ == "__main__":
try:
asyncio.run(main(cli_parse()))
except Exception as e:
LOGGER.error("%s", str(e))
except Exception:
LOGGER.exception("Failed to run migration scripts")
sys.exit(1)
99 changes: 99 additions & 0 deletions deployment/migrations/scripts/0003-clean-local-storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
This migration checks all the files stored in local storage (=GridFS) and compares them to the list
of messages already on the node. The files that are not linked to any message are scheduled for
deletion.
"""

import asyncio
import datetime as dt
import logging
from dataclasses import asdict
from typing import Optional, FrozenSet, Any, List

from aleph_message.models import MessageType
from configmanager import Config

import aleph.model
from aleph.config import get_defaults
from aleph.model import init_db_globals
from aleph.model.messages import Message
from aleph.model.scheduled_deletions import ScheduledDeletion, ScheduledDeletionInfo

logger = logging.getLogger()


async def async_upgrade(config_file: Optional[str], **kwargs):
config = Config(schema=get_defaults())
if config_file is not None:
config.yaml.load(config_file)

init_db_globals(config=config)
collections = await aleph.model.db.list_collection_names()
if ScheduledDeletion.COLLECTION in collections:
logging.info(
"%s collection is already present. Skipping migration.",
ScheduledDeletion.COLLECTION,
)
return

# Get a set of all the files currently in GridFS
gridfs_files = frozenset(
[
file["filename"]
async for file in aleph.model.db["fs.files"].find(
projection={"filename": 1}, batch_size=1000
)
]
)

print(len(gridfs_files))

# Get all the messages that potentially store data in local storage:
# * AGGREGATEs with item_type=="storage"
# * POSTs with item_type=="storage"
# * STOREs with content.item_type=="storage"
async def get_hashes(
msg_type: MessageType, item_type_field: str, item_hash_field: str
) -> FrozenSet[str]:
def rgetitem(dictionary: Any, fields: List[str]) -> Any:
value = dictionary[fields[0]]
if len(fields) > 1:
return rgetitem(value, fields[1:])
return value

return frozenset(
[
rgetitem(msg, item_hash_field.split("."))
async for msg in Message.collection.find(
{"type": msg_type, item_type_field: "storage"},
{item_hash_field: 1},
batch_size=1000,
)
]
)

aggregates = await get_hashes(MessageType.aggregate, "item_type", "item_hash")
posts = await get_hashes(MessageType.post, "item_type", "item_hash")
stores = await get_hashes(
MessageType.store, "content.item_type", "content.item_hash"
)

files_to_preserve = aggregates | posts | stores
files_to_delete = gridfs_files - files_to_preserve
delete_by = dt.datetime.utcnow()

await ScheduledDeletion.collection.insert_many(
[
asdict(ScheduledDeletionInfo(filename=file_to_delete, delete_by=delete_by))
for file_to_delete in files_to_delete
]
)


def upgrade(config_file: str, **kwargs):
asyncio.run(async_upgrade(config_file=config_file, **kwargs))


def downgrade(**kwargs):
# Nothing to do, processing the chain data multiple times only adds some load on the node.
pass
6 changes: 0 additions & 6 deletions docs/architecture.rst

This file was deleted.

33 changes: 33 additions & 0 deletions docs/architecture/garbage_collector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
*****************
Garbage collector
*****************

Core Channel Nodes dispose of unneeded files through a process called garbage collection.
Two kinds of garbage collection are in place, one for the local storage system and
one for the IPFS service.

Local storage
=============

CCNs have a dedicated process to dispose of files, the garbage collector.
This process monitors the files on the local storage and deletes them once
they are scheduled for deletion.

Files can be scheduled for deletion for a number of reasons:
- They were temporary files that ended up being unused by the user that pushed them
- The user decided to delete them
- The payment plan of the user no longer covered for them.

In any of these situations, a date and time of deletion is assigned to the file.
The garbage collector runs periodically and simply deletes the files for which
this date and time is passed.

By default, the garbage collector runs once every hour. Temporary files uploaded
using the /storage/add_[json|file] endpoints are given a lifetime of one hour
before deletion.

IPFS
====

The IPFS daemon has its own garbage collector process. You can read more about it
in their `official documentation <https://docs.ipfs.io/concepts/persistence/#garbage-collection>`_.
11 changes: 11 additions & 0 deletions docs/architecture/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
============
Architecture
============

.. image:: ../figures/architecture-stack.*
:width: 100%

.. toctree::
:maxdepth: 2

garbage_collector
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Contents
.. toctree::
:maxdepth: 2

architecture
architecture/index
guides/index
node-synchronisation
protocol/index
Expand Down
10 changes: 8 additions & 2 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ def get_defaults():
"/ip4/62.210.93.220/tcp/4025/p2p/QmXdci5feFmA2pxTg8p3FCyWmSKnWYAAmr7Uys1YCTFD8U",
],
},
"storage": {"folder": "./data/", "store_files": False, "engine": "mongodb"},
"storage": {
"folder": "./data/",
"store_files": False,
"engine": "mongodb",
"delete_interval": 3600,
"garbage_collector": {"period": 3600},
},
"nuls": {
"chain_id": 8964,
"enabled": False,
Expand Down Expand Up @@ -80,7 +86,7 @@ def get_defaults():
"peers": [
"/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx",
"/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2",
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF"
"/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF",
],
},
"sentry": {
Expand Down
14 changes: 11 additions & 3 deletions src/aleph/handlers/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import aioipfs
from aioipfs import InvalidCIDError
from aleph_message.models import StoreMessage
from pydantic import ValidationError

from aleph.config import get_config
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.model.scheduled_deletions import ScheduledDeletion
from aleph.services.ipfs.common import get_ipfs_api
from aleph.storage import get_hash_content
from aleph.storage import get_hash_content, ContentSource
from aleph.types import ItemType
from aleph_message.models import StoreMessage
from pydantic import ValidationError

LOGGER = logging.getLogger("HANDLERS.STORAGE")

Expand Down Expand Up @@ -111,6 +113,12 @@ async def handle_new_storage(message: Dict, content: Dict):
except AlephStorageException:
return None

# If the file was found locally, it might be marked for deletion.
# Ensure that we keep the content in the DB by removing the scheduled
# deletion entry.
if file_content.source == ContentSource.DB:
ScheduledDeletion.collection.delete_one({"filename": item_hash})

size = len(file_content)

content["size"] = size
Expand Down
19 changes: 14 additions & 5 deletions src/aleph/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from multiprocessing import Process
from typing import Dict, List, Coroutine

from aleph.jobs.process_pending_messages import pending_messages_subprocess, retry_messages_task
from aleph.jobs.garbage_collector import garbage_collector_subprocess
from aleph.jobs.process_pending_messages import (
pending_messages_subprocess,
retry_messages_task,
)
from aleph.jobs.process_pending_txs import pending_txs_subprocess, handle_txs_task
from aleph.jobs.reconnect_ipfs import reconnect_ipfs_job

Expand All @@ -20,20 +24,25 @@ def start_jobs(

if use_processes:
config_values = config.dump_values()
p1 = Process(
pending_messages_job = Process(
target=pending_messages_subprocess,
args=(
config_values,
shared_stats,
api_servers,
),
)
p2 = Process(
pending_txs_job = Process(
target=pending_txs_subprocess,
args=(config_values, api_servers),
)
p1.start()
p2.start()

garbage_collector_job = Process(
target=garbage_collector_subprocess, args=(config_values,)
)
pending_messages_job.start()
pending_txs_job.start()
garbage_collector_job.start()
else:
tasks.append(retry_messages_task(shared_stats=shared_stats))
tasks.append(handle_txs_task())
Expand Down
64 changes: 64 additions & 0 deletions src/aleph/jobs/garbage_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Job in charge of deleting files from IPFS and local storage when they are scheduled for deletion.
"""

import asyncio
import datetime as dt
import logging
from typing import Dict

import sentry_sdk
from setproctitle import setproctitle

from aleph.logging import setup_logging
from aleph.model.hashes import delete_value as delete_gridfs_file
from aleph.model.scheduled_deletions import ScheduledDeletion, ScheduledDeletionInfo
from .job_utils import prepare_loop

LOGGER = logging.getLogger("jobs.garbage_collector")


async def delete_file(file_to_delete: ScheduledDeletionInfo) -> None:
await delete_gridfs_file(key=file_to_delete.filename)
LOGGER.info("Deleted '%s' from local storage", file_to_delete.filename)


async def garbage_collector_task(job_period: int):
while True:
try:
async for file_to_delete in ScheduledDeletion.files_to_delete(
delete_by=dt.datetime.utcnow()
):
try:
await delete_file(file_to_delete)
finally:
ScheduledDeletion.collection.delete_one(
{"_id": file_to_delete.object_id}
)

except Exception:
LOGGER.exception("Error in garbage collector job")
# Sleep to avoid overloading the logs in case of a repeating error
await asyncio.sleep(5)

await asyncio.sleep(job_period)


def garbage_collector_subprocess(config_values: Dict):
setproctitle("aleph.jobs.garbage_collector")
loop, config = prepare_loop(config_values)

sentry_sdk.init(
dsn=config.sentry.dsn.value,
traces_sample_rate=config.sentry.traces_sample_rate.value,
ignore_errors=[KeyboardInterrupt],
)
setup_logging(
loglevel=config.logging.level.value,
filename="/tmp/aleph_ccn_garbage_collector.log",
max_log_file_size=config.logging.max_log_file_size.value,
)

loop.run_until_complete(
garbage_collector_task(job_period=config.storage.garbage_collector.period.value)
)
5 changes: 2 additions & 3 deletions src/aleph/model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
from logging import getLogger

from configmanager import Config

from aleph.model.filepin import PermanentPin
from aleph.model.scheduled_deletions import ScheduledDeletion

try:
from pymongo import MongoClient
Expand Down Expand Up @@ -56,8 +56,7 @@ def init_db(config: Config, ensure_indexes: bool = True):
Peer.ensure_indexes(sync_db)

PermanentPin.ensure_indexes(sync_db)
# from aleph.model.hashes import Hash
# Hash.ensure_indexes(sync_db)
ScheduledDeletion.ensure_indexes(sync_db)

from aleph.model.messages import Message

Expand Down
Loading