Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions apps/cloud/scripts/dev-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,41 @@ const db = await PGlite.create(DB_PATH);
console.log(`[dev-db] Running migrations from ${MIGRATIONS_FOLDER}`);
await migrate(drizzle(db), { migrationsFolder: MIGRATIONS_FOLDER });

// `PGLiteSocketServer` defaults to `maxConnections: 1` and answers every extra
// concurrent connection with "Too many connections" + an immediate socket
// close. (pglite-socket 0.1.4's published index.d.ts documents "default: 100",
// but the shipped runtime JS is `maxConnections ?? 1`, verified in the shipped
// chunk, so the runtime default really is 1.) The cloud worker opens a fresh
// postgres pool per request (the MCP auth seam rebuilds one on EVERY `/mcp`
// request, see apps/cloud/src/mcp/auth.ts), so under concurrent load, exactly
// what the e2e suite generates against one shared dev stack, the
// second-and-later connects were rejected, and postgres.js reconnected in a
// tight loop against the closed socket. That reconnect storm piled up
// thousands of half-closed sockets, starved real queries, drove request
// latency into the tens of seconds, and eventually hung the stack: the CI e2e
// "cloud dev stack degrades after minutes of sustained load" cascade flake.
// PGlite runs queries serially (its internal QueryQueueManager executes each
// under `runExclusive`), so allowing many connections means they queue instead
// of being rejected. One caveat makes that safe: stock pglite-socket 0.1.4
// enqueues each wire FRAME separately, so two connections' extended-protocol
// pipelines (Parse/Bind/Execute/Sync) would interleave inside the one shared
// PGlite session and corrupt each other ("bind message supplies N parameters,
// but prepared statement requires M" -> random 500s on whichever request lost
// the race). The patch in patches/@electric-sql%2Fpglite-socket@0.1.4.patch
// batches each socket data event into one queue entry and holds handler
// affinity while a pipeline is open;
// src/db/dev-db-socket-concurrency.node.test.ts is the regression test.
const server = new PGLiteSocketServer({
db,
port: PORT,
host: "127.0.0.1",
maxConnections: Number(process.env.DEV_DB_MAX_CONNECTIONS ?? 1000),
// Backstop for pipeline affinity: a client that stalls mid-pipeline (Parse
// sent, no Sync) with its socket still OPEN would hold the queue's handler
// affinity forever and starve every other connection, since affinity only
// releases on detach and detach needs close/error/idle-timeout. In ms; the
// timer resets on every data event, so only a genuinely dead client trips it.
idleTimeout: Number(process.env.DEV_DB_IDLE_TIMEOUT_MS ?? 30_000),
Comment on lines +113 to +119

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Number() returns NaN for a non-numeric string, which causes this.handlers.size >= NaN to always be false, silently removing the connection cap. Using Number.isFinite with a fallback gives a clearly bounded result.

Suggested change
maxConnections: Number(process.env.DEV_DB_MAX_CONNECTIONS ?? 1000),
// Backstop for pipeline affinity: a client that stalls mid-pipeline (Parse
// sent, no Sync) with its socket still OPEN would hold the queue's handler
// affinity forever and starve every other connection, since affinity only
// releases on detach and detach needs close/error/idle-timeout. In ms; the
// timer resets on every data event, so only a genuinely dead client trips it.
idleTimeout: Number(process.env.DEV_DB_IDLE_TIMEOUT_MS ?? 30_000),
maxConnections: Number.isFinite(Number(process.env.DEV_DB_MAX_CONNECTIONS))
? Number(process.env.DEV_DB_MAX_CONNECTIONS)
: 1000,
// Backstop for pipeline affinity: a client that stalls mid-pipeline (Parse
// sent, no Sync) with its socket still OPEN would hold the queue's handler
// affinity forever and starve every other connection, since affinity only
// releases on detach and detach needs close/error/idle-timeout. In ms; the
// timer resets on every data event, so only a genuinely dead client trips it.
idleTimeout: Number.isFinite(Number(process.env.DEV_DB_IDLE_TIMEOUT_MS))
? Number(process.env.DEV_DB_IDLE_TIMEOUT_MS)
: 30_000,

});

await server.start();
Expand Down
109 changes: 109 additions & 0 deletions apps/cloud/src/db/db.close.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Contract: the postgres pool finalizer must AWAIT the connection teardown.
//
// The cloud MCP auth seam (`makeMcpOrganizationAuthServices`) builds a fresh
// postgres pool on EVERY `/mcp` request and closes it in its `acquireRelease`
// finalizer. That finalizer used to be fire-and-forget (`Effect.runFork(
// sql.end({ timeout: 0 }))`): it returned before the socket was torn down, so
// under sustained MCP load closed-but-unreaped sockets piled up against the
// dev PGlite server (effectively single-connection) faster than it reaped
// them. New connects queued behind the backlog, request latency climbed into
// the tens of seconds, and the e2e cloud dev stack hung after a few minutes:
// the CI cascade flake.
//
// These tests characterize the contract the old fire-and-forget close violated
// (`closePostgres` itself is new alongside them): it must (a) call `sql.end`
// with a NON-zero drain window (a clean Terminate, not an abandon) and (b)
// return an Effect that does not complete until `sql.end` has resolved, even
// when the teardown takes real wall-clock time. All asserted with a fake `sql`
// whose `end()` completion is observable, so the tests are fast and need no
// live database.

import { describe, expect, it } from "@effect/vitest";
import { Effect, Exit } from "effect";

import { POSTGRES_END_TIMEOUT_SECONDS, closePostgres } from "./db";

describe("closePostgres", () => {
it.effect("passes a non-zero drain window to sql.end (clean Terminate, not abandon)", () =>
Effect.gen(function* () {
let received: { timeout?: number } | undefined;
const fakeSql = {
end: (options?: { timeout?: number }) => {
received = options;
return Promise.resolve();
},
};

yield* closePostgres(fakeSql);

expect(received?.timeout).toBe(POSTGRES_END_TIMEOUT_SECONDS);
expect(POSTGRES_END_TIMEOUT_SECONDS).toBeGreaterThan(0);
}),
);

it.effect("does not complete until sql.end resolves (awaits the teardown)", () =>
Effect.gen(function* () {
// `end` records an ordering marker only after an async tick. If
// `closePostgres` awaits it, the "close completed" marker lands AFTER the
// "end resolved" marker. The old fire-and-forget close returned before
// `end` ran, so "close completed" would land FIRST.
const order: string[] = [];
const fakeSql = {
end: () =>
// Defer resolution across a microtask so a non-awaiting close would
// observably finish before this runs.
Promise.resolve()
.then(() => Promise.resolve())
.then(() => {
order.push("end-resolved");
}),
};

yield* closePostgres(fakeSql);
order.push("close-completed");

// Awaiting the teardown means end resolved strictly before close returned.
expect(order).toEqual(["end-resolved", "close-completed"]);
}),
);

it.effect("awaits a teardown that takes real wall-clock time (bounded by the ceiling)", () =>
Effect.gen(function* () {
// `end` resolves only after a real timer delay, not just a microtask.
// This pins the "we await to completion, the timeout is only a ceiling"
// contract: closePostgres must stay suspended across the delay rather
// than resolving early.
const order: string[] = [];
const fakeSql = {
end: () =>
new Promise<void>((resolve) => {
setTimeout(() => {
order.push("end-resolved");
resolve();
}, 75);
}),
};

const startedAt = Date.now();
yield* closePostgres(fakeSql);
order.push("close-completed");

expect(order).toEqual(["end-resolved", "close-completed"]);
// Slack of a few ms: platform timers may fire marginally early.
expect(Date.now() - startedAt).toBeGreaterThanOrEqual(70);
}),
);

it.effect("swallows sql.end failures (a teardown error must not fail the request scope)", () =>
Effect.gen(function* () {
const fakeSql = {
// A rejected teardown (connection already gone) must not surface as a
// scope failure.
// oxlint-disable-next-line executor/no-promise-reject -- test fake: model `sql.end` (a raw postgres.js promise) rejecting
end: () => new Promise<void>((_resolve, reject) => reject("connection already gone")),
};
const exit = yield* Effect.exit(closePostgres(fakeSql));
expect(Exit.isSuccess(exit)).toBe(true);
}),
);
});
49 changes: 42 additions & 7 deletions apps/cloud/src/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,53 @@ const makeSql = (): Sql =>
onnotice: () => undefined,
});

/**
* Graceful drain window (seconds) handed to `postgres`'s `sql.end`. Small but
* non-zero: `timeout: 0` closes immediately without waiting for a clean
* Terminate, which leaves the socket half-closed for the server to reap on its
* own schedule. A short drain lets postgres.js finish the wire teardown.
*
* This is a `Promise.race` CEILING, not a fixed wait: postgres.js races
* `end({ timeout })` against the actual teardown, and an idle connection's
* `end()` calls `terminate()` immediately and resolves as soon as the socket
* closes (sub-millisecond). Awaiting it in the request scope therefore adds no
* meaningful latency on the common path; the 5s only bounds how long a
* connection still mid-query can hold the scope open.
*/
export const POSTGRES_END_TIMEOUT_SECONDS = 5;

/**
* Close a postgres pool and AWAIT its teardown.
*
* The `DbService` layers ({@link DbService.Live}, {@link makeDbLayer}) run this
* as their `acquireRelease` finalizer. The MCP auth seam
* (`makeMcpOrganizationAuthServices`) builds a FRESH pool on EVERY `/mcp`
* request, so this finalizer runs per request under sustained load.
*
* It used to be fire-and-forget (`Effect.runFork(sql.end({ timeout: 0 }))`),
* which returned before the connection was actually torn down. The abandoned
* sockets piled up against the dev PGlite server (effectively single-connection)
* faster than it reaped them; new connects then queued behind the backlog, so
* request latency climbed into the tens of seconds and the stack eventually
* hung — the CI e2e "cloud dev stack degrades after minutes of sustained load"
* cascade. Awaiting the close bounds the number of live-plus-closing sockets to
* what is actually in flight. It runs inside the request's own Effect scope, so
* it respects workerd's per-request I/O rule.
*/
export const closePostgres = (sql: Pick<Sql, "end">): Effect.Effect<void> =>
Effect.ignore(
Effect.tryPromise({
try: () => sql.end({ timeout: POSTGRES_END_TIMEOUT_SECONDS }),
catch: (cause) => cause,
}),
);

const makePostgresResource = (): DbResource => {
const sql = makeSql();
return {
sql,
db: drizzle(sql, { schema: combinedSchema }) as DrizzleDb,
close: () =>
Effect.ignore(
Effect.tryPromise({
try: () => sql.end({ timeout: 0 }),
catch: (cause) => cause,
}),
),
close: () => closePostgres(sql),
};
};

Expand Down
94 changes: 94 additions & 0 deletions apps/cloud/src/db/dev-db-socket-concurrency.node.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Regression for the dev-db PGlite socket protocol-interleaving bug (patched
// in patches/@electric-sql%2Fpglite-socket@0.1.4.patch).
//
// PGLiteSocketServer's QueryQueueManager used to enqueue each postgres wire
// FRAME (Parse, Bind, Execute, Sync) as its own queue entry against the one
// shared PGlite session. With more than one connection (the dev-db now allows
// many — see scripts/dev-db.ts maxConnections), two clients' extended-protocol
// pipelines interleaved: client A's Parse (5 params) ... client B's Parse
// (1 param) ... A's Bind now hits B's unnamed statement:
//
// PostgresError: bind message supplies 5 parameters, but prepared
// statement "" requires 1
//
// which surfaced in e2e as random 500s ("Failed to load tools", StorageError)
// on whichever request lost the race — the residual per-spec CI flakes after
// the connection-storm fix. The patch batches all frames of one socket data
// event into a single queue entry and adds handler affinity while a pipeline
// is open, so one client's Parse..Sync executes atomically.
//
// This test drives concurrent clients issuing unprepared parameterized queries
// with DIFFERENT parameter counts (the exact drizzle/postgres-js shape) through
// one PGLiteSocketServer and asserts zero protocol corruption.

import { describe, expect, it } from "@effect/vitest";
import { PGlite } from "@electric-sql/pglite";
import { PGLiteSocketServer } from "@electric-sql/pglite-socket";
import postgres from "postgres";

const PORT = 45998;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Hardcoded port within e2e claiming range

PORT = 45998 maps to offset 8 of block 399 in the 42000–45999 e2e claiming range. If this test runs while the e2e globalsetup happens to walk to block 399 and claim that port, the server.listen() call on 127.0.0.1:45998 races with the e2e service binding. The new bindProbe would detect the unit-test server via the connect-probe and skip block 399, but the inverse — e2e claims the block first, releases the bind probes, and then the test tries to bind — would surface as a spurious test failure. Picking a port outside the claiming range (e.g. ≥46000 or <42000) removes the ambiguity entirely.

const CLIENTS = 6;
const QUERIES_PER_CLIENT = 40;

describe("dev-db PGlite socket under concurrent connections", () => {
it(
"serves interleaved multi-connection pipelines without protocol corruption",
{ timeout: 60_000 },
async () => {
const db = await PGlite.create();
const server = new PGLiteSocketServer({
db,
port: PORT,
host: "127.0.0.1",
maxConnections: 100,
});
await server.start();

let ok = 0;
const errors: string[] = [];

const worker = async (id: number) => {
const sql = postgres(`postgres://postgres:postgres@127.0.0.1:${PORT}/postgres`, {
max: 1,
idle_timeout: 0,
connect_timeout: 10,
fetch_types: false,
prepare: true,
onnotice: () => undefined,
});
// oxlint-disable-next-line executor/no-try-catch-or-throw -- test boundary: postgres.js is promise-native and the socket must be closed on every path
try {
for (let q = 0; q < QUERIES_PER_CLIENT; q++) {
// Alternate 1-param and 5-param unprepared queries: maximally
// collision-prone unnamed-statement shapes across connections.
if ((id + q) % 2 === 0) {
await sql.unsafe(`select $1::int as one`, [1]);
} else {
await sql.unsafe(`select $1::int, $2::text, $3::text, $4::text, $5::text`, [
1,
"b",
"c",
"d",
"e",
]);
}
ok++;
}
} catch (cause) {
// oxlint-disable-next-line executor/no-unknown-error-message -- test boundary: the raw PostgresError message IS the assertion payload
errors.push(String(cause));
} finally {
// oxlint-disable-next-line executor/no-promise-catch -- test boundary: postgres.js is promise-native; a failed teardown must not mask the assertion
await sql.end({ timeout: 5 }).catch(() => {});
}
};

await Promise.all(Array.from({ length: CLIENTS }, (_, i) => worker(i)));
await server.stop();
await db.close();

expect(errors, `protocol corruption under concurrency:\n${errors.join("\n")}`).toEqual([]);
expect(ok).toBe(CLIENTS * QUERIES_PER_CLIENT);
},
);
});
1 change: 1 addition & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions e2e/cloud/billing-trial-checkout-stale.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ scenario(
await step("A fresh org is offered the Team free trial", async () => {
// Billing requests are org-scoped via the URL slug header, so reach the
// plans page through the org-scoped URL (a bare /billing/plans would fire
// the first fetch before the slug resolves and 401). Land on "/" to
// canonicalize, then open the slug-scoped plans page.
// the first fetch before the slug resolves and 401). Land on "/" and WAIT
// for the shell to canonicalize onto the slug before reading it:
// networkidle only proves the network went quiet, not that the client-side
// redirect ran, and reading too early navigates to the literal
// "/undefined/billing/plans" — the billing fetches fire under the bogus
// slug, fail, and are never refetched (autumn-js staleTime 60s), leaving
// the plans grid empty forever.
await page.goto("/", { waitUntil: "networkidle" });
await page.waitForURL((url) => /^\/[a-z0-9-]+\/?$/.test(url.pathname), {
timeout: 30_000,
});
const slug = new URL(page.url()).pathname.split("/").filter(Boolean)[0];
await page.goto(`/${slug}/billing/plans`, { waitUntil: "networkidle" });
await page.getByRole("heading", { name: "Choose a plan" }).waitFor();
Expand Down
31 changes: 30 additions & 1 deletion e2e/cloud/oauth-callback-org-scope.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,48 @@ scenario(
});

await step("Start OAuth from the original organization's add-connection flow", async () => {
await page.goto(`/${orgA.slug}/integrations/${String(integration)}?addAccount=1`, {
// Land WITHOUT ?addAccount=1 and let the org scope settle first. The
// session cookie still points at org B, so this navigation bounces:
// first paint under the cookie's org B, OrgSlugGate canonicalizes,
// then the URL-scoped /account/me settles auth back on org A. A modal
// auto-opened by the deep link would fire its /api/oauth/clients fetch
// DURING that transient org-B scope, get org B's empty app list, never
// refetch, and leave "Connect with OAuth" disabled forever (the flake
// this step used to have). Waiting for org A's shell before opening
// the modal makes every modal fetch run under the settled org A scope.
await page.goto(`/${orgA.slug}/integrations/${String(integration)}`, {
waitUntil: "networkidle",
});
await page.getByRole("button", { name: new RegExp(escapeRegExp(orgA.name)) }).waitFor({
timeout: 30_000,
});
await page.waitForURL((url) => url.pathname.startsWith(`/${orgA.slug}/`), {
timeout: 30_000,
});
await page.getByRole("button", { name: "Add connection" }).click();
await page.getByRole("heading", { name: /Add connection/ }).waitFor({
timeout: 30_000,
});
// The footer button enables once the registered OAuth app has loaded
// and is selected; wait for that state instead of racing the click
// against the apps fetch.
await page
.getByRole("button", { name: "Connect with OAuth", disabled: false })
.waitFor({ timeout: 30_000 });
// Armed BEFORE the click so the navigation can never outrun the wait.
const authorizeRequest = page.waitForRequest(
(request) => {
const url = new URL(request.url());
return url.origin === new URL(oauth.issuerUrl).origin && url.pathname === "/authorize";
},
{ timeout: 30_000 },
);
// If the click throws first (e.g. the button never enables), the armed
// wait would otherwise time out later as an UNHANDLED rejection and
// pollute the run with phantom errors. Mark it handled here; the
// `await authorizeRequest` below still surfaces its failure on the
// success path.
authorizeRequest.catch(() => {});
await page.getByRole("button", { name: "Connect with OAuth" }).click();
providerAuthorizeUrl = new URL((await authorizeRequest).url());
await page.waitForURL(
Expand Down
Loading
Loading