diff --git a/.github/workflows/test-ros1.yml b/.github/workflows/test-ros1.yml index e8768c3..0e053b6 100644 --- a/.github/workflows/test-ros1.yml +++ b/.github/workflows/test-ros1.yml @@ -19,6 +19,9 @@ jobs: "ubuntu-py39", "ubuntu-py310", "ubuntu-py311", + "ubuntu-py312", + "ubuntu-py313", + "ubuntu-py314", ] include: - name: "ubuntu-py39" @@ -30,6 +33,15 @@ jobs: - name: "ubuntu-py311" os: ubuntu-latest python-version: "3.11" + - name: "ubuntu-py312" + os: ubuntu-latest + python-version: "3.12" + - name: "ubuntu-py313" + os: ubuntu-latest + python-version: "3.13" + - name: "ubuntu-py314" + os: ubuntu-latest + python-version: "3.14" steps: - uses: actions/checkout@v7 - name: Set up Python ${{ matrix.python-version }} @@ -51,9 +63,16 @@ jobs: - name: Run linter run: | invoke check - - name: Run tests + # The twisted and asyncio transports both build on Autobahn, whose txaio + # layer binds a single async framework per process, so each transport is + # exercised in its own pytest process. + - name: Run tests (twisted transport) + run: | + ROSLIBPY_TRANSPORT=twisted pytest tests/ros1 + - name: Run tests (asyncio transport) run: | - pytest tests/ros1 + ROSLIBPY_TRANSPORT=asyncio pytest tests/ros1 - name: Tear down docker containers + if: always() run: | docker rm -f rosbridge diff --git a/.github/workflows/test-ros2.yml b/.github/workflows/test-ros2.yml index 9b1fd71..0b129d2 100644 --- a/.github/workflows/test-ros2.yml +++ b/.github/workflows/test-ros2.yml @@ -20,6 +20,8 @@ jobs: "ubuntu-py310", "ubuntu-py311", "ubuntu-py312", + "ubuntu-py313", + "ubuntu-py314", ] include: - name: "ubuntu-py39" @@ -34,6 +36,12 @@ jobs: - name: "ubuntu-py312" os: ubuntu-latest python-version: "3.12" + - name: "ubuntu-py313" + os: ubuntu-latest + python-version: "3.13" + - name: "ubuntu-py314" + os: ubuntu-latest + python-version: "3.14" steps: - uses: actions/checkout@v7 - name: Set up Python ${{ matrix.python-version }} @@ -55,9 +63,15 @@ jobs: - name: Run linter run: | invoke check - - name: Run tests + # The twisted and asyncio transports both build on Autobahn, whose txaio + # layer binds a single async framework per process, so each transport is + # exercised in its own pytest process. + - name: Run tests (twisted transport) + run: | + ROSLIBPY_TRANSPORT=twisted pytest tests/ros2 + - name: Run tests (asyncio transport) run: | - pytest tests/ros2 + ROSLIBPY_TRANSPORT=asyncio pytest tests/ros2 - name: Tear down docker containers run: | docker rm -f rosbridge diff --git a/.github/workflows/transport-benchmark.yml b/.github/workflows/transport-benchmark.yml new file mode 100644 index 0000000..fe09862 --- /dev/null +++ b/.github/workflows/transport-benchmark.yml @@ -0,0 +1,59 @@ +name: Transport benchmark + +on: + push: + branches: + - main + tags: + - 'v*' + pull_request: + branches: + - main + workflow_dispatch: + +jobs: + transport-benchmark: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - name: Set up Python 3.11 + uses: actions/setup-python@v6 + with: + python-version: "3.11" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install wheel + - name: Install + run: | + python -m pip install --no-cache-dir -r requirements-dev.txt + python -m pip install --no-cache-dir uvloop + - name: Set up docker containers + run: | + docker build -t gramaziokohler/rosbridge:integration_tests_ros1 ./docker/ros1 + docker run -d -p 9090:9090 --name rosbridge gramaziokohler/rosbridge:integration_tests_ros1 /bin/bash -c "roslaunch /integration-tests.launch" + docker ps -a + - name: Run transport benchmark + continue-on-error: true + run: | + python benchmarks/transport.py \ + --host 127.0.0.1 \ + --port 9090 \ + --cases twisted asyncio asyncio-uvloop asyncio-no-compression asyncio-uvloop-no-compression \ + --warmup 100 \ + --service-count 1500 \ + --topic-count 3000 \ + --markdown transport-benchmark.md + cat transport-benchmark.md >> "$GITHUB_STEP_SUMMARY" + - name: Upload transport benchmark + if: always() + continue-on-error: true + uses: actions/upload-artifact@v4 + with: + name: transport-benchmark + path: transport-benchmark.md + if-no-files-found: ignore + - name: Tear down docker containers + if: always() + run: | + docker rm -f rosbridge diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..6d67285 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,13 @@ +version: 2 + +build: + os: ubuntu-24.04 + tools: + python: "3.14" + +sphinx: + configuration: docs/conf.py + +python: + install: + - requirements: docs/requirements.txt diff --git a/CHANGELOG.rst b/CHANGELOG.rst index def1cde..c4ed692 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,8 +12,16 @@ Unreleased **Added** +* Added an asyncio transport backend (Autobahn running on an asyncio event + loop instead of the Twisted reactor) selectable with ``ROSLIBPY_TRANSPORT``, + ``set_default_transport()``, or the ``Ros(..., transport=...)`` argument. +* Added transport-parametrized ROS integration tests for CPython, while + keeping IronPython on the ``cli`` transport. + **Changed** +* Updated the package classifiers to 3.9-3.14 (removes most EOL python versions, 3.9 is still supported). + **Fixed** **Deprecated** diff --git a/MANIFEST.in b/MANIFEST.in index 1d9c40c..cc7865b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,11 +1,13 @@ graft docs graft src graft tests +graft benchmarks prune docker include .bumpversion.cfg include .editorconfig +include .readthedocs.yaml include AUTHORS.rst include CHANGELOG.rst diff --git a/README.rst b/README.rst index edb3122..3ccc7ab 100644 --- a/README.rst +++ b/README.rst @@ -68,6 +68,11 @@ To install **roslibpy**, simply use ``pip``:: pip install roslibpy +The default WebSocket transport on CPython uses Autobahn on the Twisted +reactor. An alternative transport that runs the same Autobahn WebSocket stack +on an asyncio event loop is also available (no extra dependencies required); +see `Transport selection`_ below. + For IronPython, the ``pip`` command is slightly different:: ipy -X:Frames -m pip install --user roslibpy @@ -75,6 +80,18 @@ For IronPython, the ``pip`` command is slightly different:: Remember that you will need a working ROS setup including the **rosbridge server** and **TF2 web republisher** accessible within your network. +Transport selection +------------------- + +CPython uses the ``twisted`` transport by default; IronPython uses ``cli``. The +``asyncio`` transport runs the same Autobahn WebSocket stack on an asyncio +event loop. To select it, either pass it per client, set the process-wide +default, or set the environment variable:: + + roslibpy.Ros(host='localhost', port=9090, transport='asyncio') + roslibpy.set_default_transport('asyncio') + ROSLIBPY_TRANSPORT=asyncio python my_script.py + Documentation ------------- diff --git a/benchmarks/transport.py b/benchmarks/transport.py new file mode 100644 index 0000000..a9d2b7d --- /dev/null +++ b/benchmarks/transport.py @@ -0,0 +1,361 @@ +"""Benchmark roslibpy WebSocket transports against a running rosbridge. + +This is an opt-in development helper, not a pytest test. Start a rosbridge on +the chosen host/port, then run for example: + + python benchmarks/transport.py --host 127.0.0.1 --port 9090 + +The benchmark compares connection setup, blocking rosapi service calls, and +topic publish/subscribe round-trip latency for the ``twisted`` and ``asyncio`` +transports. + +The GitHub Actions benchmark runs on shared CI infrastructure, so occasional +topic latency spikes are expected. Prefer medians and throughput for quick +comparisons, and compare p95/max values across several runs before drawing +conclusions about tail latency. +""" + +from __future__ import print_function + +import argparse +import json +import os +import statistics +import subprocess +import sys +import tempfile +import threading +import time + +from roslibpy import Message, Ros, Service, ServiceRequest, Topic + + +def service_ping(ros): + """Distro-agnostic rosapi round-trip used to measure service latency. + + Calls ``/rosapi/get_time`` but, unlike ``Ros.get_time()``, does not parse + the response (whose shape differs between ROS 1 ``secs``/``nsecs`` and ROS 2 + ``sec``/``nanosec``), so the benchmark runs unchanged against either.""" + service = Service(ros, "/rosapi/get_time", "rosapi/GetTime") + return service.call(ServiceRequest(), timeout=5) + + +CASES = { + "twisted": { + "transport": "twisted", + "event_loop": None, + "compression": None, + }, + "asyncio": { + "transport": "asyncio", + "event_loop": "default", + "compression": "deflate", + }, + "asyncio-uvloop": { + "transport": "asyncio", + "event_loop": "uvloop", + "compression": "deflate", + }, + "asyncio-no-compression": { + "transport": "asyncio", + "event_loop": "default", + "compression": None, + }, + "asyncio-uvloop-no-compression": { + "transport": "asyncio", + "event_loop": "uvloop", + "compression": None, + }, +} + + +def percentile(values, pct): + ordered = sorted(values) + return ordered[int(round((len(ordered) - 1) * pct / 100.0))] + + +def summary(values): + return { + "mean": statistics.mean(values) * 1000.0, + "median": statistics.median(values) * 1000.0, + "p95": percentile(values, 95) * 1000.0, + "max": max(values) * 1000.0, + } + + +def format_summary_line(transport, label, values): + data = summary(values) + return ( + "{:<30} {:<16} mean={mean:7.3f} ms median={median:7.3f} ms " + "p95={p95:7.3f} ms max={max:7.3f} ms".format(transport, label, **data) + ) + + +def print_summary(transport, label, values): + print(format_summary_line(transport, label, values)) + + +def markdown_row(transport, metric, values=None, value=None): + if values is not None: + data = summary(values) + return ( + "| {transport} | {metric} | {mean:.3f} ms | {median:.3f} ms | " + "{p95:.3f} ms | {max:.3f} ms | |".format( + transport=transport, metric=metric, **data + ) + ) + return "| {} | {} | | | | | {} |".format(transport, metric, value) + + +def wait_connected(ros, timeout): + deadline = time.time() + timeout + while time.time() < deadline: + if ros.is_connected: + return + time.sleep(0.005) + raise RuntimeError("Timed out waiting for connection") + + +def wait_rosbridge_ready(transport, args): + deadline = time.time() + args.ready_timeout + last_error = None + while time.time() < deadline: + ros = None + try: + ros = Ros(args.host, args.port, transport=transport) + ros.run() + wait_connected(ros, args.connect_timeout) + service_ping(ros) + return + except Exception as error: + last_error = error + time.sleep(args.ready_interval) + finally: + if ros is not None: + try: + ros.close() + except Exception: + pass + raise RuntimeError( + "Timed out waiting for rosbridge readiness: {}".format(last_error) + ) + + +def service_latency(ros, count, warmup): + for _ in range(warmup): + service_ping(ros) + timings = [] + for _ in range(count): + start = time.perf_counter() + service_ping(ros) + timings.append(time.perf_counter() - start) + return timings + + +def topic_latency(ros, case_name, count, warmup, delay): + topic_name = "/roslibpy_transport_benchmark_{}".format(case_name.replace("-", "_")) + listener = Topic(ros, topic_name, "std_msgs/String") + publisher = Topic(ros, topic_name, "std_msgs/String") + + expected = count + warmup + sent = {} + timings = [] + done = threading.Event() + + def receive(message): + payload = json.loads(message["data"]) + seq = payload["seq"] + start = sent.get(seq) + if start is None: + return + if seq >= warmup: + timings.append(time.perf_counter() - start) + if seq == expected - 1: + done.set() + + listener.subscribe(receive) + time.sleep(0.5) + + start_total = time.perf_counter() + for seq in range(expected): + sent[seq] = time.perf_counter() + publisher.publish(Message({"data": json.dumps({"seq": seq})})) + if delay: + time.sleep(delay) + + if not done.wait(15): + raise RuntimeError( + "Timed out after receiving {} of {} messages".format(len(timings), count) + ) + + total = time.perf_counter() - start_total + listener.unsubscribe() + publisher.unadvertise() + return timings, len(timings) / total + + +def configure_case(case_name): + case = CASES[case_name] + if case["event_loop"] == "uvloop": + import uvloop + + uvloop.install() + + if case["transport"] == "asyncio": + from roslibpy.comm.comm_asyncio import AsyncioRosBridgeClientFactory + + AsyncioRosBridgeClientFactory.compression = case["compression"] + + return case + + +def run_case(case_name, args): + case = configure_case(case_name) + transport = case["transport"] + + wait_rosbridge_ready(transport, args) + + ros = Ros(args.host, args.port, transport=transport) + start = time.perf_counter() + ros.run() + wait_connected(ros, args.connect_timeout) + connect_time = time.perf_counter() - start + + services = service_latency(ros, args.service_count, args.warmup) + topics, topic_rate = topic_latency( + ros, case_name, args.topic_count, args.warmup, args.topic_delay + ) + + print( + "{:<30} {:<16} {:7.3f} ms".format( + case_name, "initial connect", connect_time * 1000.0 + ) + ) + print_summary(case_name, "get_time", services) + print_summary(case_name, "topic rtt", topics) + print("{:<30} {:<16} {:7.1f} msg/s".format(case_name, "topic rate", topic_rate)) + + ros.close() + time.sleep(0.5) + return { + "connect": [connect_time], + "services": services, + "topics": topics, + "topic_rate": topic_rate, + } + + +def run_case_subprocess(case_name, args): + with tempfile.NamedTemporaryFile(delete=False) as result_file: + result_path = result_file.name + + command = [ + sys.executable, + os.path.abspath(__file__), + "--case", + case_name, + "--host", + args.host, + "--port", + str(args.port), + "--service-count", + str(args.service_count), + "--topic-count", + str(args.topic_count), + "--warmup", + str(args.warmup), + "--topic-delay", + str(args.topic_delay), + "--connect-timeout", + str(args.connect_timeout), + "--ready-timeout", + str(args.ready_timeout), + "--ready-interval", + str(args.ready_interval), + "--json-result", + result_path, + ] + try: + subprocess.check_call(command) + with open(result_path) as fh: + return json.load(fh) + finally: + try: + os.remove(result_path) + except OSError: + pass + + +def write_markdown(path, results): + lines = [ + "# roslibpy transport benchmark", + "", + "These numbers are sampled on shared CI infrastructure. Topic p95 and max", + "latencies can be noisy because of runner scheduling, Docker networking,", + "and rosbridge timing. Prefer medians and throughput for quick comparisons;", + "interpret tail latency across several runs.", + "", + "| Transport | Metric | Mean | Median | P95 | Max | Value |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: |", + ] + for transport, result in results: + lines.append(markdown_row(transport, "initial connect", result["connect"])) + lines.append(markdown_row(transport, "get_time service", result["services"])) + lines.append(markdown_row(transport, "topic round trip", result["topics"])) + lines.append( + markdown_row( + transport, + "topic throughput", + value="{:.1f} msg/s".format(result["topic_rate"]), + ) + ) + lines.append("") + with open(path, "w") as fh: + fh.write("\n".join(lines)) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=9090) + parser.add_argument("--cases", nargs="+") + parser.add_argument("--transports", nargs="+", help=argparse.SUPPRESS) + parser.add_argument("--case", choices=sorted(CASES), help=argparse.SUPPRESS) + parser.add_argument("--json-result", help=argparse.SUPPRESS) + parser.add_argument("--service-count", type=int, default=1000) + parser.add_argument("--topic-count", type=int, default=1000) + parser.add_argument("--warmup", type=int, default=50) + parser.add_argument("--topic-delay", type=float, default=0.00005) + parser.add_argument("--connect-timeout", type=float, default=5.0) + parser.add_argument("--ready-timeout", type=float, default=30.0) + parser.add_argument("--ready-interval", type=float, default=0.5) + parser.add_argument( + "--markdown", help="Write a Markdown summary table to this path" + ) + args = parser.parse_args() + + if args.case: + result = run_case(args.case, args) + if args.json_result: + with open(args.json_result, "w") as fh: + json.dump(result, fh) + return + + cases = args.cases or args.transports or ["twisted", "asyncio"] + + results = [] + for case_name in cases: + if case_name not in CASES: + raise ValueError( + "Unknown benchmark case {!r}; expected one of {}".format( + case_name, sorted(CASES) + ) + ) + results.append((case_name, run_case_subprocess(case_name, args))) + + if args.markdown: + write_markdown(args.markdown, results) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 82ea93d..e61d91b 100644 --- a/setup.py +++ b/setup.py @@ -54,17 +54,18 @@ def read(*names, **kwargs): "Operating System :: Microsoft :: Windows", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.3", - "Programming Language :: Python :: 3.4", - "Programming Language :: Python :: 3.5", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: IronPython", "Topic :: Scientific/Engineering", ], keywords=["ros", "ros-bridge", "robotics", "websockets"], + python_requires=">=3.9", install_requires=requirements, extras_require={}, entry_points={"console_scripts": ["roslibpy=roslibpy.__main__:main"]}, diff --git a/src/roslibpy/__init__.py b/src/roslibpy/__init__.py index 4ccf076..ae3c469 100644 --- a/src/roslibpy/__init__.py +++ b/src/roslibpy/__init__.py @@ -34,9 +34,21 @@ Other classes that need an active connection with ROS receive this instance as an argument to their constructors. +Transport selection +------------------- + +CPython uses the ``twisted`` transport by default; IronPython uses ``cli``. The +``asyncio`` transport can be selected per client, process-wide, or via the +``ROSLIBPY_TRANSPORT`` environment variable:: + + ros = roslibpy.Ros(host='localhost', port=9090, transport='asyncio') + roslibpy.set_default_transport('asyncio') + ROSLIBPY_TRANSPORT=asyncio python my_script.py + .. autoclass:: Ros :members: .. autofunction:: set_rosapi_timeout +.. autofunction:: set_default_transport Main ROS concepts ================= @@ -129,6 +141,7 @@ class and are passed around via :class:`Topics ` using a **publish/subscr __url__, __version__, ) +from .comm import set_default_transport from .core import ( ActionClient, Feedback, @@ -167,6 +180,7 @@ class and are passed around via :class:`Topics ` using a **publish/subscr "Service", "ServiceRequest", "ServiceResponse", + "set_default_transport", "set_rosapi_timeout", "Time", "Topic", diff --git a/src/roslibpy/comm/__init__.py b/src/roslibpy/comm/__init__.py index 0e88ce7..759489f 100644 --- a/src/roslibpy/comm/__init__.py +++ b/src/roslibpy/comm/__init__.py @@ -1,10 +1,145 @@ +"""Transport selection for the ROS bridge connection. + +Three transports are available: + +* ``twisted``: default. Built on autobahn + twisted. The historical + implementation. +* ``asyncio``: opt-in (2.1+). Built on Autobahn's asyncio integration (the + same Autobahn WebSocket stack as ``twisted``, running on an asyncio event + loop). Cleaner per-test loop isolation; no extra dependencies. +* ``cli``: IronPython only (auto-selected on ``sys.platform == "cli"``). + +Selection precedence (highest → lowest): + +1. ``transport=`` kwarg on ``Ros`` (per-instance). +2. ``ROSLIBPY_TRANSPORT`` environment variable. +3. Module-level default set via ``roslibpy.set_default_transport(...)``. +4. Platform default: ``cli`` on IronPython, ``twisted`` elsewhere. +""" + +import os import sys from .comm import RosBridgeException, RosBridgeProtocol -if sys.platform == "cli": - from .comm_cli import CliRosBridgeClientFactory as RosBridgeClientFactory -else: - from .comm_autobahn import AutobahnRosBridgeClientFactory as RosBridgeClientFactory +__all__ = [ + "RosBridgeException", + "RosBridgeProtocol", + "RosBridgeClientFactory", + "select_factory", + "set_default_transport", + "TRANSPORT_TWISTED", + "TRANSPORT_ASYNCIO", + "TRANSPORT_CLI", +] + +TRANSPORT_TWISTED = "twisted" +TRANSPORT_ASYNCIO = "asyncio" +TRANSPORT_CLI = "cli" + +_VALID_TRANSPORTS = (TRANSPORT_TWISTED, TRANSPORT_ASYNCIO, TRANSPORT_CLI) +_PLATFORM_DEFAULT = TRANSPORT_CLI if sys.platform == "cli" else TRANSPORT_TWISTED +_DEFAULT_TRANSPORT = _PLATFORM_DEFAULT + + +def set_default_transport(name): + """Set the process-wide default transport. + + Args: + name (str): One of ``"twisted"``, ``"asyncio"``, ``"cli"``. + + Raises: + ValueError: If ``name`` is not a known transport. + """ + global _DEFAULT_TRANSPORT + if name not in _VALID_TRANSPORTS: + raise ValueError("Unknown transport %r; expected one of %s" % (name, _VALID_TRANSPORTS)) + _DEFAULT_TRANSPORT = name + + +def _resolve_transport(explicit=None): + """Apply the precedence rules to land on a single transport name.""" + if explicit is not None: + if explicit not in _VALID_TRANSPORTS: + raise ValueError("Unknown transport %r; expected one of %s" % (explicit, _VALID_TRANSPORTS)) + return explicit + env_choice = os.environ.get("ROSLIBPY_TRANSPORT") + if env_choice: + if env_choice not in _VALID_TRANSPORTS: + raise ValueError("Unknown ROSLIBPY_TRANSPORT=%r; expected one of %s" % (env_choice, _VALID_TRANSPORTS)) + return env_choice + return _DEFAULT_TRANSPORT + + +def _transport_conflict_error(requested, cause): + """Build a clear error for the txaio single-framework-per-process limit. + + The ``twisted`` and ``asyncio`` transports both build on Autobahn, whose + ``txaio`` layer binds a single async framework (Twisted *or* asyncio) per + Python process. Selecting one after the other has already been activated + surfaces as a ``RuntimeError`` from deep inside ``txaio``; we translate it + into actionable guidance. + """ + other = TRANSPORT_ASYNCIO if requested == TRANSPORT_TWISTED else TRANSPORT_TWISTED + return RuntimeError( + "Cannot activate the %r transport: the %r transport is already in use " + "in this process. Both build on Autobahn, whose txaio layer binds a " + "single async framework per process, so they are mutually exclusive. " + "Select one transport per process (e.g. via the ROSLIBPY_TRANSPORT " + "environment variable or roslibpy.set_default_transport()), and use " + "separate processes if you need both. Original error: %s" % (requested, other, cause) + ) + + +def select_factory(transport=None): + """Return the factory class for the requested (or resolved) transport. + + The transport modules are imported lazily so a process that never uses the + asyncio transport never imports ``autobahn.asyncio``, and a process that + never uses the twisted transport never imports ``twisted``. This laziness + also matters because the two Autobahn-based transports cannot coexist in a + single process (see :func:`_transport_conflict_error`). + + Args: + transport (str, optional): One of ``"twisted"``, ``"asyncio"``, + ``"cli"``. If ``None``, applies the precedence rules described in + the module docstring. + + Returns: + The factory class to use for new ``Ros`` instances. + + Raises: + RuntimeError: If the requested Autobahn-based transport conflicts with + one already activated in this process. + """ + name = _resolve_transport(transport) + if name == TRANSPORT_CLI: + from .comm_cli import CliRosBridgeClientFactory + + return CliRosBridgeClientFactory + if name == TRANSPORT_ASYNCIO: + try: + from .comm_asyncio import AsyncioRosBridgeClientFactory + except RuntimeError as cause: # txaio already bound to twisted + raise _transport_conflict_error(TRANSPORT_ASYNCIO, cause) + + return AsyncioRosBridgeClientFactory + + # Fallback to default + try: + from .comm_autobahn import AutobahnRosBridgeClientFactory + except RuntimeError as cause: # txaio already bound to asyncio + raise _transport_conflict_error(TRANSPORT_TWISTED, cause) + + return AutobahnRosBridgeClientFactory + -__all__ = ["RosBridgeException", "RosBridgeProtocol", "RosBridgeClientFactory"] +def __getattr__(name): + # ``RosBridgeClientFactory`` remains a module-level binding for + # back-compatibility, but is resolved lazily via PEP 562 so that merely + # importing roslibpy does not import a transport — and, through Autobahn's + # txaio, irreversibly lock the process to a single async framework before + # the user has had a chance to choose one. + if name == "RosBridgeClientFactory": + return select_factory() + raise AttributeError("module %r has no attribute %r" % (__name__, name)) diff --git a/src/roslibpy/comm/comm_asyncio.py b/src/roslibpy/comm/comm_asyncio.py new file mode 100644 index 0000000..9e403dd --- /dev/null +++ b/src/roslibpy/comm/comm_asyncio.py @@ -0,0 +1,503 @@ +"""Asyncio-based transport for roslibpy. + +Opt-in alternative to the default Twisted-based transport. Selected via: + +* env var ``ROSLIBPY_TRANSPORT=asyncio`` +* module-level ``roslibpy.set_default_transport("asyncio")`` +* per-instance ``Ros(host, port, transport="asyncio")`` + +Why a separate transport +------------------------ + +Twisted's reactor is a process-wide singleton that cannot be restarted after +``reactor.stop()``. Long-running test sessions that create many ``Ros`` +instances accumulate state on it. asyncio loops, by contrast, are first-class +objects that can be started, stopped, and discarded independently; that +gives us clean per-process (or per-test, when needed) isolation. + +The public ``Ros`` / ``Topic`` / ``Service`` / ``ActionClient`` / ``Param`` API +is unaffected — only the transport layer changes. + +Dependencies +------------ + +This transport reuses the very same Autobahn WebSocket stack as the default +transport; the only difference is that it runs on an asyncio event loop +(``autobahn.asyncio``) rather than the Twisted reactor. No extra dependencies +are required beyond Autobahn, which roslibpy already depends on. +""" + +from __future__ import annotations + +import asyncio +import logging +import random +import threading +from typing import Any, Callable, Optional + +from autobahn.asyncio.websocket import WebSocketClientFactory, WebSocketClientProtocol +from autobahn.websocket.compress import ( + PerMessageDeflateOffer, + PerMessageDeflateResponse, + PerMessageDeflateResponseAccept, +) +from autobahn.websocket.util import create_url + +from ..core import RosTimeoutError +from ..event_emitter import EventEmitterMixin +from . import RosBridgeProtocol + +LOGGER = logging.getLogger("roslibpy") + +# Defaults matched to ReconnectingClientFactory's behaviour so users moving +# between transports see the same retry cadence. Autobahn's asyncio integration +# does not ship a reconnecting factory (unlike the Twisted side), so this +# transport reimplements the same exponential backoff. +DEFAULT_INITIAL_RECONNECT_DELAY = 1.0 +DEFAULT_MAX_RECONNECT_DELAY = 3600.0 +DEFAULT_RECONNECT_FACTOR = 2.7 +DEFAULT_RECONNECT_JITTER = 0.119 +DEFAULT_MAX_RECONNECT_RETRIES = None # None = unbounded, matching twisted + +# Single shared event loop manager, owned by the module so all factories in +# this process share one background thread + loop — same singleton semantics +# as the twisted reactor, but with proper teardown via terminate(). +_MANAGER_SINGLETON: "Optional[AsyncioEventLoopManager]" = None +_MANAGER_SINGLETON_LOCK = threading.Lock() + + +def _get_shared_manager() -> "AsyncioEventLoopManager": + global _MANAGER_SINGLETON + if _MANAGER_SINGLETON is not None: + return _MANAGER_SINGLETON + with _MANAGER_SINGLETON_LOCK: + if _MANAGER_SINGLETON is None: + _MANAGER_SINGLETON = AsyncioEventLoopManager() + return _MANAGER_SINGLETON + + +class AsyncioRosBridgeProtocol(RosBridgeProtocol, WebSocketClientProtocol): + """ROS Bridge protocol over an Autobahn asyncio WebSocket connection. + + Mirrors :class:`AutobahnRosBridgeProtocol` almost exactly — the protocol + callbacks (``onConnect`` / ``onOpen`` / ``onMessage`` / ``onClose``) are + identical. The only difference is that ``send_message`` / ``send_close`` + schedule the IO onto the background asyncio loop (via + ``call_soon_threadsafe``) instead of the Twisted reactor. + """ + + def __init__(self, *args, **kwargs): + super(AsyncioRosBridgeProtocol, self).__init__(*args, **kwargs) + self._manual_disconnect = False + + def onConnect(self, response): + LOGGER.debug("Server connected: %s", response.peer) + + def onOpen(self): + LOGGER.info("Connection to ROS ready.") + self._manual_disconnect = False + self.factory.ready(self) + + def onMessage(self, payload, isBinary): + if isBinary: + raise NotImplementedError("Add support for binary messages") + + try: + self.on_message(payload) + except Exception: + LOGGER.exception( + "Exception on start_listening while trying to handle message received." + + "It could indicate a bug in user code on message handlers. Message skipped." + ) + + def onClose(self, wasClean, code, reason): + LOGGER.info("WebSocket connection closed: Code=%s, Reason=%s", str(code), reason) + # Notify the factory so it can clear its protocol reference and, unless + # this was a manual disconnect, schedule a reconnect attempt. + if self.factory is not None: + self.factory.connection_lost(self) + + def send_message(self, payload): + """Send an already-encoded ROS bridge message frame. + + Safe to call from any thread; the actual send is scheduled onto the + background loop. + """ + loop = self.factory.manager.loop + if loop is None or loop.is_closed(): + return + loop.call_soon_threadsafe(self._send, payload) + + def _send(self, payload): + try: + # ROS bridge frames are JSON text; send them as (non-binary) text + # frames, matching the autobahn/twisted transport. + self.sendMessage(payload, isBinary=False) + except Exception: + LOGGER.exception("Failed to send ROS bridge frame; connection likely dropped.") + + def send_close(self): + """Initiate a clean WebSocket close. + + Sets the manual-disconnect flag so the factory's reconnect logic knows + to stand down, then asks the loop to close the socket. + """ + self._manual_disconnect = True + loop = self.factory.manager.loop + if loop is None or loop.is_closed(): + return + loop.call_soon_threadsafe(self.sendClose) + + +class AsyncioRosBridgeClientFactory(EventEmitterMixin, WebSocketClientFactory): + """ROS Bridge client factory built on Autobahn's asyncio integration. + + Mirrors the public surface of :class:`AutobahnRosBridgeClientFactory` so + that callers (``Ros`` and friends) don't care which transport is selected. + Because the asyncio side of Autobahn has no ``ReconnectingClientFactory`` + equivalent, this factory implements its own exponential-backoff reconnect. + """ + + protocol = AsyncioRosBridgeProtocol + + # Class-level reconnect tuning, kept as class attributes so the + # `set_initial_delay` / `set_max_delay` / `set_max_retries` classmethods + # behave like their autobahn/twisted counterparts. + initialDelay = DEFAULT_INITIAL_RECONNECT_DELAY + maxDelay = DEFAULT_MAX_RECONNECT_DELAY + factor = DEFAULT_RECONNECT_FACTOR + jitter = DEFAULT_RECONNECT_JITTER + maxRetries = DEFAULT_MAX_RECONNECT_RETRIES + compression = "deflate" + + def __init__(self, *args, **kwargs): + # Autobahn's asyncio factory binds to an event loop at construction + # time, so make sure our background loop is up and hand it over. + self._manager = _get_shared_manager() + self._manager.run() + kwargs.setdefault("loop", self._manager.loop) + + super(AsyncioRosBridgeClientFactory, self).__init__(*args, **kwargs) + + self._proto: Optional[AsyncioRosBridgeProtocol] = None + # Lock guarding `_proto` reads/writes in on_ready / ready. Closes the + # TOCTOU race between checking ``_proto`` and registering a one-shot + # "ready" listener. + self._proto_lock = threading.Lock() + # Reconnect bookkeeping, mutated only from the loop thread. + self._stop = False + self._retry_count = 0 + self._reconnect_delay = self.initialDelay + + self.setProtocolOptions(closeHandshakeTimeout=5) + self._configure_compression() + + # ------------------------------------------------------------------ + # Public surface mirroring AutobahnRosBridgeClientFactory + # ------------------------------------------------------------------ + + def connect(self): + """Schedule the initial WebSocket connection on the background loop.""" + self._stop = False + self._retry_count = 0 + self._reconnect_delay = self.initialDelay + + manager = self.manager + manager.run() # ensure background loop is up + loop = manager.loop + assert loop is not None + loop.call_soon_threadsafe(self._open_connection) + + @property + def is_connected(self): + """Indicate if the WebSocket connection is open or not. + + Returns: + bool: True if WebSocket is connected, False otherwise. + """ + proto = self._proto + return proto is not None and not proto._manual_disconnect + + def on_ready(self, callback): + """Register a callback to fire as soon as the connection is established. + + If the connection is already established, fires synchronously. Otherwise + registers a one-shot listener for the next "ready" event. + """ + proto_to_fire: Optional[AsyncioRosBridgeProtocol] = None + with self._proto_lock: + if self._proto is not None: + proto_to_fire = self._proto + else: + self.once("ready", callback) + if proto_to_fire is not None: + callback(proto_to_fire) + + def ready(self, proto): + """Mark the connection as ready and notify any pending listeners.""" + with self._proto_lock: + self._proto = proto + # Reset backoff on every successful connect. + self._retry_count = 0 + self._reconnect_delay = self.initialDelay + self.emit("ready", proto) + + def connection_lost(self, proto): + """Handle a dropped connection (called from the protocol's ``onClose``). + + Clears the protocol reference, emits ``close`` (mirroring the autobahn + factory's ``clientConnectionLost`` emit), and unless the disconnect was + manual, schedules a reconnect with exponential backoff. + """ + with self._proto_lock: + self._proto = None + try: + self.emit("close", proto) + except Exception: + LOGGER.exception("Error in user 'close' listener.") + + if proto._manual_disconnect or self._stop: + self._stop = True + return + self._schedule_reconnect() + + @classmethod + def create_url(cls, host, port=None, is_secure=False): + url = host if port is None else create_url(host, port, is_secure) + return url + + @classmethod + def set_max_delay(cls, max_delay): + """Set the maximum reconnect backoff delay in seconds (3600 by default).""" + LOGGER.debug("Updating max delay to {} seconds".format(max_delay)) + cls.maxDelay = max_delay + + @classmethod + def set_initial_delay(cls, initial_delay): + """Set the initial reconnect backoff delay in seconds (1 by default).""" + LOGGER.debug("Updating initial delay to {} seconds".format(initial_delay)) + cls.initialDelay = initial_delay + + @classmethod + def set_max_retries(cls, max_retries): + """Set the max reconnect attempts when the connection is lost (unbounded by default).""" + LOGGER.debug("Updating max retries to {}".format(max_retries)) + cls.maxRetries = max_retries + + @property + def manager(self) -> "AsyncioEventLoopManager": + """Get an instance of the event loop manager for this factory.""" + if self._manager is None: + self._manager = _get_shared_manager() + return self._manager + + # ------------------------------------------------------------------ + # Connect / reconnect machinery (runs on the loop thread) + # ------------------------------------------------------------------ + + def _configure_compression(self): + """Enable per-message deflate negotiation unless compression is disabled.""" + if not self.compression or self.compression == "none": + return + + offers = [PerMessageDeflateOffer()] + self.setProtocolOptions(perMessageCompressionOffers=offers) + + def accept(response): + if isinstance(response, PerMessageDeflateResponse): + return PerMessageDeflateResponseAccept(response) + + self.setProtocolOptions(perMessageCompressionAccept=accept) + + def _open_connection(self): + # Scheduled onto the loop thread, where there is a running loop. + asyncio.ensure_future(self._connect_async()) + + async def _connect_async(self): + loop = self.manager.loop + if loop is None or loop.is_closed(): + return + + ssl = None + if self.isSecure: + import ssl as ssl_module + + ssl = ssl_module.create_default_context() + + try: + LOGGER.debug("Connecting to %s...", self.url) + # The factory itself is the (callable) protocol factory expected by + # ``create_connection``; autobahn drives the WebSocket handshake + # from the protocol's ``connection_made``. + await loop.create_connection(self, self.host, self.port, ssl=ssl) + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 — any connect error triggers a retry + LOGGER.debug("Connection attempt failed: %s", exc) + self._schedule_reconnect() + + def _schedule_reconnect(self): + """Schedule the next reconnect attempt with jittered exponential backoff. + + Always invoked from the loop thread, so it can use ``call_later`` + directly rather than ``call_soon_threadsafe``. + """ + if self._stop: + return + + self._retry_count += 1 + if self.maxRetries is not None and self._retry_count > self.maxRetries: + LOGGER.warning("Exceeded max reconnect retries (%s); giving up.", self.maxRetries) + return + + delay = self._reconnect_delay + jittered = delay + (random.random() * 2 - 1) * delay * self.jitter + sleep_for = max(0.0, min(jittered, self.maxDelay)) + # Advance the backoff for the following attempt. + self._reconnect_delay = min(delay * self.factor, self.maxDelay) + + LOGGER.debug("Will retry connection in %.2fs (attempt %d).", sleep_for, self._retry_count) + loop = self.manager.loop + if loop is None or loop.is_closed(): + return + loop.call_later(sleep_for, self._open_connection) + + +class AsyncioEventLoopManager(object): + """Manage the asyncio event loop on a background thread. + + Mirrors :class:`TwistedEventLoopManager`'s surface so ``Ros`` doesn't need + transport-specific branches. There is only ever one of these per process + (held by ``_MANAGER_SINGLETON``), and the same loop is shared by every + ``Ros`` instance using the asyncio transport. + """ + + def __init__(self) -> None: + self.loop: Optional[asyncio.AbstractEventLoop] = None + self._thread: Optional[threading.Thread] = None + self._started = threading.Event() + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def run(self) -> None: + """Spin up the background loop thread if it isn't running yet.""" + if self._thread is not None and self._thread.is_alive(): + return + self._started.clear() + self._thread = threading.Thread(target=self._run_thread, daemon=True, name="roslibpy-asyncio") + self._thread.start() + # Wait until the loop is actually running before returning so callers + # can safely use call_soon_threadsafe immediately after. + self._started.wait(timeout=5) + + def _run_thread(self) -> None: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self._started.set() + try: + self.loop.run_forever() + finally: + self.loop.close() + + def run_forever(self) -> None: + """Run the loop on the calling thread (rarely used; matches twisted's run_forever).""" + if self.loop is None: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self._started.set() + self.loop.run_forever() + + def terminate(self) -> None: + """Stop the background loop and join the thread. After this, the + manager is unusable in this process (matching twisted's one-shot + ``reactor.stop()`` semantics — though asyncio could in principle + be re-run, we keep parity). + """ + if self.loop is not None and self.loop.is_running(): + self.loop.call_soon_threadsafe(self.loop.stop) + if self._thread is not None and self._thread.is_alive(): + self._thread.join(timeout=5) + + # ------------------------------------------------------------------ + # Scheduling primitives + # ------------------------------------------------------------------ + + def call_later(self, delay: float, callback: Callable[[], None]) -> None: + """Run ``callback`` on the loop after ``delay`` seconds.""" + loop = self.loop + if loop is None or loop.is_closed(): + return + loop.call_soon_threadsafe(loop.call_later, delay, callback) + + def call_in_thread(self, callback: Callable[[], None]) -> None: + """Run ``callback`` on a worker thread (off the loop).""" + loop = self.loop + if loop is None or loop.is_closed(): + return + loop.call_soon_threadsafe(lambda: loop.run_in_executor(None, callback)) + + def blocking_call_from_thread(self, callback: Callable[[Any], Any], timeout: Optional[float]) -> Any: + """Run ``callback(result_placeholder)`` on the loop, block until result is set. + + ``callback`` is a function that accepts an ``asyncio.Future``; the + callback is expected to register handlers that eventually resolve the + future. We then block on a stdlib ``threading.Event`` set by a done- + callback, so the caller doesn't have to know anything about asyncio. + + Mirrors ``TwistedEventLoopManager.blocking_call_from_thread``; the + ``result_placeholder`` argument it passes to ``callback`` is an + :class:`asyncio.Future` rather than a twisted ``Deferred``, but + ``get_inner_callback`` / ``get_inner_errback`` keep the shape uniform + for ``Ros`` consumers. + """ + loop = self.loop + if loop is None or loop.is_closed(): + raise RuntimeError("asyncio loop is not running") + + result_box: dict = {} + done = threading.Event() + + def _on_loop() -> None: + future = loop.create_future() + + def _on_future_done(fut: asyncio.Future) -> None: + try: + result_box["result"] = fut.result() + except Exception as exc: # noqa: BLE001 + result_box["error"] = exc + finally: + done.set() + + future.add_done_callback(_on_future_done) + try: + callback(future) + except Exception as exc: # noqa: BLE001 + result_box["error"] = exc + done.set() + + loop.call_soon_threadsafe(_on_loop) + if not done.wait(timeout=timeout if timeout else None): + raise RosTimeoutError("No service response received") + if "error" in result_box: + raise result_box["error"] + return result_box["result"] + + def get_inner_callback(self, result_placeholder: asyncio.Future) -> Callable[[Any], None]: + """Return a callback that resolves ``result_placeholder`` with success.""" + + def inner_callback(result: Any) -> None: + if not result_placeholder.done(): + result_placeholder.set_result({"result": result}) + + return inner_callback + + def get_inner_errback(self, result_placeholder: asyncio.Future) -> Callable[[Any], None]: + """Return an errback that resolves ``result_placeholder`` with an error.""" + + def inner_errback(error: Any) -> None: + if not result_placeholder.done(): + result_placeholder.set_result({"exception": error}) + + return inner_errback diff --git a/src/roslibpy/ros.py b/src/roslibpy/ros.py index 78a29c8..d473586 100644 --- a/src/roslibpy/ros.py +++ b/src/roslibpy/ros.py @@ -4,7 +4,7 @@ import threading from . import Message, Param, Service, ServiceRequest, Time -from .comm import RosBridgeClientFactory +from .comm import select_factory from .core import RosTimeoutError __all__ = ["Ros", "set_rosapi_timeout"] @@ -35,10 +35,25 @@ class Ros(object): headers (:obj:`dict`): Additional headers to include in the WebSocket connection. """ - def __init__(self, host, port=None, is_secure=False, headers=None): + def __init__(self, host, port=None, is_secure=False, headers=None, transport=None): + """Create a new connection manager. + + Args: + host (:obj:`str`): Name or IP address of the ROS bridge host, e.g. ``127.0.0.1``. + port (:obj:`int`): ROS bridge port, e.g. ``9090``. + is_secure (:obj:`bool`): ``True`` to use a secure web sockets connection. + headers (:obj:`dict`): Additional headers to include in the WebSocket connection. + transport (:obj:`str`, optional): Transport backend to use. One of + ``"twisted"``, ``"asyncio"``, ``"cli"``. If ``None`` (default), + resolves via the ``ROSLIBPY_TRANSPORT`` env var, + :func:`roslibpy.set_default_transport`, or the platform default + (``cli`` on IronPython, ``twisted`` elsewhere). See + ``roslibpy.comm.__init__`` for the full precedence rules. + """ self._id_counter = 0 - url = RosBridgeClientFactory.create_url(host, port, is_secure) - self.factory = RosBridgeClientFactory(url, headers=headers) + factory_cls = select_factory(transport) + url = factory_cls.create_url(host, port, is_secure) + self.factory = factory_cls(url, headers=headers) self.is_connecting = False self.connect() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..f96f7e5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,21 @@ +import os +import sys + +import pytest + +# The ``twisted`` and ``asyncio`` transports both run on Autobahn, whose txaio +# layer binds a single async framework per process. They are therefore mutually +# exclusive within one interpreter, so each test process exercises exactly one +# transport, selected via ``ROSLIBPY_TRANSPORT`` (default: ``twisted``). CI runs +# the suite once per transport in separate processes. +if sys.platform == "cli": + ROS_TRANSPORTS = ("cli",) +else: + ROS_TRANSPORTS = (os.environ.get("ROSLIBPY_TRANSPORT", "twisted"),) + + +@pytest.fixture(params=ROS_TRANSPORTS) +def ros_transport(request): + if request.param == "asyncio": + pytest.importorskip("autobahn.asyncio.websocket") + return request.param diff --git a/tests/ros1/test_actionlib.py b/tests/ros1/test_actionlib.py index 54a88af..a9b8ca3 100644 --- a/tests/ros1/test_actionlib.py +++ b/tests/ros1/test_actionlib.py @@ -6,8 +6,8 @@ from roslibpy.ros1.actionlib import ActionClient, Goal, GoalStatus, SimpleActionServer -def test_action_success(): - ros = Ros("127.0.0.1", 9090) +def test_action_success(ros_transport): + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() server = SimpleActionServer(ros, "/test_action", "actionlib/TestAction") @@ -31,8 +31,8 @@ def execute(goal): ros.close() -def test_action_preemt(): - ros = Ros("127.0.0.1", 9090) +def test_action_preemt(ros_transport): + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() server = SimpleActionServer(ros, "/test_action", "actionlib/TestAction") diff --git a/tests/ros1/test_param.py b/tests/ros1/test_param.py index 32352eb..08a2d89 100644 --- a/tests/ros1/test_param.py +++ b/tests/ros1/test_param.py @@ -1,8 +1,8 @@ from roslibpy import Param, Ros -def test_param_manipulation(): - ros = Ros("127.0.0.1", 9090) +def test_param_manipulation(ros_transport): + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() param = Param(ros, "test_param") diff --git a/tests/ros1/test_ros.py b/tests/ros1/test_ros.py index aef72c5..1721085 100644 --- a/tests/ros1/test_ros.py +++ b/tests/ros1/test_ros.py @@ -10,8 +10,8 @@ url = "ws://%s:%d" % (host, port) -def test_reconnect_does_not_trigger_on_client_close(): - ros = Ros(host, port) +def test_reconnect_does_not_trigger_on_client_close(ros_transport): + ros = Ros(host, port, transport=ros_transport) ros.run() assert ros.is_connected, "ROS initially connected" @@ -25,22 +25,22 @@ def test_reconnect_does_not_trigger_on_client_close(): assert not ros.is_connecting, "Not trying to re-connect" -def test_connection(): - ros = Ros(host, port) +def test_connection(ros_transport): + ros = Ros(host, port, transport=ros_transport) ros.run() assert ros.is_connected ros.close() -def test_url_connection(): - ros = Ros(url) +def test_url_connection(ros_transport): + ros = Ros(url, transport=ros_transport) ros.run() assert ros.is_connected ros.close() -def test_closing_event(): - ros = Ros(url) +def test_closing_event(ros_transport): + ros = Ros(url, transport=ros_transport) ros.run() ctx = dict(closing_event_called=False, was_still_connected=False) @@ -60,12 +60,12 @@ def handle_closing(): assert closing_was_handled_synchronously_before_close -def test_multithreaded_connect_disconnect(): +def test_multithreaded_connect_disconnect(ros_transport): CONNECTIONS = 30 clients = [] def connect(clients): - ros = Ros(url) + ros = Ros(url, transport=ros_transport) ros.run() clients.append(ros) diff --git a/tests/ros1/test_rosapi.py b/tests/ros1/test_rosapi.py index 2f37ac1..a11b42d 100644 --- a/tests/ros1/test_rosapi.py +++ b/tests/ros1/test_rosapi.py @@ -9,9 +9,9 @@ url = "ws://%s:%d" % (host, port) -def test_rosapi_topics(): +def test_rosapi_topics(ros_transport): context = dict(wait=threading.Event(), result=None) - ros = Ros(host, port) + ros = Ros(host, port, transport=ros_transport) ros.run() def callback(topic_list): @@ -26,8 +26,8 @@ def callback(topic_list): ros.close() -def test_rosapi_topics_blocking(): - ros = Ros(host, port) +def test_rosapi_topics_blocking(ros_transport): + ros = Ros(host, port, transport=ros_transport) ros.run() topic_list = ros.get_topics() @@ -37,11 +37,11 @@ def test_rosapi_topics_blocking(): ros.close() -def test_connection_fails_when_missing_port(): +def test_connection_fails_when_missing_port(ros_transport): with pytest.raises(Exception): - Ros(host) + Ros(host, transport=ros_transport) -def test_connection_fails_when_schema_not_ws(): +def test_connection_fails_when_schema_not_ws(ros_transport): with pytest.raises(Exception): - Ros("http://%s:%d" % (host, port)) + Ros("http://%s:%d" % (host, port), transport=ros_transport) diff --git a/tests/ros1/test_service.py b/tests/ros1/test_service.py index 64bd165..d4b47b0 100644 --- a/tests/ros1/test_service.py +++ b/tests/ros1/test_service.py @@ -5,8 +5,8 @@ from roslibpy import Ros, Service, ServiceRequest -def test_add_two_ints_service(): - ros = Ros("127.0.0.1", 9090) +def test_add_two_ints_service(ros_transport): + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() def add_two_ints(request, response): @@ -29,8 +29,8 @@ def add_two_ints(request, response): ros.close() -def test_empty_service(): - ros = Ros("127.0.0.1", 9090) +def test_empty_service(ros_transport): + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() service = Service(ros, "/test_empty_service", "std_srvs/Empty") diff --git a/tests/ros1/test_tf.py b/tests/ros1/test_tf.py index bbb922d..40e8f1c 100644 --- a/tests/ros1/test_tf.py +++ b/tests/ros1/test_tf.py @@ -4,9 +4,9 @@ from roslibpy.tf import TFClient -def test_tf_test(): +def test_tf_test(ros_transport): context = dict(wait=threading.Event(), counter=0) - ros = Ros("127.0.0.1", 9090) + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() tf_client = TFClient(ros, fixed_frame="world") diff --git a/tests/ros1/test_topic.py b/tests/ros1/test_topic.py index 97ba430..9248d93 100644 --- a/tests/ros1/test_topic.py +++ b/tests/ros1/test_topic.py @@ -6,10 +6,10 @@ from roslibpy import Header, Message, Ros, Time, Topic -def test_topic_pubsub(): +def test_topic_pubsub(ros_transport): context = dict(wait=threading.Event(), counter=0) - ros = Ros("127.0.0.1", 9090) + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() listener = Topic(ros, "/chatter", "std_msgs/String") @@ -50,10 +50,10 @@ def start_receiving(): ros.close() -def test_topic_with_header(): +def test_topic_with_header(ros_transport): context = dict(wait=threading.Event()) - ros = Ros("127.0.0.1", 9090) + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() listener = Topic(ros, "/points", "geometry_msgs/PointStamped") diff --git a/tests/ros2/test_actions.py b/tests/ros2/test_actions.py index 6f02f08..115dba6 100644 --- a/tests/ros2/test_actions.py +++ b/tests/ros2/test_actions.py @@ -5,8 +5,8 @@ from roslibpy import ActionClient, Goal, GoalStatus, Ros -def test_fibonacci(): - ros = Ros("127.0.0.1", 9090) +def test_fibonacci(ros_transport): + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() action = ActionClient(ros, "/fibonacci", "example_interfaces/action/Fibonacci") @@ -34,8 +34,8 @@ def on_error(error): ros.close() -def test_cancel(): - ros = Ros("127.0.0.1", 9090) +def test_cancel(ros_transport): + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() action = ActionClient(ros, "/fibonacci", "example_interfaces/action/Fibonacci") diff --git a/tests/ros2/test_ros.py b/tests/ros2/test_ros.py index aef72c5..1721085 100644 --- a/tests/ros2/test_ros.py +++ b/tests/ros2/test_ros.py @@ -10,8 +10,8 @@ url = "ws://%s:%d" % (host, port) -def test_reconnect_does_not_trigger_on_client_close(): - ros = Ros(host, port) +def test_reconnect_does_not_trigger_on_client_close(ros_transport): + ros = Ros(host, port, transport=ros_transport) ros.run() assert ros.is_connected, "ROS initially connected" @@ -25,22 +25,22 @@ def test_reconnect_does_not_trigger_on_client_close(): assert not ros.is_connecting, "Not trying to re-connect" -def test_connection(): - ros = Ros(host, port) +def test_connection(ros_transport): + ros = Ros(host, port, transport=ros_transport) ros.run() assert ros.is_connected ros.close() -def test_url_connection(): - ros = Ros(url) +def test_url_connection(ros_transport): + ros = Ros(url, transport=ros_transport) ros.run() assert ros.is_connected ros.close() -def test_closing_event(): - ros = Ros(url) +def test_closing_event(ros_transport): + ros = Ros(url, transport=ros_transport) ros.run() ctx = dict(closing_event_called=False, was_still_connected=False) @@ -60,12 +60,12 @@ def handle_closing(): assert closing_was_handled_synchronously_before_close -def test_multithreaded_connect_disconnect(): +def test_multithreaded_connect_disconnect(ros_transport): CONNECTIONS = 30 clients = [] def connect(clients): - ros = Ros(url) + ros = Ros(url, transport=ros_transport) ros.run() clients.append(ros) diff --git a/tests/ros2/test_rosapi.py b/tests/ros2/test_rosapi.py index 2f37ac1..a11b42d 100644 --- a/tests/ros2/test_rosapi.py +++ b/tests/ros2/test_rosapi.py @@ -9,9 +9,9 @@ url = "ws://%s:%d" % (host, port) -def test_rosapi_topics(): +def test_rosapi_topics(ros_transport): context = dict(wait=threading.Event(), result=None) - ros = Ros(host, port) + ros = Ros(host, port, transport=ros_transport) ros.run() def callback(topic_list): @@ -26,8 +26,8 @@ def callback(topic_list): ros.close() -def test_rosapi_topics_blocking(): - ros = Ros(host, port) +def test_rosapi_topics_blocking(ros_transport): + ros = Ros(host, port, transport=ros_transport) ros.run() topic_list = ros.get_topics() @@ -37,11 +37,11 @@ def test_rosapi_topics_blocking(): ros.close() -def test_connection_fails_when_missing_port(): +def test_connection_fails_when_missing_port(ros_transport): with pytest.raises(Exception): - Ros(host) + Ros(host, transport=ros_transport) -def test_connection_fails_when_schema_not_ws(): +def test_connection_fails_when_schema_not_ws(ros_transport): with pytest.raises(Exception): - Ros("http://%s:%d" % (host, port)) + Ros("http://%s:%d" % (host, port), transport=ros_transport) diff --git a/tests/ros2/test_topic.py b/tests/ros2/test_topic.py index b899176..1cd2879 100644 --- a/tests/ros2/test_topic.py +++ b/tests/ros2/test_topic.py @@ -7,10 +7,10 @@ from roslibpy.ros2 import Header -def test_topic_pubsub(): +def test_topic_pubsub(ros_transport): context = dict(wait=threading.Event(), counter=0) - ros = Ros("127.0.0.1", 9090) + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() listener = Topic(ros, "/chatter", "std_msgs/String") @@ -51,10 +51,10 @@ def start_receiving(): ros.close() -def test_topic_with_header(): +def test_topic_with_header(ros_transport): context = dict(wait=threading.Event()) - ros = Ros("127.0.0.1", 9090) + ros = Ros("127.0.0.1", 9090, transport=ros_transport) ros.run() listener = Topic(ros, "/points", "geometry_msgs/PointStamped") diff --git a/tests/test_transport.py b/tests/test_transport.py new file mode 100644 index 0000000..3e909e3 --- /dev/null +++ b/tests/test_transport.py @@ -0,0 +1,136 @@ +import os +import sys + +import pytest + +from roslibpy import Ros, set_default_transport +from roslibpy.comm import ( + TRANSPORT_ASYNCIO, + TRANSPORT_CLI, + TRANSPORT_TWISTED, + _resolve_transport, +) + +PLATFORM_DEFAULT = TRANSPORT_CLI if sys.platform == "cli" else TRANSPORT_TWISTED + +# The transport this process is dedicated to. Autobahn's txaio binds a single +# async framework per process, so importing ``comm_asyncio`` (which loads +# ``autobahn.asyncio``) must be avoided unless this is the asyncio process — +# otherwise it would poison a twisted-designated process. CI runs the suite +# once per transport. +ACTIVE_TRANSPORT = TRANSPORT_CLI if sys.platform == "cli" else os.environ.get("ROSLIBPY_TRANSPORT", TRANSPORT_TWISTED) + + +@pytest.fixture(autouse=True) +def reset_transport(monkeypatch): + monkeypatch.delenv("ROSLIBPY_TRANSPORT", raising=False) + set_default_transport(PLATFORM_DEFAULT) + yield + set_default_transport(PLATFORM_DEFAULT) + + +def test_transport_can_be_selected_from_environment(monkeypatch): + monkeypatch.setenv("ROSLIBPY_TRANSPORT", TRANSPORT_ASYNCIO) + + assert _resolve_transport() == TRANSPORT_ASYNCIO + + +def test_transport_can_be_selected_as_process_default(): + set_default_transport(TRANSPORT_ASYNCIO) + + assert _resolve_transport() == TRANSPORT_ASYNCIO + + +def test_explicit_transport_takes_precedence(monkeypatch): + monkeypatch.setenv("ROSLIBPY_TRANSPORT", TRANSPORT_TWISTED) + set_default_transport(TRANSPORT_TWISTED) + + assert _resolve_transport(TRANSPORT_ASYNCIO) == TRANSPORT_ASYNCIO + + +def test_ros_passes_explicit_transport_to_factory_selector(monkeypatch): + selected = [] + + class Factory(object): + @classmethod + def create_url(cls, host, port=None, is_secure=False): + return "ws://127.0.0.1:9090" + + def __init__(self, url, headers=None): + self.is_connected = False + + def connect(self): + pass + + def on_ready(self, callback): + pass + + def select_factory(transport): + selected.append(transport) + return Factory + + monkeypatch.setattr("roslibpy.ros.select_factory", select_factory) + + Ros("127.0.0.1", 9090, transport=TRANSPORT_ASYNCIO) + + assert selected == [TRANSPORT_ASYNCIO] + + +def _import_asyncio_protocol(): + """Import the asyncio protocol, skipping unless this process is dedicated to + the asyncio transport. + + Importing ``comm_asyncio`` loads ``autobahn.asyncio`` and locks this + process's txaio framework to asyncio, so we must not do it in a + twisted-designated process.""" + if ACTIVE_TRANSPORT != TRANSPORT_ASYNCIO: + pytest.skip("asyncio protocol tests run only when ROSLIBPY_TRANSPORT=asyncio") + try: + from roslibpy.comm.comm_asyncio import AsyncioRosBridgeProtocol + except (ImportError, RuntimeError) as exc: + pytest.skip("asyncio transport unavailable in this process: %s" % exc) + return AsyncioRosBridgeProtocol + + +def test_asyncio_protocol_sends_text_frames(): + AsyncioRosBridgeProtocol = _import_asyncio_protocol() + + # Build the protocol without going through autobahn's connection setup; we + # only exercise the ROS-bridge send path here. + protocol = AsyncioRosBridgeProtocol.__new__(AsyncioRosBridgeProtocol) + + sent = [] + protocol.sendMessage = lambda payload, isBinary=False: sent.append((payload, isBinary)) + + protocol._send(b'{"op": "call_service"}') + + assert sent == [(b'{"op": "call_service"}', False)] + + +def test_asyncio_protocol_send_message_is_thread_safe(): + AsyncioRosBridgeProtocol = _import_asyncio_protocol() + + scheduled = [] + + class FakeLoop(object): + def is_closed(self): + return False + + def call_soon_threadsafe(self, fn, *args): + scheduled.append((fn, args)) + + class FakeManager(object): + loop = FakeLoop() + + class FakeFactory(object): + manager = FakeManager() + + protocol = AsyncioRosBridgeProtocol.__new__(AsyncioRosBridgeProtocol) + protocol.factory = FakeFactory() + + protocol.send_message(b'{"op": "call_service"}') + + assert len(scheduled) == 1 + fn, args = scheduled[0] + assert fn == protocol._send + assert args == (b'{"op": "call_service"}',) diff --git a/tests/test_ws_headers.py b/tests/test_ws_headers.py index 5e3d0a1..263893e 100644 --- a/tests/test_ws_headers.py +++ b/tests/test_ws_headers.py @@ -1,17 +1,14 @@ -from __future__ import print_function - -import asyncio import threading import time -import websockets +import pytest from roslibpy import Ros -headers = { - 'cookie': 'token=rosbridge', - 'authorization': 'Some auth' -} +asyncio = pytest.importorskip("asyncio") +websockets = pytest.importorskip("websockets") + +headers = {"cookie": "token=rosbridge", "authorization": "Some auth"} async def websocket_handler(websocket, path): @@ -22,7 +19,7 @@ async def websocket_handler(websocket, path): async def start_server(stop_event): - server = await websockets.serve(websocket_handler, '127.0.0.1', 9000) + server = await websockets.serve(websocket_handler, "127.0.0.1", 9000) await stop_event.wait() server.close() await server.wait_closed() @@ -32,13 +29,13 @@ def run_server(stop_event): asyncio.run(start_server(stop_event)) -def run_client(): - client = Ros('127.0.0.1', 9000, headers=headers) +def run_client(ros_transport): + client = Ros("127.0.0.1", 9000, headers=headers, transport=ros_transport) client.run() client.close() -def test_websocket_headers(): +def test_websocket_headers(ros_transport): server_stop_event = asyncio.Event() stop_event = threading.Event() @@ -47,7 +44,7 @@ def test_websocket_headers(): time.sleep(1) # Give the server time to start - client_thread = threading.Thread(target=run_client) + client_thread = threading.Thread(target=run_client, args=(ros_transport,)) client_thread.start() # Wait for the client thread to finish or timeout after 10 seconds