Skip to content

Implement migration sequencing (phase 4) #3076

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: migration-sequencing-phase-3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)
from databricks.labs.ucx.assessment.init_scripts import CheckInitScriptMixin
from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator
from databricks.labs.ucx.framework.utils import escape_sql_identifier

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -195,6 +195,9 @@ class ClusterOwnership(Ownership[ClusterInfo]):
This is the cluster creator (if known).
"""

def __init__(self, administrator_locator: AdministratorLocator):
super().__init__(administrator_locator, ClusterInfo)

def _maybe_direct_owner(self, record: ClusterInfo) -> str | None:
return record.creator

Expand Down Expand Up @@ -260,5 +263,8 @@ class ClusterPolicyOwnership(Ownership[PolicyInfo]):
This is the creator of the cluster policy (if known).
"""

def __init__(self, administrator_locator: AdministratorLocator):
super().__init__(administrator_locator, PolicyInfo)

def _maybe_direct_owner(self, record: PolicyInfo) -> str | None:
return record.creator
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from databricks.labs.ucx.assessment.clusters import CheckClusterMixin
from databricks.labs.ucx.assessment.crawlers import spark_version_compatibility
from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator
from databricks.labs.ucx.framework.utils import escape_sql_identifier

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -160,6 +160,9 @@ class JobOwnership(Ownership[JobInfo]):
This is the job creator (if known).
"""

def __init__(self, administrator_locator: AdministratorLocator):
super().__init__(administrator_locator, JobInfo)

def _maybe_direct_owner(self, record: JobInfo) -> str | None:
return record.creator

Expand Down
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/assessment/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from databricks.labs.ucx.assessment.clusters import CheckClusterMixin
from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator
from databricks.labs.ucx.framework.utils import escape_sql_identifier

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,5 +86,8 @@ class PipelineOwnership(Ownership[PipelineInfo]):
This is the pipeline creator (if known).
"""

def __init__(self, administrator_locator: AdministratorLocator):
super().__init__(administrator_locator, PipelineInfo)

def _maybe_direct_owner(self, record: PipelineInfo) -> str | None:
return record.creator_name
21 changes: 11 additions & 10 deletions src/databricks/labs/ucx/assessment/sequencing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from collections import defaultdict
from collections.abc import Iterable
from collections.abc import Iterable, Callable
from dataclasses import dataclass
from pathlib import Path

Expand All @@ -10,9 +10,10 @@

from databricks.labs.blueprint.paths import WorkspacePath

from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership
from databricks.labs.ucx.assessment.clusters import ClusterInfo
from databricks.labs.ucx.assessment.jobs import JobInfo
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.source_code.graph import DependencyGraph
from databricks.labs.ucx.source_code.path_lookup import PathLookup
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler
Expand Down Expand Up @@ -63,12 +64,12 @@ def __init__(
self,
ws: WorkspaceClient,
path_lookup: PathLookup,
admin_locator: AdministratorLocator,
ownership_factory: Callable[[type], Ownership],
used_tables_crawler: UsedTablesCrawler,
):
self._ws = ws
self._path_lookup = path_lookup
self._admin_locator = admin_locator
self._ownership_factory = ownership_factory
self._used_tables_crawler = used_tables_crawler
self._last_node_id = 0
self._nodes: dict[tuple[str, str], MigrationNode] = {}
Expand Down Expand Up @@ -128,7 +129,7 @@ def _create_dependency_node(self, object_type: str, object_id: str) -> Migration
object_name = path.relative_to(library_root).as_posix()
break
ws_path = WorkspacePath(self._ws, object_id)
object_owner = WorkspacePathOwnership(self._admin_locator, self._ws).owner_of(ws_path)
object_owner = self._ownership_factory(WorkspacePath).owner_of(ws_path)
else:
raise ValueError(f"{object_type} not supported yet!")
self._last_node_id += 1
Expand All @@ -153,7 +154,7 @@ def _register_used_tables_for(self, parent_node: MigrationNode) -> Iterable[Migr
object_type="TABLE",
object_id=used_table.fullname,
object_name=used_table.fullname,
object_owner="", # TODO
object_owner=self._ownership_factory(Table).owner_of(Table.from_used_table(used_table)),
)
self._nodes[table_node.key] = table_node
self._outgoing[table_node.key].add(parent_node.key)
Expand All @@ -170,7 +171,7 @@ def register_workflow_job(self, job: jobs.Job) -> MigrationNode:
object_type="WORKFLOW",
object_id=str(job.job_id),
object_name=job_name,
object_owner=JobOwnership(self._admin_locator).owner_of(JobInfo.from_job(job)),
object_owner=self._ownership_factory(JobInfo).owner_of(JobInfo.from_job(job)),
)
self._nodes[job_node.key] = job_node
if job.settings and job.settings.job_clusters:
Expand All @@ -197,7 +198,7 @@ def register_cluster(self, cluster_id: str) -> MigrationNode:
object_type="CLUSTER",
object_id=cluster_id,
object_name=object_name,
object_owner=ClusterOwnership(self._admin_locator).owner_of(ClusterInfo.from_cluster_details(details)),
object_owner=self._ownership_factory(ClusterInfo).owner_of(ClusterInfo.from_cluster_details(details)),
)
self._nodes[cluster_node.key] = cluster_node
# TODO register warehouses and policies
Expand Down
25 changes: 24 additions & 1 deletion src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2
from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.assessment.jobs import JobOwnership
from databricks.labs.ucx.recon.data_comparator import StandardDataComparator
from databricks.labs.ucx.recon.data_profiler import StandardDataProfiler
from databricks.labs.ucx.recon.metadata_retriever import DatabricksTableMetadataRetriever
Expand All @@ -28,7 +29,12 @@
from databricks.labs.ucx.assessment.export import AssessmentExporter
from databricks.labs.ucx.aws.credentials import CredentialManager
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership
from databricks.labs.ucx.framework.owners import (
AdministratorLocator,
WorkspacePathOwnership,
Ownership,
LegacyQueryOwnership,
)
from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
from databricks.labs.ucx.hive_metastore.grants import (
Expand Down Expand Up @@ -280,6 +286,10 @@ def workspace_path_ownership(self) -> WorkspacePathOwnership:
def legacy_query_ownership(self) -> LegacyQueryOwnership:
return LegacyQueryOwnership(self.administrator_locator, self.workspace_client)

@cached_property
def job_ownership(self) -> JobOwnership:
return JobOwnership(self.administrator_locator)

@cached_property
def directfs_access_ownership(self) -> DirectFsAccessOwnership:
return DirectFsAccessOwnership(
Expand Down Expand Up @@ -571,6 +581,19 @@ def migration_recon(self) -> MigrationRecon:
def administrator_locator(self) -> AdministratorLocator:
return AdministratorLocator(self.workspace_client)

@cached_property
def ownership_factory(self) -> Callable[[type], Ownership]:
# ensure registration of Ownerships
names_with_ownership = [name for name in dir(GlobalContext) if "ownership" in name]
for name in names_with_ownership:
if name == "ownership_factory":
continue
prop = getattr(GlobalContext, name)
if not isinstance(prop, cached_property):
continue
_ = getattr(self, name)
return Ownership.for_record_type


class CliContext(GlobalContext, abc.ABC):
@cached_property
Expand Down
16 changes: 12 additions & 4 deletions src/databricks/labs/ucx/framework/owners.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Sequence
from datetime import timedelta
from functools import cached_property
from typing import Generic, TypeVar, final
from typing import Generic, TypeVar, final, cast

from databricks.labs.blueprint.paths import WorkspacePath
from databricks.sdk import WorkspaceClient
Expand Down Expand Up @@ -169,8 +170,15 @@ def get_workspace_administrator(self) -> str:
class Ownership(ABC, Generic[Record]):
"""Determine an owner for a given type of object."""

def __init__(self, administrator_locator: AdministratorLocator) -> None:
_factories: dict[type, Ownership] = {}

@classmethod
def for_record_type(cls, record_type: type) -> Ownership[Record]:
return cast(Ownership[Record], cls._factories[record_type])

def __init__(self, administrator_locator: AdministratorLocator, record_type: type) -> None:
self._administrator_locator = administrator_locator
self._factories[record_type] = self

@final
def owner_of(self, record: Record) -> str:
Expand Down Expand Up @@ -198,7 +206,7 @@ def _maybe_direct_owner(self, record: Record) -> str | None:

class WorkspacePathOwnership(Ownership[WorkspacePath]):
def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceClient) -> None:
super().__init__(administrator_locator)
super().__init__(administrator_locator, WorkspacePath)
self._ws = ws

def owner_of_path(self, path: str) -> str:
Expand Down Expand Up @@ -244,7 +252,7 @@ def _infer_from_first_can_manage(object_permissions):

class LegacyQueryOwnership(Ownership[str]):
def __init__(self, administrator_locator: AdministratorLocator, workspace_client: WorkspaceClient) -> None:
super().__init__(administrator_locator)
super().__init__(administrator_locator, str)
self._workspace_client = workspace_client

def _maybe_direct_owner(self, record: str) -> str | None:
Expand Down
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
StoragePermissionMapping,
)
from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.locations import (
Expand Down Expand Up @@ -404,6 +404,9 @@ class GrantOwnership(Ownership[Grant]):
At the present we can't determine a specific owner for grants.
"""

def __init__(self, administrator_locator: AdministratorLocator):
super().__init__(administrator_locator, Grant)

def _maybe_direct_owner(self, record: Grant) -> None:
return None

Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/ownership.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
legacy_query_ownership: LegacyQueryOwnership,
workspace_path_ownership: WorkspacePathOwnership,
) -> None:
super().__init__(administrator_locator)
super().__init__(administrator_locator, Table)
self._grants_crawler = grants_crawler
self._used_tables_in_paths = used_tables_in_paths
self._used_tables_in_queries = used_tables_in_queries
Expand Down Expand Up @@ -92,7 +92,7 @@ class TableMigrationOwnership(Ownership[TableMigrationStatus]):
"""

def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None:
super().__init__(table_ownership._administrator_locator) # TODO: Fix this
super().__init__(table_ownership._administrator_locator, TableMigrationStatus) # TODO: Fix this
self._tables_crawler = tables_crawler
self._table_ownership = table_ownership
self._indexed_tables: dict[tuple[str, str], Table] | None = None
Expand Down
14 changes: 14 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from databricks.labs.ucx.source_code.base import UsedTable
from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator
from databricks.labs.ucx.framework.utils import escape_sql_identifier

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -670,3 +671,16 @@ def _create_describe_tasks(self, catalog: str, database: str, table_names: list[
for table in table_names:
tasks.append(partial(self._describe, catalog, database, table))
return tasks


class TableOwnership(Ownership[Table]):
"""Determine ownership of tables in the inventory.

At the present we don't determine a specific owner for tables.
"""

def __init__(self, administrator_locator: AdministratorLocator):
super().__init__(administrator_locator, Table)

def _maybe_direct_owner(self, record: Table) -> None:
return None
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from databricks.sdk.errors import Unknown, NotFound

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator
from databricks.labs.ucx.framework.utils import escape_sql_identifier

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -151,5 +151,8 @@ class UdfOwnership(Ownership[Udf]):
At the present we don't determine a specific owner for UDFs.
"""

def __init__(self, administrator_locator: AdministratorLocator):
super().__init__(administrator_locator, Udf)

def _maybe_direct_owner(self, record: Udf) -> None:
return None
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(
legacy_query_ownership: LegacyQueryOwnership,
workspace_client: WorkspaceClient,
) -> None:
super().__init__(administrator_locator)
super().__init__(administrator_locator, DirectFsAccess)
self._workspace_path_ownership = workspace_path_ownership
self._legacy_query_ownership = legacy_query_ownership
self._workspace_client = workspace_client
Expand Down
21 changes: 10 additions & 11 deletions tests/unit/assessment/test_sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

from pathlib import Path

from databricks.sdk.service import iam, jobs
from databricks.sdk.service import jobs

from databricks.sdk.service.compute import ClusterDetails
from databricks.sdk.service.jobs import NotebookTask

from databricks.labs.ucx.assessment.sequencing import MigrationSequencer, MigrationStep
from databricks.labs.ucx.framework.owners import AdministratorLocator, AdministratorFinder
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.mixins.cached_workspace_path import WorkspaceCache
from databricks.labs.ucx.source_code.base import CurrentSessionState, UsedTable, LineageAtom
from databricks.labs.ucx.source_code.graph import DependencyGraph, Dependency
Expand All @@ -19,11 +19,10 @@
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler


def admin_locator(ws, user_name: str):
admin_finder = create_autospec(AdministratorFinder)
admin_user = iam.User(user_name=user_name, active=True, roles=[iam.ComplexValue(value="account_admin")])
admin_finder.find_admin_users.return_value = (admin_user,)
return AdministratorLocator(ws, finders=[lambda _ws: admin_finder])
def ownership_factory(user_name: str):
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = user_name
return lambda record_type: ownership


def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_resolver, mock_path_lookup):
Expand All @@ -36,7 +35,7 @@ def test_sequencer_builds_cluster_and_children_from_task(ws, simple_dependency_r
graph = DependencyGraph(dependency, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState())
used_tables_crawler = create_autospec(UsedTablesCrawler)
used_tables_crawler.assert_not_called()
sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"), used_tables_crawler)
sequencer = MigrationSequencer(ws, mock_path_lookup, ownership_factory("John Doe"), used_tables_crawler)
sequencer.register_workflow_task(task, job, graph)
steps = list(sequencer.generate_steps())
step = steps[-1]
Expand Down Expand Up @@ -75,7 +74,7 @@ def test_sequencer_builds_steps_from_dependency_graph(ws, simple_dependency_reso
assert not problems
used_tables_crawler = create_autospec(UsedTablesCrawler)
used_tables_crawler.assert_not_called()
sequencer = MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"), used_tables_crawler)
sequencer = MigrationSequencer(ws, mock_path_lookup, ownership_factory("John Doe"), used_tables_crawler)
sequencer.register_workflow_task(task, job, graph)
all_steps = list(sequencer.generate_steps())
# ensure steps have a consistent step_number: TASK > grand-parent > parent > child
Expand Down Expand Up @@ -124,7 +123,7 @@ def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver,
child_graph_b.add_dependency(child_graph_a)
used_tables_crawler = create_autospec(UsedTablesCrawler)
used_tables_crawler.assert_not_called()
sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"), used_tables_crawler)
sequencer = _MigrationSequencer(ws, mock_path_lookup, ownership_factory("John Doe"), used_tables_crawler)
sequencer.register_dependency(None, root.lineage[-1].object_type, root.lineage[-1].object_id)
sequencer.visit_graph(root_graph)
steps = list(sequencer.generate_steps())
Expand All @@ -150,7 +149,7 @@ def test_sequencer_builds_steps_from_used_tables(ws, simple_dependency_resolver,
)
]
)
sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"), used_tables_crawler)
sequencer = _MigrationSequencer(ws, mock_path_lookup, ownership_factory("John Doe"), used_tables_crawler)
sequencer.register_dependency(None, object_type="FILE", object_id="/some-folder/some-file")
all_steps = list(sequencer.generate_steps())
assert len(all_steps) == 1
Expand Down
Loading
Loading