Skip to content

Commit

Permalink
feat: adds method to exec on multiple devices
Browse files Browse the repository at this point in the history
  • Loading branch information
pallabpain committed Dec 16, 2024
1 parent 6749589 commit ac9e051
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 3 deletions.
74 changes: 71 additions & 3 deletions rapyuta_io/clients/device_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# encoding: utf-8
from __future__ import absolute_import

import time
import typing
from enum import Enum

import requests

from rapyuta_io.clients.device import Device, DeviceStatus
from rapyuta_io.clients.model import Command
from rapyuta_io.utils import RestClient
from rapyuta_io.utils.error import ParameterMissingException
from rapyuta_io.utils.rest_client import HttpMethod
from rapyuta_io.utils.settings import DEVICE_API_PATH, DEVICE_SELECTION_API_PATH, PARAMETERS_API_PATH, \
DEVICE_API_ADD_DEVICE_PATH, DAEMONS_PATH
from rapyuta_io.utils.utils import create_auth_header, prepend_bearer_to_auth_token, get_api_response_data, \
from rapyuta_io.utils.settings import DAEMONS_PATH, DEVICE_API_ADD_DEVICE_PATH, DEVICE_API_PATH, \
DEVICE_COMMAND_API_PATH, DEVICE_SELECTION_API_PATH, PARAMETERS_API_PATH
from rapyuta_io.utils.utils import create_auth_header, get_api_response_data, get_error, prepend_bearer_to_auth_token, \
validate_list_of_strings


Expand Down Expand Up @@ -131,3 +137,65 @@ def patch_daemons(self, device_id, payload):
headers = create_auth_header(self._auth_token, self._project)
response = RestClient(url).method(HttpMethod.PATCH).headers(headers).execute(payload=payload)
return get_api_response_data(response, parse_full=True)

def execute_command(
self,
device_ids: typing.List[str],
command: Command,
retry_limit: int = 0,
retry_interval: int = 10,
timeout: int = 300,
):
"""Execute a command on the specified devices.
Args:
device_ids: List of device IDs on which the command should be executed.
command: Command object to be executed.
retry_limit: Number of retries in case of API failure.
retry_interval: Interval between retries.
timeout: Maximum time to wait for the background command to finish.
Returns:
dict: Output of the command execution.
Raises:
ValueError: If device_ids is empty.
TimeoutError: If command execution takes longer than the specified timeout.
ParameterMissingException: If the command is missing required parameters.
"""
if not device_ids:
raise ValueError("device_ids cannot be empty")

command.validate()
command.device_ids = device_ids

url = self._device_api_host + DEVICE_COMMAND_API_PATH
rc = RestClient(url).method(HttpMethod.POST).headers(
create_auth_header(self._auth_token, self._project))
response = rc.retry(retry_limit).execute(payload=command.to_json())
if response.status_code == requests.codes.BAD_REQUEST:
raise ParameterMissingException(get_error(response.text))

execution_result = get_api_response_data(response)

if not command.bg:
return execution_result

jid = execution_result.get('jid')
if not jid:
raise ValueError("job id not found in the response")

url = self._device_api_host + DEVICE_COMMAND_API_PATH + jid
query = {"jid": jid, "device_id": device_ids}
time_elapsed = 0
wait_interval = retry_interval
while time_elapsed < timeout:
response = RestClient(url).method(HttpMethod.GET).headers(
create_auth_header(self._auth_token, self._project)).query_param(query_param=query).execute()
if response.status_code == requests.codes.OK:
result = get_api_response_data(response)
return result
time.sleep(wait_interval)
time_elapsed += wait_interval

raise TimeoutError(f"command result not available after {timeout} seconds")
34 changes: 34 additions & 0 deletions rapyuta_io/rio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import json
import os
import typing

import six

Expand All @@ -12,6 +13,7 @@
from rapyuta_io.clients.device import Device
from rapyuta_io.clients.metrics import ListMetricsRequest, ListTagKeysRequest, ListTagValuesRequest, Metric, \
MetricFunction, MetricOperation, QueryMetricsRequest, QueryMetricsResponse, Tags
from rapyuta_io.clients.model import Command
from rapyuta_io.clients.rip_client import AuthTokenLevel, RIPClient
from rapyuta_io.clients.rosbag import ROSBagBlob, ROSBagBlobStatus, ROSBagJob, ROSBagJobStatus
from rapyuta_io.clients.user_group import UserGroup
Expand Down Expand Up @@ -231,6 +233,38 @@ def delete_device(self, device_id):
raise InvalidParameterException('device_id needs to be a non empty string')
return self._dmClient.delete_device(device_id)

def execute_command(
self,
device_ids: typing.List[str],
command: Command,
retry_limit: int = 0,
retry_interval: int = 10,
timeout: int = 300,
):
"""Execute a command on the specified devices.
:param device_ids: List of device IDs on which the command should be executed.
:type device_ids: list[str]
:param command: Command object to be executed.
:type command: Command
:param retry_limit: Number of retries in case of API failure.
:type retry_limit: int
:param retry_interval: Interval between retries.
:type retry_interval: int
:param timeout: Timeout for the command execution.
:type timeout: int
Following example demonstrates how to execute a command on a device.
>>> from rapyuta_io import Client
>>> from rapyuta_io.clients.model import Command
>>> client = Client(auth_token='auth_token', project='project_guid')
>>> command = Command('echo "Hello World!"')
>>> client.execute_command(['device-id'], command)
"""
return self._dmClient.execute_command(device_ids, command, retry_limit, retry_interval, timeout)

def toggle_features(self, device_id, features, config=None):
"""
Patch a device on rapyuta.io platform.
Expand Down

0 comments on commit ac9e051

Please sign in to comment.