Skip to content

Optimize resource usage #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import datetime
import hmac
import io
import tempfile
import json
from pathlib import Path
import sys
import time
from typing import Any, Optional
from typing import Any, Optional, BinaryIO
import aiohttp
import anyio
import anyio.abc
Expand Down Expand Up @@ -227,11 +228,8 @@ async def _download_file(
for _ in range(10):
size = 0
hash = utils.get_hash_obj(file.hash)
tmp_file = io.BytesIO()
try:
async with session.get(
file.path
) as resp:
async with session.get(file.path) as resp, tempfile.TemporaryFile() as tmp_file:
while (data := await resp.content.read(1024 * 1024 * 16)):
tmp_file.write(data)
hash.update(data)
Expand All @@ -241,25 +239,26 @@ async def _download_file(
pbar.update(inc)
if hash.hexdigest() != file.hash or size != file.size:
await anyio.sleep(50)
raise Exception(f"hash mismatch, got {hash.hexdigest()} expected {file.hash}")
await self.upload_storage(file, tmp_file, size)
self.update_success()
raise Exception(
f"hash mismatch, got {hash.hexdigest()} expected {file.hash}"
)
tmp_file.seek(0)
await self.upload_storage(file, tmp_file, size)
self.update_success()
except Exception as e:
last_error = e
self._pbar.update(-size)
pbar.update(-size)
self.update_failed()
continue
finally:
tmp_file.close()
return None
if last_error is not None:
raise last_error

async def upload_storage(
self,
file: BMCLAPIFile,
data: io.BytesIO,
data: BinaryIO,
size: int
):
missing_storage = [
Expand Down Expand Up @@ -598,14 +597,21 @@ async def get_files(self) -> list[BMCLAPIFile]:
if resp.status == 204: # no new files
#logger.tdebug("cluster.get_files.no_new_files", id=self.id)
return results
reader = utils.AvroParser(zstd.decompress(await resp.read()))
for _ in range(reader.read_long()):
results.append(BMCLAPIFile(
reader.read_string(),
reader.read_string(),
reader.read_long(),
reader.read_long() / 1000.0,
))
with tempfile.TemporaryFile() as tmp:
async for chunk in resp.content.iter_chunked(1024 * 1024 * 4):
tmp.write(chunk)
tmp.seek(0)
with zstd.open(tmp, 'rb') as zf:
reader = utils.AvroParser(zf)
for _ in range(reader.read_long()):
results.append(
BMCLAPIFile(
reader.read_string(),
reader.read_string(),
reader.read_long(),
reader.read_long() / 1000.0,
)
)
self._last_modified = max(results, key=lambda x: x.mtime).mtime
logger.tdebug("cluster.get_files", id=self.id, name=self.display_name, count=len(results), size=units.format_bytes(sum([f.size for f in results])), last_modified=units.format_datetime_from_timestamp(self._last_modified))
except:
Expand Down
5 changes: 3 additions & 2 deletions core/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ def stop(
):
self._running = 0

def update(
async def update(
self,
):
while self._running:
try:
self.cpus.append(self.process.cpu_percent(interval=1))
self.cpus.append(self.process.cpu_percent(interval=None))

memory = self.process.memory_full_info()

Expand All @@ -83,6 +83,7 @@ def update(
))
except:
break
await anyio.sleep(1)

def get_info(self) -> dict[str, Any]:
return {
Expand Down
4 changes: 2 additions & 2 deletions core/database/memory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import defaultdict
from collections import defaultdict, deque
from datetime import datetime
from typing import Any
from .abc import DataBase, ClusterCounterInfo
Expand All @@ -11,7 +11,7 @@ def __init__(
):
super().__init__(database_name)

self._clusters_logs = []
self._clusters_logs = deque(maxlen=10000)
self._clusters_counters: defaultdict[int, defaultdict[str, defaultdict[str, int]]] = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))

async def insert_cluster_info(self, cluster_id: str, type: str, event: str, data: Any | None = None):
Expand Down
27 changes: 18 additions & 9 deletions core/storage/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import anyio.abc
import io
import tempfile
from typing import BinaryIO

from core import utils
from core.abc import BMCLAPIFile, ResponseFile, ResponseFileNotFound, ResponseFileMemory, ResponseFileLocal, ResponseFileRemote
Expand Down Expand Up @@ -151,15 +153,15 @@ async def works(root_ids: list[int]):
async def upload(
self,
path: str,
data: io.BytesIO,
data: BinaryIO,
size: int
):
raise NotImplementedError

async def upload_download_file(self, path: str, data: io.BytesIO, size: int):
async def upload_download_file(self, path: str, data: BinaryIO, size: int):
if self.download_dir:
path = f"download/{path}"
await self.upload(f"download/{path}", data, size)
await self.upload(path, data, size)

async def get_response_file(
self,
Expand Down Expand Up @@ -196,12 +198,19 @@ def path(self) -> 'CPath':
async def write_measure(self, size: int):
path = f"measure/{size}"
size = size * 1024 * 1024

await self.upload(
path,
io.BytesIO(b"\x00" * size),
size
)
chunk = b"\x00" * (1024 * 1024)
with tempfile.TemporaryFile() as tmp:
remain = size
while remain > 0:
w = min(remain, len(chunk))
tmp.write(chunk[:w])
remain -= w
tmp.seek(0)
await self.upload(
path,
tmp,
size
)
logger.tsuccess("storage.write_measure", size=int(size / (1024 * 1024)), name=self.name, type=self.type)


Expand Down
6 changes: 3 additions & 3 deletions core/storage/alist.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import io
import time
from typing import Any
from typing import Any, BinaryIO
import urllib.parse as urlparse
import aiohttp
import anyio.abc
Expand Down Expand Up @@ -174,7 +174,7 @@ async def list_files(self, path: str) -> list[abc.FileInfo]:
))
return res

async def upload(self, path: str, data: io.BytesIO, size: int):
async def upload(self, path: str, data: BinaryIO, size: int):
async with aiohttp.ClientSession(
base_url=self._endpoint,
headers={
Expand All @@ -187,7 +187,7 @@ async def upload(self, path: str, data: io.BytesIO, size: int):
headers={
"File-Path": urlparse.quote(str(self._path / path)),
},
data=data.getbuffer()
data=data
) as resp:
alist_resp = AlistResponse(await resp.json())
alist_resp.raise_for_status()
Expand Down
7 changes: 5 additions & 2 deletions core/storage/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from pathlib import Path
import io
import time
import shutil
from typing import BinaryIO
import anyio.abc

from core.abc import ResponseFile, ResponseFileLocal, ResponseFileNotFound
Expand Down Expand Up @@ -53,13 +55,14 @@ async def list_files(
async def upload(
self,
path: str,
data: io.BytesIO,
data: BinaryIO,
size: int
):
root = Path(str(self.path)) / path
root.parent.mkdir(parents=True, exist_ok=True)
with open(root, "wb") as f:
f.write(data.getbuffer())
data.seek(0)
shutil.copyfileobj(data, f)
return True

async def _check(
Expand Down
7 changes: 4 additions & 3 deletions core/storage/minio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta
import datetime
import io
from typing import BinaryIO
import time
from typing import Optional
import urllib.parse as urlparse
Expand Down Expand Up @@ -92,15 +93,15 @@ async def list_files(
async def upload(
self,
path: str,
data: io.BytesIO,
data: BinaryIO,
size: int
):
root = self.path / path

await self.minio.put_object(
self.bucket,
str(root)[1:],
data.getbuffer(),
data,
size
)
return True
Expand Down
3 changes: 2 additions & 1 deletion core/storage/s3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from io import BytesIO
import io
from typing import BinaryIO
import time
import aioboto3.session
import anyio.abc
Expand Down Expand Up @@ -136,7 +137,7 @@ async def list_files(
async def upload(
self,
path: str,
data: io.BytesIO,
data: BinaryIO,
size: int
):
async with self.session.resource(
Expand Down
5 changes: 3 additions & 2 deletions core/storage/webdav.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import time
from typing import BinaryIO
import aiohttp
import aiowebdav.client
import anyio
Expand Down Expand Up @@ -128,10 +129,10 @@ async def _mkdir(self, parent: abc.CPath):
for parent in parent.parents:
await self.client.mkdir(str(parent))

async def upload(self, path: str, data: io.BytesIO, size: int):
async def upload(self, path: str, data: BinaryIO, size: int):
# check dir
await self._mkdir((self._path / path).parent)
await self.client.upload_to(io.BytesIO(data.getbuffer()), str(self._path / path))
await self.client.upload_to(data, str(self._path / path))
return True


Expand Down
9 changes: 7 additions & 2 deletions core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@
V = TypeVar("V")
T = TypeVar("T")

from typing import BinaryIO

class AvroParser:
def __init__(
self,
data: bytes
data: bytes | BinaryIO
):
self.data = io.BytesIO(data)
if isinstance(data, (bytes, bytearray)):
self.data = io.BytesIO(data)
else:
self.data = data

def read_long(self):
result, shift = 0, 0
Expand Down