Skip to content

Commit

Permalink
Changes for reporting files that have not been uploaded (#394)
Browse files Browse the repository at this point in the history
* Changes for reporting files that have not been uploaded

* formatting changes

* Changes requested on PR

* Changes suggested on the PR

* Changes suggested on the PR

* removed datetime as a dependency

* Added test case for file upload failure

* change the method being called in the test case

* Undo a minor change

* parameter fix

* validation changes

* minor change to error message

* path fix for file

* Added file with no permission

* Changes to path for failure log

* Flush failure logger

* Changes to check in-memory

* Added sleep for the error logs to get stored

* Added print statements

* stringify the path on joinpath

* add assert

* raise manual exception

* Added random printing stuff

* changes to assert statements

* Handle the exception in upload queue and check for value in failure logger

* read jsonlines file and validate it

* remove incorrect validation with len

* Delete file with no permission

* remove print statements

---------

Co-authored-by: Nithin Bodanapu <[email protected]>
  • Loading branch information
nithinb and Nithin Bodanapu authored Jan 3, 2025
1 parent 78cccf5 commit e55df22
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 29 deletions.
53 changes: 43 additions & 10 deletions cognite/extractorutils/uploader/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
BinaryIO,
Callable,
Iterator,
List,
Optional,
Type,
Union,
)
from urllib.parse import ParseResult, urlparse

Expand All @@ -50,6 +53,7 @@
FILES_UPLOADER_QUEUED,
FILES_UPLOADER_WRITTEN,
)
from cognite.extractorutils.uploader.upload_failure_handler import FileFailureManager
from cognite.extractorutils.util import cognite_exceptions, retry

_QUEUES: int = 0
Expand All @@ -62,7 +66,8 @@

_CDF_ALPHA_VERSION_HEADER = {"cdf-version": "alpha"}

FileMetadataOrCogniteExtractorFile = FileMetadata | CogniteExtractorFileApply

FileMetadataOrCogniteExtractorFile = Union[FileMetadata, CogniteExtractorFileApply]


class ChunkedStream(RawIOBase, BinaryIO):
Expand Down Expand Up @@ -202,8 +207,9 @@ def __init__(
trigger_log_level: str = "DEBUG",
thread_name: str | None = None,
overwrite_existing: bool = False,
cancellation_token: CancellationToken | None = None,
max_parallelism: int | None = None,
cancellation_token: Optional[CancellationToken] = None,
max_parallelism: Optional[int] = None,
failure_logging_path: None | str = None,
):
# Super sets post_upload and threshold
super().__init__(
Expand All @@ -219,8 +225,11 @@ def __init__(
if self.threshold <= 0:
raise ValueError("Max queue size must be positive for file upload queues")

self.upload_queue: list[Future] = []
self.errors: list[Exception] = []
self.failure_logging_path = failure_logging_path or None
self.initialize_failure_logging()

self.upload_queue: List[Future] = []
self.errors: List[Exception] = []

self.overwrite_existing = overwrite_existing

Expand Down Expand Up @@ -251,6 +260,26 @@ def __init__(
)
_QUEUES += 1

def initialize_failure_logging(self) -> None:
self._file_failure_manager: FileFailureManager | None = (
FileFailureManager(path_to_file=self.failure_logging_path)
if self.failure_logging_path is not None
else None
)

def get_failure_logger(self) -> FileFailureManager | None:
return self._file_failure_manager

def add_entry_failure_logger(self, file_name: str, error: Exception) -> None:
if self._file_failure_manager is not None:
error_reason = str(error)
self._file_failure_manager.add(file_name=file_name, error_reason=error_reason)

def flush_failure_logger(self) -> None:
if self._file_failure_manager is not None:
self.logger.info("Flushing failure logs")
self._file_failure_manager.write_to_file()

def _remove_done_from_queue(self) -> None:
while not self.cancellation_token.is_cancelled:
with self.lock:
Expand Down Expand Up @@ -451,7 +480,10 @@ def wrapped_upload(
upload_file(read_file, file_meta)

except Exception as e:
self.logger.exception(f"Unexpected error while uploading file: {file_meta.external_id}")
self.logger.exception(
f"Unexpected error while uploading file: {file_meta.external_id} {file_meta.name}"
)
self.add_entry_failure_logger(file_name=str(file_meta.name), error=e)
self.errors.append(e)

finally:
Expand Down Expand Up @@ -528,6 +560,7 @@ def upload(self, fail_on_errors: bool = True, timeout: float | None = None) -> N
self.queue_size.set(self.upload_queue_size)
if fail_on_errors and self.errors:
# There might be more errors, but we can only have one as the cause, so pick the first
self.flush_failure_logger()
raise RuntimeError(f"{len(self.errors)} upload(s) finished with errors") from self.errors[0]

def __enter__(self) -> "IOFileUploadQueue":
Expand All @@ -544,9 +577,9 @@ def __enter__(self) -> "IOFileUploadQueue":

def __exit__(
self,
exc_type: Type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""
Wraps around stop method, for use as context manager
Expand Down Expand Up @@ -607,7 +640,7 @@ def __init__(
def add_to_upload_queue(
self,
file_meta: FileMetadataOrCogniteExtractorFile,
file_name: str | PathLike,
file_name: Union[str, PathLike],
) -> None:
"""
Add file to upload queue. The queue will be uploaded if the queue size is larger than the threshold
Expand Down
64 changes: 64 additions & 0 deletions cognite/extractorutils/uploader/upload_failure_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from datetime import datetime
from typing import Iterator, List

import jsonlines


class FileErrorMapping:
def __init__(self, file_name: str, error_reason: str) -> None:
self.file_name = file_name
self.error_reason = error_reason

def __iter__(self) -> Iterator[List[str]]:
return iter([[self.file_name, self.error_reason]])


class FileFailureManager:
MAX_QUEUE_SIZE = 500
START_TIME_KEY = "start_time"
FILE_REASON_MAP_KEY = "file_error_reason_map"

def __init__(self, start_time: str | None = None, path_to_file: str | None = None) -> None:
self.failure_logs: dict[str, str] = {}

self.path_to_failure_log: str = self._pre_process_file_extension(path_to_file)
self.start_time = start_time or str(datetime.now())
self._initialize_failure_logs()

def _pre_process_file_extension(self, path_to_file: str | None) -> str:
if path_to_file and not path_to_file.endswith(".jsonl"):
return path_to_file + ".jsonl"
return str(path_to_file)

def _initialize_failure_logs(self) -> None:
self.failure_logs = {}

def __len__(self) -> int:
return len(self.failure_logs)

def clear(self) -> None:
self.failure_logs.clear()
self._initialize_failure_logs()

def add(self, file_name: str, error_reason: str) -> None:
error_file_object = FileErrorMapping(file_name=file_name, error_reason=error_reason)
error_file_dict = dict(error_file_object)

self.failure_logs.update(error_file_dict)

if len(self) >= self.MAX_QUEUE_SIZE:
self.write_to_file()

def write_to_file(self) -> None:
if len(self) == 0:
return

dict_to_write = {
self.START_TIME_KEY: self.start_time,
self.FILE_REASON_MAP_KEY: self.failure_logs,
}

with jsonlines.open(self.path_to_failure_log, mode="a") as writer:
writer.write(dict_to_write)

self.clear()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ httpx = "^0.27.0"
pydantic = "^2.8.2"
pyhumps = "^3.8.0"
croniter = "^5.0.0"
jsonlines = "^4.0.0"

[tool.poetry.extras]
experimental = ["cognite-sdk-experimental"]
Expand Down
Loading

0 comments on commit e55df22

Please sign in to comment.