Fully configure frame processors when they are used directly on an audio stream#679
Fully configure frame processors when they are used directly on an audio stream#6791egoman wants to merge 25 commits into
Conversation
3e5a9ab to
f62c247
Compare
564b2c7 to
8d3f4fe
Compare
| num_channels: int = 1, | ||
| frame_size_ms: int | None = None, | ||
| noise_cancellation: Optional[NoiseCancellationOptions | FrameProcessor[AudioFrame]] = None, | ||
| noise_cancellation_leave_open: bool = False, |
There was a problem hiding this comment.
| noise_cancellation_leave_open: bool = False, |
Can we move that inside NoiseCancellationOptions?
There was a problem hiding this comment.
Unfortunately, no - this is important to the FrameProcessor[AudioFrame] side of that noise_cancellation union. Open to putting it somewhere else but it needs to be settable in the FrameProcessor path.
There was a problem hiding this comment.
hmm, not sure if it's a good idea, but could it be a field on the FrameProcessor interface instead?
Then we could add it to NoiseCancellationOptions and new FrameProcessors would be able to set it on the processor itself
There was a problem hiding this comment.
It's not a setting that a frame processor would always want to have set or not have set, so I'm not sure that would really make sense either.
For context, the reason this is here is so the agents sdk can reuse a single FrameProcessor across multiple underlying tracks. Previously, this wasn't a problem in the way this used to work, because the agents sdk had the responsibility of closing the FrameProcessor, so it could easily do it at room disconnection time. But in order to support the ability to use FrameProcessors directly on an AudioStream, calling close needs to be pushed down deeper than the agents sdk layer. This flag allows the caller to explictly tell AudioStream that they will manage cleaning up the FrameProcessor so that both use cases can continue to work.
There was a problem hiding this comment.
I think this flag is not really configuring the noise suppression behavior, but how AudioStream deals with its own noise suppression, maybe the naming of noise_cancellation_leave_open is a bit confusing ?
how about close_noise_cancellation_on_stream_close or manage_noise_cancellation_processor ?
There was a problem hiding this comment.
It's not a setting that a frame processor would always want to have set or not have set
it could stay undefined by default? 🤷
I understand however that it feels a bit weird for it to live on the processor if the processor itself doesn't really use the field.
We shortly discussed also the option to introduce a restart method on the processor. I think this could still be a viable alternative?
There was a problem hiding this comment.
We shortly discussed also the option to introduce a restart method on the processor. I think this could still be a viable alternative?
It could, but the con there is it's a breaking api change to FrameProcessor.
Just generally, I want to understand what folks' concerns are in more detail. Is it just the noise_cancellation_ prefix naming like shijing suggested (I think out of the two suggestions, I like manage_noise_cancellation_processor better)? Or is there something deeper behavior wise that is concerning?
FWIW, two fairly similar patterns I found:
There was a problem hiding this comment.
Talked to @lukasIO in a 1:1 and he confirmed his concern was mostly with the naming, not with the broad approach, which is helpful.
A few other name ideas, in addition to shijing's suggestions (close_noise_cancellation_on_stream_close / manage_noise_cancellation_processor) - some of these would involve flipping the flag:
shared_noise_cancellationnoise_cancellation_externally_managedauto_close_noise_cancellationowns_noise_cancellation
Out of the above, I think I like auto_close_noise_cancellation the best:
# Usage within agents sdk:
AudioStream.from_track(
# ...
noise_cancellation=frame_processor,
auto_close_noise_cancellation=False,
)I'm going to update the pull request to use it for now in 8d5e656.
Another possible idea: maybe something like the below could be a different way to package the same data which could better contain it. In a world like this, noise_cancellation would be of type Union[NoiseCancellationOptions, FrameProcessorOptions, FrameProcessor]:
AudioStream.from_track(
# ...
noise_cancellation=FrameProcessorOptions(frame_processor=self, leave_open=True)
)Do any of these ideas look better than the current state?
There was a problem hiding this comment.
@theomonnom @xianshijing-lk Lukas gave a a general 👍 to the rename to auto_close_noise_cancellation addressing his concern. Do either of you have further concerns beyond what this rename could accomplish with this approach?
If not / I don't hear anything in the next few days I think I am good to merge this.
…io stream And extracting metadata from that room that can be fed into the frame processor.
…oStream This makes it less complex.
The agents sdk can pass this opt-out flag so that it can reuse the frame processor across many audio tracks
Need to think about this a bit more, this pattern as written won't work, since the FrameProcessor today can't have a set of no-op credentials pushed.
…Processor methods, and use them when moving a track out of a room
These tests exercise all the frame processor track reparenting under room / etc paths.
8d5e656 to
8e3a461
Compare
…ll reconnect When the room does a full reconnect, make sure the audiostream metadata gets a new push with the updated track sid
| if republished.track is not None: | ||
| # Keep the local-track invariant (track.sid == publication.sid, | ||
| # set at publish_track) intact across republish, then re-push | ||
| # metadata so any attached FrameProcessor learns the new | ||
| # publication SID / credentials. _set_room with the same room | ||
| # is a no-op for the token_refreshed listener but re-fans the | ||
| # metadata to every registered AudioStream. | ||
| republished.track._info.sid = republished.sid | ||
| republished.track._set_room(self) |
There was a problem hiding this comment.
Note to any reviewers: the Devin bot surfaced this case where when a full reconnect happens, tracks would not have their metadata kept up to date. I'm not sure if republished.track._info.sid = republished.sid is a good pattern, but wanted to give an opportunity for folks to weigh in on these final late breaking changes before merging.
| def _set_room(self, room: Optional[Room]) -> None: | ||
| old_room = self._resolve_room() | ||
| if old_room is not room: | ||
| if old_room is not None: | ||
| old_room.off("token_refreshed", self._on_room_token_refreshed) | ||
| if room is not None: | ||
| room.on("token_refreshed", self._on_room_token_refreshed) | ||
|
|
||
| self._room_ref = weakref.ref(room) if room is not None else None | ||
|
|
||
| for stream in self._audio_streams: | ||
| self._push_processor_metadata_to_stream(stream, room) |
There was a problem hiding this comment.
🟡 _set_room(None) is not idempotent: double-fires _on_stream_info_cleared/_on_credentials_cleared in unpublish race
The comment in livekit-rtc/livekit/rtc/participant.py:860 claims _set_room(None) is idempotent, but it isn't. When the local_track_unpublished room event wins the race against unpublish_track, both paths call _set_room(None) on the same track. On the second call, old_room resolves to None (already cleared by the first call), so the if old_room is not room check at track.py:43 is False (None is not None → False) and listener management is correctly skipped. However, the metadata push loop at track.py:51-52 still runs unconditionally, calling _push_processor_metadata_to_stream(stream, None) which re-invokes _on_stream_info_cleared() and _on_credentials_cleared() on every registered stream's processor (track.py:69-70). A concrete FrameProcessor that performs side-effectful work in these callbacks (closing a websocket, decrementing a reference count, etc.) will see the effect applied twice.
Race flow when room event wins
- Room handler calls
unpublished.track._set_room(None)→ first clear, detaches listener, fires cleared callbacks - Room handler does NOT null
publication._track unpublish_trackresumes, findspublication._track is not None(line 862), calls_set_room(None)again → second clear, fires cleared callbacks again
| def _set_room(self, room: Optional[Room]) -> None: | |
| old_room = self._resolve_room() | |
| if old_room is not room: | |
| if old_room is not None: | |
| old_room.off("token_refreshed", self._on_room_token_refreshed) | |
| if room is not None: | |
| room.on("token_refreshed", self._on_room_token_refreshed) | |
| self._room_ref = weakref.ref(room) if room is not None else None | |
| for stream in self._audio_streams: | |
| self._push_processor_metadata_to_stream(stream, room) | |
| def _set_room(self, room: Optional[Room]) -> None: | |
| old_room = self._resolve_room() | |
| if old_room is None and room is None: | |
| return # already cleared, nothing to do | |
| if old_room is not room: | |
| if old_room is not None: | |
| old_room.off("token_refreshed", self._on_room_token_refreshed) | |
| if room is not None: | |
| room.on("token_refreshed", self._on_room_token_refreshed) | |
| self._room_ref = weakref.ref(room) if room is not None else None | |
| for stream in self._audio_streams: | |
| self._push_processor_metadata_to_stream(stream, room) |
Was this helpful? React with 👍 or 👎 to provide feedback.
| if unpublished is not None: | ||
| del self.local_participant._track_publications[sid] | ||
| if unpublished.track is not None: | ||
| unpublished.track._set_room(None) | ||
| self.emit("local_track_unpublished", unpublished) |
There was a problem hiding this comment.
🚩 local_track_unpublished room handler does not null publication._track
In the local_track_unpublished handler at room.py:760-764, the publication is removed from the dict and _set_room(None) is called on the track, but unpublished._track is never set to None. This contrasts with track_unsubscribed at room.py:830 which does set rpublication._track = None. The unpublish_track method at participant.py:864 eventually nulls it, but only if it runs. If only the room event fires (e.g., during server-side removal without an explicit unpublish_track call), the publication object retains a stale reference to the track. This may be intentional (preserving track for the emitted event's consumers) but creates an asymmetry between local and remote unpublish handlers.
Was this helpful? React with 👍 or 👎 to provide feedback.
Updates the python sdk so that
FrameProcessor-based noise cancellation providers can be used directly onAudioStream, without having to go through the agent's RoomIO to be able to initialize itself with credentials.For example, with this change, something like the below becomes possible:
The way this works -
Tracks now keep track of which room they are part of (holding aweakrefvalue). When the room a track is in changes, it computes new frame processor options and sends these to anyAudioStreams which are associated with the track.The
noise_cancellation_leave_openparameter allows the agents sdk to call thisfrom_trackmethod with a frame processor which remains open across the whole session, and won't be auto-closed when the track is closed.This goes along with livekit/agents#5867, which removes the relevant event handling logic in the agents sdk. I will follow up with a node version of this once the python one is in a good state.
Todo