Skip to content

Commit 03193d7

Browse files
authored
fix: make V1Channel re-subscribable after a failed subscribe (#845)
* fix: make V1Channel re-subscribable after a failed subscribe * fix: narrow subscribe/connect cleanup to Exception; use 16-byte test nonce Address review feedback on #845: - Narrow the cleanup guards in RoborockDevice.connect() and V1Channel.subscribe() from BaseException to Exception so KeyboardInterrupt/SystemExit/CancelledError propagate untouched while still releasing the channel on normal failures (incl. the ValueError and RoborockException retry paths the fix targets). - Use an actually-16-byte test nonce to match its name and avoid relying on absent length validation. * test: verify v1 subscribe/connect behavior via public api, not internals Rewrite the V1Channel/RoborockDevice regression tests to assert observable behavior (the callback fires on message arrival; re-subscribe and re-connect succeed) instead of private attributes (_callback, _reconnect_task, _mqtt_unsub, _unsub), per review feedback. Add type: ignore for the test-only mock assignments and FakeChannel construction that were failing the mypy pre-commit hook. * refactor: distinguish expected vs unexpected exceptions in subscribe/connect cleanup Split the leak-safety catch-all in V1Channel.subscribe() and RoborockDevice.connect() into an explicit `except RoborockException` (the expected failure path) and an `except Exception` that logs the unexpected, shouldn't-happen error before tearing down. Both branches still run teardown/unsub and re-raise, so behavior is unchanged; the broad catch is now a deliberate, signposted exception to the usual don't-catch-Exception rule, and unexpected errors become observable in logs.
1 parent b311a31 commit 03193d7

4 files changed

Lines changed: 246 additions & 41 deletions

File tree

roborock/devices/device.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,16 @@ async def connect(self) -> None:
203203
elif self.b01_q10_properties is not None:
204204
await self.b01_q10_properties.start()
205205
except RoborockException:
206+
# Expected: start() can fail transiently. Unsubscribe before propagating
207+
# so the retry by connect_loop() gets a clean channel.
208+
unsub()
209+
raise
210+
except Exception:
211+
# Not expected here. We normally avoid a bare ``except Exception`` in
212+
# this codebase, but a leaked subscription would stop connect_loop() from
213+
# ever reconnecting, so we deliberately catch broadly, log the unexpected
214+
# error, and release the channel before propagating.
215+
self._logger.exception("Unexpected error during connect; releasing channel to avoid a leak")
206216
unsub()
207217
raise
208218
self._logger.info("Connected to device")

roborock/devices/rpc/v1_channel.py

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -295,45 +295,75 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
295295
if self._callback is not None:
296296
raise ValueError("Only one subscription allowed at a time")
297297

298-
# Make an initial, optimistic attempt to connect to local with the
299-
# cache. The cache information will be refreshed by the background task.
300-
try:
301-
await self._local_connect(prefer_cache=True)
302-
except RoborockException as err:
303-
self._logger.debug("First local connection attempt failed, will retry: %s", err)
304-
305-
# Start a background task to manage the local connection health. This
306-
# happens independent of whether we were able to connect locally now.
307-
if self._reconnect_task is None:
308-
loop = asyncio.get_running_loop()
309-
self._reconnect_task = loop.create_task(self._background_reconnect())
310-
311-
# We maintain an active MQTT subscription even when connected locally to receive
312-
# unsolicited status updates (DPS push messages) directly from the cloud.
298+
# Claim the subscription up front. Any failure in the setup below routes
299+
# through _teardown(), which clears this again so the channel is left in
300+
# a clean, re-subscribable state. Without this, a partially-completed
301+
# subscribe (e.g. a transient failure later in connect()) would leave a
302+
# stale callback and the next subscribe() would raise the guard above.
303+
self._callback = callback
313304
try:
314-
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
315-
except RoborockException as err:
316-
if not self.is_local_connected:
317-
# Propagate error if both local and MQTT failed
318-
self._logger.debug("MQTT connection also failed: %s", err)
319-
raise
320-
self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err)
321-
322-
def unsub() -> None:
323-
"""Unsubscribe from all messages."""
324-
if self._reconnect_task:
325-
self._reconnect_task.cancel()
326-
self._reconnect_task = None
327-
if self._mqtt_unsub:
328-
self._mqtt_unsub()
329-
self._mqtt_unsub = None
330-
if self._local_unsub:
331-
self._local_unsub()
332-
self._local_unsub = None
333-
self._logger.debug("Unsubscribed from device")
305+
# Make an initial, optimistic attempt to connect to local with the
306+
# cache. The cache information will be refreshed by the background task.
307+
try:
308+
await self._local_connect(prefer_cache=True)
309+
except RoborockException as err:
310+
self._logger.debug("First local connection attempt failed, will retry: %s", err)
334311

335-
self._callback = callback
336-
return unsub
312+
# Start a background task to manage the local connection health. This
313+
# happens independent of whether we were able to connect locally now.
314+
if self._reconnect_task is None:
315+
loop = asyncio.get_running_loop()
316+
self._reconnect_task = loop.create_task(self._background_reconnect())
317+
318+
# We maintain an active MQTT subscription even when connected locally to receive
319+
# unsolicited status updates (DPS push messages) directly from the cloud.
320+
try:
321+
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
322+
except RoborockException as err:
323+
if not self.is_local_connected:
324+
# Propagate error if both local and MQTT failed
325+
self._logger.debug("MQTT connection also failed: %s", err)
326+
raise
327+
self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err)
328+
except RoborockException:
329+
# Expected failure path (e.g. both local and MQTT transports down).
330+
# Release the channel so the next subscribe() starts clean.
331+
self._teardown()
332+
raise
333+
except Exception:
334+
# Not expected here. We normally avoid a bare ``except Exception`` in
335+
# this codebase, but leaving a partial subscription behind (reconnect
336+
# task, MQTT subscription, stale callback) would brick the device, so we
337+
# deliberately catch broadly, log the unexpected error, and tear down
338+
# before propagating.
339+
self._logger.exception("Unexpected error during subscribe; tearing down to avoid a leak")
340+
self._teardown()
341+
raise
342+
343+
self._logger.debug("Subscribed to device")
344+
return self._teardown
345+
346+
def _teardown(self) -> None:
347+
"""Tear down all subscriptions and reset the channel to a re-subscribable state.
348+
349+
Returned from subscribe() as the unsubscribe function and also invoked on
350+
any failure partway through subscribe(). Idempotent: each resource is
351+
guarded so repeat calls are no-ops.
352+
"""
353+
if self._reconnect_task:
354+
self._reconnect_task.cancel()
355+
self._reconnect_task = None
356+
if self._mqtt_unsub:
357+
self._mqtt_unsub()
358+
self._mqtt_unsub = None
359+
if self._local_unsub:
360+
self._local_unsub()
361+
self._local_unsub = None
362+
if self._local_channel:
363+
self._local_channel.close()
364+
self._local_channel = None
365+
self._callback = None
366+
self._logger.debug("Unsubscribed from device")
337367

338368
def add_dps_listener(self, listener: Callable[[dict[RoborockDataProtocol, Any]], None]) -> Callable[[], None]:
339369
"""Add a listener for DPS updates.

tests/devices/rpc/test_v1_channel.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
USER_DATA = UserData.from_dict(mock_data.USER_DATA)
3232
TEST_DEVICE_UID = "abc123"
3333
TEST_LOCAL_KEY = "local_key"
34-
TEST_SECURITY_DATA = SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byte")
34+
TEST_SECURITY_DATA = SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byt")
3535
TEST_HOST = mock_data.TEST_LOCAL_API_HOST
3636

3737

@@ -642,3 +642,69 @@ async def test_v1_channel_dps_listener_raises_exception(
642642

643643
unsub_dps1()
644644
unsub_dps2()
645+
646+
647+
async def test_v1_channel_resubscribe_after_unsub(
648+
v1_channel: V1Channel,
649+
mock_mqtt_channel: FakeChannel,
650+
) -> None:
651+
"""A subscribe -> unsub -> subscribe cycle must not raise, and the new callback works.
652+
653+
Regression: unsub() previously failed to reset the subscription, so the second
654+
subscribe() tripped the "Only one subscription allowed at a time" guard.
655+
This is the exact failure that bricked a second vacuum sharing an account.
656+
"""
657+
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
658+
callback = Mock()
659+
unsub = await v1_channel.subscribe(callback)
660+
661+
# The subscribed callback receives messages arriving on the channel.
662+
mock_mqtt_channel.notify_subscribers(TEST_RESPONSE)
663+
callback.assert_called_once_with(TEST_RESPONSE)
664+
665+
# After unsub, the old callback no longer receives messages.
666+
unsub()
667+
callback.reset_mock()
668+
mock_mqtt_channel.notify_subscribers(TEST_RESPONSE)
669+
callback.assert_not_called()
670+
671+
# Re-subscribing must succeed (network info is now cached, no MQTT needed) and
672+
# the new callback then receives messages.
673+
new_callback = Mock()
674+
unsub2 = await v1_channel.subscribe(new_callback)
675+
mock_mqtt_channel.notify_subscribers(TEST_RESPONSE)
676+
new_callback.assert_called_once_with(TEST_RESPONSE)
677+
unsub2()
678+
679+
680+
async def test_v1_channel_subscribe_failure_is_atomic(
681+
v1_channel: V1Channel,
682+
mock_mqtt_channel: FakeChannel,
683+
mock_local_channel: FakeChannel,
684+
) -> None:
685+
"""A failure partway through subscribe() leaves the channel re-subscribable.
686+
687+
Regression: a failed subscribe() previously leaked the background reconnect
688+
task and a partial subscription, so the next attempt could neither reuse nor
689+
cleanly recreate the channel.
690+
"""
691+
# Both transports down: local connect fails and the MQTT subscribe fails.
692+
mock_local_channel.connect.side_effect = RoborockException("local down")
693+
mock_mqtt_channel.subscribe.side_effect = RoborockException("mqtt down")
694+
695+
with pytest.raises(RoborockException):
696+
await v1_channel.subscribe(Mock())
697+
698+
# The failed subscribe left no dangling subscription on the channel.
699+
assert not mock_mqtt_channel.subscribers
700+
701+
# And the channel is re-subscribable once the transports recover: the new
702+
# subscription succeeds and its callback receives messages.
703+
mock_local_channel.connect.side_effect = None
704+
mock_mqtt_channel.subscribe.side_effect = mock_mqtt_channel._subscribe
705+
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
706+
callback = Mock()
707+
unsub = await v1_channel.subscribe(callback)
708+
mock_mqtt_channel.notify_subscribers(TEST_RESPONSE)
709+
callback.assert_called_once_with(TEST_RESPONSE)
710+
unsub()

tests/devices/test_v1_device.py

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@
88
import pytest
99
from syrupy import SnapshotAssertion
1010

11-
from roborock.data import HomeData, S7MaxVStatus, UserData
12-
from roborock.devices.cache import DeviceCache, NoCache
11+
from roborock.data import HomeData, NetworkInfo, S7MaxVStatus, UserData
12+
from roborock.devices.cache import DeviceCache, DeviceCacheData, InMemoryCache, NoCache
1313
from roborock.devices.device import RoborockDevice
14+
from roborock.devices.rpc.v1_channel import V1Channel
1415
from roborock.devices.traits import v1
1516
from roborock.devices.traits.v1.common import V1TraitMixin
16-
from roborock.protocols.v1_protocol import decode_rpc_response
17+
from roborock.devices.transport.local_channel import LocalSession
18+
from roborock.exceptions import RoborockException
19+
from roborock.protocols.v1_protocol import SecurityData, decode_rpc_response
1720
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
1821
from tests import mock_data
22+
from tests.fixtures.channel_fixtures import FakeChannel
1923

2024
USER_DATA = UserData.from_dict(mock_data.USER_DATA)
2125
HOME_DATA = HomeData.from_dict(mock_data.HOME_DATA_RAW)
@@ -181,3 +185,98 @@ async def test_device_trait_command_parsing(
181185
assert device.v1_properties
182186
device_dict = device.diagnostic_data()
183187
assert device_dict == snapshot
188+
189+
190+
@pytest.mark.parametrize(
191+
"start_error",
192+
[RoborockException("transient status fetch failed"), ValueError("unexpected")],
193+
ids=["roborock-exception", "non-roborock-exception"],
194+
)
195+
async def test_connect_unsubscribes_when_start_fails(
196+
device: RoborockDevice,
197+
channel: AsyncMock,
198+
start_error: Exception,
199+
) -> None:
200+
"""connect() must release the channel when start() fails, for any exception.
201+
202+
Regression: the cleanup was scoped to ``except RoborockException``, so a
203+
non-Roborock failure in start() propagated without unsubscribing, leaving the
204+
channel subscribed and the next attempt unable to re-subscribe.
205+
"""
206+
unsub = Mock()
207+
channel.subscribe = AsyncMock(return_value=unsub)
208+
device.v1_properties.start = AsyncMock(side_effect=start_error) # type: ignore[method-assign, union-attr]
209+
210+
with pytest.raises(type(start_error)):
211+
await device.connect()
212+
213+
# The channel was released before the error propagated.
214+
channel.subscribe.assert_awaited_once()
215+
unsub.assert_called_once()
216+
217+
# The device is left re-connectable: once start() recovers, connect()
218+
# succeeds instead of raising "Already connected to the device".
219+
device.v1_properties.start = AsyncMock(return_value=None) # type: ignore[method-assign, union-attr]
220+
await device.connect()
221+
assert channel.subscribe.await_count == 2
222+
223+
224+
async def test_connect_retries_after_transient_start_failure() -> None:
225+
"""End-to-end regression for the Q5 multi-vacuum bug.
226+
227+
A device backed by a real V1Channel: the first connect() subscribes, then
228+
start() fails transiently. The retry must re-subscribe cleanly rather than
229+
raising "Only one subscription allowed at a time", and the device must end
230+
up connected.
231+
"""
232+
duid = HOME_DATA.devices[0].duid
233+
234+
mqtt_channel = FakeChannel()
235+
await mqtt_channel.connect()
236+
local_channel = FakeChannel()
237+
local_session = Mock(spec=LocalSession, return_value=local_channel)
238+
239+
# Cache the network info so local connect doesn't need an MQTT round-trip.
240+
cache = InMemoryCache()
241+
device_cache = DeviceCache(duid, cache)
242+
await device_cache.set(DeviceCacheData(network_info=NetworkInfo.from_dict(mock_data.NETWORK_INFO)))
243+
244+
v1_channel = V1Channel(
245+
device_uid=duid,
246+
security_data=SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byt"),
247+
mqtt_channel=mqtt_channel, # type: ignore[arg-type]
248+
local_session=local_session,
249+
device_cache=device_cache,
250+
)
251+
device = RoborockDevice(
252+
device_info=HOME_DATA.devices[0],
253+
product=HOME_DATA.products[0],
254+
channel=v1_channel,
255+
trait=v1.create(
256+
duid,
257+
HOME_DATA.products[0],
258+
HOME_DATA,
259+
AsyncMock(),
260+
AsyncMock(),
261+
AsyncMock(),
262+
Mock(),
263+
AsyncMock(),
264+
device_cache=device_cache,
265+
region=USER_DATA.region,
266+
),
267+
)
268+
269+
# First connect() subscribes successfully, then start() fails transiently;
270+
# the second succeeds.
271+
device.v1_properties.start = AsyncMock(side_effect=[RoborockException("transient"), None]) # type: ignore[method-assign, union-attr]
272+
273+
with pytest.raises(RoborockException):
274+
await device.connect()
275+
276+
# The retry must NOT raise "Only one subscription allowed at a time"; the
277+
# clean release after the transient failure lets connect() re-subscribe and
278+
# the device ends up connected.
279+
await device.connect()
280+
assert device.is_connected
281+
282+
await device.close()

0 commit comments

Comments
 (0)