Skip to content

Commit

Permalink
[dagster-sigma][ez] Add info/debug logging (#25906)
Browse files Browse the repository at this point in the history
## Summary

Adds some debug logging which a user can manually enable, which tracks
Sigma load process.

## How I Tested These Changes

Tested locally.
  • Loading branch information
benpankow authored Nov 13, 2024
1 parent fe90aee commit 86591fb
Showing 1 changed file with 27 additions and 2 deletions.
29 changes: 27 additions & 2 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._record import IHaveNew, record_custom
from dagster._utils.cached_method import cached_method
from dagster._utils.log import get_dagster_logger
from pydantic import Field, PrivateAttr
from sqlglot import exp, parse_one

Expand All @@ -43,6 +44,8 @@

SIGMA_PARTNER_ID_TAG = {"X-Sigma-Partner-Id": "dagster"}

logger = get_dagster_logger("dagster_sigma")


@record_custom
class SigmaFilter(IHaveNew):
Expand Down Expand Up @@ -153,8 +156,16 @@ async def _fetch_json_async_paginated_entries(
**(query_params or {}),
"limit": limit,
}
result = await self._fetch_json_async(endpoint, query_params=query_params)

result = await self._fetch_json_async(endpoint, query_params=query_params_with_limit)
entries.extend(result["entries"])
logger.debug(
"Fetched %s\n Query params %s\n Received %s entries%s",
endpoint,
query_params_with_limit,
len(entries),
", fetching additional results" if result.get("hasMore") else "",
)

while result.get("hasMore"):
next_page = result["nextPage"]
Expand All @@ -166,7 +177,13 @@ async def _fetch_json_async_paginated_entries(
endpoint, query_params=query_params_with_limit_and_page
)
entries.extend(result["entries"])

logger.debug(
"Fetched %s\n Query params %s\n Received %s entries%s",
endpoint,
query_params_with_limit_and_page,
len(result["entries"]),
", fetching additional results" if result.get("hasMore") else "",
)
return entries

@cached_method
Expand Down Expand Up @@ -225,9 +242,12 @@ async def _fetch_dataset_upstreams_by_inode(self) -> Mapping[str, AbstractSet[st
"""
deps_by_dataset_inode = defaultdict(set)

logger.debug("Fetching dataset dependencies")

raw_workbooks = await self._fetch_workbooks()

async def process_workbook(workbook: Dict[str, Any]) -> None:
logger.info("Inferring dataset dependencies for workbook %s", workbook["workbookId"])
queries = await self._fetch_queries_for_workbook(workbook["workbookId"])
queries_by_element_id = defaultdict(list)
for query in queries:
Expand Down Expand Up @@ -299,6 +319,7 @@ async def _fetch_dataset_columns_by_inode(self) -> Mapping[str, AbstractSet[str]
workbooks = await self._fetch_workbooks()

async def process_workbook(workbook: Dict[str, Any]) -> None:
logger.info("Fetching column data from workbook %s", workbook["workbookId"])
pages = await self._fetch_pages_for_workbook(workbook["workbookId"])
elements = [
element
Expand Down Expand Up @@ -343,6 +364,8 @@ async def build_member_id_to_email_mapping(self) -> Mapping[str, str]:
async def load_workbook_data(self, raw_workbook_data: Dict[str, Any]) -> SigmaWorkbook:
workbook_deps = set()

logger.info("Fetching data for workbook %s", raw_workbook_data["workbookId"])

pages = await self._fetch_pages_for_workbook(raw_workbook_data["workbookId"])
elements = [
element
Expand Down Expand Up @@ -394,6 +417,7 @@ async def build_organization_data(
"""
_sigma_filter = sigma_filter or SigmaFilter()

logger.info("Beginning Sigma organization data fetch")
raw_workbooks = await self._fetch_workbooks()
workbooks_to_fetch = []
if _sigma_filter.workbook_folders:
Expand All @@ -420,6 +444,7 @@ async def build_organization_data(
await self._fetch_dataset_columns_by_inode() if fetch_column_data else {}
)

logger.info("Fetching dataset data")
for dataset in await self._fetch_datasets():
inode = _inode_from_url(dataset["url"])
datasets.append(
Expand Down

0 comments on commit 86591fb

Please sign in to comment.