Skip to content

Add can bridge #1923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
435d86d
Add basic implementation for can bridge
pkess Feb 22, 2025
7ec1f52
Implement basic configuration parsing
pkess Feb 22, 2025
441c4da
Add implementation for bridge
pkess Feb 22, 2025
e599f0e
Improve exit handling
pkess Feb 22, 2025
7b18bc6
Add debug logging
pkess Feb 22, 2025
9e3c666
Add error handling for wrong arguments
pkess Feb 22, 2025
8e4bd93
Use stderr
pkess Feb 22, 2025
52703d7
Add custom usage
pkess Feb 22, 2025
0435e17
Add bus configuration info
pkess Feb 22, 2025
8da0ab3
Code format
pkess Feb 23, 2025
9fa9395
Add exception for prints for can_bridge
pkess Feb 23, 2025
3e3dd9e
Add from to exception in exception
pkess Feb 23, 2025
1f2de7d
Remove assignment to unused variable
pkess Feb 23, 2025
ac6e4db
Shorten line length
pkess Feb 23, 2025
0fcea64
Organize imports
pkess Feb 23, 2025
d37af87
Remove unnecessary else
pkess Feb 23, 2025
6b0378e
Add documentation for new script
pkess Feb 23, 2025
28c80bd
Add handling for -h and help sub command
pkess Feb 23, 2025
c780e8a
Add from none to exception
pkess Feb 23, 2025
de20baf
Fix typo busses to bus
pkess Feb 23, 2025
5a17ef6
Add type annotations
pkess Feb 23, 2025
d81dbab
Fix type annotations
pkess Feb 23, 2025
2e8652f
Fix type annotations again
pkess Feb 23, 2025
3d70761
Add --help to get help
pkess Feb 23, 2025
e883a4e
Add basic print help test
pkess Feb 23, 2025
e32b5d6
Add basic test file for bridge script
pkess Feb 23, 2025
74556a1
Add very basic test
pkess Feb 23, 2025
8a04304
Add different channels for virtual bus
pkess Feb 23, 2025
05f3f03
Add assert for call to exit
pkess Feb 23, 2025
6bfcae9
Patch correct function
pkess Feb 23, 2025
6ffc7d4
test
pkess Feb 24, 2025
036e25c
fjkdf
pkess Feb 24, 2025
e8f03ed
once again -.-
pkess Feb 24, 2025
8df35aa
Try snakecase
pkess Feb 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions can/bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""
Creates a bridge between two CAN buses.

This will connect to two CAN buses. Messages received on one
bus will be sent to the other bus and vice versa.
"""

import argparse
import errno
import logging
import sys
import time
from datetime import datetime
from typing import (
Iterator,
List,
Tuple,
)

import can

from .logger import _create_base_argument_parser, _create_bus, _parse_additional_config

USAGE = """
usage: can_bridge [{general config} --] {can A config} -- {can B config}

Bridge two CAN buses.

Both can buses will be connected so that messages from bus A will be sent on
bus B and messages on bus B will be sent to bus A. The buses are separated by a `--`

positional arguments:
{general config} The configuration for this program excluding
the config for each bus. Can be omitted
{can A config} The configuration for the first bus
{can B config} The configuration for the second bus

Example usage:
can_bridge -i socketcan -c can0 -- -i socketcan can1
can_bridge -vvv -- -i socketcan -c can0 -- -i socketcan can1

Type `can_bridge help_bus` for information about single bus configuration.
"""

LOG = logging.getLogger(__name__)


class UserError(Exception):
pass


def get_config_list(it: Iterator[str], separator: str, conf: list) -> None:
while True:
el = next(it)
if el == separator:
break

conf.append(el)


def split_configurations(
arg_list: List[str], separator: str = "--"
) -> Tuple[list, list, list]:
general = []
conf_a: List[str] = []
conf_b: List[str] = []

found_sep = False
it = iter(arg_list)
try:
get_config_list(it, separator, conf_a)
found_sep = True
get_config_list(it, separator, conf_b)

# When we reached this point we found two separators so we have
# a general config. We will treate the first config as general
general = conf_a
conf_a = conf_b
get_config_list(it, separator, conf_b)

# When we reached this point we found three separators so this is
# an error.
raise UserError("To many configurations")
except StopIteration:
LOG.debug("All configurations were split")
if not found_sep:
raise UserError("Missing separator") from None

return general, conf_a, conf_b


def main() -> None:
general_parser = argparse.ArgumentParser()
general_parser.add_argument(
"-v",
action="count",
dest="verbosity",
help="""How much information do you want to see at the command line?
You can add several of these e.g., -vv is DEBUG""",
default=2,
)

bus_parser = argparse.ArgumentParser(description="Bridge two CAN buses.")

_create_base_argument_parser(bus_parser)

parser = argparse.ArgumentParser(description="Bridge two CAN buses.")
parser.add_argument("configs", nargs=argparse.REMAINDER)

# print help message when no arguments were given
if len(sys.argv) < 2:
print(USAGE, file=sys.stderr)
raise SystemExit(errno.EINVAL)

args = sys.argv[1:]
try:
general, conf_a, conf_b = split_configurations(args)
except UserError as exc:
if len(args) >= 1:
if args[0] == "-h" or args[0] == "--help" or args[0] == "help":
print(USAGE)
raise SystemExit() from None
elif args[0] == "help_bus":
bus_parser.print_help(sys.stderr)
else:
print(f"Error while processing arguments: {exc}", file=sys.stderr)
raise SystemExit(errno.EINVAL) from exc

LOG.debug("General configuration: %s", general)
LOG.debug("Bus A configuration: %s", conf_a)
LOG.debug("Bus B configuration: %s", conf_b)
g_results = general_parser.parse_args(general)
verbosity = g_results.verbosity

a_results, a_unknown_args = bus_parser.parse_known_args(conf_a)
a_additional_config = _parse_additional_config(
[*a_results.extra_args, *a_unknown_args]
)
a_results.__dict__["verbosity"] = verbosity

b_results, b_unknown_args = bus_parser.parse_known_args(conf_b)
b_additional_config = _parse_additional_config(
[*b_results.extra_args, *b_unknown_args]
)
b_results.__dict__["verbosity"] = verbosity

LOG.debug("General configuration results: %s", g_results)
LOG.debug("Bus A configuration results: %s", a_results)
LOG.debug("Bus A additional configuration results: %s", a_additional_config)
LOG.debug("Bus B configuration results: %s", b_results)
LOG.debug("Bus B additional configuration results: %s", b_additional_config)
with _create_bus(a_results, **a_additional_config) as bus_a:
with _create_bus(b_results, **b_additional_config) as bus_b:
reader_a = can.RedirectReader(bus_b)
reader_b = can.RedirectReader(bus_a)
can.Notifier(bus_a, [reader_a])
can.Notifier(bus_b, [reader_b])
print(f"CAN Bridge (Started on {datetime.now()})")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass

print(f"CAN Bridge (Stopped on {datetime.now()})")


if __name__ == "__main__":
main()
15 changes: 15 additions & 0 deletions doc/scripts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ The full usage page can be seen below:
:shell:


can.bridge
----------

A small application that can be used to connect two can buses:

.. command-output:: python -m can.bridge -h
:shell:


Example call:
::

python -m can.bridge -i socketcan -c can0 -- -i socketcan -c can1


can.logconvert
--------------

Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ can_logconvert = "can.logconvert:main"
can_logger = "can.logger:main"
can_player = "can.player:main"
can_viewer = "can.viewer:main"
can_bridge = "can.bridge:main"

[project.urls]
homepage = "https://github.com/hardbyte/python-can"
Expand Down Expand Up @@ -166,6 +167,7 @@ ignore = [
]
"can/logger.py" = ["T20"] # flake8-print
"can/player.py" = ["T20"] # flake8-print
"can/bridge.py" = ["T20"] # flake8-print

[tool.ruff.lint.isort]
known-first-party = ["can"]
Expand Down
58 changes: 58 additions & 0 deletions test/test_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python

"""
This module tests the functions inside of bridge.py
"""

import sys
import unittest
from unittest import mock
from unittest.mock import Mock

import can
import can.bridge


class TestBridgeScriptModule(unittest.TestCase):
def setUp(self) -> None:
# Patch VirtualBus object
patcher_virtual_bus = mock.patch("can.interfaces.virtual.VirtualBus", spec=True)
self.MockVirtualBus = patcher_virtual_bus.start()
self.addCleanup(patcher_virtual_bus.stop)
self.mock_virtual_bus = self.MockVirtualBus.return_value
self.mock_virtual_bus.__enter__ = Mock(return_value=self.mock_virtual_bus)

# Patch time sleep object
patcher_sleep = mock.patch("can.bridge.time.sleep", spec=True)
self.MockSleep = patcher_sleep.start()
self.addCleanup(patcher_sleep.stop)

self.testmsg = can.Message(
arbitration_id=0xC0FFEE, data=[0, 25, 0, 1, 3, 1, 4, 1], is_extended_id=True
)

self.busargs = ["-i", "virtual"]

def assertSuccessfullCleanup(self):
self.MockVirtualBus.assert_called()
self.assertEqual(2, len(self.mock_virtual_bus.__exit__.mock_calls))

def test_bridge_no_config(self):
self.MockSleep.side_effect = KeyboardInterrupt
sys.argv = [
sys.argv[0],
*self.busargs,
"-c",
"can_a",
"--",
*self.busargs,
"-c",
"can_b",
]
can.bridge.main()

self.assertSuccessfullCleanup()


if __name__ == "__main__":
unittest.main()
14 changes: 14 additions & 0 deletions test/test_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ def _import(self):
return module


class TestBridgeScript(CanScriptTest):
def _commands(self):
commands = [
"python -m can.bridge --help",
"can_bridge --help",
]
return commands

def _import(self):
import can.bridge as module

return module


class TestLogconvertScript(CanScriptTest):
def _commands(self):
commands = [
Expand Down
Loading