diff --git a/core/cluster.py b/core/cluster.py index 61f7f52..207d820 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -3,11 +3,12 @@ import datetime import hmac import io +import tempfile import json from pathlib import Path import sys import time -from typing import Any, Optional +from typing import Any, Optional, BinaryIO import aiohttp import anyio import anyio.abc @@ -227,11 +228,8 @@ async def _download_file( for _ in range(10): size = 0 hash = utils.get_hash_obj(file.hash) - tmp_file = io.BytesIO() try: - async with session.get( - file.path - ) as resp: + async with session.get(file.path) as resp, tempfile.TemporaryFile() as tmp_file: while (data := await resp.content.read(1024 * 1024 * 16)): tmp_file.write(data) hash.update(data) @@ -241,17 +239,18 @@ async def _download_file( pbar.update(inc) if hash.hexdigest() != file.hash or size != file.size: await anyio.sleep(50) - raise Exception(f"hash mismatch, got {hash.hexdigest()} expected {file.hash}") - await self.upload_storage(file, tmp_file, size) - self.update_success() + raise Exception( + f"hash mismatch, got {hash.hexdigest()} expected {file.hash}" + ) + tmp_file.seek(0) + await self.upload_storage(file, tmp_file, size) + self.update_success() except Exception as e: last_error = e self._pbar.update(-size) pbar.update(-size) self.update_failed() continue - finally: - tmp_file.close() return None if last_error is not None: raise last_error @@ -259,7 +258,7 @@ async def _download_file( async def upload_storage( self, file: BMCLAPIFile, - data: io.BytesIO, + data: BinaryIO, size: int ): missing_storage = [ @@ -598,14 +597,21 @@ async def get_files(self) -> list[BMCLAPIFile]: if resp.status == 204: # no new files #logger.tdebug("cluster.get_files.no_new_files", id=self.id) return results - reader = utils.AvroParser(zstd.decompress(await resp.read())) - for _ in range(reader.read_long()): - results.append(BMCLAPIFile( - reader.read_string(), - reader.read_string(), - reader.read_long(), - reader.read_long() / 1000.0, - )) + with tempfile.TemporaryFile() as tmp: + async for chunk in resp.content.iter_chunked(1024 * 1024 * 4): + tmp.write(chunk) + tmp.seek(0) + with zstd.open(tmp, 'rb') as zf: + reader = utils.AvroParser(zf) + for _ in range(reader.read_long()): + results.append( + BMCLAPIFile( + reader.read_string(), + reader.read_string(), + reader.read_long(), + reader.read_long() / 1000.0, + ) + ) self._last_modified = max(results, key=lambda x: x.mtime).mtime logger.tdebug("cluster.get_files", id=self.id, name=self.display_name, count=len(results), size=units.format_bytes(sum([f.size for f in results])), last_modified=units.format_datetime_from_timestamp(self._last_modified)) except: diff --git a/core/dashboard.py b/core/dashboard.py index 993da8a..0c81ee5 100644 --- a/core/dashboard.py +++ b/core/dashboard.py @@ -68,12 +68,12 @@ def stop( ): self._running = 0 - def update( + async def update( self, ): while self._running: try: - self.cpus.append(self.process.cpu_percent(interval=1)) + self.cpus.append(self.process.cpu_percent(interval=None)) memory = self.process.memory_full_info() @@ -83,6 +83,7 @@ def update( )) except: break + await anyio.sleep(1) def get_info(self) -> dict[str, Any]: return { diff --git a/core/database/memory.py b/core/database/memory.py index 2c4e83a..ed17612 100644 --- a/core/database/memory.py +++ b/core/database/memory.py @@ -1,4 +1,4 @@ -from collections import defaultdict +from collections import defaultdict, deque from datetime import datetime from typing import Any from .abc import DataBase, ClusterCounterInfo @@ -11,7 +11,7 @@ def __init__( ): super().__init__(database_name) - self._clusters_logs = [] + self._clusters_logs = deque(maxlen=10000) self._clusters_counters: defaultdict[int, defaultdict[str, defaultdict[str, int]]] = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) async def insert_cluster_info(self, cluster_id: str, type: str, event: str, data: Any | None = None): diff --git a/core/storage/abc.py b/core/storage/abc.py index ac15844..1b00512 100644 --- a/core/storage/abc.py +++ b/core/storage/abc.py @@ -2,6 +2,8 @@ import anyio.abc import io +import tempfile +from typing import BinaryIO from core import utils from core.abc import BMCLAPIFile, ResponseFile, ResponseFileNotFound, ResponseFileMemory, ResponseFileLocal, ResponseFileRemote @@ -151,15 +153,15 @@ async def works(root_ids: list[int]): async def upload( self, path: str, - data: io.BytesIO, + data: BinaryIO, size: int ): raise NotImplementedError - async def upload_download_file(self, path: str, data: io.BytesIO, size: int): + async def upload_download_file(self, path: str, data: BinaryIO, size: int): if self.download_dir: path = f"download/{path}" - await self.upload(f"download/{path}", data, size) + await self.upload(path, data, size) async def get_response_file( self, @@ -196,12 +198,19 @@ def path(self) -> 'CPath': async def write_measure(self, size: int): path = f"measure/{size}" size = size * 1024 * 1024 - - await self.upload( - path, - io.BytesIO(b"\x00" * size), - size - ) + chunk = b"\x00" * (1024 * 1024) + with tempfile.TemporaryFile() as tmp: + remain = size + while remain > 0: + w = min(remain, len(chunk)) + tmp.write(chunk[:w]) + remain -= w + tmp.seek(0) + await self.upload( + path, + tmp, + size + ) logger.tsuccess("storage.write_measure", size=int(size / (1024 * 1024)), name=self.name, type=self.type) diff --git a/core/storage/alist.py b/core/storage/alist.py index d3ae2ab..575d850 100644 --- a/core/storage/alist.py +++ b/core/storage/alist.py @@ -1,6 +1,6 @@ import io import time -from typing import Any +from typing import Any, BinaryIO import urllib.parse as urlparse import aiohttp import anyio.abc @@ -174,7 +174,7 @@ async def list_files(self, path: str) -> list[abc.FileInfo]: )) return res - async def upload(self, path: str, data: io.BytesIO, size: int): + async def upload(self, path: str, data: BinaryIO, size: int): async with aiohttp.ClientSession( base_url=self._endpoint, headers={ @@ -187,7 +187,7 @@ async def upload(self, path: str, data: io.BytesIO, size: int): headers={ "File-Path": urlparse.quote(str(self._path / path)), }, - data=data.getbuffer() + data=data ) as resp: alist_resp = AlistResponse(await resp.json()) alist_resp.raise_for_status() diff --git a/core/storage/local.py b/core/storage/local.py index 73dabda..ca7a27f 100644 --- a/core/storage/local.py +++ b/core/storage/local.py @@ -1,6 +1,8 @@ from pathlib import Path import io import time +import shutil +from typing import BinaryIO import anyio.abc from core.abc import ResponseFile, ResponseFileLocal, ResponseFileNotFound @@ -53,13 +55,14 @@ async def list_files( async def upload( self, path: str, - data: io.BytesIO, + data: BinaryIO, size: int ): root = Path(str(self.path)) / path root.parent.mkdir(parents=True, exist_ok=True) with open(root, "wb") as f: - f.write(data.getbuffer()) + data.seek(0) + shutil.copyfileobj(data, f) return True async def _check( diff --git a/core/storage/minio.py b/core/storage/minio.py index a710bc2..70bed0b 100644 --- a/core/storage/minio.py +++ b/core/storage/minio.py @@ -1,6 +1,7 @@ from datetime import timedelta import datetime import io +from typing import BinaryIO import time from typing import Optional import urllib.parse as urlparse @@ -92,15 +93,15 @@ async def list_files( async def upload( self, path: str, - data: io.BytesIO, + data: BinaryIO, size: int ): root = self.path / path - + await self.minio.put_object( self.bucket, str(root)[1:], - data.getbuffer(), + data, size ) return True diff --git a/core/storage/s3.py b/core/storage/s3.py index f1cd0e4..aa5a678 100644 --- a/core/storage/s3.py +++ b/core/storage/s3.py @@ -1,5 +1,6 @@ from io import BytesIO import io +from typing import BinaryIO import time import aioboto3.session import anyio.abc @@ -136,7 +137,7 @@ async def list_files( async def upload( self, path: str, - data: io.BytesIO, + data: BinaryIO, size: int ): async with self.session.resource( diff --git a/core/storage/webdav.py b/core/storage/webdav.py index a251ef1..e884781 100644 --- a/core/storage/webdav.py +++ b/core/storage/webdav.py @@ -1,5 +1,6 @@ import io import time +from typing import BinaryIO import aiohttp import aiowebdav.client import anyio @@ -128,10 +129,10 @@ async def _mkdir(self, parent: abc.CPath): for parent in parent.parents: await self.client.mkdir(str(parent)) - async def upload(self, path: str, data: io.BytesIO, size: int): + async def upload(self, path: str, data: BinaryIO, size: int): # check dir await self._mkdir((self._path / path).parent) - await self.client.upload_to(io.BytesIO(data.getbuffer()), str(self._path / path)) + await self.client.upload_to(data, str(self._path / path)) return True diff --git a/core/utils.py b/core/utils.py index 08e9c9f..272974b 100644 --- a/core/utils.py +++ b/core/utils.py @@ -30,12 +30,17 @@ V = TypeVar("V") T = TypeVar("T") +from typing import BinaryIO + class AvroParser: def __init__( self, - data: bytes + data: bytes | BinaryIO ): - self.data = io.BytesIO(data) + if isinstance(data, (bytes, bytearray)): + self.data = io.BytesIO(data) + else: + self.data = data def read_long(self): result, shift = 0, 0