Skip to content

Commit

Permalink
Merge pull request #13 from arcee-ai/feat/better-cli-support-contetx-…
Browse files Browse the repository at this point in the history
…upload

support csv and fix jsonl support in cli
  • Loading branch information
Ben-Epstein authored Sep 25, 2023
2 parents 36c8f39 + a9ccd2a commit 0d71f8b
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 19 deletions.
2 changes: 1 addition & 1 deletion arcee/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.0.15"
__version__ = "0.0.16"

import os

Expand Down
14 changes: 12 additions & 2 deletions arcee/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def context(
Optional[list[Path]],
typer.Option(help="Path to a document", exists=True, file_okay=True, dir_okay=False, readable=True),
] = None,
doc_name: Annotated[
str,
typer.Option(help="Column/key representing the doc name. Used if file is jsonl or csv", exists=True),
] = "name",
doc_text: Annotated[
str,
typer.Option(help="Column/key representing the doc text. Used if file is jsonl or csv", exists=True),
] = "text",
directory: Annotated[
Optional[list[Path]],
typer.Option(
Expand All @@ -114,9 +122,11 @@ def context(
file (Path): Path to the file.
directory (Path): Path to the directory.
chunk_size (int): The chunk size in megabytes (MB) to limit memory usage during file uploads.
doc_name (str): The name of the column/key representing the doc name. Used for csv/jsonl
doc_text (str): The name of the column/key representing the doc text/content. Used for csv/jsonl
"""
if not file and not directory:
raise typer.BadParameter("Atleast one file or directory must be provided")
raise typer.BadParameter("At least one file or directory must be provided")

if file is None:
file = []
Expand All @@ -127,7 +137,7 @@ def context(
file.extend(directory)

try:
resp = UploadHandler.handle_doc_upload(name, file, chunk_size)
resp = UploadHandler.handle_doc_upload(name, file, chunk_size, doc_name, doc_text)
typer.secho(resp)
except Exception as e:
raise ArceeException(message=f"Error uploading document(s): {e}") from e
Expand Down
63 changes: 47 additions & 16 deletions arcee/cli_handler.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from importlib.util import find_spec
from pathlib import Path

import typer
from click import ClickException as ArceeException
from rich.progress import Progress, SpinnerColumn, TextColumn

from arcee import upload_doc, upload_docs
from arcee import upload_docs
from arcee.schemas.doc import Doc

if not find_spec("pandas"):
raise ModuleNotFoundError("Cannot find pandas. Please run `pip install 'arcee-py[cli]'` for cli support")

import pandas as pd


class UploadHandler:
"""Upload data to Arcee platform"""

valid_context_file_extensions = set([".txt", ".jsonl"])
valid_context_file_extensions = {".txt", ".jsonl", ".csv"}

one_kb = 1024
one_mb = 1024 * one_kb
Expand Down Expand Up @@ -64,39 +71,61 @@ def _handle_paths(cls, paths: list[Path]) -> list[Path]:
return list(set(all_paths))

@classmethod
def _handle_upload(cls, name: str, files: list[Path], max_chunk_size: int) -> dict[str, str]:
def _get_docs(cls, file: Path, doc_name: str, doc_text: str) -> list[Doc]:
if file.suffix == ".txt":
return [Doc(doc_name=file.name, doc_text=file.read_text())]
if file.suffix == ".jsonl":
df = pd.read_json(file.name, lines=True)
elif file.suffix == ".csv":
df = pd.read_csv(file.name)
else:
raise ValueError(f"File type not valid. Must be one of {cls.valid_context_file_extensions}")
if doc_name not in df.columns:
raise ValueError(
f"{doc_name} not found in data column/key. Rename column/key or use "
f"--doc-name in comment to specify your own"
)
if doc_text not in df.columns:
raise ValueError(
f"{doc_text} not found in data column/key. Rename column/key or use "
f"--doc-text in comment to specify your own"
)
return [Doc(doc_name=row[doc_name], doc_text=row[doc_text]) for _, row in df.iterrows()]

@classmethod
def _handle_upload(
cls, name: str, files: list[Path], max_chunk_size: int, doc_name: str, doc_text: str
) -> dict[str, str]:
"""Upload document file(s) to context
Args:
name str: Name of the context
files list[Path]: tuple of paths to valid file(s).
max_chunk_size int: Maximum memory, in bytes to use for uploading
"""

# if only one file is passed, upload it
if len(files) == 1:
file = files[0]
return upload_doc(context=name, doc_name=file.name, doc_text=file.read_text())

docs: list[dict[str, str]] = []
chunk: int = 0
for file in files:
if chunk + file.stat().st_size > max_chunk_size:
if chunk + file.stat().st_size >= max_chunk_size:
if len(docs) == 0:
raise ArceeException(
message=f"Memory Limit Exceeded."
f" When uploading {file.name} ({file.stat().st_size/cls.one_mb} MB)."
" Try increasing chunk size."
message=f"Memory Limit Exceeded. "
f"When uploading {file.name} ({file.stat().st_size/cls.one_mb} MB). "
"Try increasing chunk size."
)
upload_docs(context=name, docs=docs)
chunk = 0
docs.clear()
chunk += file.stat().st_size
docs.append({"doc_name": file.name, "doc_text": file.read_text()})
file_docs = cls._get_docs(file, doc_name, doc_text)
file_docs_json = [doc.dict() for doc in file_docs]
docs.extend(file_docs_json)

return upload_docs(context=name, docs=docs)

@classmethod
def handle_doc_upload(cls, name: str, paths: list[Path], chunk_size: int) -> dict[str, str]:
def handle_doc_upload(
cls, name: str, paths: list[Path], chunk_size: int, doc_name: str, doc_text: str
) -> dict[str, str]:
"""Handle document upload from valid paths to files and directories
Args:
Expand Down Expand Up @@ -126,6 +155,8 @@ def handle_doc_upload(cls, name: str, paths: list[Path], chunk_size: int) -> dic

# upload documents
uploading = progress.add_task(description=f"Uploading {len(paths)} document(s)...", total=len(files))
resp = doc_uploader(name=name, files=files, max_chunk_size=chunk_size * ONE_MB)
resp = doc_uploader(
name=name, files=files, max_chunk_size=chunk_size * ONE_MB, doc_name=doc_name, doc_text=doc_text
)
progress.update(uploading, description=f"✅ Uploaded {len(paths)} document(s) to context {name}")
return resp
10 changes: 10 additions & 0 deletions arcee/schemas/doc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dataclasses import asdict, dataclass


@dataclass
class Doc:
doc_name: str
doc_text: str

def dict(self) -> dict[str, str]:
return asdict(self)
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ dev = [
"pytest-env",
"ruff",
"types-requests",
"pandas",
"pandas-stubs"
]
cli = [
"pandas"
]

[project.urls]
Expand Down

0 comments on commit 0d71f8b

Please sign in to comment.