From 532b9ea0eefcf4dc1bcb29f41a4539ed5a520e50 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 9 Jun 2026 21:48:58 -0700 Subject: [PATCH 1/2] feat(realtime): preflight schema-compatibility check on startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The socket service authorizes every connection with a full-row query against the workflow table. When a deploy ships a realtime image whose compiled schema is ahead of/behind the live DB (e.g. a column dropped by a migration the image predates), that query fails on every request and silently breaks persistence — yet the process stays up and the shallow /health probe keeps returning 200, so the deploy looks healthy while serving nothing. Run one representative workflow query before listen(): a schema mismatch throws, propagates to the entrypoint, and the task exits non-zero and never goes healthy, so CodeDeploy auto-rolls-back instead of shifting traffic onto broken tasks. Schema-class errors (undefined column/table/function) fail fast; connection-class errors retry with backoff so a cold DB at boot does not flap. Runs once at startup, never on the per-probe LB health check, to avoid a DB blip mass- terminating the fleet (cascading failure). --- apps/realtime/src/database/preflight.test.ts | 91 ++++++++++++++++++++ apps/realtime/src/database/preflight.ts | 81 +++++++++++++++++ apps/realtime/src/index.ts | 3 + 3 files changed, 175 insertions(+) create mode 100644 apps/realtime/src/database/preflight.test.ts create mode 100644 apps/realtime/src/database/preflight.ts diff --git a/apps/realtime/src/database/preflight.test.ts b/apps/realtime/src/database/preflight.test.ts new file mode 100644 index 00000000000..8d412d442dd --- /dev/null +++ b/apps/realtime/src/database/preflight.test.ts @@ -0,0 +1,91 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockLimit } = vi.hoisted(() => ({ + mockLimit: vi.fn(), +})) + +vi.mock('@sim/db', () => ({ + db: { + select: () => ({ + from: () => ({ + limit: mockLimit, + }), + }), + }, +})) + +vi.mock('@sim/db/schema', () => ({ + workflow: {}, +})) + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), +})) + +vi.mock('@sim/utils/helpers', () => ({ + sleep: vi.fn().mockResolvedValue(undefined), +})) + +import { sleep } from '@sim/utils/helpers' +import { assertSchemaCompatibility } from '@/database/preflight' + +/** Builds a Postgres-shaped error carrying a SQLSTATE `code`, as postgres.js throws. */ +function pgError(code: string): Error & { code: string } { + return Object.assign(new Error(`pg error ${code}`), { code }) +} + +describe('assertSchemaCompatibility', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('resolves when the representative schema query succeeds', async () => { + mockLimit.mockResolvedValueOnce([]) + + await expect(assertSchemaCompatibility()).resolves.toBeUndefined() + + expect(mockLimit).toHaveBeenCalledTimes(1) + }) + + it('throws immediately on an undefined-column mismatch without retrying', async () => { + mockLimit.mockRejectedValue(pgError('42703')) + + await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/) + + expect(mockLimit).toHaveBeenCalledTimes(1) + expect(sleep).not.toHaveBeenCalled() + }) + + it('throws immediately on an undefined-table mismatch', async () => { + mockLimit.mockRejectedValue(pgError('42P01')) + + await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/) + + expect(mockLimit).toHaveBeenCalledTimes(1) + }) + + it('retries transient connection errors and resolves once reachable', async () => { + mockLimit + .mockRejectedValueOnce(pgError('ECONNREFUSED')) + .mockRejectedValueOnce(pgError('ECONNREFUSED')) + .mockResolvedValueOnce([]) + + await expect(assertSchemaCompatibility()).resolves.toBeUndefined() + + expect(mockLimit).toHaveBeenCalledTimes(3) + expect(sleep).toHaveBeenCalledTimes(2) + }) + + it('throws after exhausting retries when the database stays unreachable', async () => { + mockLimit.mockRejectedValue(pgError('ECONNREFUSED')) + + await expect(assertSchemaCompatibility()).rejects.toThrow(/database unreachable/) + + expect(mockLimit).toHaveBeenCalledTimes(5) + }) +}) diff --git a/apps/realtime/src/database/preflight.ts b/apps/realtime/src/database/preflight.ts new file mode 100644 index 00000000000..bc60d0d5d58 --- /dev/null +++ b/apps/realtime/src/database/preflight.ts @@ -0,0 +1,81 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { getErrorMessage } from '@sim/utils/errors' +import { sleep } from '@sim/utils/helpers' +import { backoffWithJitter } from '@sim/utils/retry' + +const logger = createLogger('SocketPreflight') + +/** + * Maximum attempts for the schema canary when the database is merely unreachable. + * Connection-class failures are retried; schema-class failures fail immediately. + */ +const MAX_CONNECT_ATTEMPTS = 5 + +/** + * Postgres SQLSTATE codes meaning the deployed image's compiled schema disagrees + * with the live database (undefined column, table, or function). These never + * self-heal, so retrying only delays an inevitable startup failure. + */ +const SCHEMA_MISMATCH_CODES = new Set(['42703', '42P01', '42883']) + +function isSchemaMismatch(error: unknown): boolean { + const code = (error as { code?: unknown })?.code + return typeof code === 'string' && SCHEMA_MISMATCH_CODES.has(code) +} + +/** + * Verifies, before the server accepts traffic, that the deployed image's schema + * is compatible with the live database — throwing if it is not. + * + * Every socket is authorized against the `workflow` table through a full-row + * drizzle projection. If the image's compiled schema is ahead of (or behind) the + * database — e.g. a column dropped by a migration the image predates — that query + * fails on every request and silently breaks persistence, yet the process stays + * up and the shallow `/health` probe keeps returning 200. The fleet looks healthy + * while serving nothing. + * + * Running one representative query at startup turns that latent, per-request + * failure into an immediate startup failure: the throw propagates to the server + * entrypoint, the task exits non-zero and never becomes healthy, and the deploy's + * health gate never flips — so CodeDeploy auto-rolls-back instead of shifting + * traffic onto broken tasks. + * + * Deliberately invoked once at startup and never from the per-probe load-balancer + * health check: a deep dependency check on every probe would let a transient + * database blip mass-terminate the whole fleet (cascading failure). + * + * @throws when the schema is incompatible, or the database stays unreachable + * across {@link MAX_CONNECT_ATTEMPTS} attempts. + */ +export async function assertSchemaCompatibility(): Promise { + let lastError: unknown + + for (let attempt = 1; attempt <= MAX_CONNECT_ATTEMPTS; attempt++) { + try { + await db.select().from(workflow).limit(1) + logger.info('Schema-compatibility check passed') + return + } catch (error) { + lastError = error + + if (isSchemaMismatch(error)) { + throw new Error( + `Deployed image is incompatible with the live database schema: ${getErrorMessage(error)}` + ) + } + + const delay = backoffWithJitter(attempt, null) + logger.warn( + `Schema-compatibility check could not reach the database (attempt ${attempt}/${MAX_CONNECT_ATTEMPTS}), retrying in ${Math.round(delay)}ms`, + getErrorMessage(error) + ) + await sleep(delay) + } + } + + throw new Error( + `Schema-compatibility check failed after ${MAX_CONNECT_ATTEMPTS} attempts — database unreachable: ${getErrorMessage(lastError)}` + ) +} diff --git a/apps/realtime/src/index.ts b/apps/realtime/src/index.ts index 7232eb36be5..e43184c1f02 100644 --- a/apps/realtime/src/index.ts +++ b/apps/realtime/src/index.ts @@ -2,6 +2,7 @@ import { createServer } from 'http' import { createLogger } from '@sim/logger' import type { Server as SocketIOServer } from 'socket.io' import { createSocketIOServer, shutdownSocketIOAdapter } from '@/config/socket' +import { assertSchemaCompatibility } from '@/database/preflight' import { env } from '@/env' import { setupAllHandlers } from '@/handlers' import { type AuthenticatedSocket, authenticateSocket } from '@/middleware/auth' @@ -93,6 +94,8 @@ async function main() { setupAllHandlers(socket, roomManager) }) + await assertSchemaCompatibility() + httpServer.listen(PORT, '0.0.0.0', () => { logger.info(`Socket.IO server running on port ${PORT}`) logger.info(`Health check available at: http://localhost:${PORT}/health`) From 89676491e103917010c7af80d42b332d73d64352 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 9 Jun 2026 21:56:00 -0700 Subject: [PATCH 2/2] fix(realtime): unwrap cause for schema codes, drop sleep after final attempt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - isSchemaMismatch now walks the error.cause chain — drizzle wraps the driver error, so the SQLSTATE often lives on the inner cause, not the outer throw. Without this a wrapped 42703/42P01 was retried 5x and mis-reported as "database unreachable" instead of failing fast. - No longer sleeps after the final failed attempt (~6-10s of dead wait that undermined the fail-fast contract); sleep now only happens between attempts. - Tests: assert sleep is called exactly 4 times on exhaustion, and add a wrapped-cause fail-fast case. --- apps/realtime/src/database/preflight.test.ts | 15 ++++++++++++++ apps/realtime/src/database/preflight.ts | 21 ++++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/apps/realtime/src/database/preflight.test.ts b/apps/realtime/src/database/preflight.test.ts index 8d412d442dd..f290c2e35bb 100644 --- a/apps/realtime/src/database/preflight.test.ts +++ b/apps/realtime/src/database/preflight.test.ts @@ -39,6 +39,11 @@ function pgError(code: string): Error & { code: string } { return Object.assign(new Error(`pg error ${code}`), { code }) } +/** Mirrors how drizzle wraps the driver error: the SQLSTATE lives on `cause`, not the outer error. */ +function wrappedPgError(code: string): Error { + return new Error('Failed query', { cause: pgError(code) }) +} + describe('assertSchemaCompatibility', () => { beforeEach(() => { vi.clearAllMocks() @@ -69,6 +74,15 @@ describe('assertSchemaCompatibility', () => { expect(mockLimit).toHaveBeenCalledTimes(1) }) + it('detects a schema mismatch wrapped in error.cause and fails fast', async () => { + mockLimit.mockRejectedValue(wrappedPgError('42703')) + + await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/) + + expect(mockLimit).toHaveBeenCalledTimes(1) + expect(sleep).not.toHaveBeenCalled() + }) + it('retries transient connection errors and resolves once reachable', async () => { mockLimit .mockRejectedValueOnce(pgError('ECONNREFUSED')) @@ -87,5 +101,6 @@ describe('assertSchemaCompatibility', () => { await expect(assertSchemaCompatibility()).rejects.toThrow(/database unreachable/) expect(mockLimit).toHaveBeenCalledTimes(5) + expect(sleep).toHaveBeenCalledTimes(4) }) }) diff --git a/apps/realtime/src/database/preflight.ts b/apps/realtime/src/database/preflight.ts index bc60d0d5d58..44eb2ab154e 100644 --- a/apps/realtime/src/database/preflight.ts +++ b/apps/realtime/src/database/preflight.ts @@ -20,9 +20,22 @@ const MAX_CONNECT_ATTEMPTS = 5 */ const SCHEMA_MISMATCH_CODES = new Set(['42703', '42P01', '42883']) +/** + * Walks the `cause` chain so a SQLSTATE code is found even when drizzle wraps the + * driver error (the code commonly lives on the inner `cause`, not the outer throw). + */ function isSchemaMismatch(error: unknown): boolean { - const code = (error as { code?: unknown })?.code - return typeof code === 'string' && SCHEMA_MISMATCH_CODES.has(code) + const seen = new Set() + let current: unknown = error + while (current && typeof current === 'object' && !seen.has(current)) { + seen.add(current) + const code = (current as { code?: unknown }).code + if (typeof code === 'string' && SCHEMA_MISMATCH_CODES.has(code)) { + return true + } + current = (current as { cause?: unknown }).cause + } + return false } /** @@ -66,6 +79,10 @@ export async function assertSchemaCompatibility(): Promise { ) } + if (attempt === MAX_CONNECT_ATTEMPTS) { + break + } + const delay = backoffWithJitter(attempt, null) logger.warn( `Schema-compatibility check could not reach the database (attempt ${attempt}/${MAX_CONNECT_ATTEMPTS}), retrying in ${Math.round(delay)}ms`,