Skip to content

Commit

Permalink
V4 changes, Indra adaptors (#2733)
Browse files Browse the repository at this point in the history
* Rename DeeplakeQuery to Indra.
* Added indra storage provider.
* Reimplement rename with deecopy+delete.
* Switch to batch request for indra tensor bytes.
* Handle index in non-linear views.
* Added indra view load test with optimize=True
* Added indra flag to ingest api.
* Added ingest dataframe with indra.

---------

Co-authored-by: zaaram <[email protected]>
Co-authored-by: Sasun Hambardzumyan <[email protected]>
  • Loading branch information
3 people authored Apr 4, 2024
1 parent ec84842 commit e75c56c
Show file tree
Hide file tree
Showing 19 changed files with 436 additions and 223 deletions.
7 changes: 7 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@

deeplake.client.config.USE_STAGING_ENVIRONMENT = True

try:
from indra import api # type: ignore

api.backend.set_endpoint("https://app-staging.activeloop.dev")
except ImportError:
pass

from deeplake.constants import *
from deeplake.tests.common import SESSION_ID

Expand Down
77 changes: 68 additions & 9 deletions deeplake/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from deeplake.auto.unstructured.yolo.yolo import YoloDataset
from deeplake.client.log import logger
from deeplake.core.dataset import Dataset, dataset_factory
from deeplake.core.dataset.indra_dataset_view import IndraDatasetView
from deeplake.core.tensor import Tensor
from deeplake.core.meta.dataset_meta import DatasetMeta
from deeplake.util.connect_dataset import connect_dataset_entry
Expand Down Expand Up @@ -43,6 +44,7 @@
DEFAULT_READONLY,
DATASET_META_FILENAME,
DATASET_LOCK_FILENAME,
USE_INDRA,
)
from deeplake.util.access_method import (
check_access_method,
Expand Down Expand Up @@ -101,6 +103,7 @@ def init(
lock_enabled: Optional[bool] = True,
lock_timeout: Optional[int] = 0,
index_params: Optional[Dict[str, Union[int, str]]] = None,
indra: bool = USE_INDRA,
):
"""Returns a :class:`~deeplake.core.dataset.Dataset` object referencing either a new or existing dataset.
Expand Down Expand Up @@ -173,6 +176,7 @@ def init(
lock_timeout (int): Number of seconds to wait before throwing a LockException. If None, wait indefinitely
lock_enabled (bool): If true, the dataset manages a write lock. NOTE: Only set to False if you are managing concurrent access externally
index_params: Optional[Dict[str, Union[int, str]]] = None : The index parameters used while creating vector store is passed down to dataset.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
..
# noqa: DAR101
Expand Down Expand Up @@ -225,6 +229,7 @@ def init(
token=token,
memory_cache_size=memory_cache_size,
local_cache_size=local_cache_size,
indra=indra,
)

feature_report_path(path, "dataset", {"Overwrite": overwrite}, token=token)
Expand Down Expand Up @@ -378,6 +383,7 @@ def empty(
lock_timeout: Optional[int] = 0,
verbose: bool = True,
index_params: Optional[Dict[str, Union[int, str]]] = None,
indra: bool = USE_INDRA,
) -> Dataset:
"""Creates an empty dataset
Expand All @@ -402,6 +408,7 @@ def empty(
lock_timeout (int): Number of seconds to wait before throwing a LockException. If None, wait indefinitely
lock_enabled (bool): If true, the dataset manages a write lock. NOTE: Only set to False if you are managing concurrent access externally.
index_params: Optional[Dict[str, Union[int, str]]]: Index parameters used while creating vector store, passed down to dataset.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
Returns:
Dataset: Dataset created using the arguments provided.
Expand Down Expand Up @@ -441,6 +448,7 @@ def empty(
token=token,
memory_cache_size=memory_cache_size,
local_cache_size=local_cache_size,
indra=indra,
)

feature_report_path(
Expand Down Expand Up @@ -508,6 +516,7 @@ def load(
access_method: str = "stream",
unlink: bool = False,
reset: bool = False,
indra: bool = USE_INDRA,
check_integrity: Optional[bool] = None,
lock_timeout: Optional[int] = 0,
lock_enabled: Optional[bool] = True,
Expand Down Expand Up @@ -578,6 +587,7 @@ def load(
setting ``reset=True`` will reset HEAD changes and load the previous version.
check_integrity (bool, Optional): Performs an integrity check by default (None) if the dataset has 20 or fewer tensors.
Set to ``True`` to force integrity check, ``False`` to skip integrity check.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
..
# noqa: DAR101
Expand Down Expand Up @@ -624,6 +634,7 @@ def load(
token=token,
memory_cache_size=memory_cache_size,
local_cache_size=local_cache_size,
indra=indra,
)
feature_report_path(
path,
Expand All @@ -644,6 +655,12 @@ def load(
f"A Deep Lake dataset does not exist at the given path ({path}). Check the path provided or in case you want to create a new dataset, use deeplake.empty()."
)

if indra and read_only:
from indra import api # type: ignore

ids = api.load_from_storage(storage.core)
return IndraDatasetView(indra_ds=ids)

dataset_kwargs: Dict[str, Union[None, str, bool, int, Dict]] = {
"path": path,
"read_only": read_only,
Expand Down Expand Up @@ -812,10 +829,10 @@ def rename(

feature_report_path(old_path, "rename", {}, token=token)

ds = deeplake.load(old_path, verbose=False, token=token, creds=creds)
ds.rename(new_path)
deeplake.deepcopy(old_path, new_path, verbose=False, token=token, creds=creds)
deeplake.delete(old_path, token=token, creds=creds)

return ds # type: ignore
return deeplake.load(new_path, verbose=False, token=token, creds=creds)

@staticmethod
@spinner
Expand Down Expand Up @@ -1491,6 +1508,7 @@ def ingest_coco(
num_workers: int = 0,
token: Optional[str] = None,
connect_kwargs: Optional[Dict] = None,
indra: bool = USE_INDRA,
**dataset_kwargs,
) -> Dataset:
"""Ingest images and annotations in COCO format to a Deep Lake Dataset. The source data can be stored locally or in the cloud.
Expand Down Expand Up @@ -1544,6 +1562,7 @@ def ingest_coco(
num_workers (int): The number of workers to use for ingestion. Set to ``0`` by default.
token (Optional[str]): The token to use for accessing the dataset and/or connecting it to Deep Lake.
connect_kwargs (Optional[Dict]): If specified, the dataset will be connected to Deep Lake, and connect_kwargs will be passed to :meth:`Dataset.connect <deeplake.core.dataset.Dataset.connect>`.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
**dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. See :func:`deeplake.empty`.
Returns:
Expand Down Expand Up @@ -1582,7 +1601,12 @@ def ingest_coco(
structure = unstructured.prepare_structure(inspect_limit)

ds = deeplake.empty(
dest, creds=dest_creds, verbose=False, token=token, **dataset_kwargs
dest,
creds=dest_creds,
verbose=False,
token=token,
indra=indra,
**dataset_kwargs,
)
if connect_kwargs is not None:
connect_kwargs["token"] = token or connect_kwargs.get("token")
Expand Down Expand Up @@ -1613,6 +1637,7 @@ def ingest_yolo(
num_workers: int = 0,
token: Optional[str] = None,
connect_kwargs: Optional[Dict] = None,
indra: bool = USE_INDRA,
**dataset_kwargs,
) -> Dataset:
"""Ingest images and annotations (bounding boxes or polygons) in YOLO format to a Deep Lake Dataset. The source data can be stored locally or in the cloud.
Expand Down Expand Up @@ -1661,6 +1686,7 @@ def ingest_yolo(
num_workers (int): The number of workers to use for ingestion. Set to ``0`` by default.
token (Optional[str]): The token to use for accessing the dataset and/or connecting it to Deep Lake.
connect_kwargs (Optional[Dict]): If specified, the dataset will be connected to Deep Lake, and connect_kwargs will be passed to :meth:`Dataset.connect <deeplake.core.dataset.Dataset.connect>`.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
**dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. See :func:`deeplake.empty`.
Returns:
Expand Down Expand Up @@ -1708,7 +1734,12 @@ def ingest_yolo(
structure = unstructured.prepare_structure()

ds = deeplake.empty(
dest, creds=dest_creds, verbose=False, token=token, **dataset_kwargs
dest,
creds=dest_creds,
verbose=False,
token=token,
indra=indra,
**dataset_kwargs,
)
if connect_kwargs is not None:
connect_kwargs["token"] = token or connect_kwargs.get("token")
Expand Down Expand Up @@ -1738,6 +1769,7 @@ def ingest_classification(
shuffle: bool = True,
token: Optional[str] = None,
connect_kwargs: Optional[Dict] = None,
indra: bool = USE_INDRA,
**dataset_kwargs,
) -> Dataset:
"""Ingest a dataset of images from a local folder to a Deep Lake Dataset. Images should be stored in subfolders by class name.
Expand All @@ -1758,6 +1790,7 @@ def ingest_classification(
shuffle (bool): Shuffles the input data prior to ingestion. Since data arranged in folders by class is highly non-random, shuffling is important in order to produce optimal results when training. Defaults to ``True``.
token (Optional[str]): The token to use for accessing the dataset.
connect_kwargs (Optional[Dict]): If specified, the dataset will be connected to Deep Lake, and connect_kwargs will be passed to :meth:`Dataset.connect <deeplake.core.dataset.Dataset.connect>`.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
**dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function see :func:`deeplake.empty`.
Returns:
Expand Down Expand Up @@ -1839,6 +1872,7 @@ def ingest_classification(
dest_creds=dest_creds,
progressbar=progressbar,
token=token,
indra=indra,
**dataset_kwargs,
)
return ds
Expand All @@ -1861,7 +1895,12 @@ def ingest_classification(
unstructured = ImageClassification(source=src)

ds = deeplake.empty(
dest, creds=dest_creds, token=token, verbose=False, **dataset_kwargs
dest,
creds=dest_creds,
token=token,
verbose=False,
indra=indra,
**dataset_kwargs,
)
if connect_kwargs is not None:
connect_kwargs["token"] = token or connect_kwargs.get("token")
Expand Down Expand Up @@ -1892,6 +1931,7 @@ def ingest_kaggle(
progressbar: bool = True,
summary: bool = True,
shuffle: bool = True,
indra: bool = USE_INDRA,
**dataset_kwargs,
) -> Dataset:
"""Download and ingest a kaggle dataset and store it as a structured dataset to destination.
Expand All @@ -1911,6 +1951,7 @@ def ingest_kaggle(
progressbar (bool): Enables or disables ingestion progress bar. Set to ``True`` by default.
summary (bool): Generates ingestion summary. Set to ``True`` by default.
shuffle (bool): Shuffles the input data prior to ingestion. Since data arranged in folders by class is highly non-random, shuffling is important in order to produce optimal results when training. Defaults to ``True``.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
**dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. See :func:`deeplake.dataset`.
Returns:
Expand Down Expand Up @@ -1956,6 +1997,7 @@ def ingest_kaggle(
progressbar=progressbar,
summary=summary,
shuffle=shuffle,
indra=indra,
**dataset_kwargs,
)

Expand All @@ -1972,6 +2014,7 @@ def ingest_dataframe(
progressbar: bool = True,
token: Optional[str] = None,
connect_kwargs: Optional[Dict] = None,
indra: bool = USE_INDRA,
**dataset_kwargs,
):
"""Convert pandas dataframe to a Deep Lake Dataset. The contents of the dataframe can be parsed literally, or can be treated as links to local or cloud files.
Expand Down Expand Up @@ -2021,6 +2064,7 @@ def ingest_dataframe(
progressbar (bool): Enables or disables ingestion progress bar. Set to ``True`` by default.
token (Optional[str]): The token to use for accessing the dataset.
connect_kwargs (Optional[Dict]): A dictionary containing arguments to be passed to the dataset connect method. See :meth:`Dataset.connect`.
indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false
**dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. See :func:`deeplake.empty`.
Returns:
Expand All @@ -2045,15 +2089,30 @@ def ingest_dataframe(
structured = DataFrame(src, column_params, src_creds, creds_key)

dest = convert_pathlib_to_string_if_needed(dest)
ds = deeplake.empty(
dest, creds=dest_creds, token=token, verbose=False, **dataset_kwargs
)
if indra:
from indra import api

ds = api.dataset_writer(
dest, creds=dest_creds, token=token, **dataset_kwargs
)
else:
ds = deeplake.empty(
dest,
creds=dest_creds,
token=token,
verbose=False,
**dataset_kwargs,
)
if connect_kwargs is not None:
connect_kwargs["token"] = token or connect_kwargs.get("token")
ds.connect(**connect_kwargs)

structured.fill_dataset(ds, progressbar) # type: ignore

if indra:
ids = api.load_from_storage(ds.storage)
return IndraDatasetView(indra_ds=ids)

return ds # type: ignore

@staticmethod
Expand Down
6 changes: 3 additions & 3 deletions deeplake/auto/tests/test_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def test_csv(memory_ds: Dataset, dataframe_ingestion_data: dict):
assert ds[tensors_names[2]].htype == "text"
assert ds[tensors_names[2]].dtype == str
np.testing.assert_array_equal(
ds[tensors_names[2]].numpy().reshape(-1), df[df_keys[2]].values
np.array(ds[tensors_names[2]].numpy()).reshape(-1), df[df_keys[2]].values
)


Expand Down Expand Up @@ -273,7 +273,7 @@ def test_dataframe_basic(
assert ds[df_keys[2]].htype == "text"
assert ds[df_keys[2]].dtype == str
np.testing.assert_array_equal(
ds[df_keys[2]].numpy().reshape(-1), df[df_keys[2]].values
np.array(ds[df_keys[2]].numpy()).reshape(-1), df[df_keys[2]].values
)


Expand Down Expand Up @@ -342,7 +342,7 @@ def test_dataframe_array(memory_ds: Dataset):
)

np.testing.assert_array_equal(
ds[df_keys[2]][0:3].numpy().reshape(-1), df[df_keys[2]].values[0:3]
np.array(ds[df_keys[2]][0:3].numpy()).reshape(-1), df[df_keys[2]].values[0:3]
)
assert ds[df_keys[2]].dtype == df[df_keys[2]].dtype

Expand Down
2 changes: 2 additions & 0 deletions deeplake/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,5 @@

# Size of dataset view to expose as indra dataset wrapper.
INDRA_DATASET_SAMPLES_THRESHOLD = 10000000

USE_INDRA = os.environ.get("DEEPLAKE_USE_INDRA", "false").strip().lower() == "true"
Loading

0 comments on commit e75c56c

Please sign in to comment.