Skip to content

fix(stream): remove completed/errored streams from registry after buffer TTL#438

Open
MarioCadenas wants to merge 3 commits into
mainfrom
fix/stream-registry-cleanup
Open

fix(stream): remove completed/errored streams from registry after buffer TTL#438
MarioCadenas wants to merge 3 commits into
mainfrom
fix/stream-registry-cleanup

Conversation

@MarioCadenas

Copy link
Copy Markdown
Collaborator

Problem (memory leak, High)

Every SSE stream that finished while its original client was still connected — the normal case — stayed in the StreamRegistry forever, retaining its EventRingBuffer (up to 100 events x 1MB each), generator, abort controller, and trace context. Since every analytics query is one SSE stream whose final event is the full result set, a busy server accumulated up to maxActiveStreams (1,000) full query results in heap.

Leak mechanics

  • Success path: _processGeneratorInBackground set isCompleted = true, called _closeAllClients() (which only calls res.end() — the close event fires asynchronously), then immediately called _cleanupStream. At that point clients.size > 0, so no removal was ever scheduled.
  • Close handler: the handler installed in _createNewStream deleted the client from the set but had no cleanup-scheduling branch for completed streams — only _attachToExistingStream's close handler (the reconnect path) had one. So the leak hit every stream whose client never reconnected.
  • Error path: never scheduled cleanup at all.
  • streamDefaults.cleanupInterval and maxPersistentBuffers were defined but referenced nowhere — there was no periodic sweeper to catch any of this.

Fix

  • Factored the TTL-based removal scheduling into a single helper, _scheduleRemovalAfterTTL, and call it from every termination path: both close handlers (_createNewStream and _attachToExistingStream), the success path, the client-abort path, and the error path. The helper no-ops unless the stream is finished (isCompleted) and has no connected clients, so whichever of completion / last-disconnect happens second does the scheduling.
  • Reconnect replay is preserved: the event buffer stays in the registry for bufferTTL (default 10 min) after the last client disconnects. A reconnect during the TTL window refreshes lastAccess, which makes any pending timer a no-op; a fresh TTL starts when that client disconnects again.
  • Cleanup timers are unref()'d so they never keep the process alive.
  • Decision on the dead config keys: removed cleanupInterval and maxPersistentBuffers from streamDefaults rather than wiring a periodic sweeper. The per-stream timers now cover every termination path (and abortAll() clears the registry on shutdown), and a sweeper would require adding iteration APIs to stream-registry.ts, which a sibling PR is modifying (capacity-eviction fix) — kept out of this change to avoid merge conflicts. The optional fields on the public StreamConfig type in shared are left as-is (they were already no-ops); they can be dropped in a follow-up.

Tests

New registry cleanup after stream termination suite (fake timers):

  • completed stream + client disconnect → still present at TTL - 1ms, removed at TTL
  • errored stream + client disconnect → likewise
  • errored stream whose client disconnected mid-stream → removed after TTL
  • reconnect within the TTL window still replays buffered events, the stale timer from the first disconnect no-ops, and removal happens a full TTL after the second disconnect
  • a completed stream with a still-connected client is never removed

Verified: pnpm install, pnpm build, pnpm -r typecheck, pnpm check:fix all pass; vitest run packages/appkit/src/stream/tests/ → 91/91 passing (33 in stream.test.ts, including 5 new).

This pull request and its description were written by Isaac.

…fer TTL

Every SSE stream that finished (completed or errored) while its original
client was still connected stayed in the StreamRegistry forever: the
success path called _cleanupStream synchronously before the client's
close event fired (so clients.size was still > 0 and nothing was
scheduled), the close handler installed by _createNewStream had no
cleanup-scheduling branch, and the error path never scheduled cleanup at
all. Since every analytics query is one SSE stream whose final event is
the full result, a busy server retained up to maxActiveStreams (1000)
event buffers, generators, abort controllers, and trace contexts in heap.

Factor the TTL-based removal scheduling into _scheduleRemovalAfterTTL and
call it from every termination path: both close handlers, the success
path, the abort path, and the error path. The event buffer remains
available for reconnect replay for bufferTTL after the last client
disconnects; a reconnect during the TTL window refreshes lastAccess so
the pending timer no-ops and a fresh TTL starts when that client
disconnects. Cleanup timers are unref()'d so they don't keep the process
alive.

Also drop the never-referenced cleanupInterval and maxPersistentBuffers
keys from streamDefaults; the per-stream timers now cover every
termination path, so no periodic sweeper is wired.

Co-authored-by: Isaac
Signed-off-by: MarioCadenas <MarioCadenas@users.noreply.github.com>
@MarioCadenas MarioCadenas requested a review from a team as a code owner June 11, 2026 16:27
…cting completed streams

Follow-up to the stream registry leak fix, addressing review findings:

- _closeAllClients now removes the clients it deliberately ends from the
  stream entry, so registry removal is scheduled on every terminal path
  (complete/error/abort) instead of hinging on the transport emitting
  'close'. The close handlers' later deletes remain safe no-ops.
- At most one removal timer per stream: the timer handle is stored on the
  StreamEntry, rescheduling clears the prior timer, and a reconnect-attach
  cancels any pending removal. The fire-time lastAccess re-check stays as
  a safety net.
- Registry eviction now prefers the oldest completed stream (waiting out
  its buffer TTL) and only falls back to LRU when every stream is live, so
  a full registry no longer evicts live streams while dead ones survive.
  StreamRegistry is backed by a Map instead of RingBuffer: ring slot
  overwrites clobbered an unrelated live entry whenever the evicted stream
  wasn't in the oldest insertion slot.
- Remove unused StreamConfig keys cleanupInterval and maxPersistentBuffers
  (never read; their only definition site was already removed).
- Tests: mock response end() now marks the response ended and fires close
  handlers asynchronously like Node; replaced the test that encoded the
  leaky behavior with active-stream-retained and completed-stream-removed-
  without-close coverage; added end()-fires-close cleanup, eviction
  preference, and replay boundary (newest id, evicted id) tests.
- knip: set ignoreExportsUsedInFile so RingBuffer (still used inside
  buffers.ts by EventRingBuffer) isn't flagged once the registry stops
  importing it.

Co-authored-by: Isaac
Signed-off-by: MarioCadenas <MarioCadenas@users.noreply.github.com>
…on edge cases

- revert repo-wide ignoreExportsUsedInFile in knip.json; replace with a
  file-scoped ignore entry for packages/appkit/src/stream/buffers.ts only
- StreamRegistry.add(): only evict when inserting a genuinely new key, so
  re-adding an existing streamId at capacity no longer destroys an
  unrelated live stream for a net-zero insert
- eviction now aborts with DOMException("Stream evicted", "AbortError")
  to match the manager's terminal abort paths so the error categorizer
  routes eviction as an abort rather than a stream failure
- update stale RingBuffer comments in stream-registry tests to Map
  semantics and adjust eviction/abort-reason test expectations

Co-authored-by: Isaac
Signed-off-by: MarioCadenas <MarioCadenas@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant