diff --git a/can/bridge.py b/can/bridge.py new file mode 100644 index 000000000..bbae0a3e1 --- /dev/null +++ b/can/bridge.py @@ -0,0 +1,169 @@ +""" +Creates a bridge between two CAN buses. + +This will connect to two CAN buses. Messages received on one +bus will be sent to the other bus and vice versa. +""" + +import argparse +import errno +import logging +import sys +import time +from datetime import datetime +from typing import ( + Iterator, + List, + Tuple, +) + +import can + +from .logger import _create_base_argument_parser, _create_bus, _parse_additional_config + +USAGE = """ +usage: can_bridge [{general config} --] {can A config} -- {can B config} + +Bridge two CAN buses. + +Both can buses will be connected so that messages from bus A will be sent on +bus B and messages on bus B will be sent to bus A. The buses are separated by a `--` + +positional arguments: + {general config} The configuration for this program excluding + the config for each bus. Can be omitted + {can A config} The configuration for the first bus + {can B config} The configuration for the second bus + +Example usage: + can_bridge -i socketcan -c can0 -- -i socketcan can1 + can_bridge -vvv -- -i socketcan -c can0 -- -i socketcan can1 + +Type `can_bridge help_bus` for information about single bus configuration. +""" + +LOG = logging.getLogger(__name__) + + +class UserError(Exception): + pass + + +def get_config_list(it: Iterator[str], separator: str, conf: list) -> None: + while True: + el = next(it) + if el == separator: + break + + conf.append(el) + + +def split_configurations( + arg_list: List[str], separator: str = "--" +) -> Tuple[list, list, list]: + general = [] + conf_a: List[str] = [] + conf_b: List[str] = [] + + found_sep = False + it = iter(arg_list) + try: + get_config_list(it, separator, conf_a) + found_sep = True + get_config_list(it, separator, conf_b) + + # When we reached this point we found two separators so we have + # a general config. We will treate the first config as general + general = conf_a + conf_a = conf_b + get_config_list(it, separator, conf_b) + + # When we reached this point we found three separators so this is + # an error. + raise UserError("To many configurations") + except StopIteration: + LOG.debug("All configurations were split") + if not found_sep: + raise UserError("Missing separator") from None + + return general, conf_a, conf_b + + +def main() -> None: + general_parser = argparse.ArgumentParser() + general_parser.add_argument( + "-v", + action="count", + dest="verbosity", + help="""How much information do you want to see at the command line? + You can add several of these e.g., -vv is DEBUG""", + default=2, + ) + + bus_parser = argparse.ArgumentParser(description="Bridge two CAN buses.") + + _create_base_argument_parser(bus_parser) + + parser = argparse.ArgumentParser(description="Bridge two CAN buses.") + parser.add_argument("configs", nargs=argparse.REMAINDER) + + # print help message when no arguments were given + if len(sys.argv) < 2: + print(USAGE, file=sys.stderr) + raise SystemExit(errno.EINVAL) + + args = sys.argv[1:] + try: + general, conf_a, conf_b = split_configurations(args) + except UserError as exc: + if len(args) >= 1: + if args[0] == "-h" or args[0] == "--help" or args[0] == "help": + print(USAGE) + raise SystemExit() from None + elif args[0] == "help_bus": + bus_parser.print_help(sys.stderr) + else: + print(f"Error while processing arguments: {exc}", file=sys.stderr) + raise SystemExit(errno.EINVAL) from exc + + LOG.debug("General configuration: %s", general) + LOG.debug("Bus A configuration: %s", conf_a) + LOG.debug("Bus B configuration: %s", conf_b) + g_results = general_parser.parse_args(general) + verbosity = g_results.verbosity + + a_results, a_unknown_args = bus_parser.parse_known_args(conf_a) + a_additional_config = _parse_additional_config( + [*a_results.extra_args, *a_unknown_args] + ) + a_results.__dict__["verbosity"] = verbosity + + b_results, b_unknown_args = bus_parser.parse_known_args(conf_b) + b_additional_config = _parse_additional_config( + [*b_results.extra_args, *b_unknown_args] + ) + b_results.__dict__["verbosity"] = verbosity + + LOG.debug("General configuration results: %s", g_results) + LOG.debug("Bus A configuration results: %s", a_results) + LOG.debug("Bus A additional configuration results: %s", a_additional_config) + LOG.debug("Bus B configuration results: %s", b_results) + LOG.debug("Bus B additional configuration results: %s", b_additional_config) + with _create_bus(a_results, **a_additional_config) as bus_a: + with _create_bus(b_results, **b_additional_config) as bus_b: + reader_a = can.RedirectReader(bus_b) + reader_b = can.RedirectReader(bus_a) + can.Notifier(bus_a, [reader_a]) + can.Notifier(bus_b, [reader_b]) + print(f"CAN Bridge (Started on {datetime.now()})") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + + print(f"CAN Bridge (Stopped on {datetime.now()})") + + +if __name__ == "__main__": + main() diff --git a/doc/scripts.rst b/doc/scripts.rst index 2d59b7528..47555bf87 100644 --- a/doc/scripts.rst +++ b/doc/scripts.rst @@ -57,6 +57,21 @@ The full usage page can be seen below: :shell: +can.bridge +---------- + +A small application that can be used to connect two can buses: + +.. command-output:: python -m can.bridge -h + :shell: + + +Example call: +:: + + python -m can.bridge -i socketcan -c can0 -- -i socketcan -c can1 + + can.logconvert -------------- diff --git a/pyproject.toml b/pyproject.toml index d41bf6e22..2f434cb09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ can_logconvert = "can.logconvert:main" can_logger = "can.logger:main" can_player = "can.player:main" can_viewer = "can.viewer:main" +can_bridge = "can.bridge:main" [project.urls] homepage = "https://github.com/hardbyte/python-can" @@ -166,6 +167,7 @@ ignore = [ ] "can/logger.py" = ["T20"] # flake8-print "can/player.py" = ["T20"] # flake8-print +"can/bridge.py" = ["T20"] # flake8-print [tool.ruff.lint.isort] known-first-party = ["can"] diff --git a/test/test_bridge.py b/test/test_bridge.py new file mode 100644 index 000000000..11e645ede --- /dev/null +++ b/test/test_bridge.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python + +""" +This module tests the functions inside of bridge.py +""" + +import sys +import unittest +from unittest import mock +from unittest.mock import Mock + +import can +import can.bridge + + +class TestBridgeScriptModule(unittest.TestCase): + def setUp(self) -> None: + # Patch VirtualBus object + patcher_virtual_bus = mock.patch("can.interfaces.virtual.VirtualBus", spec=True) + self.MockVirtualBus = patcher_virtual_bus.start() + self.addCleanup(patcher_virtual_bus.stop) + self.mock_virtual_bus = self.MockVirtualBus.return_value + self.mock_virtual_bus.__enter__ = Mock(return_value=self.mock_virtual_bus) + + # Patch time sleep object + patcher_sleep = mock.patch("can.bridge.time.sleep", spec=True) + self.MockSleep = patcher_sleep.start() + self.addCleanup(patcher_sleep.stop) + + self.testmsg = can.Message( + arbitration_id=0xC0FFEE, data=[0, 25, 0, 1, 3, 1, 4, 1], is_extended_id=True + ) + + self.busargs = ["-i", "virtual"] + + def assertSuccessfullCleanup(self): + self.MockVirtualBus.assert_called() + self.assertEqual(2, len(self.mock_virtual_bus.__exit__.mock_calls)) + + def test_bridge_no_config(self): + self.MockSleep.side_effect = KeyboardInterrupt + sys.argv = [ + sys.argv[0], + *self.busargs, + "-c", + "can_a", + "--", + *self.busargs, + "-c", + "can_b", + ] + can.bridge.main() + + self.assertSuccessfullCleanup() + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_scripts.py b/test/test_scripts.py index 9d8c059cf..c1a6c082d 100644 --- a/test/test_scripts.py +++ b/test/test_scripts.py @@ -98,6 +98,20 @@ def _import(self): return module +class TestBridgeScript(CanScriptTest): + def _commands(self): + commands = [ + "python -m can.bridge --help", + "can_bridge --help", + ] + return commands + + def _import(self): + import can.bridge as module + + return module + + class TestLogconvertScript(CanScriptTest): def _commands(self): commands = [