Skip to content

Commit

Permalink
ADD: Fallback method to get cluster_id from task instance. (#16508)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Fixes: #16506
Sometimes run.cluster_intance is None, so we instead attempt to get it
from run.tasks[0].cluster_instance, since we use singleton run tasks in
this step launcher this should be correct. To round it out, I've also
added a check.str_param check to the method.

## How I Tested These Changes

With changes introduced in this pr; ran:
```py
In [1]: import os
In [2]: import logging
In [3]: from dagster_databricks import DatabricksClient, DatabricksJobRunner
In [4]: logger = logging.getLogger()
In [5]: logger.addHandler(logging.StreamHandler())
In [6]: logger.setLevel("INFO")
In [7]: runner = DatabricksJobRunner(os.environ['DATABRICKS_HOST'], os.environ['DATABRICKS_TOKEN'])

# Actually ran this with a real cluster id that was failing
# under current methodology.
In [8]: runner.retrieve_logs_for_run_id(logger, 123456)
```
Which resulted in the logs being pushed to the logger.

---------

Co-authored-by: Sean Mackesey <[email protected]>
  • Loading branch information
PadenZach and smackesey authored Sep 27, 2023
1 parent 56219ae commit 0ca6823
Showing 1 changed file with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,21 @@ def retrieve_logs_for_run_id(
) -> Optional[Tuple[Optional[str], Optional[str]]]:
"""Retrieve the stdout and stderr logs for a run."""
run = self.client.workspace_client.jobs.get_run(databricks_run_id)
cluster = self.client.workspace_client.clusters.get(run.cluster_instance.cluster_id)

# Run.cluster_instance can be None. In that case, fall back to cluster instance on first
# task. Currently pyspark step launcher runs jobs with singleton tasks.
cluster_instance = run.cluster_instance or run.tasks[0].cluster_instance
cluster_id = check.inst(
cluster_instance.cluster_id,
str,
"cluster_id should be string like `1234-123456-abcdefgh` got:"
f" `{cluster_instance.cluster_id}`",
)
cluster = self.client.workspace_client.clusters.get(cluster_id)
log_config = cluster.cluster_log_conf
if log_config is None:
log.warn(
"Logs not configured for cluster {cluster} used for run {run}".format(
cluster=cluster.cluster_id, run=databricks_run_id
)
f"Logs not configured for cluster {cluster_id} used for run {databricks_run_id}"
)
return None
if cast(Optional[compute.S3StorageInfo], log_config.s3) is not None:
Expand All @@ -447,8 +455,8 @@ def retrieve_logs_for_run_id(
return None
elif cast(Optional[compute.DbfsStorageInfo], log_config.dbfs) is not None:
logs_prefix = log_config.dbfs.destination
stdout = self.wait_for_dbfs_logs(log, logs_prefix, cluster.cluster_id, "stdout")
stderr = self.wait_for_dbfs_logs(log, logs_prefix, cluster.cluster_id, "stderr")
stdout = self.wait_for_dbfs_logs(log, logs_prefix, cluster_id, "stdout")
stderr = self.wait_for_dbfs_logs(log, logs_prefix, cluster_id, "stderr")
return stdout, stderr

def wait_for_dbfs_logs(
Expand Down

0 comments on commit 0ca6823

Please sign in to comment.