diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index 7287b937c3062..9693c5728f0f3 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -30,6 +30,7 @@ from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._record import IHaveNew, record_custom from dagster._utils.cached_method import cached_method +from dagster._utils.log import get_dagster_logger from pydantic import Field, PrivateAttr from sqlglot import exp, parse_one @@ -43,6 +44,8 @@ SIGMA_PARTNER_ID_TAG = {"X-Sigma-Partner-Id": "dagster"} +logger = get_dagster_logger("dagster_sigma") + @record_custom class SigmaFilter(IHaveNew): @@ -153,8 +156,16 @@ async def _fetch_json_async_paginated_entries( **(query_params or {}), "limit": limit, } - result = await self._fetch_json_async(endpoint, query_params=query_params) + + result = await self._fetch_json_async(endpoint, query_params=query_params_with_limit) entries.extend(result["entries"]) + logger.debug( + "Fetched %s\n Query params %s\n Received %s entries%s", + endpoint, + query_params_with_limit, + len(entries), + ", fetching additional results" if result.get("hasMore") else "", + ) while result.get("hasMore"): next_page = result["nextPage"] @@ -166,7 +177,13 @@ async def _fetch_json_async_paginated_entries( endpoint, query_params=query_params_with_limit_and_page ) entries.extend(result["entries"]) - + logger.debug( + "Fetched %s\n Query params %s\n Received %s entries%s", + endpoint, + query_params_with_limit_and_page, + len(result["entries"]), + ", fetching additional results" if result.get("hasMore") else "", + ) return entries @cached_method @@ -225,9 +242,12 @@ async def _fetch_dataset_upstreams_by_inode(self) -> Mapping[str, AbstractSet[st """ deps_by_dataset_inode = defaultdict(set) + logger.debug("Fetching dataset dependencies") + raw_workbooks = await self._fetch_workbooks() async def process_workbook(workbook: Dict[str, Any]) -> None: + logger.info("Inferring dataset dependencies for workbook %s", workbook["workbookId"]) queries = await self._fetch_queries_for_workbook(workbook["workbookId"]) queries_by_element_id = defaultdict(list) for query in queries: @@ -299,6 +319,7 @@ async def _fetch_dataset_columns_by_inode(self) -> Mapping[str, AbstractSet[str] workbooks = await self._fetch_workbooks() async def process_workbook(workbook: Dict[str, Any]) -> None: + logger.info("Fetching column data from workbook %s", workbook["workbookId"]) pages = await self._fetch_pages_for_workbook(workbook["workbookId"]) elements = [ element @@ -343,6 +364,8 @@ async def build_member_id_to_email_mapping(self) -> Mapping[str, str]: async def load_workbook_data(self, raw_workbook_data: Dict[str, Any]) -> SigmaWorkbook: workbook_deps = set() + logger.info("Fetching data for workbook %s", raw_workbook_data["workbookId"]) + pages = await self._fetch_pages_for_workbook(raw_workbook_data["workbookId"]) elements = [ element @@ -394,6 +417,7 @@ async def build_organization_data( """ _sigma_filter = sigma_filter or SigmaFilter() + logger.info("Beginning Sigma organization data fetch") raw_workbooks = await self._fetch_workbooks() workbooks_to_fetch = [] if _sigma_filter.workbook_folders: @@ -420,6 +444,7 @@ async def build_organization_data( await self._fetch_dataset_columns_by_inode() if fetch_column_data else {} ) + logger.info("Fetching dataset data") for dataset in await self._fetch_datasets(): inode = _inode_from_url(dataset["url"]) datasets.append(