fix(stream): remove completed/errored streams from registry after buffer TTL#438
Open
MarioCadenas wants to merge 3 commits into
Open
fix(stream): remove completed/errored streams from registry after buffer TTL#438MarioCadenas wants to merge 3 commits into
MarioCadenas wants to merge 3 commits into
Conversation
…fer TTL Every SSE stream that finished (completed or errored) while its original client was still connected stayed in the StreamRegistry forever: the success path called _cleanupStream synchronously before the client's close event fired (so clients.size was still > 0 and nothing was scheduled), the close handler installed by _createNewStream had no cleanup-scheduling branch, and the error path never scheduled cleanup at all. Since every analytics query is one SSE stream whose final event is the full result, a busy server retained up to maxActiveStreams (1000) event buffers, generators, abort controllers, and trace contexts in heap. Factor the TTL-based removal scheduling into _scheduleRemovalAfterTTL and call it from every termination path: both close handlers, the success path, the abort path, and the error path. The event buffer remains available for reconnect replay for bufferTTL after the last client disconnects; a reconnect during the TTL window refreshes lastAccess so the pending timer no-ops and a fresh TTL starts when that client disconnects. Cleanup timers are unref()'d so they don't keep the process alive. Also drop the never-referenced cleanupInterval and maxPersistentBuffers keys from streamDefaults; the per-stream timers now cover every termination path, so no periodic sweeper is wired. Co-authored-by: Isaac Signed-off-by: MarioCadenas <MarioCadenas@users.noreply.github.com>
…cting completed streams Follow-up to the stream registry leak fix, addressing review findings: - _closeAllClients now removes the clients it deliberately ends from the stream entry, so registry removal is scheduled on every terminal path (complete/error/abort) instead of hinging on the transport emitting 'close'. The close handlers' later deletes remain safe no-ops. - At most one removal timer per stream: the timer handle is stored on the StreamEntry, rescheduling clears the prior timer, and a reconnect-attach cancels any pending removal. The fire-time lastAccess re-check stays as a safety net. - Registry eviction now prefers the oldest completed stream (waiting out its buffer TTL) and only falls back to LRU when every stream is live, so a full registry no longer evicts live streams while dead ones survive. StreamRegistry is backed by a Map instead of RingBuffer: ring slot overwrites clobbered an unrelated live entry whenever the evicted stream wasn't in the oldest insertion slot. - Remove unused StreamConfig keys cleanupInterval and maxPersistentBuffers (never read; their only definition site was already removed). - Tests: mock response end() now marks the response ended and fires close handlers asynchronously like Node; replaced the test that encoded the leaky behavior with active-stream-retained and completed-stream-removed- without-close coverage; added end()-fires-close cleanup, eviction preference, and replay boundary (newest id, evicted id) tests. - knip: set ignoreExportsUsedInFile so RingBuffer (still used inside buffers.ts by EventRingBuffer) isn't flagged once the registry stops importing it. Co-authored-by: Isaac Signed-off-by: MarioCadenas <MarioCadenas@users.noreply.github.com>
…on edge cases
- revert repo-wide ignoreExportsUsedInFile in knip.json; replace with a
file-scoped ignore entry for packages/appkit/src/stream/buffers.ts only
- StreamRegistry.add(): only evict when inserting a genuinely new key, so
re-adding an existing streamId at capacity no longer destroys an
unrelated live stream for a net-zero insert
- eviction now aborts with DOMException("Stream evicted", "AbortError")
to match the manager's terminal abort paths so the error categorizer
routes eviction as an abort rather than a stream failure
- update stale RingBuffer comments in stream-registry tests to Map
semantics and adjust eviction/abort-reason test expectations
Co-authored-by: Isaac
Signed-off-by: MarioCadenas <MarioCadenas@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem (memory leak, High)
Every SSE stream that finished while its original client was still connected — the normal case — stayed in the
StreamRegistryforever, retaining itsEventRingBuffer(up to 100 events x 1MB each), generator, abort controller, and trace context. Since every analytics query is one SSE stream whose final event is the full result set, a busy server accumulated up tomaxActiveStreams(1,000) full query results in heap.Leak mechanics
_processGeneratorInBackgroundsetisCompleted = true, called_closeAllClients()(which only callsres.end()— thecloseevent fires asynchronously), then immediately called_cleanupStream. At that pointclients.size > 0, so no removal was ever scheduled._createNewStreamdeleted the client from the set but had no cleanup-scheduling branch for completed streams — only_attachToExistingStream's close handler (the reconnect path) had one. So the leak hit every stream whose client never reconnected.streamDefaults.cleanupIntervalandmaxPersistentBufferswere defined but referenced nowhere — there was no periodic sweeper to catch any of this.Fix
_scheduleRemovalAfterTTL, and call it from every termination path: both close handlers (_createNewStreamand_attachToExistingStream), the success path, the client-abort path, and the error path. The helper no-ops unless the stream is finished (isCompleted) and has no connected clients, so whichever of completion / last-disconnect happens second does the scheduling.bufferTTL(default 10 min) after the last client disconnects. A reconnect during the TTL window refresheslastAccess, which makes any pending timer a no-op; a fresh TTL starts when that client disconnects again.unref()'d so they never keep the process alive.cleanupIntervalandmaxPersistentBuffersfromstreamDefaultsrather than wiring a periodic sweeper. The per-stream timers now cover every termination path (andabortAll()clears the registry on shutdown), and a sweeper would require adding iteration APIs tostream-registry.ts, which a sibling PR is modifying (capacity-eviction fix) — kept out of this change to avoid merge conflicts. The optional fields on the publicStreamConfigtype insharedare left as-is (they were already no-ops); they can be dropped in a follow-up.Tests
New
registry cleanup after stream terminationsuite (fake timers):TTL - 1ms, removed atTTLVerified:
pnpm install,pnpm build,pnpm -r typecheck,pnpm check:fixall pass;vitest run packages/appkit/src/stream/tests/→ 91/91 passing (33 in stream.test.ts, including 5 new).This pull request and its description were written by Isaac.