Skip to content

Use TACL cluster in test_all_grant_types and wait for ANONYMOUS FILE grant #3800

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
8 changes: 7 additions & 1 deletion src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,13 @@ def group_manager(self) -> GroupManager:

@cached_property
def grants_crawler(self) -> GrantsCrawler:
return GrantsCrawler(self.tables_crawler, self.udfs_crawler, self.config.include_databases)
return GrantsCrawler(
self.sql_backend,
self.inventory_database,
self.tables_crawler,
self.udfs_crawler,
include_databases=self.config.include_databases,
)

@cached_property
def grant_ownership(self) -> GrantOwnership:
Expand Down
19 changes: 12 additions & 7 deletions src/databricks/labs/ucx/hive_metastore/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,18 @@ def uc_grant_sql(self, object_type: str | None = None, object_key: str | None =
class GrantsCrawler(CrawlerBase[Grant]):
"""Crawler that captures access controls that relate to data and other securable objects."""

def __init__(self, tables_crawler: TablesCrawler, udf: UdfsCrawler, include_databases: list[str] | None = None):
assert tables_crawler._sql_backend == udf._sql_backend
assert tables_crawler._catalog == udf._catalog
assert tables_crawler._schema == udf._schema
super().__init__(tables_crawler._sql_backend, tables_crawler._catalog, tables_crawler._schema, "grants", Grant)
def __init__(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decoupled the grantscrawler from the tables crawler for two reasons:

  1. IMO it is confusing because AFAIK this is the only crawler which does it. It also bears the question: why not tightly couple it with the UDFs crawler?
  2. The GrantsCrawler is expected to run against a different SqlBackend in the assessment workflow, namely the TACL cluster.

self,
sql_backend: SqlBackend,
schema: str,
tables_crawler: TablesCrawler,
udfs_crawler: UdfsCrawler,
*,
include_databases: list[str] | None = None,
):
super().__init__(sql_backend, "hive_metastore", schema, "grants", Grant)
self._tables_crawler = tables_crawler
self._udf = udf
self._udfs_crawler = udfs_crawler
self._include_databases = include_databases

def snapshot(self, *, force_refresh: bool = False) -> Iterable[Grant]:
Expand Down Expand Up @@ -286,7 +291,7 @@ def _crawl(self) -> Iterable[Grant]:
else:
task = partial(fn, view=table.name)
tasks.append(task)
for udf in self._udf.snapshot():
for udf in self._udfs_crawler.snapshot():
fn = partial(self.grants, catalog=catalog, database=udf.database)
tasks.append(partial(fn, udf=udf.name))
catalog_grants, errors = Threads.gather(f"listing grants for {catalog}", tasks)
Expand Down
83 changes: 53 additions & 30 deletions tests/integration/hive_metastore/test_grant_detail.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from collections.abc import Callable, Iterable

import pytest
from databricks.labs.lsql.backends import StatementExecutionBackend
from databricks.labs.lsql.backends import CommandExecutionBackend, SqlBackend
from databricks.sdk.errors import NotFound
from databricks.sdk.retries import retried

from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.install import deploy_schema

from ..conftest import MockRuntimeContext
Expand All @@ -17,6 +17,16 @@
logger = logging.getLogger(__name__)


@pytest.fixture
def sql_backend_tacl(ws, env_or_skip) -> SqlBackend:
"""Ensure the SQL backend used during fixture setup is using the TACL cluster.

The TACL cluster is used for grants.
"""
cluster_id = env_or_skip("TEST_LEGACY_TABLE_ACL_CLUSTER_ID")
return CommandExecutionBackend(ws, cluster_id)


@pytest.fixture()
def _deployed_schema(runtime_ctx) -> None:
"""Ensure that the schemas (and views) are initialized."""
Expand All @@ -25,7 +35,7 @@ def _deployed_schema(runtime_ctx) -> None:


@retried(on=[NotFound, TimeoutError], timeout=dt.timedelta(minutes=3))
def test_all_grant_types(runtime_ctx: MockRuntimeContext, _deployed_schema: None):
def test_all_grant_types(runtime_ctx: MockRuntimeContext, sql_backend_tacl: SqlBackend, _deployed_schema: None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am mimicking the assessment workflow here:

  1. TablesCrawler and UdfsCrawler run against the "regular" backend
  2. GrantsCrawler run against the "tacl" backend

"""All types of grants should be reported by the grant_detail view."""

# Fixture: a group and schema to hold all the objects, the objects themselves and a grant on each to the group.
Expand All @@ -34,40 +44,48 @@ def test_all_grant_types(runtime_ctx: MockRuntimeContext, _deployed_schema: None
table = runtime_ctx.make_table(schema_name=schema.name)
view = runtime_ctx.make_table(schema_name=schema.name, view=True, ctas="select 1")
udf = runtime_ctx.make_udf(schema_name=schema.name)
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON CATALOG {schema.catalog_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON SCHEMA {schema.full_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON TABLE {table.full_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON VIEW {view.full_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON FUNCTION {udf.full_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON ANY FILE TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON ANONYMOUS FUNCTION TO `{group.display_name}`")

# Snapshotting tables and udfs to avoid snapshot on TACL cluster during grants crawler
runtime_ctx.tables_crawler.snapshot()
runtime_ctx.udfs_crawler.snapshot()

# Grants require TACL cluster
ctx = runtime_ctx.replace(sql_backend=sql_backend_tacl)
ctx.sql_backend.execute(f"GRANT SELECT ON CATALOG {schema.catalog_name} TO `{group.display_name}`")
ctx.sql_backend.execute(f"GRANT SELECT ON SCHEMA {schema.full_name} TO `{group.display_name}`")
ctx.sql_backend.execute(f"GRANT SELECT ON TABLE {table.full_name} TO `{group.display_name}`")
ctx.sql_backend.execute(f"GRANT SELECT ON VIEW {view.full_name} TO `{group.display_name}`")
ctx.sql_backend.execute(f"GRANT SELECT ON FUNCTION {udf.full_name} TO `{group.display_name}`")
ctx.sql_backend.execute(f"GRANT SELECT ON ANY FILE TO `{group.display_name}`")
ctx.sql_backend.execute(f"GRANT SELECT ON ANONYMOUS FUNCTION TO `{group.display_name}`")

@retried(on=[ValueError], timeout=dt.timedelta(minutes=2))
def wait_for_grants(condition: Callable[[Iterable[Grant]], bool], **kwargs) -> None:
"""Wait for grants to meet the condition.

The method retries the condition check to account for eventual consistency of the permission API.
"""
grants = runtime_ctx.grants_crawler.grants(**kwargs)
grants = ctx.grants_crawler.grants(**kwargs)
if not condition(grants):
raise ValueError("Grants do not meet condition")

def contains_select_on_any_file(grants: Iterable[Grant]) -> bool:
def grants_contain_select_action(grants: Iterable[Grant]) -> bool:
"""Check if the SELECT permission on ANY FILE is present in the grants."""
return any(g.principal == group.display_name and g.action_type == "SELECT" for g in grants)

# Wait for the grants to be available so that we can snapshot them.
# Only verifying the SELECT permission on ANY FILE as it takes a while to propagate.
wait_for_grants(contains_select_on_any_file, any_file=True)
# Only verifying the SELECT permission on ANY FILE and ANONYMOUS FUNCTION as those take a while to propagate.
wait_for_grants(grants_contain_select_action, any_file=True)
wait_for_grants(grants_contain_select_action, anonymous_function=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails with the following error:

DatabricksServiceHttpClientException: TEMPORARILY_UNAVAILABLE: The service at /api/2.0/sql-acl/get-permissions is taking too long to process your request. Please try again later or try a faster operation. [TraceId: 00-afa5633f98d2b7cbc47937e9233436b5-c5249ad122b4daa2-00]

This endpoint API docs suggest to use the new API endpoint. However, the choice for API endpoint is not up to us as it we use SHOW GRANTS ... - not the API endpoints directly. (Note we might implicitly chose the endpoint with the runtime.)

@gueniai : Could you ask the endpoint team on guidance here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried manually last night and I could make the /api/2.0/sql-acl/get-permissions via a python api call. If the endpoint team can give us the revoke permissions or anything that can help us clear the dangling permissions it would help us a lot.


runtime_ctx.grants_crawler.snapshot()
ctx.grants_crawler.snapshot()

grants_detail_query = f"""
SELECT object_type, object_id
FROM {runtime_ctx.inventory_database}.grant_detail
WHERE principal_type='group' AND principal='{group.display_name}' and action_type='SELECT'
"""
grants = {(row.object_type, row.object_id) for row in runtime_ctx.sql_backend.fetch(grants_detail_query)}
grants = {(row.object_type, row.object_id) for row in ctx.sql_backend.fetch(grants_detail_query)}

# TODO: The types of objects targeted by grants is missclassified; this needs to be fixed.

Expand All @@ -84,30 +102,35 @@ def contains_select_on_any_file(grants: Iterable[Grant]) -> bool:


@retried(on=[NotFound, TimeoutError], timeout=dt.timedelta(minutes=3))
def test_grant_findings(
runtime_ctx: MockRuntimeContext, sql_backend: StatementExecutionBackend, _deployed_schema: None
) -> None:
def test_grant_findings(runtime_ctx: MockRuntimeContext, sql_backend_tacl: SqlBackend, _deployed_schema: None) -> None:
"""Test that findings are reported for a grant."""

# Fixture: two objects, one with a grant that is okay and the other with a grant that is not okay.
group = runtime_ctx.make_group()
schema = runtime_ctx.make_schema()
# The UDF is not used by the test, but avoids re-crawling UDFs during grants crawling
runtime_ctx.make_udf(schema_name=schema.name)
table_a = runtime_ctx.make_table(schema_name=schema.name)
table_b = runtime_ctx.make_table(schema_name=schema.name)
sql_backend.execute(f"GRANT SELECT ON TABLE {table_a.full_name} TO `{group.display_name}`")
sql_backend.execute(f"DENY SELECT ON TABLE {table_b.full_name} TO `{group.display_name}`")

# Snapshotting tables and udfs to avoid snapshot on TACL cluster during grants crawler
runtime_ctx.tables_crawler.snapshot()
runtime_ctx.udfs_crawler.snapshot()

ctx = runtime_ctx.replace(sql_backend=sql_backend_tacl)
ctx.sql_backend.execute(f"GRANT SELECT ON TABLE {table_a.full_name} TO `{group.display_name}`")
ctx.sql_backend.execute(f"DENY SELECT ON TABLE {table_b.full_name} TO `{group.display_name}`")

# Ensure the view is populated (it's based on the crawled grants) and fetch the content.
GrantsCrawler(runtime_ctx.tables_crawler, runtime_ctx.udfs_crawler).snapshot()
ctx.grants_crawler.snapshot()

rows = sql_backend.fetch(
f"""
SELECT object_type, object_id, success, failures
FROM {runtime_ctx.inventory_database}.grant_detail
WHERE catalog='{schema.catalog_name}' AND database='{schema.name}'
AND principal_type='group' AND principal='{group.display_name}'
"""
)
grants_detail_query = f"""
SELECT object_type, object_id, success, failures
FROM {runtime_ctx.inventory_database}.grant_detail
WHERE catalog='{schema.catalog_name}' AND database='{schema.name}'
AND principal_type='group' AND principal='{group.display_name}'
"""
rows = ctx.sql_backend.fetch(grants_detail_query)
grants = {
(row.object_type, row.object_id): (row.success, json.loads(row.failures) if row.failures is not None else None)
for row in rows
Expand Down
16 changes: 6 additions & 10 deletions tests/integration/hive_metastore/test_grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
from databricks.labs.lsql.backends import StatementExecutionBackend

from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler, GrantOwnership
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
from databricks.labs.ucx.hive_metastore.grants import GrantOwnership
from ..conftest import MockRuntimeContext

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -123,22 +121,20 @@ def test_all_grants_for_other_objects(
assert {"DENIED_SELECT"} == found_anonymous_function_grants[group_d.display_name]


def test_grant_ownership(ws, runtime_ctx, inventory_schema, sql_backend, make_random, make_acc_group) -> None:
def test_grant_ownership(ws, runtime_ctx, make_random, make_acc_group) -> None:
"""Verify the ownership can be determined for crawled grants.
This currently isn't very useful: we can't locate specific owners for grants"""

schema = runtime_ctx.make_schema()
this_user = ws.current_user.me()
sql_backend.execute(f"GRANT SELECT ON SCHEMA {escape_sql_identifier(schema.full_name)} TO `{this_user.user_name}`")
table_crawler = TablesCrawler(sql_backend, schema=inventory_schema, include_databases=[schema.name])
udf_crawler = UdfsCrawler(sql_backend, schema=inventory_schema, include_databases=[schema.name])
runtime_ctx.sql_backend.execute(
f"GRANT SELECT ON SCHEMA {escape_sql_identifier(schema.full_name)} TO `{this_user.user_name}`"
)
current_user = ws.current_user.me()
admin_group_name = f"admin_group_{make_random()}"
make_acc_group(display_name=admin_group_name, members=[current_user.id], wait_for_provisioning=True)

# Produce the crawled records.
crawler = GrantsCrawler(table_crawler, udf_crawler, include_databases=[schema.name])
records = crawler.snapshot(force_refresh=True)
records = runtime_ctx.grants_crawler.snapshot(force_refresh=True)

# Find the crawled record for the grant we made.
grant_record = next(record for record in records if record.this_type_and_key() == ("DATABASE", schema.full_name))
Expand Down
44 changes: 22 additions & 22 deletions tests/unit/hive_metastore/test_grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_crawler_no_data() -> None:
sql_backend = MockBackend()
table = TablesCrawler(sql_backend, "schema")
udf = UdfsCrawler(sql_backend, "schema")
crawler = GrantsCrawler(table, udf)
crawler = GrantsCrawler(sql_backend, "test", table, udf)
grants = list(crawler.snapshot())
assert len(grants) == 0

Expand Down Expand Up @@ -252,7 +252,7 @@ def test_crawler_crawl() -> None:
}
table = TablesCrawler(sql_backend, "schema")
udf = UdfsCrawler(sql_backend, "schema")
crawler = GrantsCrawler(table, udf)
crawler = GrantsCrawler(sql_backend, "test", table, udf)
grants = list(crawler.snapshot())
assert len(grants) == len(expected_grants) and set(grants) == expected_grants

Expand Down Expand Up @@ -301,7 +301,7 @@ def test_crawler_udf_crawl() -> None:

table = TablesCrawler(sql_backend, "schema")
udf = UdfsCrawler(sql_backend, "schema")
crawler = GrantsCrawler(table, udf)
crawler = GrantsCrawler(sql_backend, "test", table, udf)
grants = list(crawler.snapshot())

assert len(grants) == len(expected_grants) and set(grants) == expected_grants
Expand All @@ -311,7 +311,7 @@ def test_crawler_snapshot_when_no_data() -> None:
sql_backend = MockBackend()
table = TablesCrawler(sql_backend, "schema")
udf = UdfsCrawler(sql_backend, "schema")
crawler = GrantsCrawler(table, udf)
crawler = GrantsCrawler(sql_backend, "test", table, udf)
snapshot = list(crawler.snapshot())
assert len(snapshot) == 0

Expand All @@ -320,7 +320,7 @@ def test_crawler_snapshot_with_data() -> None:
sql_backend = MockBackend(rows=ROWS)
table = TablesCrawler(sql_backend, "schema")
udf = UdfsCrawler(sql_backend, "schema")
crawler = GrantsCrawler(table, udf)
crawler = GrantsCrawler(sql_backend, "test", table, udf)
snapshot = list(crawler.snapshot())
assert len(snapshot) == 3

Expand All @@ -345,10 +345,10 @@ def test_grants_returning_error_when_showing_grants() -> None:
],
}

backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "default")
udf = UdfsCrawler(backend, "default")
crawler = GrantsCrawler(table_crawler, udf)
sql_backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(sql_backend, "default")
udf = UdfsCrawler(sql_backend, "default")
crawler = GrantsCrawler(sql_backend, "test", table_crawler, udf)

results = list(crawler.snapshot())
assert results == [
Expand Down Expand Up @@ -381,10 +381,10 @@ def test_grants_returning_error_when_describing() -> None:
],
}

backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "default")
udf = UdfsCrawler(backend, "default")
crawler = GrantsCrawler(table_crawler, udf)
sql_backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(sql_backend, "default")
udf = UdfsCrawler(sql_backend, "default")
crawler = GrantsCrawler(sql_backend, "test", table_crawler, udf)

results = list(crawler.snapshot())
assert results == [
Expand Down Expand Up @@ -420,10 +420,10 @@ def test_udf_grants_returning_error_when_showing_grants() -> None:
],
}

backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "default")
udf = UdfsCrawler(backend, "default")
crawler = GrantsCrawler(table_crawler, udf)
sql_backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(sql_backend, "default")
udf = UdfsCrawler(sql_backend, "default")
crawler = GrantsCrawler(sql_backend, "test", table_crawler, udf)

results = list(crawler.snapshot())
assert results == [
Expand Down Expand Up @@ -456,10 +456,10 @@ def test_udf_grants_returning_error_when_describing() -> None:
],
}

backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "default")
udf = UdfsCrawler(backend, "default")
crawler = GrantsCrawler(table_crawler, udf)
sql_backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(sql_backend, "default")
udf = UdfsCrawler(sql_backend, "default")
crawler = GrantsCrawler(sql_backend, "test", table_crawler, udf)

results = list(crawler.snapshot())
assert results == [
Expand Down Expand Up @@ -504,7 +504,7 @@ def test_crawler_should_filter_databases() -> None:

table = TablesCrawler(sql_backend, "schema", include_databases=["database_one"])
udf = UdfsCrawler(sql_backend, "schema", include_databases=["database_one"])
crawler = GrantsCrawler(table, udf, include_databases=["database_one"])
crawler = GrantsCrawler(sql_backend, "test", table, udf, include_databases=["database_one"])
grants = list(crawler.snapshot())

assert "SHOW DATABASES" not in sql_backend.queries
Expand Down
Loading
Loading