diff --git a/apps/realtime/src/database/preflight.test.ts b/apps/realtime/src/database/preflight.test.ts new file mode 100644 index 00000000000..f290c2e35bb --- /dev/null +++ b/apps/realtime/src/database/preflight.test.ts @@ -0,0 +1,106 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockLimit } = vi.hoisted(() => ({ + mockLimit: vi.fn(), +})) + +vi.mock('@sim/db', () => ({ + db: { + select: () => ({ + from: () => ({ + limit: mockLimit, + }), + }), + }, +})) + +vi.mock('@sim/db/schema', () => ({ + workflow: {}, +})) + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), +})) + +vi.mock('@sim/utils/helpers', () => ({ + sleep: vi.fn().mockResolvedValue(undefined), +})) + +import { sleep } from '@sim/utils/helpers' +import { assertSchemaCompatibility } from '@/database/preflight' + +/** Builds a Postgres-shaped error carrying a SQLSTATE `code`, as postgres.js throws. */ +function pgError(code: string): Error & { code: string } { + return Object.assign(new Error(`pg error ${code}`), { code }) +} + +/** Mirrors how drizzle wraps the driver error: the SQLSTATE lives on `cause`, not the outer error. */ +function wrappedPgError(code: string): Error { + return new Error('Failed query', { cause: pgError(code) }) +} + +describe('assertSchemaCompatibility', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('resolves when the representative schema query succeeds', async () => { + mockLimit.mockResolvedValueOnce([]) + + await expect(assertSchemaCompatibility()).resolves.toBeUndefined() + + expect(mockLimit).toHaveBeenCalledTimes(1) + }) + + it('throws immediately on an undefined-column mismatch without retrying', async () => { + mockLimit.mockRejectedValue(pgError('42703')) + + await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/) + + expect(mockLimit).toHaveBeenCalledTimes(1) + expect(sleep).not.toHaveBeenCalled() + }) + + it('throws immediately on an undefined-table mismatch', async () => { + mockLimit.mockRejectedValue(pgError('42P01')) + + await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/) + + expect(mockLimit).toHaveBeenCalledTimes(1) + }) + + it('detects a schema mismatch wrapped in error.cause and fails fast', async () => { + mockLimit.mockRejectedValue(wrappedPgError('42703')) + + await expect(assertSchemaCompatibility()).rejects.toThrow(/incompatible with the live database/) + + expect(mockLimit).toHaveBeenCalledTimes(1) + expect(sleep).not.toHaveBeenCalled() + }) + + it('retries transient connection errors and resolves once reachable', async () => { + mockLimit + .mockRejectedValueOnce(pgError('ECONNREFUSED')) + .mockRejectedValueOnce(pgError('ECONNREFUSED')) + .mockResolvedValueOnce([]) + + await expect(assertSchemaCompatibility()).resolves.toBeUndefined() + + expect(mockLimit).toHaveBeenCalledTimes(3) + expect(sleep).toHaveBeenCalledTimes(2) + }) + + it('throws after exhausting retries when the database stays unreachable', async () => { + mockLimit.mockRejectedValue(pgError('ECONNREFUSED')) + + await expect(assertSchemaCompatibility()).rejects.toThrow(/database unreachable/) + + expect(mockLimit).toHaveBeenCalledTimes(5) + expect(sleep).toHaveBeenCalledTimes(4) + }) +}) diff --git a/apps/realtime/src/database/preflight.ts b/apps/realtime/src/database/preflight.ts new file mode 100644 index 00000000000..44eb2ab154e --- /dev/null +++ b/apps/realtime/src/database/preflight.ts @@ -0,0 +1,98 @@ +import { db } from '@sim/db' +import { workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { getErrorMessage } from '@sim/utils/errors' +import { sleep } from '@sim/utils/helpers' +import { backoffWithJitter } from '@sim/utils/retry' + +const logger = createLogger('SocketPreflight') + +/** + * Maximum attempts for the schema canary when the database is merely unreachable. + * Connection-class failures are retried; schema-class failures fail immediately. + */ +const MAX_CONNECT_ATTEMPTS = 5 + +/** + * Postgres SQLSTATE codes meaning the deployed image's compiled schema disagrees + * with the live database (undefined column, table, or function). These never + * self-heal, so retrying only delays an inevitable startup failure. + */ +const SCHEMA_MISMATCH_CODES = new Set(['42703', '42P01', '42883']) + +/** + * Walks the `cause` chain so a SQLSTATE code is found even when drizzle wraps the + * driver error (the code commonly lives on the inner `cause`, not the outer throw). + */ +function isSchemaMismatch(error: unknown): boolean { + const seen = new Set() + let current: unknown = error + while (current && typeof current === 'object' && !seen.has(current)) { + seen.add(current) + const code = (current as { code?: unknown }).code + if (typeof code === 'string' && SCHEMA_MISMATCH_CODES.has(code)) { + return true + } + current = (current as { cause?: unknown }).cause + } + return false +} + +/** + * Verifies, before the server accepts traffic, that the deployed image's schema + * is compatible with the live database — throwing if it is not. + * + * Every socket is authorized against the `workflow` table through a full-row + * drizzle projection. If the image's compiled schema is ahead of (or behind) the + * database — e.g. a column dropped by a migration the image predates — that query + * fails on every request and silently breaks persistence, yet the process stays + * up and the shallow `/health` probe keeps returning 200. The fleet looks healthy + * while serving nothing. + * + * Running one representative query at startup turns that latent, per-request + * failure into an immediate startup failure: the throw propagates to the server + * entrypoint, the task exits non-zero and never becomes healthy, and the deploy's + * health gate never flips — so CodeDeploy auto-rolls-back instead of shifting + * traffic onto broken tasks. + * + * Deliberately invoked once at startup and never from the per-probe load-balancer + * health check: a deep dependency check on every probe would let a transient + * database blip mass-terminate the whole fleet (cascading failure). + * + * @throws when the schema is incompatible, or the database stays unreachable + * across {@link MAX_CONNECT_ATTEMPTS} attempts. + */ +export async function assertSchemaCompatibility(): Promise { + let lastError: unknown + + for (let attempt = 1; attempt <= MAX_CONNECT_ATTEMPTS; attempt++) { + try { + await db.select().from(workflow).limit(1) + logger.info('Schema-compatibility check passed') + return + } catch (error) { + lastError = error + + if (isSchemaMismatch(error)) { + throw new Error( + `Deployed image is incompatible with the live database schema: ${getErrorMessage(error)}` + ) + } + + if (attempt === MAX_CONNECT_ATTEMPTS) { + break + } + + const delay = backoffWithJitter(attempt, null) + logger.warn( + `Schema-compatibility check could not reach the database (attempt ${attempt}/${MAX_CONNECT_ATTEMPTS}), retrying in ${Math.round(delay)}ms`, + getErrorMessage(error) + ) + await sleep(delay) + } + } + + throw new Error( + `Schema-compatibility check failed after ${MAX_CONNECT_ATTEMPTS} attempts — database unreachable: ${getErrorMessage(lastError)}` + ) +} diff --git a/apps/realtime/src/index.ts b/apps/realtime/src/index.ts index 7232eb36be5..e43184c1f02 100644 --- a/apps/realtime/src/index.ts +++ b/apps/realtime/src/index.ts @@ -2,6 +2,7 @@ import { createServer } from 'http' import { createLogger } from '@sim/logger' import type { Server as SocketIOServer } from 'socket.io' import { createSocketIOServer, shutdownSocketIOAdapter } from '@/config/socket' +import { assertSchemaCompatibility } from '@/database/preflight' import { env } from '@/env' import { setupAllHandlers } from '@/handlers' import { type AuthenticatedSocket, authenticateSocket } from '@/middleware/auth' @@ -93,6 +94,8 @@ async function main() { setupAllHandlers(socket, roomManager) }) + await assertSchemaCompatibility() + httpServer.listen(PORT, '0.0.0.0', () => { logger.info(`Socket.IO server running on port ${PORT}`) logger.info(`Health check available at: http://localhost:${PORT}/health`)