-
Notifications
You must be signed in to change notification settings - Fork 154
Fix cloud dev-stack degradation under sustained MCP load #1280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
df86ec5
57513ff
74e934f
56421d5
6834eb2
4d07bf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| // Contract: the postgres pool finalizer must AWAIT the connection teardown. | ||
| // | ||
| // The cloud MCP auth seam (`makeMcpOrganizationAuthServices`) builds a fresh | ||
| // postgres pool on EVERY `/mcp` request and closes it in its `acquireRelease` | ||
| // finalizer. That finalizer used to be fire-and-forget (`Effect.runFork( | ||
| // sql.end({ timeout: 0 }))`): it returned before the socket was torn down, so | ||
| // under sustained MCP load closed-but-unreaped sockets piled up against the | ||
| // dev PGlite server (effectively single-connection) faster than it reaped | ||
| // them. New connects queued behind the backlog, request latency climbed into | ||
| // the tens of seconds, and the e2e cloud dev stack hung after a few minutes: | ||
| // the CI cascade flake. | ||
| // | ||
| // These tests characterize the contract the old fire-and-forget close violated | ||
| // (`closePostgres` itself is new alongside them): it must (a) call `sql.end` | ||
| // with a NON-zero drain window (a clean Terminate, not an abandon) and (b) | ||
| // return an Effect that does not complete until `sql.end` has resolved, even | ||
| // when the teardown takes real wall-clock time. All asserted with a fake `sql` | ||
| // whose `end()` completion is observable, so the tests are fast and need no | ||
| // live database. | ||
|
|
||
| import { describe, expect, it } from "@effect/vitest"; | ||
| import { Effect, Exit } from "effect"; | ||
|
|
||
| import { POSTGRES_END_TIMEOUT_SECONDS, closePostgres } from "./db"; | ||
|
|
||
| describe("closePostgres", () => { | ||
| it.effect("passes a non-zero drain window to sql.end (clean Terminate, not abandon)", () => | ||
| Effect.gen(function* () { | ||
| let received: { timeout?: number } | undefined; | ||
| const fakeSql = { | ||
| end: (options?: { timeout?: number }) => { | ||
| received = options; | ||
| return Promise.resolve(); | ||
| }, | ||
| }; | ||
|
|
||
| yield* closePostgres(fakeSql); | ||
|
|
||
| expect(received?.timeout).toBe(POSTGRES_END_TIMEOUT_SECONDS); | ||
| expect(POSTGRES_END_TIMEOUT_SECONDS).toBeGreaterThan(0); | ||
| }), | ||
| ); | ||
|
|
||
| it.effect("does not complete until sql.end resolves (awaits the teardown)", () => | ||
| Effect.gen(function* () { | ||
| // `end` records an ordering marker only after an async tick. If | ||
| // `closePostgres` awaits it, the "close completed" marker lands AFTER the | ||
| // "end resolved" marker. The old fire-and-forget close returned before | ||
| // `end` ran, so "close completed" would land FIRST. | ||
| const order: string[] = []; | ||
| const fakeSql = { | ||
| end: () => | ||
| // Defer resolution across a microtask so a non-awaiting close would | ||
| // observably finish before this runs. | ||
| Promise.resolve() | ||
| .then(() => Promise.resolve()) | ||
| .then(() => { | ||
| order.push("end-resolved"); | ||
| }), | ||
| }; | ||
|
|
||
| yield* closePostgres(fakeSql); | ||
| order.push("close-completed"); | ||
|
|
||
| // Awaiting the teardown means end resolved strictly before close returned. | ||
| expect(order).toEqual(["end-resolved", "close-completed"]); | ||
| }), | ||
| ); | ||
|
|
||
| it.effect("awaits a teardown that takes real wall-clock time (bounded by the ceiling)", () => | ||
| Effect.gen(function* () { | ||
| // `end` resolves only after a real timer delay, not just a microtask. | ||
| // This pins the "we await to completion, the timeout is only a ceiling" | ||
| // contract: closePostgres must stay suspended across the delay rather | ||
| // than resolving early. | ||
| const order: string[] = []; | ||
| const fakeSql = { | ||
| end: () => | ||
| new Promise<void>((resolve) => { | ||
| setTimeout(() => { | ||
| order.push("end-resolved"); | ||
| resolve(); | ||
| }, 75); | ||
| }), | ||
| }; | ||
|
|
||
| const startedAt = Date.now(); | ||
| yield* closePostgres(fakeSql); | ||
| order.push("close-completed"); | ||
|
|
||
| expect(order).toEqual(["end-resolved", "close-completed"]); | ||
| // Slack of a few ms: platform timers may fire marginally early. | ||
| expect(Date.now() - startedAt).toBeGreaterThanOrEqual(70); | ||
| }), | ||
| ); | ||
|
|
||
| it.effect("swallows sql.end failures (a teardown error must not fail the request scope)", () => | ||
| Effect.gen(function* () { | ||
| const fakeSql = { | ||
| // A rejected teardown (connection already gone) must not surface as a | ||
| // scope failure. | ||
| // oxlint-disable-next-line executor/no-promise-reject -- test fake: model `sql.end` (a raw postgres.js promise) rejecting | ||
| end: () => new Promise<void>((_resolve, reject) => reject("connection already gone")), | ||
| }; | ||
| const exit = yield* Effect.exit(closePostgres(fakeSql)); | ||
| expect(Exit.isSuccess(exit)).toBe(true); | ||
| }), | ||
| ); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| // Regression for the dev-db PGlite socket protocol-interleaving bug (patched | ||
| // in patches/@electric-sql%2Fpglite-socket@0.1.4.patch). | ||
| // | ||
| // PGLiteSocketServer's QueryQueueManager used to enqueue each postgres wire | ||
| // FRAME (Parse, Bind, Execute, Sync) as its own queue entry against the one | ||
| // shared PGlite session. With more than one connection (the dev-db now allows | ||
| // many — see scripts/dev-db.ts maxConnections), two clients' extended-protocol | ||
| // pipelines interleaved: client A's Parse (5 params) ... client B's Parse | ||
| // (1 param) ... A's Bind now hits B's unnamed statement: | ||
| // | ||
| // PostgresError: bind message supplies 5 parameters, but prepared | ||
| // statement "" requires 1 | ||
| // | ||
| // which surfaced in e2e as random 500s ("Failed to load tools", StorageError) | ||
| // on whichever request lost the race — the residual per-spec CI flakes after | ||
| // the connection-storm fix. The patch batches all frames of one socket data | ||
| // event into a single queue entry and adds handler affinity while a pipeline | ||
| // is open, so one client's Parse..Sync executes atomically. | ||
| // | ||
| // This test drives concurrent clients issuing unprepared parameterized queries | ||
| // with DIFFERENT parameter counts (the exact drizzle/postgres-js shape) through | ||
| // one PGLiteSocketServer and asserts zero protocol corruption. | ||
|
|
||
| import { describe, expect, it } from "@effect/vitest"; | ||
| import { PGlite } from "@electric-sql/pglite"; | ||
| import { PGLiteSocketServer } from "@electric-sql/pglite-socket"; | ||
| import postgres from "postgres"; | ||
|
|
||
| const PORT = 45998; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| const CLIENTS = 6; | ||
| const QUERIES_PER_CLIENT = 40; | ||
|
|
||
| describe("dev-db PGlite socket under concurrent connections", () => { | ||
| it( | ||
| "serves interleaved multi-connection pipelines without protocol corruption", | ||
| { timeout: 60_000 }, | ||
| async () => { | ||
| const db = await PGlite.create(); | ||
| const server = new PGLiteSocketServer({ | ||
| db, | ||
| port: PORT, | ||
| host: "127.0.0.1", | ||
| maxConnections: 100, | ||
| }); | ||
| await server.start(); | ||
|
|
||
| let ok = 0; | ||
| const errors: string[] = []; | ||
|
|
||
| const worker = async (id: number) => { | ||
| const sql = postgres(`postgres://postgres:postgres@127.0.0.1:${PORT}/postgres`, { | ||
| max: 1, | ||
| idle_timeout: 0, | ||
| connect_timeout: 10, | ||
| fetch_types: false, | ||
| prepare: true, | ||
| onnotice: () => undefined, | ||
| }); | ||
| // oxlint-disable-next-line executor/no-try-catch-or-throw -- test boundary: postgres.js is promise-native and the socket must be closed on every path | ||
| try { | ||
| for (let q = 0; q < QUERIES_PER_CLIENT; q++) { | ||
| // Alternate 1-param and 5-param unprepared queries: maximally | ||
| // collision-prone unnamed-statement shapes across connections. | ||
| if ((id + q) % 2 === 0) { | ||
| await sql.unsafe(`select $1::int as one`, [1]); | ||
| } else { | ||
| await sql.unsafe(`select $1::int, $2::text, $3::text, $4::text, $5::text`, [ | ||
| 1, | ||
| "b", | ||
| "c", | ||
| "d", | ||
| "e", | ||
| ]); | ||
| } | ||
| ok++; | ||
| } | ||
| } catch (cause) { | ||
| // oxlint-disable-next-line executor/no-unknown-error-message -- test boundary: the raw PostgresError message IS the assertion payload | ||
| errors.push(String(cause)); | ||
| } finally { | ||
| // oxlint-disable-next-line executor/no-promise-catch -- test boundary: postgres.js is promise-native; a failed teardown must not mask the assertion | ||
| await sql.end({ timeout: 5 }).catch(() => {}); | ||
| } | ||
| }; | ||
|
|
||
| await Promise.all(Array.from({ length: CLIENTS }, (_, i) => worker(i))); | ||
| await server.stop(); | ||
| await db.close(); | ||
|
|
||
| expect(errors, `protocol corruption under concurrency:\n${errors.join("\n")}`).toEqual([]); | ||
| expect(ok).toBe(CLIENTS * QUERIES_PER_CLIENT); | ||
| }, | ||
| ); | ||
| }); | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Number()returnsNaNfor a non-numeric string, which causesthis.handlers.size >= NaNto always befalse, silently removing the connection cap. UsingNumber.isFinitewith a fallback gives a clearly bounded result.