From e75c56ca7ab91599f79fdd7deaf63b94fdec888f Mon Sep 17 00:00:00 2001 From: Sasun Hambardzumyan <151129343+khustup2@users.noreply.github.com> Date: Fri, 5 Apr 2024 01:54:18 +0400 Subject: [PATCH] V4 changes, Indra adaptors (#2733) * Rename DeeplakeQuery to Indra. * Added indra storage provider. * Reimplement rename with deecopy+delete. * Switch to batch request for indra tensor bytes. * Handle index in non-linear views. * Added indra view load test with optimize=True * Added indra flag to ingest api. * Added ingest dataframe with indra. --------- Co-authored-by: zaaram Co-authored-by: Sasun Hambardzumyan --- conftest.py | 7 + deeplake/api/dataset.py | 77 +++++++- deeplake/auto/tests/test_ingestion.py | 6 +- deeplake/constants.py | 2 + deeplake/core/dataset/dataset.py | 61 ++++-- ...query_dataset.py => indra_dataset_view.py} | 177 ++++++++---------- ...e_query_tensor.py => indra_tensor_view.py} | 59 +++--- deeplake/core/dataset/view_entry.py | 4 +- deeplake/core/index/index.py | 4 + deeplake/core/storage/indra.py | 106 +++++++++++ ...indra_dataset.py => test_indra_dataset.py} | 37 ++-- deeplake/core/tests/test_query.py | 2 +- .../core/vectorstore/deeplake_vectorstore.py | 3 + .../vector_search/indra/search_algorithm.py | 6 +- deeplake/enterprise/convert_to_libdeeplake.py | 60 ++++-- deeplake/enterprise/dataloader.py | 10 +- deeplake/enterprise/libdeeplake_query.py | 10 +- deeplake/util/storage.py | 26 ++- setup.py | 2 +- 19 files changed, 436 insertions(+), 223 deletions(-) rename deeplake/core/dataset/{deeplake_query_dataset.py => indra_dataset_view.py} (70%) rename deeplake/core/dataset/{deeplake_query_tensor.py => indra_tensor_view.py} (78%) create mode 100644 deeplake/core/storage/indra.py rename deeplake/core/tests/{test_deeplake_indra_dataset.py => test_indra_dataset.py} (93%) diff --git a/conftest.py b/conftest.py index 3cb3db25f3..78880bfa3d 100644 --- a/conftest.py +++ b/conftest.py @@ -19,6 +19,13 @@ deeplake.client.config.USE_STAGING_ENVIRONMENT = True +try: + from indra import api # type: ignore + + api.backend.set_endpoint("https://app-staging.activeloop.dev") +except ImportError: + pass + from deeplake.constants import * from deeplake.tests.common import SESSION_ID diff --git a/deeplake/api/dataset.py b/deeplake/api/dataset.py index 48ba0836ab..5706ee2656 100644 --- a/deeplake/api/dataset.py +++ b/deeplake/api/dataset.py @@ -13,6 +13,7 @@ from deeplake.auto.unstructured.yolo.yolo import YoloDataset from deeplake.client.log import logger from deeplake.core.dataset import Dataset, dataset_factory +from deeplake.core.dataset.indra_dataset_view import IndraDatasetView from deeplake.core.tensor import Tensor from deeplake.core.meta.dataset_meta import DatasetMeta from deeplake.util.connect_dataset import connect_dataset_entry @@ -43,6 +44,7 @@ DEFAULT_READONLY, DATASET_META_FILENAME, DATASET_LOCK_FILENAME, + USE_INDRA, ) from deeplake.util.access_method import ( check_access_method, @@ -101,6 +103,7 @@ def init( lock_enabled: Optional[bool] = True, lock_timeout: Optional[int] = 0, index_params: Optional[Dict[str, Union[int, str]]] = None, + indra: bool = USE_INDRA, ): """Returns a :class:`~deeplake.core.dataset.Dataset` object referencing either a new or existing dataset. @@ -173,6 +176,7 @@ def init( lock_timeout (int): Number of seconds to wait before throwing a LockException. If None, wait indefinitely lock_enabled (bool): If true, the dataset manages a write lock. NOTE: Only set to False if you are managing concurrent access externally index_params: Optional[Dict[str, Union[int, str]]] = None : The index parameters used while creating vector store is passed down to dataset. + indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false .. # noqa: DAR101 @@ -225,6 +229,7 @@ def init( token=token, memory_cache_size=memory_cache_size, local_cache_size=local_cache_size, + indra=indra, ) feature_report_path(path, "dataset", {"Overwrite": overwrite}, token=token) @@ -378,6 +383,7 @@ def empty( lock_timeout: Optional[int] = 0, verbose: bool = True, index_params: Optional[Dict[str, Union[int, str]]] = None, + indra: bool = USE_INDRA, ) -> Dataset: """Creates an empty dataset @@ -402,6 +408,7 @@ def empty( lock_timeout (int): Number of seconds to wait before throwing a LockException. If None, wait indefinitely lock_enabled (bool): If true, the dataset manages a write lock. NOTE: Only set to False if you are managing concurrent access externally. index_params: Optional[Dict[str, Union[int, str]]]: Index parameters used while creating vector store, passed down to dataset. + indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false Returns: Dataset: Dataset created using the arguments provided. @@ -441,6 +448,7 @@ def empty( token=token, memory_cache_size=memory_cache_size, local_cache_size=local_cache_size, + indra=indra, ) feature_report_path( @@ -508,6 +516,7 @@ def load( access_method: str = "stream", unlink: bool = False, reset: bool = False, + indra: bool = USE_INDRA, check_integrity: Optional[bool] = None, lock_timeout: Optional[int] = 0, lock_enabled: Optional[bool] = True, @@ -578,6 +587,7 @@ def load( setting ``reset=True`` will reset HEAD changes and load the previous version. check_integrity (bool, Optional): Performs an integrity check by default (None) if the dataset has 20 or fewer tensors. Set to ``True`` to force integrity check, ``False`` to skip integrity check. + indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false .. # noqa: DAR101 @@ -624,6 +634,7 @@ def load( token=token, memory_cache_size=memory_cache_size, local_cache_size=local_cache_size, + indra=indra, ) feature_report_path( path, @@ -644,6 +655,12 @@ def load( f"A Deep Lake dataset does not exist at the given path ({path}). Check the path provided or in case you want to create a new dataset, use deeplake.empty()." ) + if indra and read_only: + from indra import api # type: ignore + + ids = api.load_from_storage(storage.core) + return IndraDatasetView(indra_ds=ids) + dataset_kwargs: Dict[str, Union[None, str, bool, int, Dict]] = { "path": path, "read_only": read_only, @@ -812,10 +829,10 @@ def rename( feature_report_path(old_path, "rename", {}, token=token) - ds = deeplake.load(old_path, verbose=False, token=token, creds=creds) - ds.rename(new_path) + deeplake.deepcopy(old_path, new_path, verbose=False, token=token, creds=creds) + deeplake.delete(old_path, token=token, creds=creds) - return ds # type: ignore + return deeplake.load(new_path, verbose=False, token=token, creds=creds) @staticmethod @spinner @@ -1491,6 +1508,7 @@ def ingest_coco( num_workers: int = 0, token: Optional[str] = None, connect_kwargs: Optional[Dict] = None, + indra: bool = USE_INDRA, **dataset_kwargs, ) -> Dataset: """Ingest images and annotations in COCO format to a Deep Lake Dataset. The source data can be stored locally or in the cloud. @@ -1544,6 +1562,7 @@ def ingest_coco( num_workers (int): The number of workers to use for ingestion. Set to ``0`` by default. token (Optional[str]): The token to use for accessing the dataset and/or connecting it to Deep Lake. connect_kwargs (Optional[Dict]): If specified, the dataset will be connected to Deep Lake, and connect_kwargs will be passed to :meth:`Dataset.connect `. + indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. See :func:`deeplake.empty`. Returns: @@ -1582,7 +1601,12 @@ def ingest_coco( structure = unstructured.prepare_structure(inspect_limit) ds = deeplake.empty( - dest, creds=dest_creds, verbose=False, token=token, **dataset_kwargs + dest, + creds=dest_creds, + verbose=False, + token=token, + indra=indra, + **dataset_kwargs, ) if connect_kwargs is not None: connect_kwargs["token"] = token or connect_kwargs.get("token") @@ -1613,6 +1637,7 @@ def ingest_yolo( num_workers: int = 0, token: Optional[str] = None, connect_kwargs: Optional[Dict] = None, + indra: bool = USE_INDRA, **dataset_kwargs, ) -> Dataset: """Ingest images and annotations (bounding boxes or polygons) in YOLO format to a Deep Lake Dataset. The source data can be stored locally or in the cloud. @@ -1661,6 +1686,7 @@ def ingest_yolo( num_workers (int): The number of workers to use for ingestion. Set to ``0`` by default. token (Optional[str]): The token to use for accessing the dataset and/or connecting it to Deep Lake. connect_kwargs (Optional[Dict]): If specified, the dataset will be connected to Deep Lake, and connect_kwargs will be passed to :meth:`Dataset.connect `. + indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. See :func:`deeplake.empty`. Returns: @@ -1708,7 +1734,12 @@ def ingest_yolo( structure = unstructured.prepare_structure() ds = deeplake.empty( - dest, creds=dest_creds, verbose=False, token=token, **dataset_kwargs + dest, + creds=dest_creds, + verbose=False, + token=token, + indra=indra, + **dataset_kwargs, ) if connect_kwargs is not None: connect_kwargs["token"] = token or connect_kwargs.get("token") @@ -1738,6 +1769,7 @@ def ingest_classification( shuffle: bool = True, token: Optional[str] = None, connect_kwargs: Optional[Dict] = None, + indra: bool = USE_INDRA, **dataset_kwargs, ) -> Dataset: """Ingest a dataset of images from a local folder to a Deep Lake Dataset. Images should be stored in subfolders by class name. @@ -1758,6 +1790,7 @@ def ingest_classification( shuffle (bool): Shuffles the input data prior to ingestion. Since data arranged in folders by class is highly non-random, shuffling is important in order to produce optimal results when training. Defaults to ``True``. token (Optional[str]): The token to use for accessing the dataset. connect_kwargs (Optional[Dict]): If specified, the dataset will be connected to Deep Lake, and connect_kwargs will be passed to :meth:`Dataset.connect `. + indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function see :func:`deeplake.empty`. Returns: @@ -1839,6 +1872,7 @@ def ingest_classification( dest_creds=dest_creds, progressbar=progressbar, token=token, + indra=indra, **dataset_kwargs, ) return ds @@ -1861,7 +1895,12 @@ def ingest_classification( unstructured = ImageClassification(source=src) ds = deeplake.empty( - dest, creds=dest_creds, token=token, verbose=False, **dataset_kwargs + dest, + creds=dest_creds, + token=token, + verbose=False, + indra=indra, + **dataset_kwargs, ) if connect_kwargs is not None: connect_kwargs["token"] = token or connect_kwargs.get("token") @@ -1892,6 +1931,7 @@ def ingest_kaggle( progressbar: bool = True, summary: bool = True, shuffle: bool = True, + indra: bool = USE_INDRA, **dataset_kwargs, ) -> Dataset: """Download and ingest a kaggle dataset and store it as a structured dataset to destination. @@ -1911,6 +1951,7 @@ def ingest_kaggle( progressbar (bool): Enables or disables ingestion progress bar. Set to ``True`` by default. summary (bool): Generates ingestion summary. Set to ``True`` by default. shuffle (bool): Shuffles the input data prior to ingestion. Since data arranged in folders by class is highly non-random, shuffling is important in order to produce optimal results when training. Defaults to ``True``. + indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. See :func:`deeplake.dataset`. Returns: @@ -1956,6 +1997,7 @@ def ingest_kaggle( progressbar=progressbar, summary=summary, shuffle=shuffle, + indra=indra, **dataset_kwargs, ) @@ -1972,6 +2014,7 @@ def ingest_dataframe( progressbar: bool = True, token: Optional[str] = None, connect_kwargs: Optional[Dict] = None, + indra: bool = USE_INDRA, **dataset_kwargs, ): """Convert pandas dataframe to a Deep Lake Dataset. The contents of the dataframe can be parsed literally, or can be treated as links to local or cloud files. @@ -2021,6 +2064,7 @@ def ingest_dataframe( progressbar (bool): Enables or disables ingestion progress bar. Set to ``True`` by default. token (Optional[str]): The token to use for accessing the dataset. connect_kwargs (Optional[Dict]): A dictionary containing arguments to be passed to the dataset connect method. See :meth:`Dataset.connect`. + indra (bool): Flag indicating whether indra api should be used to create the dataset. Defaults to false **dataset_kwargs: Any arguments passed here will be forwarded to the dataset creator function. See :func:`deeplake.empty`. Returns: @@ -2045,15 +2089,30 @@ def ingest_dataframe( structured = DataFrame(src, column_params, src_creds, creds_key) dest = convert_pathlib_to_string_if_needed(dest) - ds = deeplake.empty( - dest, creds=dest_creds, token=token, verbose=False, **dataset_kwargs - ) + if indra: + from indra import api + + ds = api.dataset_writer( + dest, creds=dest_creds, token=token, **dataset_kwargs + ) + else: + ds = deeplake.empty( + dest, + creds=dest_creds, + token=token, + verbose=False, + **dataset_kwargs, + ) if connect_kwargs is not None: connect_kwargs["token"] = token or connect_kwargs.get("token") ds.connect(**connect_kwargs) structured.fill_dataset(ds, progressbar) # type: ignore + if indra: + ids = api.load_from_storage(ds.storage) + return IndraDatasetView(indra_ds=ids) + return ds # type: ignore @staticmethod diff --git a/deeplake/auto/tests/test_ingestion.py b/deeplake/auto/tests/test_ingestion.py index b8432325e8..ff0177ada2 100644 --- a/deeplake/auto/tests/test_ingestion.py +++ b/deeplake/auto/tests/test_ingestion.py @@ -231,7 +231,7 @@ def test_csv(memory_ds: Dataset, dataframe_ingestion_data: dict): assert ds[tensors_names[2]].htype == "text" assert ds[tensors_names[2]].dtype == str np.testing.assert_array_equal( - ds[tensors_names[2]].numpy().reshape(-1), df[df_keys[2]].values + np.array(ds[tensors_names[2]].numpy()).reshape(-1), df[df_keys[2]].values ) @@ -273,7 +273,7 @@ def test_dataframe_basic( assert ds[df_keys[2]].htype == "text" assert ds[df_keys[2]].dtype == str np.testing.assert_array_equal( - ds[df_keys[2]].numpy().reshape(-1), df[df_keys[2]].values + np.array(ds[df_keys[2]].numpy()).reshape(-1), df[df_keys[2]].values ) @@ -342,7 +342,7 @@ def test_dataframe_array(memory_ds: Dataset): ) np.testing.assert_array_equal( - ds[df_keys[2]][0:3].numpy().reshape(-1), df[df_keys[2]].values[0:3] + np.array(ds[df_keys[2]][0:3].numpy()).reshape(-1), df[df_keys[2]].values[0:3] ) assert ds[df_keys[2]].dtype == df[df_keys[2]].dtype diff --git a/deeplake/constants.py b/deeplake/constants.py index edb1056f8e..e7c1f34612 100644 --- a/deeplake/constants.py +++ b/deeplake/constants.py @@ -352,3 +352,5 @@ # Size of dataset view to expose as indra dataset wrapper. INDRA_DATASET_SAMPLES_THRESHOLD = 10000000 + +USE_INDRA = os.environ.get("DEEPLAKE_USE_INDRA", "false").strip().lower() == "true" diff --git a/deeplake/core/dataset/dataset.py b/deeplake/core/dataset/dataset.py index 6fca825095..29bf358e09 100644 --- a/deeplake/core/dataset/dataset.py +++ b/deeplake/core/dataset/dataset.py @@ -295,6 +295,7 @@ def __init__( self._initial_autoflush: List[bool] = ( [] ) # This is a stack to support nested with contexts + self._indexing_history: List[int] = [] if not self.read_only: @@ -3800,7 +3801,7 @@ def _save_view( ) from e else: raise ReadOnlyModeError( - "Cannot save view in read only dataset. Speicify a path to save the view in a different location." + "Cannot save view in read only dataset. Specify a path to save the view in a different location." ) else: vds = self._save_view_in_subdir( @@ -4604,6 +4605,40 @@ def visualize( def __contains__(self, tensor: str): return tensor in self.tensors + def _optimize_and_copy_view( + self, + info, + path: str, + tensors: Optional[List[str]] = None, + external=False, + unlink=True, + num_workers=0, + scheduler="threaded", + progressbar=True, + ): + tql_query = info.get("tql_query") + vds = self._sub_ds(".queries/" + path, verbose=False) + view = vds._get_view(not external) + new_path = path + "_OPTIMIZED" + if tql_query is not None: + view = view.query(tql_query) + view.indra_ds.materialize(new_path, tensors, True) + optimized = self._sub_ds(".queries/" + new_path, empty=False, verbose=False) + else: + optimized = self._sub_ds(".queries/" + new_path, empty=True, verbose=False) + view._copy( + optimized, + tensors=tensors, + overwrite=True, + unlink=unlink, + create_vds_index_tensor=True, + num_workers=num_workers, + scheduler=scheduler, + progressbar=progressbar, + ) + optimized.info.update(vds.info.__getstate__()) + return (vds, optimized, new_path) + def _optimize_saved_view( self, id: str, @@ -4630,32 +4665,24 @@ def _optimize_saved_view( # Already optimized return info path = info.get("path", info["id"]) - vds = self._sub_ds(".queries/" + path, verbose=False) - view = vds._get_view(not external) - new_path = path + "_OPTIMIZED" - optimized = self._sub_ds( - ".queries/" + new_path, empty=True, verbose=False - ) - view._copy( - optimized, + old, new, new_path = self._optimize_and_copy_view( + info, + path, tensors=tensors, - overwrite=True, unlink=unlink, - create_vds_index_tensor=True, num_workers=num_workers, scheduler=scheduler, progressbar=progressbar, ) - optimized.info.update(vds.info.__getstate__()) - optimized.info["virtual-datasource"] = False - optimized.info["path"] = new_path - optimized.flush() + new.info["virtual-datasource"] = False + new.info["path"] = new_path + new.flush() info["virtual-datasource"] = False info["path"] = new_path self._write_queries_json(qjson) - vds.base_storage.disable_readonly() + old.base_storage.disable_readonly() try: - vds.base_storage.clear() + old.base_storage.clear() except Exception as e: warnings.warn( f"Error while deleting old view after writing optimized version: {e}" diff --git a/deeplake/core/dataset/deeplake_query_dataset.py b/deeplake/core/dataset/indra_dataset_view.py similarity index 70% rename from deeplake/core/dataset/deeplake_query_dataset.py rename to deeplake/core/dataset/indra_dataset_view.py index 075f6b52f2..364d74d94c 100644 --- a/deeplake/core/dataset/deeplake_query_dataset.py +++ b/deeplake/core/dataset/indra_dataset_view.py @@ -30,54 +30,68 @@ import warnings -from deeplake.core.dataset.deeplake_query_tensor import DeepLakeQueryTensor +from deeplake.core.dataset.indra_tensor_view import IndraTensorView -class DeepLakeQueryDataset(Dataset): +class IndraDatasetView(Dataset): def __init__( self, - deeplake_ds, indra_ds, group_index="", enabled_tensors=None, - index: Optional[Index] = None, ): - if isinstance(deeplake_ds, DeepLakeQueryDataset): - deeplake_ds = deeplake_ds.deeplake_ds d: Dict[str, Any] = {} - d["deeplake_ds"] = deeplake_ds d["indra_ds"] = indra_ds - d["group_index"] = ( - group_index or deeplake_ds.group_index if deeplake_ds is not None else "" - ) - d["enabled_tensors"] = ( - enabled_tensors or deeplake_ds.enabled_tensors - if deeplake_ds is not None - else None - ) - d["version_state"] = ( - deeplake_ds.version_state if deeplake_ds is not None else {} - ) - d["_index"] = ( - index or deeplake_ds.index - if deeplake_ds is not None - else Index(item=slice(None)) - ) + d["group_index"] = "" + d["enabled_tensors"] = None + d["verbose"] = False self.__dict__.update(d) + self._view_base = None + self._view_entry = None + self._read_only = True + self._locked_out = False + self._query_string = None + try: + from deeplake.core.storage.indra import IndraProvider - @property - def read_only(self): - if self.deeplake_ds is not None: - return self.deeplake_ds.read_only - return True + self.storage = IndraProvider(indra_ds.storage) + self._read_only = self.storage.read_only + self._token = self.storage.token + except: + pass @property def meta(self): - return self.deeplake_ds.meta if self.deeplake_ds is not None else DatasetMeta() + return DatasetMeta() @property def path(self): - return self.deeplake_ds.path if self.deeplake_ds is not None else "" + try: + return self.storage.original_path + except: + return "" + + @property + def version_state(self) -> Dict: + try: + state = self.indra_ds.version_state + for k, v in state["full_tensors"].items(): + state["full_tensors"][k] = IndraTensorView(v) + return state + except: + return dict() + + @property + def branches(self): + return self.indra_ds.branches + + @property + def commits(self) -> List[Dict]: + return self.indra_ds.commits + + @property + def commit_id(self) -> str: + return self.indra_ds.commit_id @property def libdeeplake_dataset(self): @@ -89,23 +103,21 @@ def merge(self, *args, **kwargs): ) def checkout(self, address: str, create: bool = False): - raise InvalidOperationError( - "checkout", "checkout method cannot be called on a Dataset view." - ) + if create: + raise InvalidOperationError( + "checkout", "Cannot create new branch on Dataset View." + ) + self.indra_ds.checkout(address) + + def flush(self): + pass def _get_tensor_from_root(self, fullpath): tensors = self.indra_ds.tensors for tensor in tensors: if tensor.name == fullpath: - deeplake_tensor = None - try: - deeplake_tensor = self.deeplake_ds.__getattr__(fullpath) - except: - pass indra_tensor = tensor - return DeepLakeQueryTensor( - deeplake_tensor, indra_tensor, index=self.index - ) + return IndraTensorView(indra_tensor) def pytorch( self, @@ -151,18 +163,9 @@ def __getitem__( tensor = self._get_tensor_from_root(fullpath) if tensor is not None: return tensor - if self.deeplake_ds is not None and self.deeplake_ds._has_group_in_root( - fullpath - ): - ret = DeepLakeQueryDataset( - deeplake_ds=self.deeplake_ds, - indra_ds=self.indra_ds, - index=self.index, - group_index=posixpath.join(self.group_index, item), - ) elif "/" in item: splt = posixpath.split(item) - ret = self[splt[0]][splt[1]] + return self[splt[0]][splt[1]] else: raise TensorDoesNotExistError(item) elif isinstance(item, (int, slice, list, tuple, Index, type(Ellipsis))): @@ -185,11 +188,9 @@ def __getitem__( ) for x in item ] - ret = DeepLakeQueryDataset( - deeplake_ds=self.deeplake_ds, + return IndraDatasetView( indra_ds=self.indra_ds, enabled_tensors=enabled_tensors, - index=self.index, ) elif isinstance(item, tuple) and len(item) and isinstance(item[0], str): ret = self @@ -197,36 +198,20 @@ def __getitem__( ret = self[x] return ret else: - if not is_iteration and isinstance(item, int): - is_iteration = check_if_iteration(self._indexing_history, item) - if is_iteration and SHOW_ITERATION_WARNING: - warnings.warn( - "Indexing by integer in a for loop, like `for i in range(len(ds)): ... ds[i]` can be quite slow. Use `for i, sample in enumerate(ds)` instead." - ) - ret = DeepLakeQueryDataset( - deeplake_ds=self.deeplake_ds, + return IndraDatasetView( indra_ds=self.indra_ds[item], - index=self.index[item], ) else: raise InvalidKeyTypeError(item) - - if hasattr(self, "_view_entry"): - ret._view_entry = self._view_entry - return ret + raise AttributeError("Dataset has no attribute - {item}") def __getattr__(self, key): try: - return self.__getitem__(key) - except TensorDoesNotExistError as ke: - try: - return getattr(self.deeplake_ds, key) - except AttributeError: - raise AttributeError( - f"'{self.__class__}' object has no attribute '{key}'" - ) from ke + ret = self.__getitem__(key) except AttributeError: - return getattr(self.indra_ds, key) + ret = getattr(self.indra_ds, key) + ret._view_entry = self._view_entry + return ret def __len__(self): return len(self.indra_ds) @@ -314,9 +299,16 @@ def dataloader(self, ignore_errors: bool = False, verbose: bool = False): def no_view_dataset(self): return self + @property + def base_storage(self): + return self.storage + @property def index(self): - return self._index + try: + return Index(self.indra_ds.indexes) + except: + return Index(slice(0, len(self))) @property def sample_indices(self): @@ -330,39 +322,18 @@ def sample_indices(self): def _all_tensors_filtered( self, include_hidden: bool = True, include_disabled=True ) -> List[str]: - if self.deeplake_ds is not None: - return self.deeplake_ds._all_tensors_filtered( - include_hidden, include_disabled - ) - indra_tensors = self.indra_ds.tensors return list(t.name for t in indra_tensors) def _tensors( self, include_hidden: bool = True, include_disabled=True - ) -> Dict[str, Tensor]: + ) -> Dict[str, IndraTensorView]: """All tensors belonging to this group, including those within sub groups. Always returns the sliced tensors.""" - original_tensors = ( - self.deeplake_ds._tensors(include_hidden, include_disabled) - if self.deeplake_ds is not None - else {} - ) indra_tensors = self.indra_ds.tensors - indra_keys = set(t.name for t in indra_tensors) - original_tensors = { - k: v for k, v in original_tensors.items() if k in indra_keys or v.hidden - } - original_keys = set(original_tensors.keys()) + ret = {} for t in indra_tensors: - if t.name in original_keys: - original_tensors[t.name] = DeepLakeQueryTensor( - original_tensors[t.name], t, index=self.index - ) - else: - original_tensors[t.name] = DeepLakeQueryTensor( - None, t, index=self.index - ) - return original_tensors + ret[t.name] = IndraTensorView(t) + return ret def __str__(self): path_str = "" @@ -397,4 +368,4 @@ def random_split(self, lengths: Sequence[Union[int, float]]): lengths = calculate_absolute_lengths(lengths, len(self)) vs = self.indra_ds.random_split(lengths) - return [DeepLakeQueryDataset(self.deeplake_ds, v) for v in vs] + return [IndraDatasetView(indra_ds=v) for v in vs] diff --git a/deeplake/core/dataset/deeplake_query_tensor.py b/deeplake/core/dataset/indra_tensor_view.py similarity index 78% rename from deeplake/core/dataset/deeplake_query_tensor.py rename to deeplake/core/dataset/indra_tensor_view.py index 2ec182793d..63e748f1ff 100644 --- a/deeplake/core/dataset/deeplake_query_tensor.py +++ b/deeplake/core/dataset/indra_tensor_view.py @@ -11,38 +11,24 @@ import json -class DeepLakeQueryTensor(tensor.Tensor): +class IndraTensorView(tensor.Tensor): def __init__( self, - deeplake_tensor, indra_tensor, - index: Optional[Index] = None, is_iteration: bool = False, ): - self.deeplake_tensor = deeplake_tensor self.indra_tensor = indra_tensor self.is_iteration = is_iteration - self.key = ( - deeplake_tensor.key - if hasattr(deeplake_tensor, "key") - else indra_tensor.name - ) + self.key = indra_tensor.name self.first_dim = None - self._index = index or Index(self.indra_tensor.index) - def __getattr__(self, key): try: - return getattr(self.deeplake_tensor, key) + return getattr(self.indra_tensor, key) except AttributeError: - try: - return getattr(self.indra_tensor, key) - except AttributeError: - raise AttributeError( - f"'{self.__class__}' object has no attribute '{key}'" - ) + raise AttributeError(f"'{self.__class__}' object has no attribute '{key}'") def __getitem__( self, @@ -55,12 +41,8 @@ def __getitem__( if isinstance(item, tuple) or item is Ellipsis: item = replace_ellipsis_with_slices(item, self.ndim) - indra_tensor = self.indra_tensor[item] - - return DeepLakeQueryTensor( - self.deeplake_tensor, - indra_tensor, - index=self.index[item], + return IndraTensorView( + self.indra_tensor[item], is_iteration=is_iteration, ) @@ -72,6 +54,8 @@ def numpy( return r else: try: + if self.index.values[0].subscriptable(): + r = r[0] return np.array(r) except ValueError: raise DynamicTensorNumpyError(self.name, self.index, "shape") @@ -103,7 +87,7 @@ def htype(self): htype = self.indra_tensor.htype if self.indra_tensor.is_sequence: htype = f"sequence[{htype}]" - if self.deeplake_tensor.is_link: + if self.indra_tensor.is_link: htype = f"link[{htype}]" return htype @@ -162,9 +146,10 @@ def shape(self): @property def index(self): - if self._index is not None: - return self._index - return Index(self.indra_tensor.indexes) + try: + return Index(self.indra_tensor.indexes) + except: + return Index(slice(0, len(self))) @property def shape_interval(self): @@ -186,16 +171,14 @@ def ndim(self): @property def meta(self): """Metadata of the tensor.""" - if self.deeplake_tensor is None: - return TensorMeta( - htype=self.indra_tensor.htype, - dtype=self.indra_tensor.dtype, - sample_compression=self.indra_tensor.sample_compression, - chunk_compression=None, - is_sequence=self.indra_tensor.is_sequence, - is_link=False, - ) - return self.deeplake_tensor.chunk_engine.tensor_meta + return TensorMeta( + htype=self.indra_tensor.htype, + dtype=self.indra_tensor.dtype, + sample_compression=self.indra_tensor.sample_compression, + chunk_compression=None, + is_sequence=self.indra_tensor.is_sequence, + is_link=False, + ) @property def base_htype(self): diff --git a/deeplake/core/dataset/view_entry.py b/deeplake/core/dataset/view_entry.py index 4b65eb4c50..24b94e14ce 100644 --- a/deeplake/core/dataset/view_entry.py +++ b/deeplake/core/dataset/view_entry.py @@ -74,7 +74,7 @@ def load(self, verbose=True): if self.virtual: ds = ds._get_view(inherit_creds=not self._external) - if not self.tql_query is None: + if self.virtual and not self.tql_query is None: query_str = self.tql_query ds = ds.query(query_str) @@ -120,8 +120,6 @@ def optimize( Exception: When query view cannot be optimized. """ - if not self.tql_query is None: - raise Exception("Optimizing nonlinear query views is not supported") self.info = self._ds._optimize_saved_view( self.info["id"], tensors=tensors, diff --git a/deeplake/core/index/index.py b/deeplake/core/index/index.py index 688241a045..d39e3b6897 100644 --- a/deeplake/core/index/index.py +++ b/deeplake/core/index/index.py @@ -198,6 +198,10 @@ def __getitem__(self, item: IndexValue): def subscriptable(self): """Returns whether an IndexEntry can be further subscripted.""" + from indra import api # type: ignore + + if isinstance(self.value, api.core.IndexMappingInt64): + return self.value.subscriptable() return not isinstance(self.value, int) def indices(self, length: int): diff --git a/deeplake/core/storage/indra.py b/deeplake/core/storage/indra.py new file mode 100644 index 0000000000..c692fc880c --- /dev/null +++ b/deeplake/core/storage/indra.py @@ -0,0 +1,106 @@ +from deeplake.core.storage.provider import StorageProvider +from deeplake.core.partial_reader import PartialReader +from deeplake.core.storage.deeplake_memory_object import DeepLakeMemoryObject +from indra.api import storage # type: ignore +from typing import Optional, Union, Dict + + +class IndraProvider(StorageProvider): + """Provider class for using Indra storage provider.""" + + def __init__( + self, + root: Union[str, storage.provider], + read_only: Optional[bool] = False, + **kwargs, + ): + if isinstance(root, str): + self.core = storage.create(root, read_only, **kwargs) + else: + self.core = root + self.root = self.path + + @property + def path(self): + return self.core.path + + @property + def original_path(self): + return self.core.original_path + + @property + def token(self): + return self.core.token + + def copy(self): + return IndraProvider(self.core) + + def subdir(self, path: str, read_only: bool = False): + return IndraProvider(self.core.subdir(path, read_only)) + + def __setitem__(self, path, content): + self.check_readonly() + self.core.set(path, bytes(content)) + + def __getitem__(self, path): + return bytes(self.core.get(path)) + + def get_bytes( + self, path, start_byte: Optional[int] = None, end_byte: Optional[int] = None + ): + s = start_byte or 0 + e = end_byte or 0 + try: + return bytes(self.core.get(path, s, e)) + except RuntimeError as e: + raise KeyError(path) + + def get_deeplake_object( + self, + path: str, + expected_class, + meta: Optional[Dict] = None, + url=False, + partial_bytes: int = 0, + ): + if partial_bytes != 0: + buff = self.get_bytes(path, 0, partial_bytes) + obj = expected_class.frombuffer(buff, meta, partial=True) + obj.data_bytes = PartialReader(self, path, header_offset=obj.header_bytes) + return obj + + item = self[path] + if isinstance(item, DeepLakeMemoryObject): + if type(item) != expected_class: + raise ValueError( + f"'{path}' was expected to have the class '{expected_class.__name__}'. Instead, got: '{type(item)}'." + ) + return item + + if isinstance(item, (bytes, memoryview)): + obj = ( + expected_class.frombuffer(item) + if meta is None + else expected_class.frombuffer(item, meta) + ) + return obj + + raise ValueError(f"Item at '{path}' got an invalid type: '{type(item)}'.") + + def get_object_size(self, path: str) -> int: + return self.core.length(path) + + def __delitem__(self, path): + return self.core.remove(path) + + def _all_keys(self): + return self.core.list("") + + def __len__(self): + return len(self.core.list("")) + + def __iter__(self): + return iter(self.core.list("")) + + def clear(self, prefix=""): + self.core.clear(prefix) diff --git a/deeplake/core/tests/test_deeplake_indra_dataset.py b/deeplake/core/tests/test_indra_dataset.py similarity index 93% rename from deeplake/core/tests/test_deeplake_indra_dataset.py rename to deeplake/core/tests/test_indra_dataset.py index 916eeff97b..2da1204be1 100644 --- a/deeplake/core/tests/test_deeplake_indra_dataset.py +++ b/deeplake/core/tests/test_indra_dataset.py @@ -6,7 +6,7 @@ EmptyTokenException, ) -from deeplake.core.dataset.deeplake_query_dataset import DeepLakeQueryDataset +from deeplake.core.dataset.indra_dataset_view import IndraDatasetView import random import math import pytest @@ -23,7 +23,7 @@ def test_indexing(local_auth_ds_generator): deeplake_ds.label.append(int(100 * random.uniform(0.0, 1.0))) indra_ds = dataset_to_libdeeplake(deeplake_ds) - deeplake_indra_ds = DeepLakeQueryDataset(deeplake_ds=deeplake_ds, indra_ds=indra_ds) + deeplake_indra_ds = IndraDatasetView(indra_ds=indra_ds) assert len(deeplake_indra_ds) == len(indra_ds) @@ -70,7 +70,7 @@ def test_save_view(local_auth_ds_generator): deeplake_ds.commit("First") indra_ds = dataset_to_libdeeplake(deeplake_ds) - deeplake_indra_ds = DeepLakeQueryDataset(deeplake_ds=deeplake_ds, indra_ds=indra_ds) + deeplake_indra_ds = IndraDatasetView(indra_ds=indra_ds) deeplake_indra_ds.save_view() assert ( deeplake_indra_ds.base_storage["queries.json"] @@ -108,7 +108,7 @@ def test_load_view(local_auth_ds_generator): deeplake_ds.commit("First") indra_ds = dataset_to_libdeeplake(deeplake_ds) - deeplake_indra_ds = DeepLakeQueryDataset(deeplake_ds=deeplake_ds, indra_ds=indra_ds) + deeplake_indra_ds = IndraDatasetView(indra_ds=indra_ds) with pytest.raises(Exception): dataloader = deeplake_indra_ds.pytorch() @@ -116,7 +116,7 @@ def test_load_view(local_auth_ds_generator): query_str = "select * group by label" view = deeplake_ds.query(query_str) view_path = view.save_view() - view_id = view_path.split("/")[-1] + view_id = view_path.split("/")[-2] view = deeplake_ds.load_view(view_id) dataloader = view[:3].dataloader().pytorch() @@ -130,7 +130,7 @@ def test_load_view(local_auth_ds_generator): view = deeplake_ds[0:50].query(query_str) view_path = view.save_view() - view_id = view_path.split("/")[-1] + view_id = view_path.split("/")[-2] view = deeplake_ds.load_view(view_id) dataloader = view[:3].dataloader().pytorch() @@ -142,6 +142,19 @@ def test_load_view(local_auth_ds_generator): assert iss == [0, 1, 2] assert np.all(indra_ds.image.numpy() == deeplake_indra_ds.image.numpy()) + query_str = "select label where label > 0" + view = deeplake_ds.query(query_str) + view_path = view.save_view() + view_id = view_path.split("/")[-2] + view = deeplake_ds.load_view(view_id, optimize=True) + + dataloader = view.dataloader().pytorch() + count = 0 + for i, batch in enumerate(dataloader): + assert batch["label"][0] > 0 + count += 1 + assert count == 90 + @requires_libdeeplake def test_query(local_auth_ds_generator): @@ -158,7 +171,7 @@ def test_query(local_auth_ds_generator): deeplake_ds.image.append(np.random.randint(0, 255, (100, 200, 3), np.uint8)) indra_ds = dataset_to_libdeeplake(deeplake_ds) - deeplake_indra_ds = DeepLakeQueryDataset(deeplake_ds=deeplake_ds, indra_ds=indra_ds) + deeplake_indra_ds = IndraDatasetView(indra_ds=indra_ds) view = deeplake_indra_ds.query("SELECT * GROUP BY label") assert len(view) == 10 @@ -193,7 +206,7 @@ def test_metadata(local_auth_ds_generator): ) indra_ds = dataset_to_libdeeplake(deeplake_ds) - deeplake_indra_ds = DeepLakeQueryDataset(deeplake_ds=deeplake_ds, indra_ds=indra_ds) + deeplake_indra_ds = IndraDatasetView(indra_ds=indra_ds) assert deeplake_indra_ds.label.htype == "generic" assert deeplake_indra_ds.label.dtype == np.int32 assert deeplake_indra_ds.label.sample_compression == None @@ -219,7 +232,7 @@ def test_accessing_data(local_auth_ds_generator): deeplake_ds.label.append(int(100 * random.uniform(0.0, 1.0))) indra_ds = dataset_to_libdeeplake(deeplake_ds) - deeplake_indra_ds = DeepLakeQueryDataset(deeplake_ds=deeplake_ds, indra_ds=indra_ds) + deeplake_indra_ds = IndraDatasetView(indra_ds=indra_ds) assert np.all( np.isclose(deeplake_indra_ds.label.numpy(), deeplake_indra_ds["label"].numpy()) @@ -244,11 +257,11 @@ def test_sequences_accessing_data(local_auth_ds_generator): assert len(deeplake_indra_ds) == 2 assert deeplake_indra_ds.image.shape == (2, None, None, 10, 3) assert deeplake_indra_ds[0].image.shape == (101, 10, 10, 3) - assert deeplake_indra_ds[0, 0].image.shape == (10, 10, 3) + assert deeplake_indra_ds[0, 0].image.shape == (1, 10, 10, 3) assert len(deeplake_indra_ds[0].image.numpy()) == 101 assert deeplake_indra_ds[1].image.shape == (99, None, 10, 3) - assert deeplake_indra_ds[1, 0].image.shape == (10, 10, 3) - assert deeplake_indra_ds[1, 98].image.shape == (20, 10, 3) + assert deeplake_indra_ds[1, 0].image.shape == (1, 10, 10, 3) + assert deeplake_indra_ds[1, 98].image.shape == (1, 20, 10, 3) assert len(deeplake_indra_ds[1].image.numpy()) == 99 assert deeplake_indra_ds[1].image.numpy()[0].shape == (10, 10, 3) assert deeplake_indra_ds[1].image.numpy()[98].shape == (20, 10, 3) diff --git a/deeplake/core/tests/test_query.py b/deeplake/core/tests/test_query.py index 854cd92a22..900cd5f305 100644 --- a/deeplake/core/tests/test_query.py +++ b/deeplake/core/tests/test_query.py @@ -1,6 +1,6 @@ import deeplake from deeplake.tests.common import requires_libdeeplake -from deeplake.core.dataset.deeplake_query_dataset import DeepLakeQueryDataset +from deeplake.core.dataset.indra_dataset_view import IndraDatasetView from deeplake.client.client import DeepLakeBackendClient import pytest import numpy as np diff --git a/deeplake/core/vectorstore/deeplake_vectorstore.py b/deeplake/core/vectorstore/deeplake_vectorstore.py index e7a6edc5d5..0378ca3d70 100644 --- a/deeplake/core/vectorstore/deeplake_vectorstore.py +++ b/deeplake/core/vectorstore/deeplake_vectorstore.py @@ -12,6 +12,7 @@ DEFAULT_VECTORSTORE_TENSORS, MAX_BYTES_PER_MINUTE, TARGET_BYTE_SIZE, + USE_INDRA, ) from deeplake.util.bugout_reporter import feature_report_path from deeplake.util.exceptions import DeepMemoryAccessError @@ -41,6 +42,7 @@ def __init__( org_id: Optional[str] = None, logger: logging.Logger = logger, branch: str = "main", + indra: bool = USE_INDRA, **kwargs: Any, ) -> None: """Creates an empty VectorStore or loads an existing one if it exists at the specified ``path``. @@ -103,6 +105,7 @@ def __init__( - If 'ENV' is passed, credentials are fetched from the environment variables. This is also the case when creds is not passed for cloud datasets. For datasets connected to hub cloud, specifying 'ENV' will override the credentials fetched from Activeloop and use local ones. runtime (Dict, optional): Parameters for creating the Vector Store in Deep Lake's Managed Tensor Database. Not applicable when loading an existing Vector Store. To create a Vector Store in the Managed Tensor Database, set `runtime = {"tensor_db": True}`. branch (str): Branch name to use for the Vector Store. Defaults to "main". + indra (bool): Flag indicating whether indra api should be used to create the underlying dataset. Defaults to false **kwargs (dict): Additional keyword arguments. .. diff --git a/deeplake/core/vectorstore/vector_search/indra/search_algorithm.py b/deeplake/core/vectorstore/vector_search/indra/search_algorithm.py index 0b3ad24bea..e36e411d14 100644 --- a/deeplake/core/vectorstore/vector_search/indra/search_algorithm.py +++ b/deeplake/core/vectorstore/vector_search/indra/search_algorithm.py @@ -5,7 +5,7 @@ from deeplake.core.vectorstore.vector_search.indra import query from deeplake.core.vectorstore.vector_search import utils from deeplake.core.dataset import Dataset as DeepLakeDataset -from deeplake.core.dataset.deeplake_query_dataset import DeepLakeQueryDataset +from deeplake.core.dataset.indra_dataset_view import IndraDatasetView class SearchBasic(ABC): @@ -105,9 +105,7 @@ class SearchIndra(SearchBasic): def _get_view(self, tql_query, runtime: Optional[Dict] = None): indra_dataset = self._get_indra_dataset() indra_view = indra_dataset.query(tql_query) - view = DeepLakeQueryDataset( - deeplake_ds=self.deeplake_dataset, indra_ds=indra_view - ) + view = IndraDatasetView(indra_ds=indra_view) view._tql_query = tql_query return view diff --git a/deeplake/enterprise/convert_to_libdeeplake.py b/deeplake/enterprise/convert_to_libdeeplake.py index 93a35ab9f0..be062dc458 100644 --- a/deeplake/enterprise/convert_to_libdeeplake.py +++ b/deeplake/enterprise/convert_to_libdeeplake.py @@ -4,6 +4,7 @@ from deeplake.core.storage.gcs import GCSProvider from deeplake.enterprise.util import raise_indra_installation_error # type: ignore from deeplake.core.storage import S3Provider +from deeplake.core.storage.indra import IndraProvider from deeplake.core.storage.azure import AzureProvider from deeplake.util.remove_cache import get_base_storage from deeplake.util.exceptions import EmptyTokenException @@ -45,6 +46,11 @@ def import_indra_api(): INDRA_INSTALLED = bool(importlib.util.find_spec("indra")) +def _get_indra_ds_from_native_provider(provider: IndraProvider): + api = import_indra_api() + return api.dataset(provider.core) + + def _get_indra_ds_from_azure_provider( path: str, token: str, @@ -59,15 +65,16 @@ def _get_indra_ds_from_azure_provider( sas_token = provider.get_sas_token() expiration = str(provider.expiration) if provider.expiration else None - return api.dataset( + storage = IndraProvider( path, - origin_path=provider.root, + read_only=provider.read_only, token=token, account_name=account_name, account_key=account_key, sas_token=sas_token, expiration=expiration, ) + return _get_indra_ds_from_native_provider(storage) def _get_indra_ds_from_gcp_provider( @@ -88,10 +95,11 @@ def _get_indra_ds_from_gcp_provider( scheme = creds.get("scheme", "") retry_limit_seconds = creds.get("retry_limit_seconds", "") - return api.dataset( + storage = IndraProvider( path, - origin_path=provider.root, + read_only=provider.read_only, token=token, + origin_path=provider.root, anon=anon, expiration=expiration, access_token=access_token, @@ -100,6 +108,7 @@ def _get_indra_ds_from_gcp_provider( scheme=scheme, retry_limit_seconds=retry_limit_seconds, ) + return _get_indra_ds_from_native_provider(storage) def _get_indra_ds_from_s3_provider( @@ -115,10 +124,11 @@ def _get_indra_ds_from_s3_provider( creds_used = provider.creds_used if creds_used == "PLATFORM": provider._check_update_creds() - return api.dataset( + storage = IndraProvider( path, - origin_path=provider.root, + read_only=provider.read_only, token=token, + origin_path=provider.root, aws_access_key_id=provider.aws_access_key_id, aws_secret_access_key=provider.aws_secret_access_key, aws_session_token=provider.aws_session_token, @@ -126,30 +136,36 @@ def _get_indra_ds_from_s3_provider( endpoint_url=provider.endpoint_url, expiration=str(provider.expiration), ) + return _get_indra_ds_from_native_provider(storage) elif creds_used == "ENV": - return api.dataset( + storage = IndraProvider( path, - origin_path=provider.root, + read_only=provider.read_only, token=token, + origin_path=provider.root, profile_name=provider.profile_name, ) + return _get_indra_ds_from_native_provider(storage) elif creds_used == "DICT": - return api.dataset( + storage = IndraProvider( path, - origin_path=provider.root, + read_only=provider.read_only, token=token, + origin_path=provider.root, aws_access_key_id=provider.aws_access_key_id, aws_secret_access_key=provider.aws_secret_access_key, aws_session_token=provider.aws_session_token, region_name=provider.aws_region, endpoint_url=provider.endpoint_url, ) + return _get_indra_ds_from_native_provider(storage) def dataset_to_libdeeplake(hub2_dataset: Dataset): """Convert a hub 2.x dataset object to a libdeeplake dataset object.""" try_flushing(hub2_dataset) api = import_indra_api() + path: str = hub2_dataset.path token = ( @@ -160,7 +176,11 @@ def dataset_to_libdeeplake(hub2_dataset: Dataset): ) if token is None or token == "": raise EmptyTokenException - if hub2_dataset.libdeeplake_dataset is None: + if hub2_dataset.libdeeplake_dataset is not None: + libdeeplake_dataset = hub2_dataset.libdeeplake_dataset + elif isinstance(hub2_dataset.storage.next_storage, IndraProvider): + libdeeplake_dataset = api.dataset(hub2_dataset.storage.next_storage.core) + else: libdeeplake_dataset = None if path.startswith("gdrive://"): raise ValueError("Gdrive datasets are not supported for libdeeplake") @@ -182,6 +202,10 @@ def dataset_to_libdeeplake(hub2_dataset: Dataset): libdeeplake_dataset = _get_indra_ds_from_azure_provider( path=path, token=token, provider=provider ) + elif isinstance(provider, IndraProvider): + libdeeplake_dataset = _get_indra_ds_from_native_provider( + provider=provider + ) else: raise ValueError("Unknown storage provider for hub:// dataset") @@ -208,16 +232,18 @@ def dataset_to_libdeeplake(hub2_dataset: Dataset): org_id = ( org_id or jwt.decode(token, options={"verify_signature": False})["id"] ) - libdeeplake_dataset = api.dataset(path, token=token, org_id=org_id) + storage = IndraProvider( + path, read_only=hub2_dataset.read_only, token=token, org_id=org_id + ) + libdeeplake_dataset = api.dataset(storage.core) hub2_dataset.libdeeplake_dataset = libdeeplake_dataset - else: - libdeeplake_dataset = hub2_dataset.libdeeplake_dataset assert libdeeplake_dataset is not None - libdeeplake_dataset._max_cache_size = max( - hub2_dataset.storage.cache_size, libdeeplake_dataset._max_cache_size - ) + if hasattr(hub2_dataset.storage, "cache_size"): + libdeeplake_dataset._max_cache_size = max( + hub2_dataset.storage.cache_size, libdeeplake_dataset._max_cache_size + ) commit_id = hub2_dataset.pending_commit_id libdeeplake_dataset.checkout(commit_id) slice_ = hub2_dataset.index.values[0].value diff --git a/deeplake/enterprise/dataloader.py b/deeplake/enterprise/dataloader.py index 0a1f1f0791..a62844b79f 100644 --- a/deeplake/enterprise/dataloader.py +++ b/deeplake/enterprise/dataloader.py @@ -105,7 +105,6 @@ def __init__( _return_index=None, _primary_tensor_name=None, _buffer_size=None, - _orig_dataset=None, _decode_method=None, _persistent_workers=None, _dataloader=None, @@ -117,7 +116,6 @@ def __init__( ): import_indra_loader() self.dataset = dataset - self._orig_dataset = _orig_dataset or dataset self._batch_size = _batch_size self._shuffle = _shuffle self._num_threads = _num_threads @@ -279,9 +277,9 @@ def collate_fn(self): def __len__(self): len_ds = ( - len(self._orig_dataset[self._tensors]) + len(self.dataset[self._tensors]) if self._tensors is not None - else len(self._orig_dataset) + else len(self.dataset) ) round_fn = math.floor if self._drop_last else math.ceil return round_fn(len_ds / ((self.batch_size) * self._world_size)) @@ -805,7 +803,7 @@ def __get_indra_dataloader( def __iter__(self): if self._dataloader is None: - dataset = self._orig_dataset + dataset = self.dataset tensors = self._tensors or map_tensor_keys(dataset, None) jpeg_png_compressed_tensors, json_tensors, list_tensors = check_tensors( @@ -857,7 +855,7 @@ def __iter__(self): tensor_info_dict=tensor_info_dict, ) - dataset_read(self._orig_dataset) + dataset_read(self.dataset) if self._iterator is not None: self._iterator = iter(self._dataloader) diff --git a/deeplake/enterprise/libdeeplake_query.py b/deeplake/enterprise/libdeeplake_query.py index 42c8c05b1f..3efcb50218 100644 --- a/deeplake/enterprise/libdeeplake_query.py +++ b/deeplake/enterprise/libdeeplake_query.py @@ -1,5 +1,5 @@ from deeplake.enterprise.convert_to_libdeeplake import dataset_to_libdeeplake -from deeplake.core.dataset.deeplake_query_dataset import DeepLakeQueryDataset +from deeplake.core.dataset.indra_dataset_view import IndraDatasetView from typing import Optional, Union from deeplake.constants import INDRA_DATASET_SAMPLES_THRESHOLD @@ -35,7 +35,7 @@ def query(dataset, query_string: str): >>> ds_train = deeplake.load('hub://activeloop/coco-train') >>> query_ds_train = query(ds_train, "(select * where contains(categories, 'car') limit 1000) union (select * where contains(categories, 'motorcycle') limit 1000)") """ - if isinstance(dataset, DeepLakeQueryDataset): + if isinstance(dataset, IndraDatasetView): ds = dataset.indra_ds elif dataset.libdeeplake_dataset is not None: ds = dataset.libdeeplake_dataset @@ -49,11 +49,11 @@ def query(dataset, query_string: str): dsv = ds.query(query_string) from deeplake.enterprise.convert_to_libdeeplake import INDRA_API - if not isinstance(dataset, DeepLakeQueryDataset) and INDRA_API.tql.parse(query_string).is_filter and len(dsv.indexes) < INDRA_DATASET_SAMPLES_THRESHOLD: # type: ignore + if not isinstance(dataset, IndraDatasetView) and INDRA_API.tql.parse(query_string).is_filter and len(dsv.indexes) < INDRA_DATASET_SAMPLES_THRESHOLD: # type: ignore indexes = list(dsv.indexes) return dataset.no_view_dataset[indexes] else: - view = DeepLakeQueryDataset(deeplake_ds=dataset, indra_ds=dsv) + view = IndraDatasetView(indra_ds=dsv) view._tql_query = query_string if hasattr(dataset, "is_actually_cloud"): view.is_actually_cloud = dataset.is_actually_cloud @@ -158,6 +158,6 @@ def universal_query(query_string: str, token: Optional[str]): api = import_indra_api() dsv = api.tql.query(query_string, token) - view = DeepLakeQueryDataset(deeplake_ds=None, indra_ds=dsv) + view = IndraDatasetView(indra_ds=dsv) view._tql_query = query_string return view diff --git a/deeplake/util/storage.py b/deeplake/util/storage.py index 830290231b..6c0acdd505 100644 --- a/deeplake/util/storage.py +++ b/deeplake/util/storage.py @@ -28,6 +28,7 @@ def storage_provider_from_path( token: Optional[str] = None, is_hub_path: bool = False, db_engine: bool = False, + indra: bool = False, ): """Construct a StorageProvider given a path. @@ -39,6 +40,7 @@ def storage_provider_from_path( token (str): token for authentication into activeloop. is_hub_path (bool): Whether the path points to a Deep Lake dataset. db_engine (bool): Whether to use Activeloop DB Engine. Only applicable for hub:// paths. + indra (bool): If true creates indra storage provider. Returns: If given a path starting with s3:// returns the S3Provider. @@ -53,8 +55,15 @@ def storage_provider_from_path( """ if creds is None: creds = {} - if path.startswith("hub://"): - storage: StorageProvider = storage_provider_from_hub_path( + + if indra: + from deeplake.core.storage.indra import IndraProvider + + storage: StorageProvider = IndraProvider( + path, read_only=read_only, token=token, creds=creds + ) + elif path.startswith("hub://"): + storage = storage_provider_from_hub_path( path, read_only, db_engine=db_engine, token=token, creds=creds ) else: @@ -125,7 +134,7 @@ def get_dataset_credentials( mode: Optional[str], db_engine: bool, ): - # this will give the proper url (s3, gcs, etc) and corresponding creds, depending on where the dataset is stored. + # this will give the proper url(s3, gcs, etc) and corresponding creds, depending on where the dataset is stored. try: url, final_creds, mode, expiration, repo = client.get_dataset_credentials( org_id, ds_name, mode=mode, db_engine={"enabled": db_engine} @@ -144,6 +153,7 @@ def storage_provider_from_hub_path( db_engine: bool = False, token: Optional[str] = None, creds: Optional[Union[dict, str]] = None, + indra: bool = False, ): path, org_id, ds_name, subdir = process_hub_path(path) client = DeepLakeBackendClient(token=token) @@ -187,7 +197,12 @@ def storage_provider_from_hub_path( print(msg) storage = storage_provider_from_path( - path=url, creds=final_creds, read_only=read_only, is_hub_path=True, token=token + path=url, + creds=final_creds, + read_only=read_only, + is_hub_path=True, + token=token, + indra=indra, ) storage.creds_used = creds_used if creds_used == "PLATFORM": @@ -204,6 +219,7 @@ def get_storage_and_cache_chain( memory_cache_size, local_cache_size, db_engine=False, + indra=False, ): """ Returns storage provider and cache chain for a given path, according to arguments passed. @@ -217,6 +233,7 @@ def get_storage_and_cache_chain( memory_cache_size (int): The size of the in-memory cache to use. local_cache_size (int): The size of the local cache to use. db_engine (bool): Whether to use Activeloop DB Engine, only applicable for hub:// paths. + indra (bool): If true creates indra storage provider. Returns: A tuple of the storage provider and the storage chain. @@ -228,6 +245,7 @@ def get_storage_and_cache_chain( creds=creds, read_only=read_only, token=token, + indra=indra, ) memory_cache_size_bytes = memory_cache_size * MB local_cache_size_bytes = local_cache_size * MB diff --git a/setup.py b/setup.py index ea69cef28e..1b2e06755f 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ def libdeeplake_available(): extras_require["all"] = [req_map[r] for r in all_extras] if libdeeplake_available(): - libdeeplake = "libdeeplake==0.0.109" + libdeeplake = "libdeeplake==0.0.118" extras_require["enterprise"] = [libdeeplake, "pyjwt"] extras_require["all"].append(libdeeplake) install_requires.append(libdeeplake)