From d40755969901be2b3fd245b8bde69d4f5121c0f0 Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Mon, 25 Sep 2023 10:41:26 -0400 Subject: [PATCH 1/2] support csv and fix jsonl support in cli --- arcee/__init__.py | 2 +- arcee/cli.py | 14 ++++++++-- arcee/cli_handler.py | 63 +++++++++++++++++++++++++++++++++----------- arcee/schemas/doc.py | 10 +++++++ pyproject.toml | 5 ++++ 5 files changed, 75 insertions(+), 19 deletions(-) create mode 100644 arcee/schemas/doc.py diff --git a/arcee/__init__.py b/arcee/__init__.py index b38c600..180d1ab 100644 --- a/arcee/__init__.py +++ b/arcee/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.15" +__version__ = "0.0.16" import os diff --git a/arcee/cli.py b/arcee/cli.py index a265e18..bdd9e76 100644 --- a/arcee/cli.py +++ b/arcee/cli.py @@ -93,6 +93,14 @@ def context( Optional[list[Path]], typer.Option(help="Path to a document", exists=True, file_okay=True, dir_okay=False, readable=True), ] = None, + doc_name: Annotated[ + str, + typer.Option(help="Column/key representing the doc name. Used if file is jsonl or csv", exists=True), + ] = "name", + doc_text: Annotated[ + str, + typer.Option(help="Column/key representing the doc text. Used if file is jsonl or csv", exists=True), + ] = "text", directory: Annotated[ Optional[list[Path]], typer.Option( @@ -114,9 +122,11 @@ def context( file (Path): Path to the file. directory (Path): Path to the directory. chunk_size (int): The chunk size in megabytes (MB) to limit memory usage during file uploads. + doc_name (str): The name of the column/key representing the doc name. Used for csv/jsonl + doc_text (str): The name of the column/key representing the doc text/content. Used for csv/jsonl """ if not file and not directory: - raise typer.BadParameter("Atleast one file or directory must be provided") + raise typer.BadParameter("At least one file or directory must be provided") if file is None: file = [] @@ -127,7 +137,7 @@ def context( file.extend(directory) try: - resp = UploadHandler.handle_doc_upload(name, file, chunk_size) + resp = UploadHandler.handle_doc_upload(name, file, chunk_size, doc_name, doc_text) typer.secho(resp) except Exception as e: raise ArceeException(message=f"Error uploading document(s): {e}") from e diff --git a/arcee/cli_handler.py b/arcee/cli_handler.py index 2ad5117..c1681fc 100644 --- a/arcee/cli_handler.py +++ b/arcee/cli_handler.py @@ -1,16 +1,23 @@ +from importlib.util import find_spec from pathlib import Path import typer from click import ClickException as ArceeException from rich.progress import Progress, SpinnerColumn, TextColumn -from arcee import upload_doc, upload_docs +from arcee import upload_docs +from arcee.schemas.doc import Doc + +if not find_spec("pandas"): + raise ModuleNotFoundError("Cannot find pandas. Please run `pip install 'arcee[cli]'` for cli support") + +import pandas as pd class UploadHandler: """Upload data to Arcee platform""" - valid_context_file_extensions = set([".txt", ".jsonl"]) + valid_context_file_extensions = {".txt", ".jsonl", ".csv"} one_kb = 1024 one_mb = 1024 * one_kb @@ -64,39 +71,61 @@ def _handle_paths(cls, paths: list[Path]) -> list[Path]: return list(set(all_paths)) @classmethod - def _handle_upload(cls, name: str, files: list[Path], max_chunk_size: int) -> dict[str, str]: + def _get_docs(cls, file: Path, doc_name: str, doc_text: str) -> list[Doc]: + if file.suffix == ".txt": + return [Doc(doc_name=file.name, doc_text=file.read_text())] + if file.suffix == ".jsonl": + df = pd.read_json(file.name, lines=True) + elif file.suffix == ".csv": + df = pd.read_csv(file.name) + else: + raise ValueError(f"File type not valid. Must be one of {cls.valid_context_file_extensions}") + if doc_name not in df.columns: + raise ValueError( + f"{doc_name} not found in data column/key. Rename column/key or use " + f"--doc-name in comment to specify your own" + ) + if doc_text not in df.columns: + raise ValueError( + f"{doc_text} not found in data column/key. Rename column/key or use " + f"--doc-text in comment to specify your own" + ) + return [Doc(doc_name=row[doc_name], doc_text=row[doc_text]) for _, row in df.iterrows()] + + @classmethod + def _handle_upload( + cls, name: str, files: list[Path], max_chunk_size: int, doc_name: str, doc_text: str + ) -> dict[str, str]: """Upload document file(s) to context Args: name str: Name of the context files list[Path]: tuple of paths to valid file(s). max_chunk_size int: Maximum memory, in bytes to use for uploading """ - - # if only one file is passed, upload it - if len(files) == 1: - file = files[0] - return upload_doc(context=name, doc_name=file.name, doc_text=file.read_text()) - docs: list[dict[str, str]] = [] chunk: int = 0 for file in files: - if chunk + file.stat().st_size > max_chunk_size: + if chunk + file.stat().st_size >= max_chunk_size: if len(docs) == 0: raise ArceeException( - message=f"Memory Limit Exceeded." - f" When uploading {file.name} ({file.stat().st_size/cls.one_mb} MB)." - " Try increasing chunk size." + message=f"Memory Limit Exceeded. " + f"When uploading {file.name} ({file.stat().st_size/cls.one_mb} MB). " + "Try increasing chunk size." ) upload_docs(context=name, docs=docs) chunk = 0 docs.clear() chunk += file.stat().st_size - docs.append({"doc_name": file.name, "doc_text": file.read_text()}) + file_docs = cls._get_docs(file, doc_name, doc_text) + file_docs_json = [doc.dict() for doc in file_docs] + docs.extend(file_docs_json) return upload_docs(context=name, docs=docs) @classmethod - def handle_doc_upload(cls, name: str, paths: list[Path], chunk_size: int) -> dict[str, str]: + def handle_doc_upload( + cls, name: str, paths: list[Path], chunk_size: int, doc_name: str, doc_text: str + ) -> dict[str, str]: """Handle document upload from valid paths to files and directories Args: @@ -126,6 +155,8 @@ def handle_doc_upload(cls, name: str, paths: list[Path], chunk_size: int) -> dic # upload documents uploading = progress.add_task(description=f"Uploading {len(paths)} document(s)...", total=len(files)) - resp = doc_uploader(name=name, files=files, max_chunk_size=chunk_size * ONE_MB) + resp = doc_uploader( + name=name, files=files, max_chunk_size=chunk_size * ONE_MB, doc_name=doc_name, doc_text=doc_text + ) progress.update(uploading, description=f"✅ Uploaded {len(paths)} document(s) to context {name}") return resp diff --git a/arcee/schemas/doc.py b/arcee/schemas/doc.py new file mode 100644 index 0000000..8e795fc --- /dev/null +++ b/arcee/schemas/doc.py @@ -0,0 +1,10 @@ +from dataclasses import asdict, dataclass + + +@dataclass +class Doc: + doc_name: str + doc_text: str + + def dict(self) -> dict[str, str]: + return asdict(self) diff --git a/pyproject.toml b/pyproject.toml index f963ccf..a889a79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,11 @@ dev = [ "pytest-env", "ruff", "types-requests", + "pandas", + "pandas-stubs" +] +cli = [ + "pandas" ] [project.urls] From a9ccd2a1895954298926383eeceab6ae5aa31c42 Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Mon, 25 Sep 2023 10:48:11 -0400 Subject: [PATCH 2/2] err message --- arcee/cli_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arcee/cli_handler.py b/arcee/cli_handler.py index c1681fc..e7e9b1d 100644 --- a/arcee/cli_handler.py +++ b/arcee/cli_handler.py @@ -9,7 +9,7 @@ from arcee.schemas.doc import Doc if not find_spec("pandas"): - raise ModuleNotFoundError("Cannot find pandas. Please run `pip install 'arcee[cli]'` for cli support") + raise ModuleNotFoundError("Cannot find pandas. Please run `pip install 'arcee-py[cli]'` for cli support") import pandas as pd