Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: BI-6025 revisionId in us_manager #789

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions lib/dl_api_lib/dl_api_lib/app/control_api/resources/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging
from typing import (
TYPE_CHECKING,
Any,
NoReturn,
Optional,
)

from flask import request
Expand Down Expand Up @@ -37,7 +39,10 @@
DatabaseUnavailable,
USPermissionRequired,
)
from dl_core.us_connection_base import ConnectionBase
from dl_core.us_connection_base import (
ConnectionBase,
DataSourceTemplate,
)


if TYPE_CHECKING:
Expand Down Expand Up @@ -205,28 +210,28 @@ def put(self, connection_id): # type: ignore # TODO: fix
us_manager.save(conn)


def _dump_source_templates(tpls) -> dict: # type: ignore # TODO: fix
def _dump_source_templates(tpls: Optional[list[DataSourceTemplate]]) -> Optional[list[dict[str, Any]]]:
if tpls is None:
return None # type: ignore # TODO: fix
return [dict(tpl._asdict(), parameter_hash=tpl.get_param_hash()) for tpl in tpls] # type: ignore # TODO: fix
return None
return [dict(tpl._asdict(), parameter_hash=tpl.get_param_hash()) for tpl in tpls]


@ns.route("/<connection_id>/info/metadata_sources")
class ConnectionInfoMetadataSources(BIResource):
@schematic_request(ns=ns, responses={200: ("Success", ConnectionSourceTemplatesResponseSchema())})
def get(self, connection_id): # type: ignore # TODO: fix
connection = self.get_us_manager().get_by_id(connection_id, expected_type=ConnectionBase)
def get(self, connection_id: str) -> dict[str, Optional[list[dict[str, Any]]]]:
connection: ConnectionBase = self.get_us_manager().get_by_id(connection_id, expected_type=ConnectionBase)

localizer = self.get_service_registry().get_localizer()
source_template_templates = connection.get_data_source_template_templates(localizer=localizer) # type: ignore # 2024-01-24 # TODO: "USEntry" has no attribute "get_data_source_template_templates" [attr-defined]
source_template_templates = connection.get_data_source_template_templates(localizer=localizer)

source_templates = []
source_templates: Optional[list[DataSourceTemplate]] = []
try:
need_permission_on_entry(connection, USPermissionKind.read)
except USPermissionRequired:
pass
else:
source_templates = connection.get_data_source_local_templates() # type: ignore # 2024-01-24 # TODO: "USEntry" has no attribute "get_data_source_local_templates" [attr-defined]
source_templates = connection.get_data_source_local_templates()

return {
"sources": _dump_source_templates(source_templates),
Expand Down
22 changes: 16 additions & 6 deletions lib/dl_core/dl_core/united_storage_client_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,18 @@ async def _request(self, request_data: UStorageClientBase.RequestData) -> dict:
)
return self._get_us_json_from_response(response_adapter)

async def get_entry(self, entry_id: str) -> dict:
return await self._request(self._req_data_get_entry(entry_id=entry_id))
async def get_entry(
self,
entry_id: str,
params: Optional[dict[str, str]] = None,
include_permissions: bool = True,
include_links: bool = True,
) -> dict:
return await self._request(
self._req_data_get_entry(
entry_id=entry_id, params=params, include_permissions=include_permissions, include_links=include_links
)
)

async def create_entry(
self,
Expand Down Expand Up @@ -304,10 +314,10 @@ async def entries_iterator(
done = True

# 3. Yield results
for entr in page_entries:
if entr["entryId"] not in previous_page_entry_ids:
unseen_entry_ids.add(entr["entryId"])
yield entr
for entry in page_entries:
if entry["entryId"] not in previous_page_entry_ids:
unseen_entry_ids.add(entry["entryId"])
yield entry

# 4. Stop if got no nextPageToken or unseen entries
previous_page_entry_ids = unseen_entry_ids.copy()
Expand Down
16 changes: 13 additions & 3 deletions lib/dl_core/dl_core/us_manager/schema_migration/factory.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from dl_core.us_connection import get_schema_migration_cls
from dl_core.us_manager.schema_migration.base import BaseEntrySchemaMigration
from dl_core.us_manager.schema_migration.dataset import DatasetSchemaMigration
from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase


if TYPE_CHECKING:
from dl_core.services_registry import ServicesRegistry


class DummyEntrySchemaMigrationFactory(EntrySchemaMigrationFactoryBase):
def get_schema_migration(
self,
entry_scope: str,
entry_type: str,
service_registry: ServicesRegistry | None = None,
) -> BaseEntrySchemaMigration:
return BaseEntrySchemaMigration()

Expand All @@ -18,11 +27,12 @@ def get_schema_migration(
self,
entry_scope: str,
entry_type: str,
service_registry: ServicesRegistry | None = None,
) -> BaseEntrySchemaMigration:
if entry_scope == "dataset":
return DatasetSchemaMigration()
return DatasetSchemaMigration(services_registry=service_registry)
elif entry_scope == "connection":
schema_migration_cls = get_schema_migration_cls(conn_type_name=entry_type)
return schema_migration_cls()
return schema_migration_cls(services_registry=service_registry)
else:
return BaseEntrySchemaMigration()
return BaseEntrySchemaMigration(services_registry=service_registry)
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from __future__ import annotations

import abc
from typing import TYPE_CHECKING

from dl_core.us_manager.schema_migration.base import BaseEntrySchemaMigration


if TYPE_CHECKING:
from dl_core.services_registry import ServicesRegistry


class EntrySchemaMigrationFactoryBase(abc.ABC):
@abc.abstractmethod
def get_schema_migration(
self,
entry_scope: str,
entry_type: str,
service_registry: ServicesRegistry | None = None,
) -> BaseEntrySchemaMigration:
pass
10 changes: 9 additions & 1 deletion lib/dl_core/dl_core/us_manager/us_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
)
from dl_core.us_manager.crypto.main import CryptoController
from dl_core.us_manager.local_cache import USEntryBuffer
from dl_core.us_manager.schema_migration.base import BaseEntrySchemaMigration
from dl_core.us_manager.schema_migration.factory import DefaultEntrySchemaMigrationFactory
from dl_core.us_manager.schema_migration.factory_base import EntrySchemaMigrationFactoryBase
from dl_core.us_manager.storage_schemas.connection_schema_registry import MAP_TYPE_TO_SCHEMA_MAP_TYPE_TO_SCHEMA
Expand Down Expand Up @@ -172,6 +173,13 @@ def get_lifecycle_manager(
entry=entry, us_manager=self, service_registry=service_registry
)

def get_schema_migration(
self, entry_scope: str, entry_type: str, service_registry: Optional[ServicesRegistry] = None
) -> BaseEntrySchemaMigration:
if service_registry is None:
service_registry = self.get_services_registry()
return self._schema_migration_factory.get_schema_migration(entry_scope, entry_type, service_registry)

# TODO FIX: Prevent saving entries with tenant ID that doesn't match current tenant ID
def set_tenant_override(self, tenant: TenantDef) -> None:
if not self._us_auth_context.is_tenant_id_mutable():
Expand Down Expand Up @@ -523,7 +531,7 @@ def get_loaded_us_connection(self, identity: Union[str, ConnectionRef]) -> Conne

return entry

def _get_entry_links(self, entry: USEntry) -> Set[ConnectionRef]:
def _get_entry_links(self, entry: Optional[USEntry]) -> Set[ConnectionRef]:
if isinstance(entry, Dataset):
lifecycle_manager = self.get_lifecycle_manager(entry=entry)
linked_entries_refs: Set[ConnectionRef] = {
Expand Down
43 changes: 37 additions & 6 deletions lib/dl_core/dl_core/us_manager/us_manager_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,43 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
LOGGER.warning("Error during closing AsyncUSManager", exc_info=True)

@overload
async def get_by_id(self, entry_id: str, expected_type: type(None) = None) -> USEntry: # type: ignore # TODO: fix
async def get_by_id(
self,
entry_id: str,
expected_type: None = None,
params: Optional[dict[str, str]] = None,
) -> USEntry:
pass

@overload # noqa
async def get_by_id(self, entry_id: str, expected_type: Type[_ENTRY_TV] = None) -> _ENTRY_TV: # type: ignore # TODO: fix
@overload
async def get_by_id(
self,
entry_id: str,
expected_type: Optional[Type[_ENTRY_TV]] = None,
params: Optional[dict[str, str]] = None,
) -> _ENTRY_TV:
pass

@generic_profiler_async("us-fetch-entity") # type: ignore # TODO: fix
async def get_by_id(self, entry_id: str, expected_type: Type[_ENTRY_TV] = None) -> _ENTRY_TV: # type: ignore # TODO: fix
async def get_by_id(
self,
entry_id: str,
expected_type: Optional[Type[USEntry]] = None,
params: Optional[dict[str, str]] = None,
) -> USEntry:
with self._enrich_us_exception(
entry_id=entry_id,
entry_scope=expected_type.scope if expected_type is not None else None,
):
us_resp = await self._us_client.get_entry(entry_id)
us_resp = await self._us_client.get_entry(entry_id, params=params)

obj: _ENTRY_TV = self._entry_dict_to_obj(us_resp, expected_type) # type: ignore # TODO: fix
schema_migration = self.get_schema_migration(
entry_scope=us_resp["scope"],
entry_type=us_resp["type"],
)
us_resp = await schema_migration.migrate_async(us_resp)

obj = self._entry_dict_to_obj(us_resp, expected_type)
await self.get_lifecycle_manager(entry=obj).post_init_async_hook()

return obj
Expand Down Expand Up @@ -184,6 +205,11 @@ async def delete(self, entry: USEntry) -> None:
async def reload_data(self, entry: USEntry) -> None:
assert entry.uuid is not None
us_resp = await self._us_client.get_entry(entry.uuid)
schema_migration = self.get_schema_migration(
entry_scope=us_resp["scope"],
entry_type=us_resp["type"],
)
us_resp = await schema_migration.migrate_async(us_resp)
reloaded_entry = self._entry_dict_to_obj(us_resp, expected_type=type(entry))
entry.data = reloaded_entry.data
entry._us_resp = us_resp
Expand Down Expand Up @@ -341,6 +367,11 @@ async def get_collection(
if us_resp:
# noinspection PyBroadException
try:
schema_migration = self.get_schema_migration(
entry_scope=us_resp["scope"],
entry_type=us_resp["type"],
)
us_resp = await schema_migration.migrate_async(us_resp)
obj: USEntry = self._entry_dict_to_obj(us_resp, entry_cls)
yield obj
except Exception:
Expand Down
Loading
Loading