From b71c47f49d9fc4ea2f035ec7efbaa31b3f094b32 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 18 Nov 2024 17:32:14 +0000 Subject: [PATCH 01/10] Stream logs for finished jobs --- sky/jobs/controller.py | 17 ++++++++++++----- sky/jobs/state.py | 26 +++++++++++++++++++++++++- sky/jobs/utils.py | 13 +++++++++++-- sky/utils/controller_utils.py | 6 ++++-- 4 files changed, 52 insertions(+), 10 deletions(-) diff --git a/sky/jobs/controller.py b/sky/jobs/controller.py index 5219c564500..6fafcb05104 100644 --- a/sky/jobs/controller.py +++ b/sky/jobs/controller.py @@ -6,7 +6,7 @@ import time import traceback import typing -from typing import Tuple +from typing import Optional, Tuple import filelock @@ -87,7 +87,7 @@ def __init__(self, job_id: int, dag_yaml: str, task.update_envs(task_envs) def _download_log_and_stream( - self, + self, task_id: Optional[int], handle: cloud_vm_ray_backend.CloudVmRayResourceHandle) -> None: """Downloads and streams the logs of the latest job. @@ -97,8 +97,11 @@ def _download_log_and_stream( """ managed_job_logs_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, 'managed_jobs') - controller_utils.download_and_stream_latest_job_log( + log_file = controller_utils.download_and_stream_latest_job_log( self._backend, handle, managed_job_logs_dir) + if log_file is not None: + managed_job_state.set_local_log_file(self._job_id, task_id, + log_file) logger.info(f'\n== End of logs (ID: {self._job_id}) ==') def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: @@ -213,6 +216,10 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: if job_status == job_lib.JobStatus.SUCCEEDED: end_time = managed_job_utils.get_job_timestamp( self._backend, cluster_name, get_end_time=True) + _, handle = backend_utils.refresh_cluster_status_handle( + cluster_name) + if handle is not None: + self._download_log_and_stream(task_id, handle) # The job is done. managed_job_state.set_succeeded(self._job_id, task_id, @@ -226,7 +233,7 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: recovery_strategy.terminate_cluster(cluster_name=cluster_name) return True - # For single-node jobs, nonterminated job_status indicates a + # For single-node jobs, non-terminated job_status indicates a # healthy cluster. We can safely continue monitoring. # For multi-node jobs, since the job may not be set to FAILED # immediately (depending on user program) when only some of the @@ -278,7 +285,7 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: 'The user job failed. Please check the logs below.\n' f'== Logs of the user job (ID: {self._job_id}) ==\n') - self._download_log_and_stream(handle) + self._download_log_and_stream(task_id, handle) managed_job_status = ( managed_job_state.ManagedJobStatus.FAILED) if job_status == job_lib.JobStatus.FAILED_SETUP: diff --git a/sky/jobs/state.py b/sky/jobs/state.py index 6a0e3caeda3..7a006bf1619 100644 --- a/sky/jobs/state.py +++ b/sky/jobs/state.py @@ -66,7 +66,8 @@ def create_table(cursor, conn): spot_job_id INTEGER, task_id INTEGER DEFAULT 0, task_name TEXT, - specs TEXT)""") + specs TEXT, + local_log_file TEXT DEFAULT NULL)""") conn.commit() db_utils.add_column_to_table(cursor, conn, 'spot', 'failure_reason', 'TEXT') @@ -103,6 +104,8 @@ def create_table(cursor, conn): value_to_replace_existing_entries=json.dumps({ 'max_restarts_on_errors': 0, })) + db_utils.add_column_to_table(cursor, conn, 'spot', 'local_log_file', + 'TEXT DEFAULT NULL') # `job_info` contains the mapping from job_id to the job_name. # In the future, it may contain more information about each job. @@ -157,6 +160,7 @@ def _get_db_path() -> str: 'task_id', 'task_name', 'specs', + 'local_log_file', # columns from the job_info table '_job_info_job_id', # This should be the same as job_id 'job_name', @@ -512,6 +516,16 @@ def set_cancelled(job_id: int, callback_func: CallbackType): callback_func('CANCELLED') +def set_local_log_file(job_id: int, task_id: Optional[int], + local_log_file: str): + """Set the local log file for a job.""" + task_str = '' if task_id is None else f' AND task_id={task_id}' + with db_utils.safe_cursor(_DB_PATH) as cursor: + cursor.execute( + 'UPDATE spot SET local_log_file=(?) ' + f'WHERE spot_job_id=(?){task_str}', (local_log_file, job_id)) + + # ======== utility functions ======== def get_nonterminal_job_ids_by_name(name: Optional[str]) -> List[int]: """Get non-terminal job ids by name.""" @@ -662,3 +676,13 @@ def get_task_specs(job_id: int, task_id: int) -> Dict[str, Any]: WHERE spot_job_id=(?) AND task_id=(?)""", (job_id, task_id)).fetchone() return json.loads(task_specs[0]) + + +def get_local_log_file(job_id: int, task_id: Optional[int]) -> Optional[str]: + """Get the local log directory for a job.""" + task_str = '' if task_id is None else f' AND task_id={task_id}' + with db_utils.safe_cursor(_DB_PATH) as cursor: + local_log_file = cursor.execute( + f'SELECT local_log_file FROM spot ' + f'WHERE spot_job_id=(?){task_str}', (job_id,)).fetchone() + return local_log_file[-1] if local_log_file else None diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index f82e1132678..629297abaa8 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -327,10 +327,19 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: if managed_job_status.is_failed(): job_msg = ('\nFailure reason: ' f'{managed_job_state.get_failure_reason(job_id)}') + log_file = managed_job_state.get_local_log_file(job_id, None) + if log_file is not None: + with open(log_file, 'r', encoding='utf-8') as f: + # Stream the logs to the console without reading the whole + # file into memory. + for line in f: + print(line, end='', flush=True) + return '' return (f'{colorama.Fore.YELLOW}' f'Job {job_id} is already in terminal state ' - f'{managed_job_status.value}. Logs will not be shown.' - f'{colorama.Style.RESET_ALL}{job_msg}') + f'{managed_job_status.value}. For more details, run: ' + f'sky jobs logs {job_id}{colorama.Style.RESET_ALL}' + f'{job_msg}') backend = backends.CloudVmRayBackend() task_id, managed_job_status = ( managed_job_state.get_latest_task_id_status(job_id)) diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index a6657df960d..c1e032f5c83 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -380,11 +380,13 @@ def download_and_stream_latest_job_log( else: log_dir = list(log_dirs.values())[0] log_file = os.path.join(log_dir, 'run.log') - # Print the logs to the console. try: with open(log_file, 'r', encoding='utf-8') as f: - print(f.read()) + # Stream the logs to the console without reading the whole + # file into memory. + for line in f: + print(line, end='', flush=True) except FileNotFoundError: logger.error('Failed to find the logs for the user ' f'program at {log_file}.') From 4f5dc77e6c1bfff6e00badef35f36456ca9da795 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 18 Nov 2024 17:52:46 +0000 Subject: [PATCH 02/10] Allow stream logs for finished jobs --- sky/jobs/controller.py | 24 +++++++++++++++++------- sky/skylet/log_lib.py | 4 +++- sky/skylet/log_lib.pyi | 3 +++ sky/utils/controller_utils.py | 7 ++++++- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/sky/jobs/controller.py b/sky/jobs/controller.py index 6fafcb05104..9ee2f4daff8 100644 --- a/sky/jobs/controller.py +++ b/sky/jobs/controller.py @@ -87,14 +87,19 @@ def __init__(self, job_id: int, dag_yaml: str, task.update_envs(task_envs) def _download_log_and_stream( - self, task_id: Optional[int], - handle: cloud_vm_ray_backend.CloudVmRayResourceHandle) -> None: + self, task_id: Optional[int], + handle: Optional[cloud_vm_ray_backend.CloudVmRayResourceHandle] + ) -> None: """Downloads and streams the logs of the latest job. We do not stream the logs from the cluster directly, as the donwload and stream should be faster, and more robust against preemptions or ssh disconnection during the streaming. """ + if handle is None: + logger.info(f'Cluster for job {self._job_id} is not found. ' + 'Skipping downloading and streaming the logs.') + return managed_job_logs_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, 'managed_jobs') log_file = controller_utils.download_and_stream_latest_job_log( @@ -216,11 +221,8 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: if job_status == job_lib.JobStatus.SUCCEEDED: end_time = managed_job_utils.get_job_timestamp( self._backend, cluster_name, get_end_time=True) - _, handle = backend_utils.refresh_cluster_status_handle( - cluster_name) - if handle is not None: - self._download_log_and_stream(task_id, handle) - # The job is done. + # The job is done. Set the job to SUCCEEDED first before start + # downloading and streaming the logs to make it more responsive. managed_job_state.set_succeeded(self._job_id, task_id, end_time=end_time, @@ -228,6 +230,14 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: logger.info( f'Managed job {self._job_id} (task: {task_id}) SUCCEEDED. ' f'Cleaning up the cluster {cluster_name}.') + clusters = backend_utils.get_clusters( + cluster_names=[cluster_name], + refresh=False, + include_controller=False) + if clusters: + handle = clusters[0].get('handle') + # Best effort to download and stream the logs. + self._download_log_and_stream(task_id, handle) # Only clean up the cluster, not the storages, because tasks may # share storages. recovery_strategy.terminate_cluster(cluster_name=cluster_name) diff --git a/sky/skylet/log_lib.py b/sky/skylet/log_lib.py index fa3f7f9f3fc..8a40982972a 100644 --- a/sky/skylet/log_lib.py +++ b/sky/skylet/log_lib.py @@ -34,6 +34,8 @@ logger = sky_logging.init_logger(__name__) +LOG_FILE_START_STREAMING_AT = 'Waiting for task resources on ' + class _ProcessingArgs: """Arguments for processing logs.""" @@ -435,7 +437,7 @@ def tail_logs(job_id: Optional[int], time.sleep(_SKY_LOG_WAITING_GAP_SECONDS) status = job_lib.update_job_status([job_id], silent=True)[0] - start_stream_at = 'Waiting for task resources on ' + start_stream_at = LOG_FILE_START_STREAMING_AT # Explicitly declare the type to avoid mypy warning. lines: Iterable[str] = [] if follow and status in [ diff --git a/sky/skylet/log_lib.pyi b/sky/skylet/log_lib.pyi index 01b08b6444f..89d1628ec11 100644 --- a/sky/skylet/log_lib.pyi +++ b/sky/skylet/log_lib.pyi @@ -13,6 +13,9 @@ from sky.skylet import constants as constants from sky.skylet import job_lib as job_lib from sky.utils import log_utils as log_utils +LOG_FILE_START_STREAMING_AT: str = ... + + class _ProcessingArgs: log_path: str stream_logs: bool diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index c1e032f5c83..f05451f0820 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -26,6 +26,7 @@ from sky.serve import constants as serve_constants from sky.serve import serve_utils from sky.skylet import constants +from sky.skylet import log_lib from sky.utils import common_utils from sky.utils import env_options from sky.utils import rich_utils @@ -385,8 +386,12 @@ def download_and_stream_latest_job_log( with open(log_file, 'r', encoding='utf-8') as f: # Stream the logs to the console without reading the whole # file into memory. + start_streaming = False for line in f: - print(line, end='', flush=True) + if log_lib.LOG_FILE_START_STREAMING_AT in line: + start_streaming = True + if start_streaming: + print(line, end='', flush=True) except FileNotFoundError: logger.error('Failed to find the logs for the user ' f'program at {log_file}.') From 109fe279f729165bedeacc1a7deebb32921079d3 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 18 Nov 2024 19:54:50 +0000 Subject: [PATCH 03/10] Read files after the indicator lines --- sky/jobs/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index 629297abaa8..992aaaabae4 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -332,8 +332,12 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: with open(log_file, 'r', encoding='utf-8') as f: # Stream the logs to the console without reading the whole # file into memory. + start_streaming = False for line in f: - print(line, end='', flush=True) + if log_lib.LOG_FILE_START_STREAMING_AT in line: + start_streaming = True + if start_streaming: + print(line, end='', flush=True) return '' return (f'{colorama.Fore.YELLOW}' f'Job {job_id} is already in terminal state ' From 6e67cd1ba1c849d679cd941ab1c29c059abf9a2b Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 18 Nov 2024 20:11:07 +0000 Subject: [PATCH 04/10] Add refresh for `sky jobs logs` --- sky/cli.py | 13 ++++++- sky/jobs/core.py | 95 ++++++++++++++++++++++++++++++------------------ 2 files changed, 71 insertions(+), 37 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 490749d1231..2f5f49927ff 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3896,16 +3896,25 @@ def jobs_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool): default=False, help=('Show the controller logs of this job; useful for debugging ' 'launching/recoveries, etc.')) +@click.option( + '--refresh', + '-r', + default=False, + is_flag=True, + required=False, + help='Query the latest job logs, restarting the jobs controller if stopped.' +) @click.argument('job_id', required=False, type=int) @usage_lib.entrypoint def jobs_logs(name: Optional[str], job_id: Optional[int], follow: bool, - controller: bool): + controller: bool, refresh: bool): """Tail the log of a managed job.""" try: managed_jobs.tail_logs(name=name, job_id=job_id, follow=follow, - controller=controller) + controller=controller, + refresh=refresh) except exceptions.ClusterNotUpError: with ux_utils.print_exception_no_traceback(): raise diff --git a/sky/jobs/core.py b/sky/jobs/core.py index f11a556f2d4..1396ed84d98 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -1,6 +1,7 @@ """SDK functions for managed jobs.""" import os import tempfile +import typing from typing import Any, Dict, List, Optional, Union import uuid @@ -29,6 +30,9 @@ from sky.utils import timeline from sky.utils import ux_utils +if typing.TYPE_CHECKING: + from sky.backends import cloud_vm_ray_backend + @timeline.event @usage_lib.entrypoint @@ -225,6 +229,40 @@ def queue_from_kubernetes_pod( return jobs +def _maybe_restart_controller( + refresh: bool, stopped_message: str, spinner_message: str +) -> 'cloud_vm_ray_backend.CloudVmRayResourceHandle': + """Restart controller if refresh is True and it is stopped.""" + jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER + if refresh: + stopped_message = '' + try: + handle = backend_utils.is_controller_accessible( + controller=jobs_controller_type, stopped_message=stopped_message) + except exceptions.ClusterNotUpError as e: + if not refresh: + raise + handle = None + controller_status = e.cluster_status + + if handle is not None: + return handle + + sky_logging.print(f'{colorama.Fore.YELLOW}' + f'Restarting {jobs_controller_type.value.name}...' + f'{colorama.Style.RESET_ALL}') + + rich_utils.force_update_status( + ux_utils.spinner_message(f'{spinner_message} - restarting ' + 'controller')) + handle = sky.start(jobs_controller_type.value.cluster_name) + controller_status = status_lib.ClusterStatus.UP + rich_utils.force_update_status(ux_utils.spinner_message(spinner_message)) + + assert handle is not None, (controller_status, refresh) + return handle + + @usage_lib.entrypoint def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. @@ -252,34 +290,11 @@ def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: does not exist. RuntimeError: if failed to get the managed jobs with ssh. """ - jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER - stopped_message = '' - if not refresh: - stopped_message = 'No in-progress managed jobs.' - try: - handle = backend_utils.is_controller_accessible( - controller=jobs_controller_type, stopped_message=stopped_message) - except exceptions.ClusterNotUpError as e: - if not refresh: - raise - handle = None - controller_status = e.cluster_status - - if refresh and handle is None: - sky_logging.print(f'{colorama.Fore.YELLOW}' - 'Restarting controller for latest status...' - f'{colorama.Style.RESET_ALL}') - - rich_utils.force_update_status( - ux_utils.spinner_message('Checking managed jobs - restarting ' - 'controller')) - handle = sky.start(jobs_controller_type.value.cluster_name) - controller_status = status_lib.ClusterStatus.UP - rich_utils.force_update_status( - ux_utils.spinner_message('Checking managed jobs')) - - assert handle is not None, (controller_status, refresh) - + handle = _maybe_restart_controller(refresh, + stopped_message='No in-progress ' + 'managed jobs.', + spinner_message='Checking ' + 'managed jobs') backend = backend_utils.get_backend_from_handle(handle) assert isinstance(backend, backends.CloudVmRayBackend) @@ -371,7 +386,7 @@ def cancel(name: Optional[str] = None, @usage_lib.entrypoint def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool, - controller: bool) -> None: + controller: bool, refresh: bool) -> None: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. """Tail logs of managed jobs. @@ -382,15 +397,25 @@ def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool, sky.exceptions.ClusterNotUpError: the jobs controller is not up. """ # TODO(zhwu): Automatically restart the jobs controller + if name is not None and job_id is not None: + raise ValueError('Cannot specify both name and job_id.') + jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER - handle = backend_utils.is_controller_accessible( - controller=jobs_controller_type, + job_name_or_id_str = '' + if job_id is not None: + job_name_or_id_str = str(job_id) + elif name is not None: + job_name_or_id_str = f'-n {name}' + else: + job_name_or_id_str = '' + handle = _maybe_restart_controller( + refresh, stopped_message=( - 'Please restart the jobs controller with ' - f'`sky start {jobs_controller_type.value.cluster_name}`.')) + f'{jobs_controller_type.value.name.capitalize()} is stopped. To ' + f'get the logs, run: {colorama.Style.BRIGHT}sky jobs logs ' + f'-r {job_name_or_id_str}{colorama.Style.RESET_ALL}'), + spinner_message='Retrieving job logs') - if name is not None and job_id is not None: - raise ValueError('Cannot specify both name and job_id.') backend = backend_utils.get_backend_from_handle(handle) assert isinstance(backend, backends.CloudVmRayBackend), backend From fb3e678637c63e6895079338884bc61c47987d6d Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 18 Nov 2024 20:20:02 +0000 Subject: [PATCH 05/10] fix log message --- sky/jobs/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index 992aaaabae4..267c205285b 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -342,7 +342,8 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: return (f'{colorama.Fore.YELLOW}' f'Job {job_id} is already in terminal state ' f'{managed_job_status.value}. For more details, run: ' - f'sky jobs logs {job_id}{colorama.Style.RESET_ALL}' + f'sky jobs logs --controller {job_id}' + f'{colorama.Style.RESET_ALL}' f'{job_msg}') backend = backends.CloudVmRayBackend() task_id, managed_job_status = ( From 1afffa51f972e4f7f6a0b02aaa0356aebd6bcddb Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 25 Nov 2024 19:41:03 +0000 Subject: [PATCH 06/10] address comments --- sky/jobs/controller.py | 5 ++++- sky/jobs/core.py | 3 ++- sky/jobs/state.py | 16 ++++++++++++---- sky/utils/controller_utils.py | 2 ++ 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/sky/jobs/controller.py b/sky/jobs/controller.py index 9ee2f4daff8..72dce3e50d7 100644 --- a/sky/jobs/controller.py +++ b/sky/jobs/controller.py @@ -90,7 +90,7 @@ def _download_log_and_stream( self, task_id: Optional[int], handle: Optional[cloud_vm_ray_backend.CloudVmRayResourceHandle] ) -> None: - """Downloads and streams the logs of the latest job. + """Downloads and streams the logs of the current job with given task ID. We do not stream the logs from the cluster directly, as the donwload and stream should be faster, and more robust against @@ -105,6 +105,8 @@ def _download_log_and_stream( log_file = controller_utils.download_and_stream_latest_job_log( self._backend, handle, managed_job_logs_dir) if log_file is not None: + # Set the path of the log file for the current task, so it can be + # accessed even after the job is finished managed_job_state.set_local_log_file(self._job_id, task_id, log_file) logger.info(f'\n== End of logs (ID: {self._job_id}) ==') @@ -235,6 +237,7 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: refresh=False, include_controller=False) if clusters: + assert len(clusters) == 1, (clusters, cluster_name) handle = clusters[0].get('handle') # Best effort to download and stream the logs. self._download_log_and_stream(task_id, handle) diff --git a/sky/jobs/core.py b/sky/jobs/core.py index 1396ed84d98..9cde3443816 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -398,7 +398,8 @@ def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool, """ # TODO(zhwu): Automatically restart the jobs controller if name is not None and job_id is not None: - raise ValueError('Cannot specify both name and job_id.') + with ux_utils.print_exception_no_traceback(): + raise ValueError('Cannot specify both name and job_id.') jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER job_name_or_id_str = '' diff --git a/sky/jobs/state.py b/sky/jobs/state.py index 7a006bf1619..9a5ab4b3cad 100644 --- a/sky/jobs/state.py +++ b/sky/jobs/state.py @@ -519,11 +519,15 @@ def set_cancelled(job_id: int, callback_func: CallbackType): def set_local_log_file(job_id: int, task_id: Optional[int], local_log_file: str): """Set the local log file for a job.""" - task_str = '' if task_id is None else f' AND task_id={task_id}' + filter_str = 'spot_job_id=(?)' + filter_args = [local_log_file, job_id] + if task_id is not None: + filter_str += ' AND task_id=(?)' + filter_args.append(task_id) with db_utils.safe_cursor(_DB_PATH) as cursor: cursor.execute( 'UPDATE spot SET local_log_file=(?) ' - f'WHERE spot_job_id=(?){task_str}', (local_log_file, job_id)) + f'WHERE {filter_str}', filter_args) # ======== utility functions ======== @@ -680,9 +684,13 @@ def get_task_specs(job_id: int, task_id: int) -> Dict[str, Any]: def get_local_log_file(job_id: int, task_id: Optional[int]) -> Optional[str]: """Get the local log directory for a job.""" - task_str = '' if task_id is None else f' AND task_id={task_id}' + filter_str = 'spot_job_id=(?)' + filter_args = [job_id] + if task_id is not None: + filter_str += ' AND task_id=(?)' + filter_args.append(task_id) with db_utils.safe_cursor(_DB_PATH) as cursor: local_log_file = cursor.execute( f'SELECT local_log_file FROM spot ' - f'WHERE spot_job_id=(?){task_str}', (job_id,)).fetchone() + f'WHERE {filter_str}', filter_args).fetchone() return local_log_file[-1] if local_log_file else None diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index f05451f0820..e157b6416a5 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -382,6 +382,8 @@ def download_and_stream_latest_job_log( log_dir = list(log_dirs.values())[0] log_file = os.path.join(log_dir, 'run.log') # Print the logs to the console. + # TODO(zhwu): refactor this into log_utils, along with the + # refactoring for the log_lib.tail_logs. try: with open(log_file, 'r', encoding='utf-8') as f: # Stream the logs to the console without reading the whole From 08891df4c43d2049ce69299efdb11dea5841fbf9 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 25 Nov 2024 20:04:38 +0000 Subject: [PATCH 07/10] Add smoke test --- tests/test_smoke.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 6ba81ce68f0..04f93babd9d 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -3537,9 +3537,14 @@ def test_managed_jobs_inline_env(generic_cloud: str): test = Test( 'test-managed-jobs-inline-env', [ - f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', + f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "echo "\\$TEST_ENV"; ([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', 'sleep 20', f'{_GET_JOB_QUEUE} | grep {name} | grep SUCCEEDED', + f'JOB_ID=$(sky jobs list -n {name} | grep {name} | head -n1 | awk \'{{print $1}}\') && ' + # Test that logs are still available after the job finishes. + 's=$(sky jobs logs $JOB_ID) && echo "$s" && echo "$s" | grep "hello world" && ' + # Make sure we skip the unnecessary logs. + 'echo "$s" | head -n1 | grep "Waiting for"', ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. From 10c5f21a619a6c4f8d89f788824627888008195a Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 25 Nov 2024 20:06:50 +0000 Subject: [PATCH 08/10] fix smoke --- tests/test_smoke.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 04f93babd9d..3d166777add 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -3540,7 +3540,7 @@ def test_managed_jobs_inline_env(generic_cloud: str): f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "echo "\\$TEST_ENV"; ([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', 'sleep 20', f'{_GET_JOB_QUEUE} | grep {name} | grep SUCCEEDED', - f'JOB_ID=$(sky jobs list -n {name} | grep {name} | head -n1 | awk \'{{print $1}}\') && ' + f'JOB_ID=$(sky jobs queue -n {name} | grep {name} | head -n1 | awk \'{{print $1}}\') && ' # Test that logs are still available after the job finishes. 's=$(sky jobs logs $JOB_ID) && echo "$s" && echo "$s" | grep "hello world" && ' # Make sure we skip the unnecessary logs. From 1b3c2775a393d63c9a1d76c1acb956013cba22ad Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 26 Nov 2024 00:43:58 +0000 Subject: [PATCH 09/10] fix jobs queue smoke test --- tests/test_smoke.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 3d166777add..f95b5f04c8f 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -3539,10 +3539,12 @@ def test_managed_jobs_inline_env(generic_cloud: str): [ f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "echo "\\$TEST_ENV"; ([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', 'sleep 20', - f'{_GET_JOB_QUEUE} | grep {name} | grep SUCCEEDED', - f'JOB_ID=$(sky jobs queue -n {name} | grep {name} | head -n1 | awk \'{{print $1}}\') && ' + f'JOB_ROW=$(sky jobs queue | grep {name} | head -n1) && ' + f'echo "$JOB_ROW" && echo "$JOB_ROW" | grep "SUCCEEDED" && ' + f'JOB_ID=$(echo "$JOB_ROW" | awk \'{{print $1}}\') && ' + f'echo "JOB_ID=$JOB_ID" && ' # Test that logs are still available after the job finishes. - 's=$(sky jobs logs $JOB_ID) && echo "$s" && echo "$s" | grep "hello world" && ' + 'unset SKYPILOT_DEBUG; s=$(sky jobs logs $JOB_ID --refresh) && echo "$s" && echo "$s" | grep "hello world" && ' # Make sure we skip the unnecessary logs. 'echo "$s" | head -n1 | grep "Waiting for"', ], From a949ec130b998a5d1abd1cdad70a692dc7d348ec Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 3 Dec 2024 21:42:52 +0000 Subject: [PATCH 10/10] fix storage --- tests/test_smoke.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 440516bc2ee..78c87e0e60e 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -3651,6 +3651,8 @@ def test_managed_jobs_storage(generic_cloud: str): job_name=name, job_status=[sky.ManagedJobStatus.SUCCEEDED], timeout=60 + _BUMP_UP_SECONDS), + # Wait for the job to be cleaned up. + 'sleep 20', f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{storage_name}\')].Name" --output text | wc -l) -eq 0 ]', # Check if file was written to the mounted output bucket output_check_cmd