diff --git a/iperf3/csclient.py b/iperf3/csclient.py new file mode 100644 index 0000000..2fb8fea --- /dev/null +++ b/iperf3/csclient.py @@ -0,0 +1,530 @@ +""" +NCOS communication module for SDK applications. + +Copyright (c) 2018 Cradlepoint, Inc. . All rights reserved. + +This file contains confidential information of CradlePoint, Inc. and your use of +this file is subject to the CradlePoint Software License Agreement distributed with +this file. Unauthorized reproduction or distribution of this file is subject to civil and +criminal penalties. +""" + + +import json +import os +import re +import select +import socket +import threading +import logging.handlers +import signal +import sys + +try: + import traceback +except ImportError: + traceback = None + + +class SdkCSException(Exception): + pass + + +class CSClient(object): + """ + The CSClient class is the NCOS SDK mechanism for communication between apps and the router tree/config store. + Instances of this class communicate with the router using either an explicit socket or with http method calls. + + Apps running locally on the router use a socket on the router to send commands from the app to the router tree + and to receive data (JSON) from the router tree. + + Apps running remotely use the requests library to send HTTP method calls to the router and to receive data from + the router tree. This allows one to use an IDE to run and debug the application on a the computer. Although, + there are limitations with respect to the device hardware access (i.e. serial, USB, etc.). + """ + END_OF_HEADER = b"\r\n\r\n" + STATUS_HEADER_RE = re.compile(b"status: \w*") + CONTENT_LENGTH_HEADER_RE = re.compile(b"content-length: \w*") + MAX_PACKET_SIZE = 8192 + RECV_TIMEOUT = 2.0 + + _instances = {} + + @classmethod + def is_initialized(cls): + return cls in cls._instances + + def __new__(cls, *na, **kwna): + """ Singleton factory (with subclassing support) """ + if not cls.is_initialized(): + cls._instances[cls] = super().__new__(cls) + return cls._instances[cls] + + def __init__(self, app_name, init=False): + self.app_name = app_name + handlers = [logging.StreamHandler()] + if sys.platform == 'linux2': + handlers.append(logging.handlers.SysLogHandler(address='/dev/log')) + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s: %(message)s', datefmt='%b %d %H:%M:%S', + handlers=handlers) + self.logger = logging.getLogger(app_name) + if not init: + return + + def get(self, base, query='', tree=0): + """ + Constructs and sends a get request to retrieve specified data from a device. + + The behavior of this method is contextual: + - If the app is installed on (and executed from) a device, it directly queries the router tree to retrieve the + specified data. + - If the app running remotely from a computer it calls the HTTP GET method to retrieve the specified data. + + Args: + base: String representing a path to a resource on a router tree, + (i.e. '/config/system/logging/level'). + value: Not required. + query: Not required. + tree: Not required. + + Returns: + A dictionary containing the response (i.e. {"success": True, "data:": {}} + + """ + if sys.platform == 'linux2': + cmd = "get\n{}\n{}\n{}\n".format(base, query, tree) + return self._dispatch(cmd).get('data') + else: + # Running in a computer so use http to send the get to the device. + import requests + device_ip, username, password = self._get_device_access_info() + device_api = 'http://{}/api/{}/{}'.format(device_ip, base, query) + + try: + response = requests.get(device_api, auth=self._get_auth(device_ip, username, password)) + + except (requests.exceptions.Timeout, + requests.exceptions.ConnectionError): + print("Timeout: device at {} did not respond.".format(device_ip)) + return None + + return json.loads(response.text).get('data') + + def put(self, base, value='', query='', tree=0): + """ + Constructs and sends a put request to update or add specified data to the device router tree. + + The behavior of this method is contextual: + - If the app is installed on(and executed from) a device, it directly updates or adds the specified data to + the router tree. + - If the app running remotely from a computer it calls the HTTP PUT method to update or add the specified + data. + + + Args: + base: String representing a path to a resource on a router tree, + (i.e. '/config/system/logging/level'). + value: Not required. + query: Not required. + tree: Not required. + + Returns: + A dictionary containing the response (i.e. {"success": True, "data:": {}} + """ + value = json.dumps(value) + if sys.platform == 'linux2': + cmd = "put\n{}\n{}\n{}\n{}\n".format(base, query, tree, value) + return self._dispatch(cmd) + else: + # Running in a computer so use http to send the put to the device. + import requests + device_ip, username, password = self._get_device_access_info() + device_api = 'http://{}/api/{}/{}'.format(device_ip, base, query) + + try: + response = requests.put(device_api, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + auth=self._get_auth(device_ip, username, password), + data={"data": '{}'.format(value)}) + except (requests.exceptions.Timeout, + requests.exceptions.ConnectionError): + print("Timeout: device at {} did not respond.".format(device_ip)) + return None + + return json.loads(response.text) + + def post(self, base, value='', query=''): + """ + Constructs and sends a post request to update or add specified data to the device router tree. + + The behavior of this method is contextual: + - If the app is installed on(and executed from) a device, it directly updates or adds the specified data to + the router tree. + - If the app running remotely from a computer it calls the HTTP POST method to update or add the specified + data. + + + Args: + base: String representing a path to a resource on a router tree, + (i.e. '/config/system/logging/level'). + value: Not required. + query: Not required. + + Returns: + A dictionary containing the response (i.e. {"success": True, "data:": {}} + """ + value = json.dumps(value) + if sys.platform == 'linux2': + cmd = f"post\n{base}\n{query}\n{value}\n" + return self._dispatch(cmd) + else: + # Running in a computer so use http to send the post to the device. + import requests + device_ip, username, password = self._get_device_access_info() + device_api = 'http://{}/api/{}/{}'.format(device_ip, base, query) + + try: + response = requests.post(device_api, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + auth=self._get_auth(device_ip, username, password), + data={"data": '{}'.format(value)}) + except (requests.exceptions.Timeout, + requests.exceptions.ConnectionError): + print("Timeout: device at {} did not respond.".format(device_ip)) + return None + + return json.loads(response.text) + + def delete(self, base, query=''): + """ + Constructs and sends a delete request to delete specified data to the device router tree. + + The behavior of this method is contextual: + - If the app is installed on(and executed from) a device, it directly deletes the specified data to + the router tree. + - If the app running remotely from a computer it calls the HTTP DELETE method to update or add the specified + data. + + + Args: + base: String representing a path to a resource on a router tree, + (i.e. '/config/system/logging/level'). + query: Not required. + + Returns: + A dictionary containing the response (i.e. {"success": True, "data:": {}} + """ + if sys.platform == 'linux2': + cmd = "delete\n{}\n{}\n".format(base, query) + return self._dispatch(cmd) + else: + # Running in a computer so use http to send the delete to the device. + import requests + device_ip, username, password = self._get_device_access_info() + device_api = 'http://{}/api/{}/{}'.format(device_ip, base, query) + + try: + response = requests.delete(device_api, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + auth=self._get_auth(device_ip, username, password), + data={"data": '{}'.format(value)}) + except (requests.exceptions.Timeout, + requests.exceptions.ConnectionError): + print("Timeout: device at {} did not respond.".format(device_ip)) + return None + + return json.loads(response.text) + + def alert(self, value=''): + """ + Constructs and sends a custom alert to NCM for the device. Apps calling this method must be running + on the target device to send the alert. If invoked while running on a computer, then only a log is output. + + Args: + + app_name: String name of your application. + value: String to displayed for the alert. + + Returns: + Success: None + Failure: An error + """ + if sys.platform == 'linux2': + cmd = "alert\n{}\n{}\n".format(self.app_name, value) + return self._dispatch(cmd) + else: + # Running in a computer and can't actually send the alert. + print('Alert is only available when running the app in NCOS.') + print('Alert Text: {}'.format(value)) + + def log(self, value=''): + """ + Adds an INFO log to the device SYSLOG. + + Args: + value: String text for the log. + + Returns: + None + """ + if sys.platform == 'linux2': + self.logger.info(value) + else: + # Running in a computer so just use print for the log. + print(value) + + def _get_auth(self, device_ip, username, password): + # This is only needed when the app is running in a computer. + # Returns the proper HTTP Auth for the global username and password. + # Digest Auth is used for NCOS 6.4 and below while Basic Auth is + # used for NCOS 6.5 and up. + import requests + from http import HTTPStatus + + use_basic = False + device_api = 'http://{}/api/status/product_info'.format(device_ip) + + try: + response = requests.get(device_api, auth=requests.auth.HTTPBasicAuth(username, password)) + if response.status_code == HTTPStatus.OK: + use_basic = True + + except: + use_basic = False + + if use_basic: + return requests.auth.HTTPBasicAuth(username, password) + else: + return requests.auth.HTTPDigestAuth(username, password) + + @staticmethod + def _get_device_access_info(): + # Should only be called when running in a computer. It will return the + # dev_client_ip, dev_client_username, and dev_client_password as defined in + # the sdk section of the sdk_settings.ini file. + device_ip = '' + device_username = '' + device_password = '' + + if sys.platform != 'linux2': + import os + import configparser + + settings_file = os.path.join(os.path.dirname(os.getcwd()), 'sdk_settings.ini') + config = configparser.ConfigParser() + config.read(settings_file) + + # Keys in sdk_settings.ini + sdk_key = 'sdk' + ip_key = 'dev_client_ip' + username_key = 'dev_client_username' + password_key = 'dev_client_password' + + if sdk_key in config: + if ip_key in config[sdk_key]: + device_ip = config[sdk_key][ip_key] + else: + print('ERROR 1: The {} key does not exist in {}'.format(ip_key, settings_file)) + + if username_key in config[sdk_key]: + device_username = config[sdk_key][username_key] + else: + print('ERROR 2: The {} key does not exist in {}'.format(username_key, settings_file)) + + if password_key in config[sdk_key]: + device_password = config[sdk_key][password_key] + else: + print('ERROR 3: The {} key does not exist in {}'.format(password_key, settings_file)) + else: + print('ERROR 4: The {} section does not exist in {}'.format(sdk_key, settings_file)) + + return device_ip, device_username, device_password + + def _safe_dispatch(self, cmd): + """Send the command and return the response.""" + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.connect('/var/tmp/cs.sock') + sock.sendall(bytes(cmd, 'ascii')) + return self._receive(sock) + + def _dispatch(self, cmd): + errmsg = None + result = "" + try: + result = self._safe_dispatch(cmd) + except Exception as err: + # ignore the command error, continue on to next command + errmsg = "dispatch failed with exception={} err={}".format(type(err), str(err)) + if errmsg is not None: + self.log(self.app_name, errmsg) + pass + return result + + def _safe_receive(self, sock): + sock.settimeout(self.RECV_TIMEOUT) + data = b"" + eoh = -1 + while eoh < 0: + # In the event that the config store times out in returning data, lib returns + # an empty result. Then again, if the config store hangs for 2+ seconds, + # the app's behavior is the least of our worries. + try: + buf = sock.recv(self.MAX_PACKET_SIZE) + except socket.timeout: + return {"status": "timeout", "data": None} + if len(buf) == 0: + break + data += buf + eoh = data.find(self.END_OF_HEADER) + + status_hdr = self.STATUS_HEADER_RE.search(data).group(0)[8:] + content_len = self.CONTENT_LENGTH_HEADER_RE.search(data).group(0)[16:] + remaining = int(content_len) - (len(data) - eoh - len(self.END_OF_HEADER)) + + # body sent from csevent_xxx.sock will have id, action, path, & cfg + while remaining > 0: + buf = sock.recv(self.MAX_PACKET_SIZE) # TODO: This will hang things as well. + if len(buf) == 0: + break + data += buf + remaining -= len(buf) + body = data[eoh:].decode() + try: + result = json.loads(body) + except json.JSONDecodeError as e: + # config store receiver doesn't give back + # proper json for 'put' ops, body + # contains verbose error message + # so putting the error msg in result + result = body.strip() + return {"status": status_hdr.decode(), "data": result} + + def _receive(self, sock): + errmsg = None + result = "" + try: + result = self._safe_receive(sock) + except Exception as err: + # ignore the command error, continue on to next command + errmsg = "_receive failed with exception={} err={}".format(type(err), str(err)) + if errmsg is not None: + self.log(self.app_name, errmsg) + return result + + +class EventingCSClient(CSClient): + running = False + registry = {} + eids = 1 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.on = self.register + self.un = self.unregister + + def start(self): + if self.running: + self.log(f"Eventing Config Store {self.pid} already running") + return + self.running = True + self.pid = os.getpid() + self.f = '/var/tmp/csevent_%d.sock' % self.pid + try: + os.unlink(self.f) + except FileNotFoundError: + pass + self.event_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.event_sock.bind(self.f) + self.event_sock.listen() # backlog is optional. already set on value found in /proc + self.event_sock.setblocking(False) + self.eloop = threading.Thread(target=self._handle_events) + self.eloop.start() + + def stop(self): + if not self.running: + return + self.log(f"Stopping {self.app_name}") + for k in list(self.registry.keys()): + self.unregister(k) + self.event_sock.close() + os.unlink(self.f) + self.running = False + + def _handle_events(self): + poller = select.poll() + poller.register(self.event_sock, + select.POLLIN | select.POLLERR | select.POLLHUP) # I don't unregsiter this in cleaning up! + while self.running: + try: + events = poller.poll(1000) + for f, ev in events: + if ev & (select.POLLERR | select.POLLHUP): + self.log("Hangup/error received. Stopping") + self.stop() # TODO: restart w/ cached registrations. Will no longer be an error case + + if ev & select.POLLIN: + conn, addr = self.event_sock.accept() + result = self._receive(conn) + eid = int(result['data']['id']) + try: + cb = self.registry[eid]['cb'] + args = self.registry[eid]['args'] + try: + # PUTting just a string to config store results in a json encoded string returned. + # e.g. set /config/system/logging/level "debug", result['data']['cfg'] is '"debug"' + cfg = json.loads(result['data']['cfg']) + except TypeError as e: + # Non-string path + cfg = result['data']['cfg'] + try: + cb_return = cb(result['data']['path'], cfg, args) + except: + if traceback: + traceback.print_exc() + self.log(f"Exception during callback for {str(self.registry[eid])}") + if result['data']['action'] == 'get': # We've something to send back. + # config_store_receiver expects json + cb_return = json.JSONEncoder().encode(cb_return) + conn.sendall( + cb_return.encode()) # No dispatch. Config store receiver will put to config store. + except (NameError, ValueError) as e: + self.log(f"Could not find register data for eid {eid}") + except OSError as e: + self.log(f"OSError: {e}") + raise + + def register(self, action: object, path: object, callback: object, *args: object) -> object: + if not self.running: + self.start() + # what about multiple registration? + eid = self.eids + self.eids += 1 + self.registry[eid] = {'cb': callback, 'action': action, 'path': path, 'args': args} + cmd = "register\n{}\n{}\n{}\n{}\n".format(self.pid, eid, action, path) + return self._dispatch(cmd) + + def unregister(self, eid): + ret = "" + try: + e = self.registry[eid] + except KeyError: + pass + else: + if self.running: + cmd = "unregister\n{}\n{}\n{}\n{}\n".format(self.pid, eid, e['action'], e['path']) + ret = self._dispatch(cmd) + del self.registry[eid] + return ret + + +def clean_up_reg(signal, frame): + """ + When 'cppython remote_port_forward.py' gets a SIGTERM, config_store_receiver.py doesn't + clean up registrations. Even if it did, the comm module can't rely on an external service + to clean up. + """ + EventingCSClient('CSClient').stop() + sys.exit(0) + + +signal.signal(signal.SIGTERM, clean_up_reg) diff --git a/iperf3/iperf3.py b/iperf3/iperf3.py new file mode 100644 index 0000000..8aa26c9 --- /dev/null +++ b/iperf3/iperf3.py @@ -0,0 +1,119 @@ +# iperf3 - run iperf3 to user defined server and update asset_id + +from csclient import EventingCSClient +import subprocess +import json +import time +import requests +import os +import sys + +sdk_data = 'iperf3_server' +default_server = '' + +def process_results(download, upload): + # put results in asset_id + msg = f'{download}Mbps Download {upload}Mbps Upload' + cp.put('config/system/asset_id', msg) + # Log + cp.log(msg) + # Alert + cp.alert(msg) + +def run_iperf3(server): + try: + args = f'-c {server} -J' + command = f'./iperf3-arm64v8 {args}'.split(' ') + results = '' + with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: + try: + proc.wait(timeout=60) # Set timeout to 60 seconds + for line in proc.stdout: + results += line.decode('utf-8') + except subprocess.TimeoutExpired: + proc.kill() + cp.logger.exception('iperf3 command timed out after 60 seconds') + return None + try: + json_results = json.loads(results) + except json.JSONDecodeError: + cp.logger.exception('JSONDecodeError: ' + results) + return None + return json_results + except subprocess.SubprocessError as e: + cp.logger.exception(f'SubprocessError running iperf3: {e}') + return None + except Exception as e: + cp.logger.exception(f'Unexpected error running iperf3: {e}') + return None + +def get_server(): + appdata = cp.get('config/system/sdk/appdata') + try: + server = [x["value"] for x in appdata if x["name"] == sdk_data][0] + if not server: + cp.log('Set iperf3 server in System > SDK Appdata.') + except IndexError: + cp.log('Set iperf3 server in System > SDK Appdata.') + cp.post('config/system/sdk/appdata', {'name': sdk_data, 'value': default_server}) + server = default_server + return server + +def main(path, value, *args): + if not value: # asset_id has been cleared, run iperf3 + server = get_server() + if not server: + cp.log('No iPerf3 server set in System > SDK Appdata.') + else: + cp.log(f'Running iperf3 upload to {server}') + upload = run_iperf3(server) + cp.log(f'Running iperf3 download from {server}') + download = run_iperf3(server + ' -R') + + if upload and download: + upload = round(upload["end"]["sum_sent"]["bits_per_second"] / 1000000, 2) + download = round(download["end"]["sum_received"]["bits_per_second"] / 1000000, 2) + process_results(download, upload) + else: + cp.log('No results from iperf3') + +def download_iperf3(): + try: + # Use the first argument as the URL if provided, otherwise use the default URL + url = sys.argv[1] if len(sys.argv) > 1 else "https://github.com/userdocs/iperf3-static/releases/download/3.17.1%2B/iperf3-arm64v8" + + # If file doesn't exist, download it + filename = url.split("/")[-1] + if not os.path.exists(filename): + # Download the tar.gz file + cp.log(f"Downloading {url}...") + response = requests.get(url) + if response.status_code != 200: + raise Exception(f"Failed to download the file from {url}. Status code: {response.status_code}") + + # Save the file in the target folder + filename = os.path.join("./", list(filter(None, url.split("/")))[-1]) + with open(filename, "wb") as file: + file.write(response.content) + + # Make the file executable + os.chmod(filename, 0o755) + + cp.log(f"Downloaded and made {filename} executable") + else: + cp.log(f"{filename} found") + return True + except Exception as e: + cp.log(f"An error occurred: {e}") + return False + +cp = EventingCSClient('iPerf3') +cp.log('Starting...') +if download_iperf3(): + cp.log('Set iPerf3 server in System > SDK Appdata. Clear asset_id to run iPerf3.') + if not cp.get('config/system/asset_id'): + cp.put('config/system/asset_id', 'Clear this to run iPerf3.') + cp.on('put', 'config/system/asset_id', main) + main(None, None, None) + while True: + time.sleep(1) diff --git a/iperf3/package.ini b/iperf3/package.ini new file mode 100644 index 0000000..69b5ee8 --- /dev/null +++ b/iperf3/package.ini @@ -0,0 +1,12 @@ +[iperf3] +uuid = 1f7054d2-9250-4742-8414-2505ee4109d7 +vendor = Cradlepoint +notes = Downloads and runs iPerf3 to user defined server and puts results in asset_id +version_major = 1 +version_minor = 0 +version_patch = 0 +auto_start = true +restart = true +reboot = true +firmware_major = 7 +firmware_minor = 24 diff --git a/iperf3/readme.md b/iperf3/readme.md new file mode 100644 index 0000000..aff3774 --- /dev/null +++ b/iperf3/readme.md @@ -0,0 +1,31 @@ +# User Manual for iPerf3 Application + +## Overview +The iPerf3 application is designed to test network performance by measuring the bandwidth between your device and a specified server. This application automates the process of downloading the iPerf3 tool, configuring the server, and running tests. Results are logged, alerted, and stored in the asset identifier for easy access. + +## Initial Setup +1. **Download iPerf3**: When the application is first executed, it will automatically download the iPerf3 tool if it is not already present on your device. +2. **Configure Server**: The application will create an entry in the SDK Appdata named `iperf3_server`. You need to enter the server hostname or IP address in this entry to specify where the tests should be directed. + +## Running the Application +- **Automatic Testing**: If a server is configured in the SDK Appdata, the application will automatically run a test to that server upon startup. +- **Results Handling**: The results of the test, including download and upload speeds, are: + - Stored in the `asset_id` field. + - Written to the router logs. + - Sent as an alert. + + You can modify how results are processed by editing the `process_results()` function at the top of the application code. + +## Accessing Results +- **NCM Device Grid**: Test results can be viewed in the NCM (Network Configuration Manager) under the devices grid, specifically in the asset identifier column. +- **NCM API**: Results can also be retrieved programmatically from the `asset_id` field using the NCM API v2 `/routers/` endpoint. + +## Triggering a New Test +- To initiate a new test, you can clear the `asset_id` in two ways: + - **NCM Device Grid**: Clear the `asset_id` directly in the devices grid within NCM. + - **NCM API**: Set the `asset_id` to an empty string (`""`) using the `/routers/` endpoint in the NCM API v2. + +## Customization +- You can customize the behavior of the application, such as how results are processed or where they are stored, by modifying the relevant sections in the code, particularly the `process_results()` function. + +This manual provides a basic understanding of how to set up and use the iPerf3 application for network performance testing. For further customization or troubleshooting, refer to the application code and comments. \ No newline at end of file diff --git a/iperf3/start.sh b/iperf3/start.sh new file mode 100644 index 0000000..e20774a --- /dev/null +++ b/iperf3/start.sh @@ -0,0 +1,2 @@ +#!/bin/bash +cppython iperf3.py