diff --git a/rapyuta_io/clients/device_manager.py b/rapyuta_io/clients/device_manager.py index 59a69bf..b9609a8 100644 --- a/rapyuta_io/clients/device_manager.py +++ b/rapyuta_io/clients/device_manager.py @@ -1,14 +1,20 @@ # encoding: utf-8 from __future__ import absolute_import +import time +import typing from enum import Enum +import requests + from rapyuta_io.clients.device import Device, DeviceStatus +from rapyuta_io.clients.model import Command from rapyuta_io.utils import RestClient +from rapyuta_io.utils.error import ParameterMissingException from rapyuta_io.utils.rest_client import HttpMethod -from rapyuta_io.utils.settings import DEVICE_API_PATH, DEVICE_SELECTION_API_PATH, PARAMETERS_API_PATH, \ - DEVICE_API_ADD_DEVICE_PATH, DAEMONS_PATH -from rapyuta_io.utils.utils import create_auth_header, prepend_bearer_to_auth_token, get_api_response_data, \ +from rapyuta_io.utils.settings import DAEMONS_PATH, DEVICE_API_ADD_DEVICE_PATH, DEVICE_API_PATH, \ + DEVICE_COMMAND_API_PATH, DEVICE_SELECTION_API_PATH, PARAMETERS_API_PATH +from rapyuta_io.utils.utils import create_auth_header, get_api_response_data, get_error, prepend_bearer_to_auth_token, \ validate_list_of_strings @@ -131,3 +137,65 @@ def patch_daemons(self, device_id, payload): headers = create_auth_header(self._auth_token, self._project) response = RestClient(url).method(HttpMethod.PATCH).headers(headers).execute(payload=payload) return get_api_response_data(response, parse_full=True) + + def execute_command( + self, + device_ids: typing.List[str], + command: Command, + retry_limit: int = 0, + retry_interval: int = 10, + timeout: int = 300, + ): + """Execute a command on the specified devices. + + Args: + device_ids: List of device IDs on which the command should be executed. + command: Command object to be executed. + retry_limit: Number of retries in case of API failure. + retry_interval: Interval between retries. + timeout: Maximum time to wait for the background command to finish. + + Returns: + dict: Output of the command execution. + + Raises: + ValueError: If device_ids is empty. + TimeoutError: If command execution takes longer than the specified timeout. + ParameterMissingException: If the command is missing required parameters. + """ + if not device_ids: + raise ValueError("device_ids cannot be empty") + + command.validate() + command.device_ids = device_ids + + url = self._device_api_host + DEVICE_COMMAND_API_PATH + rc = RestClient(url).method(HttpMethod.POST).headers( + create_auth_header(self._auth_token, self._project)) + response = rc.retry(retry_limit).execute(payload=command.to_json()) + if response.status_code == requests.codes.BAD_REQUEST: + raise ParameterMissingException(get_error(response.text)) + + execution_result = get_api_response_data(response) + + if not command.bg: + return execution_result + + jid = execution_result.get('jid') + if not jid: + raise ValueError("job id not found in the response") + + url = self._device_api_host + DEVICE_COMMAND_API_PATH + jid + query = {"jid": jid, "device_id": device_ids} + time_elapsed = 0 + wait_interval = retry_interval + while time_elapsed < timeout: + response = RestClient(url).method(HttpMethod.GET).headers( + create_auth_header(self._auth_token, self._project)).query_param(query_param=query).execute() + if response.status_code == requests.codes.OK: + result = get_api_response_data(response) + return result + time.sleep(wait_interval) + time_elapsed += wait_interval + + raise TimeoutError(f"command result not available after {timeout} seconds") diff --git a/rapyuta_io/rio_client.py b/rapyuta_io/rio_client.py index e4b5bf7..3354afa 100644 --- a/rapyuta_io/rio_client.py +++ b/rapyuta_io/rio_client.py @@ -3,6 +3,7 @@ import json import os +import typing import six @@ -12,6 +13,7 @@ from rapyuta_io.clients.device import Device from rapyuta_io.clients.metrics import ListMetricsRequest, ListTagKeysRequest, ListTagValuesRequest, Metric, \ MetricFunction, MetricOperation, QueryMetricsRequest, QueryMetricsResponse, Tags +from rapyuta_io.clients.model import Command from rapyuta_io.clients.rip_client import AuthTokenLevel, RIPClient from rapyuta_io.clients.rosbag import ROSBagBlob, ROSBagBlobStatus, ROSBagJob, ROSBagJobStatus from rapyuta_io.clients.user_group import UserGroup @@ -231,6 +233,38 @@ def delete_device(self, device_id): raise InvalidParameterException('device_id needs to be a non empty string') return self._dmClient.delete_device(device_id) + def execute_command( + self, + device_ids: typing.List[str], + command: Command, + retry_limit: int = 0, + retry_interval: int = 10, + timeout: int = 300, + ): + """Execute a command on the specified devices. + + :param device_ids: List of device IDs on which the command should be executed. + :type device_ids: list[str] + :param command: Command object to be executed. + :type command: Command + :param retry_limit: Number of retries in case of API failure. + :type retry_limit: int + :param retry_interval: Interval between retries. + :type retry_interval: int + :param timeout: Timeout for the command execution. + :type timeout: int + + Following example demonstrates how to execute a command on a device. + + >>> from rapyuta_io import Client + >>> from rapyuta_io.clients.model import Command + >>> client = Client(auth_token='auth_token', project='project_guid') + >>> command = Command('echo "Hello World!"') + >>> client.execute_command(['device-id'], command) + + """ + return self._dmClient.execute_command(device_ids, command, retry_limit, retry_interval, timeout) + def toggle_features(self, device_id, features, config=None): """ Patch a device on rapyuta.io platform.