From 018cb59a55e52d48e4f221280f9bd638fc67fbe5 Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Thu, 2 Jul 2026 11:52:50 -0700 Subject: [PATCH 1/6] Add execution balance check to billing service --- apps/cloud/src/extensions/billing/service.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/cloud/src/extensions/billing/service.ts b/apps/cloud/src/extensions/billing/service.ts index 2d27248f4..1731fe4fa 100644 --- a/apps/cloud/src/extensions/billing/service.ts +++ b/apps/cloud/src/extensions/billing/service.ts @@ -22,6 +22,9 @@ export class AutumnError extends Data.TaggedError("AutumnError")<{ export type IAutumnService = Readonly<{ use: (fn: (client: Autumn) => Promise) => Effect.Effect; + checkExecutionBalance: ( + organizationId: string, + ) => Effect.Effect<{ readonly allowed: boolean }, AutumnError, never>; /** * Fire-and-forget-safe execution usage tracker. Errors are caught and * logged; the returned Effect never fails. Callers typically @@ -44,6 +47,7 @@ const make = Effect.sync(() => { ); return { use: () => notConfigured, + checkExecutionBalance: () => notConfigured, trackExecution: () => Effect.void, } satisfies IAutumnService; } @@ -80,7 +84,16 @@ const make = Effect.sync(() => { ); }).pipe(Effect.withSpan("autumn.trackExecution")); - return { use, trackExecution } satisfies IAutumnService; + const checkExecutionBalance = (organizationId: string) => + Effect.gen(function* () { + yield* Effect.annotateCurrentSpan({ "autumn.customer.id": organizationId }); + const check = yield* use((c) => + c.check({ customerId: organizationId, featureId: "executions" }), + ); + return { allowed: check.allowed }; + }).pipe(Effect.withSpan("autumn.checkExecutionBalance")); + + return { use, checkExecutionBalance, trackExecution } satisfies IAutumnService; }); export class AutumnService extends Context.Service()( From 45f9afa2e3475e0f8a386244e7804d4b65e16166 Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Thu, 2 Jul 2026 12:56:07 -0700 Subject: [PATCH 2/6] Add pre-execution balance gate for org execution limits Check the org's Autumn execution balance before execute/executeWithPause (never resume, so paused executions can always complete). Blocked orgs get a descriptive ExecuteResult.error instead of running. Fails open on any billing error or a 2s timeout, and caches per-org outcomes for 60s. --- apps/cloud/src/engine/execution-gate.ts | 205 ++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 apps/cloud/src/engine/execution-gate.ts diff --git a/apps/cloud/src/engine/execution-gate.ts b/apps/cloud/src/engine/execution-gate.ts new file mode 100644 index 000000000..d6e652dd1 --- /dev/null +++ b/apps/cloud/src/engine/execution-gate.ts @@ -0,0 +1,205 @@ +// --------------------------------------------------------------------------- +// Pre-execution balance gate — blocks new executions once an organization has +// used up its plan's included executions. +// +// Usage is tracked to Autumn after every execution (execution-usage.ts), but +// nothing ever CHECKED the balance before running, so a free-tier org could run +// unbounded executions past its quota. This gate consults +// `AutumnService.checkExecutionBalance` before `execute` / `executeWithPause`. +// `resume` is never gated: a paused execution already consumed its quota slot +// when it started, and blocking resume would strand approved work forever. +// +// FAIL OPEN is a hard requirement: any Autumn error, timeout, or missing +// customer/feature allows the execution (logged + Sentry, mirroring +// `trackExecution`'s reporting). Autumn being down must never block +// executions or add meaningful latency, so the check is bounded by a short +// timeout and its outcome is cached per organization. +// +// Error surfacing: the gate fails internally with the typed +// `ExecutionLimitReachedError`, and the engine seam folds it into the +// descriptive `ExecuteResult.error` channel. That is the codebase's mechanism +// for getting a domain failure's message to the MCP client verbatim — engine +// error-channel failures are deliberately rendered opaque by the MCP host +// ("Internal tool error [correlation id]"), exactly like the runtimes fold +// `CodeCompilationError` / `SandboxRuntimeError` into `ExecuteResult.error`. +// --------------------------------------------------------------------------- + +import * as Sentry from "@sentry/cloudflare"; +import { Data, Effect } from "effect"; +import type * as Cause from "effect/Cause"; + +import type { ExecutionEngine, ExecutionResult } from "@executor-js/execution"; + +// The engine's completed-result payload (`ExecuteResult` in codemode-core), +// derived from the public execution types so this app package doesn't need a +// direct dependency on the kernel package that declares it. +type EngineExecuteResult = Extract["result"]; + +// One check per org per minute is fresh enough for a monthly quota; both +// allowed AND blocked outcomes are cached (errors never are). +const BALANCE_CACHE_TTL_MS = 60_000; +// Autumn slower than this => fail open rather than stall a user-facing +// execution behind the billing provider. +const BALANCE_CHECK_TIMEOUT_MS = 2_000; +// Sweep guard so a long-lived worker isolate serving many orgs can't grow the +// cache map unbounded. +const BALANCE_CACHE_MAX_ENTRIES = 10_000; + +export const EXECUTION_LIMIT_BLOCKED_MESSAGE = + "Execution limit reached: your plan's included executions for this billing period are used up. Upgrade your plan or wait for the reset to continue."; + +export class ExecutionLimitReachedError extends Data.TaggedError("ExecutionLimitReachedError")<{ + readonly organizationId: string; + readonly message: string; +}> {} + +/** Internal sentinel for a balance check that exceeded its time budget. */ +class GateCheckTimeoutError extends Data.TaggedError("GateCheckTimeoutError")<{ + readonly timeoutMs: number; +}> {} + +// --------------------------------------------------------------------------- +// Shared gate seam — used by this gate and the rate-limit backstop. +// --------------------------------------------------------------------------- + +export type GateDecision = + | { readonly blocked: false } + | { readonly blocked: true; readonly error: { readonly message: string } }; + +/** + * Wrap an engine so `decide` runs before `execute` / `executeWithPause`. A + * blocked decision short-circuits to a descriptive `ExecuteResult.error` + * (which `formatExecuteResult` renders as a clean `isError` MCP tool result) + * WITHOUT invoking the inner engine — so a blocked execution is neither run + * nor usage-tracked. `resume` and all read-only members pass through + * untouched: paused executions must always be able to complete. + */ +export const withPreExecutionGate = ( + engine: ExecutionEngine, + decide: Effect.Effect, +): ExecutionEngine => ({ + execute: (code, options) => + Effect.flatMap( + decide, + (decision): Effect.Effect => + decision.blocked + ? Effect.succeed({ result: null, error: decision.error.message }) + : engine.execute(code, options), + ), + executeWithPause: (code, options) => + Effect.flatMap( + decide, + (decision): Effect.Effect => + decision.blocked + ? Effect.succeed({ + status: "completed", + result: { result: null, error: decision.error.message }, + }) + : engine.executeWithPause(code, options), + ), + // resume is never gated: paused executions must be able to complete. + resume: (executionId, response) => engine.resume(executionId, response), + getPausedExecution: (executionId) => engine.getPausedExecution(executionId), + pausedExecutionCount: () => engine.pausedExecutionCount(), + hasPausedExecutions: () => engine.hasPausedExecutions(), + getDescription: engine.getDescription, +}); + +// --------------------------------------------------------------------------- +// Balance gate factory +// --------------------------------------------------------------------------- + +export type ExecutionBalanceCheck = ( + organizationId: string, +) => Effect.Effect<{ readonly allowed: boolean }, unknown>; + +/** + * Build a balance gate around `checkBalance` (in production: + * `AutumnService.checkExecutionBalance`). One gate instance holds one + * per-organization outcome cache; the metered decorator layer creates a single + * instance so all engines it decorates share it. + * + * `options.timeoutMs` exists for tests; production uses the default budget. + */ +export const makeExecutionLimitGate = ( + checkBalance: ExecutionBalanceCheck, + options?: { readonly timeoutMs?: number }, +) => { + const timeoutMs = options?.timeoutMs ?? BALANCE_CHECK_TIMEOUT_MS; + const cache = new Map(); + + const writeCache = (organizationId: string, allowed: boolean, nowMs: number): void => { + if (cache.size >= BALANCE_CACHE_MAX_ENTRIES) { + for (const [key, entry] of cache) { + if (entry.expiresAtMs <= nowMs) cache.delete(key); + } + // Still saturated after dropping expired entries: reset rather than grow. + if (cache.size >= BALANCE_CACHE_MAX_ENTRIES) cache.clear(); + } + cache.set(organizationId, { allowed, expiresAtMs: nowMs + BALANCE_CACHE_TTL_MS }); + }; + + const toDecision = (organizationId: string, allowed: boolean): GateDecision => + allowed + ? { blocked: false } + : { + blocked: true, + error: new ExecutionLimitReachedError({ + organizationId, + message: EXECUTION_LIMIT_BLOCKED_MESSAGE, + }), + }; + + const decide = (organizationId: string): Effect.Effect => + Effect.suspend(() => { + const nowMs = Date.now(); + const cached = cache.get(organizationId); + if (cached && cached.expiresAtMs > nowMs) { + return Effect.succeed(toDecision(organizationId, cached.allowed)); + } + return checkBalance(organizationId).pipe( + Effect.timeoutOrElse({ + duration: `${timeoutMs} millis`, + orElse: () => Effect.fail(new GateCheckTimeoutError({ timeoutMs })), + }), + Effect.map(({ allowed }) => { + writeCache(organizationId, allowed, nowMs); + return toDecision(organizationId, allowed); + }), + // FAIL OPEN: Autumn errors, timeouts, and missing customers/features + // must never block executions. Reported like `trackExecution` so a + // billing outage still pages; the error outcome is never cached. + Effect.catch((error: unknown) => + Effect.sync((): GateDecision => { + console.warn("[billing] execution balance check failed open:", error); + Sentry.captureException(error); + return { blocked: false }; + }), + ), + ); + }); + + return { + /** + * Test/diagnostic surface: succeeds when the execution is allowed (or the + * check failed open), fails with the typed `ExecutionLimitReachedError` + * when the organization is out of executions. + */ + check: (organizationId: string): Effect.Effect => + Effect.flatMap(decide(organizationId), (decision) => + decision.blocked + ? Effect.fail( + new ExecutionLimitReachedError({ + organizationId, + message: EXECUTION_LIMIT_BLOCKED_MESSAGE, + }), + ) + : Effect.void, + ), + /** Wrap an engine so the balance gate runs before each new execution. */ + decorate: ( + organizationId: string, + engine: ExecutionEngine, + ): ExecutionEngine => withPreExecutionGate(engine, decide(organizationId)), + }; +}; From 1d8d9d0bff55ed0f480c08cd857a621f94939e35 Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Thu, 2 Jul 2026 12:56:16 -0700 Subject: [PATCH 3/6] Add per-org execution rate-limit backstop 1000 execute calls per org per hour, fixed window, counted in a minimal per-org Durable Object (EXECUTION_RATE_LIMITER, one instance per org via idFromName). Independent of billing so runaway automation is caught even when Autumn is down; fails open if the counter DO errors or is slow. The DO stores a single {windowId, count} record and purges itself by alarm after two idle windows. --- apps/cloud/src/engine/execution-rate-limit.ts | 212 ++++++++++++++++++ apps/cloud/src/env-augment.d.ts | 7 + apps/cloud/src/server.ts | 5 + apps/cloud/wrangler.jsonc | 11 + 4 files changed, 235 insertions(+) create mode 100644 apps/cloud/src/engine/execution-rate-limit.ts diff --git a/apps/cloud/src/engine/execution-rate-limit.ts b/apps/cloud/src/engine/execution-rate-limit.ts new file mode 100644 index 000000000..2f06e9bb9 --- /dev/null +++ b/apps/cloud/src/engine/execution-rate-limit.ts @@ -0,0 +1,212 @@ +// --------------------------------------------------------------------------- +// Per-org execution rate limit — an abuse backstop independent of billing. +// +// The balance gate (execution-gate.ts) depends on Autumn and fails open, so a +// billing outage plus runaway automation could still run unbounded executions. +// This limiter counts `execute` calls per organization in a fixed hourly +// window, backed by a minimal counter Durable Object (cross-session state: +// each MCP session lives in its own DO instance, so an in-memory counter +// would be per-session and trivially bypassed by opening more sessions). +// +// Like the balance gate it FAILS OPEN: an unreachable counter DO, a missing +// binding, or a slow call allows the execution (warn + Sentry). The backstop +// must never take executions down with it. +// --------------------------------------------------------------------------- + +import { DurableObject, env } from "cloudflare:workers"; +import * as Sentry from "@sentry/cloudflare"; +import { Data, Effect } from "effect"; +import type * as Cause from "effect/Cause"; + +import type { ExecutionEngine } from "@executor-js/execution"; + +import { withPreExecutionGate, type GateDecision } from "./execution-gate"; + +// Fixed window: all executions in the same clock hour share one counter. +export const RATE_LIMIT_WINDOW_MS = 3_600_000; +// Calibration: the heaviest legitimate org runs ~1.1k executions per MONTH, +// so 1000 per HOUR is far above any human-driven usage and only trips on +// runaway automation (the incident this backstops: ~18k in 30 days would +// still pass, which is fine — that class of overrun is the balance gate's +// job; this catches tight loops). +export const EXECUTIONS_PER_ORG_PER_HOUR = 1000; +// Counter DO slower than this => fail open rather than stall executions. +const RATE_LIMIT_CHECK_TIMEOUT_MS = 2_000; +// The DO purges its storage this long after the last increment, so idle orgs +// cost nothing. Two windows: long enough that an active window never purges. +const COUNTER_PURGE_AFTER_MS = 2 * RATE_LIMIT_WINDOW_MS; + +export const RATE_LIMIT_BLOCKED_MESSAGE = + "Rate limit exceeded: too many executions this hour. This is an abuse backstop — contact support if you hit this legitimately."; + +export class ExecutionRateLimitExceededError extends Data.TaggedError( + "ExecutionRateLimitExceededError", +)<{ + readonly organizationId: string; + readonly message: string; +}> {} + +/** Internal sentinel for a counter call that exceeded its time budget. */ +class RateLimitCheckTimeoutError extends Data.TaggedError("RateLimitCheckTimeoutError")<{ + readonly timeoutMs: number; +}> {} + +// --------------------------------------------------------------------------- +// Counter Durable Object — one instance per organization (idFromName(orgId)). +// Stores a single { windowId, count } record: an increment in a new window +// resets the count, so old windows never accumulate. An alarm purges storage +// after inactivity. +// --------------------------------------------------------------------------- + +const WINDOW_RECORD_KEY = "window"; + +type WindowRecord = { + readonly windowId: number; + readonly count: number; +}; + +export class ExecutionRateLimiterDO extends DurableObject { + private readonly counterStorage: DurableObjectState["storage"]; + + constructor(ctx: DurableObjectState, doEnv: Env) { + super(ctx, doEnv); + // Kept on an own field (not just inherited `this.ctx`) so tests can run + // the class against a fake storage under the `cloudflare:workers` stub. + this.counterStorage = ctx.storage; + } + + /** Add one execution to `windowId`'s counter and return the new count. */ + async increment(windowId: number): Promise { + const stored = await this.counterStorage.get(WINDOW_RECORD_KEY); + const count = stored && stored.windowId === windowId ? stored.count + 1 : 1; + await this.counterStorage.put(WINDOW_RECORD_KEY, { windowId, count }); + await this.counterStorage.setAlarm(Date.now() + COUNTER_PURGE_AFTER_MS); + return count; + } + + async alarm(): Promise { + await this.counterStorage.deleteAll(); + } +} + +// --------------------------------------------------------------------------- +// Client +// --------------------------------------------------------------------------- + +/** Count one execution for (organizationId, windowId); returns the new count. */ +export type RateLimitIncrement = ( + organizationId: string, + windowId: number, +) => Effect.Effect; + +export type ExecutionRateLimiter = { + readonly check: (organizationId: string) => Effect.Effect; + readonly decorate: ( + organizationId: string, + engine: ExecutionEngine, + ) => ExecutionEngine; +}; + +/** + * Build a rate limiter around an increment function (in production: the + * counter DO; in tests: an in-memory fake). `options` exist for tests. + */ +export const makeExecutionRateLimiter = ( + increment: RateLimitIncrement, + options?: { + readonly limit?: number; + readonly windowMs?: number; + readonly timeoutMs?: number; + readonly now?: () => number; + }, +): ExecutionRateLimiter => { + const limit = options?.limit ?? EXECUTIONS_PER_ORG_PER_HOUR; + const windowMs = options?.windowMs ?? RATE_LIMIT_WINDOW_MS; + const timeoutMs = options?.timeoutMs ?? RATE_LIMIT_CHECK_TIMEOUT_MS; + const now = options?.now ?? Date.now; + + const decide = (organizationId: string): Effect.Effect => + Effect.suspend(() => { + const windowId = Math.floor(now() / windowMs); + return increment(organizationId, windowId).pipe( + Effect.timeoutOrElse({ + duration: `${timeoutMs} millis`, + orElse: () => Effect.fail(new RateLimitCheckTimeoutError({ timeoutMs })), + }), + Effect.map( + (count): GateDecision => + count > limit + ? { + blocked: true, + error: new ExecutionRateLimitExceededError({ + organizationId, + message: RATE_LIMIT_BLOCKED_MESSAGE, + }), + } + : { blocked: false }, + ), + // FAIL OPEN: the backstop must never block executions because its + // counter is unreachable or slow. + Effect.catch((error: unknown) => + Effect.sync((): GateDecision => { + console.warn("[rate-limit] execution rate limit check failed open:", error); + Sentry.captureException(error); + return { blocked: false }; + }), + ), + ); + }); + + return { + check: (organizationId) => + Effect.flatMap(decide(organizationId), (decision) => + decision.blocked + ? Effect.fail( + new ExecutionRateLimitExceededError({ + organizationId, + message: RATE_LIMIT_BLOCKED_MESSAGE, + }), + ) + : Effect.void, + ), + decorate: (organizationId, engine) => withPreExecutionGate(engine, decide(organizationId)), + }; +}; + +// --------------------------------------------------------------------------- +// Cloud wiring — reads the EXECUTION_RATE_LIMITER binding from the worker env. +// --------------------------------------------------------------------------- + +// The DO stub's RPC surface. The binding is declared untyped in +// env-augment.d.ts (matching the BLOBS precedent), so the call site narrows it +// to the one method the class exposes. +type ExecutionRateLimiterStub = { + readonly increment: (windowId: number) => Promise; +}; + +type RateLimiterNamespace = { + readonly idFromName: (name: string) => DurableObjectId; + readonly get: (id: DurableObjectId) => unknown; +}; + +/** + * Production rate limiter backed by the `EXECUTION_RATE_LIMITER` counter DO. + * When the binding is absent (unit-test workers, older local setups) the + * limiter is disabled: every check passes, logged once at construction. + */ +export const makeCloudExecutionRateLimiter = (): ExecutionRateLimiter => { + const namespace = (env as { EXECUTION_RATE_LIMITER?: RateLimiterNamespace }) + .EXECUTION_RATE_LIMITER; + if (!namespace) { + console.warn( + "[rate-limit] EXECUTION_RATE_LIMITER binding missing; execution rate limiting disabled", + ); + return makeExecutionRateLimiter(() => Effect.succeed(0)); + } + return makeExecutionRateLimiter((organizationId, windowId) => + Effect.tryPromise(() => { + const stub = namespace.get(namespace.idFromName(organizationId)) as ExecutionRateLimiterStub; + return stub.increment(windowId); + }), + ); +}; diff --git a/apps/cloud/src/env-augment.d.ts b/apps/cloud/src/env-augment.d.ts index 8911a8303..5e860a0fa 100644 --- a/apps/cloud/src/env-augment.d.ts +++ b/apps/cloud/src/env-augment.d.ts @@ -32,6 +32,13 @@ declare global { // ON; the test workers set "true" so fixtures can reach localhost. ALLOW_LOCAL_NETWORK?: string; + // Per-org execution rate-limit counter DO (wrangler.jsonc + // `durable_objects`). Declared optional here (matching the BLOBS + // precedent) rather than regenerating worker-configuration.d.ts: test + // workers and older local setups run without the binding, and the + // limiter degrades to disabled when absent. + EXECUTION_RATE_LIMITER?: import("@cloudflare/workers-types").DurableObjectNamespace; + // Billing AUTUMN_SECRET_KEY?: string; /** Optional Autumn base-URL override (Autumn emulator in tests/dev). */ diff --git a/apps/cloud/src/server.ts b/apps/cloud/src/server.ts index eb1af1c1d..258e7ff34 100644 --- a/apps/cloud/src/server.ts +++ b/apps/cloud/src/server.ts @@ -55,6 +55,11 @@ export const McpSessionDOSqlite = Sentry.instrumentDurableObjectWithSentry( // (with a `deleted_classes: ["McpSessionDO"]` migration) now that nothing binds it. export class McpSessionDO extends DurableObject {} +// Per-org execution rate-limit counter DO (abuse backstop; migration v3, +// `EXECUTION_RATE_LIMITER` binding). Plain counter, no Sentry wrapper needed: +// its callers already fail open and report errors themselves. +export { ExecutionRateLimiterDO } from "./engine/execution-rate-limit"; + // --------------------------------------------------------------------------- // Worker fetch handler // diff --git a/apps/cloud/wrangler.jsonc b/apps/cloud/wrangler.jsonc index 68ca6297a..e12eb31f2 100644 --- a/apps/cloud/wrangler.jsonc +++ b/apps/cloud/wrangler.jsonc @@ -22,6 +22,13 @@ "name": "MCP_SESSION", "class_name": "McpSessionDOSqlite", }, + // Per-org execution rate-limit counter (abuse backstop). One instance + // per organization via idFromName(orgId); a single { windowId, count } + // record per instance, purged by alarm after inactivity. + { + "name": "EXECUTION_RATE_LIMITER", + "class_name": "ExecutionRateLimiterDO", + }, ], }, // The MCP session DO moved to the Cloudflare Agents (`McpAgent`) base, which @@ -42,6 +49,10 @@ "tag": "v2", "new_sqlite_classes": ["McpSessionDOSqlite"], }, + { + "tag": "v3", + "new_sqlite_classes": ["ExecutionRateLimiterDO"], + }, ], "services": [ { From bc95e26b9bf1774dccb00edbca16cae833cbdfdf Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Thu, 2 Jul 2026 12:56:28 -0700 Subject: [PATCH 4/6] Wire execution guards into the metered engine decorator Order: rate-limit backstop first (cheap counter), then the balance gate, then usage tracking. Guards wrap outside the tracker so a blocked execution is neither run nor tracked. Covers both planes (HTTP executor plane and MCP session DO) since both build engines through this layer. --- .../src/engine/execution-stack-metered.ts | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/apps/cloud/src/engine/execution-stack-metered.ts b/apps/cloud/src/engine/execution-stack-metered.ts index d82632ca9..4ea70b6d4 100644 --- a/apps/cloud/src/engine/execution-stack-metered.ts +++ b/apps/cloud/src/engine/execution-stack-metered.ts @@ -28,19 +28,40 @@ import { import { AutumnService } from "../extensions/billing/service"; import type { DbService } from "../db/db"; import { CloudExecutionSeamsLayer } from "../engine/execution-stack"; +import { makeExecutionLimitGate } from "./execution-gate"; +import { makeCloudExecutionRateLimiter } from "./execution-rate-limit"; import { withExecutionUsageTracking } from "./execution-usage"; -// Usage-metering decorator bound to the billing service. `trackExecution` is -// fire-and-forget (`Effect.runFork`) so the billing call can't stall a -// user-facing execution. +// Usage-metering decorator bound to the billing service, plus the two +// pre-execution guards this layer owns, ordered cheapest first: +// 1. rate-limit backstop (counter DO, independent of billing) +// 2. execution balance gate (Autumn check, cached 60s, fails open) +// 3. usage tracking — fire-and-forget (`Effect.runFork`) so the billing +// call can't stall a user-facing execution. +// The guards wrap OUTSIDE the tracker so a blocked execution is neither run +// nor tracked. One gate/limiter instance per layer build: the balance cache is +// shared by every engine this decorator produces (in the MCP session DO that's +// the session's engines; on the HTTP plane the boot layer builds it once). export const CloudMeteringEngineDecorator: Layer.Layer = Layer.effect(EngineDecorator)( - Effect.map(AutumnService.asEffect(), (autumn): EngineDecorator["Service"] => ({ - decorate: (engine, identity: EngineStackIdentity) => - withExecutionUsageTracking(identity.organizationId, engine, (organizationId) => - Effect.runFork(autumn.trackExecution(organizationId)), - ), - })), + Effect.map(AutumnService.asEffect(), (autumn): EngineDecorator["Service"] => { + const balanceGate = makeExecutionLimitGate((organizationId) => + autumn.checkExecutionBalance(organizationId), + ); + const rateLimiter = makeCloudExecutionRateLimiter(); + return { + decorate: (engine, identity: EngineStackIdentity) => + rateLimiter.decorate( + identity.organizationId, + balanceGate.decorate( + identity.organizationId, + withExecutionUsageTracking(identity.organizationId, engine, (organizationId) => + Effect.runFork(autumn.trackExecution(organizationId)), + ), + ), + ), + }; + }), ); /** From 92e9520873e29c9c6164c21009ae19fecc715281 Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Thu, 2 Jul 2026 12:58:42 -0700 Subject: [PATCH 5/6] Test the execution balance gate and rate-limit backstop Covers allow/block, the typed errors, fail-open on billing errors and timeouts (asserting the check was attempted AND the execution ran), the 60s per-org outcome cache (allowed and blocked cached, errors never), window rollover, per-org isolation, and that resume is never gated. --- apps/cloud/src/engine/execution-gate.test.ts | 234 ++++++++++++++++++ .../src/engine/execution-rate-limit.test.ts | 189 ++++++++++++++ 2 files changed, 423 insertions(+) create mode 100644 apps/cloud/src/engine/execution-gate.test.ts create mode 100644 apps/cloud/src/engine/execution-rate-limit.test.ts diff --git a/apps/cloud/src/engine/execution-gate.test.ts b/apps/cloud/src/engine/execution-gate.test.ts new file mode 100644 index 000000000..cb61304f8 --- /dev/null +++ b/apps/cloud/src/engine/execution-gate.test.ts @@ -0,0 +1,234 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import type { ExecutionEngine } from "@executor-js/execution"; + +import { EXECUTION_LIMIT_BLOCKED_MESSAGE, makeExecutionLimitGate } from "./execution-gate"; + +// Minimal engine fake: records calls, always completes successfully. The gate +// must never let a blocked execution reach it. +const makeFakeEngine = () => { + const calls = { execute: 0, executeWithPause: 0, resume: 0 }; + const engine: ExecutionEngine = { + execute: () => + Effect.sync(() => { + calls.execute += 1; + return { result: "ok" }; + }), + executeWithPause: () => + Effect.sync(() => { + calls.executeWithPause += 1; + return { status: "completed", result: { result: "ok" } } as const; + }), + resume: () => + Effect.sync(() => { + calls.resume += 1; + return { status: "completed", result: { result: "resumed" } } as const; + }), + getPausedExecution: () => Effect.succeed(null), + pausedExecutionCount: () => Effect.succeed(0), + hasPausedExecutions: () => Effect.succeed(false), + getDescription: Effect.succeed("fake"), + }; + return { engine, calls }; +}; + +const onElicitation = () => Effect.succeed({ action: "accept" as const }); + +// Balance-check fake with programmable outcomes and a call counter. +const makeBalanceCheck = (outcome: () => Effect.Effect<{ readonly allowed: boolean }, unknown>) => { + const state = { calls: 0 }; + const check = (_organizationId: string) => + Effect.suspend(() => { + state.calls += 1; + return outcome(); + }); + return { check, state }; +}; + +describe("execution balance gate", () => { + it.effect("allows execution when the balance check allows", () => + Effect.gen(function* () { + const { engine, calls } = makeFakeEngine(); + const balance = makeBalanceCheck(() => Effect.succeed({ allowed: true })); + const gate = makeExecutionLimitGate(balance.check); + const gated = gate.decorate("org_allowed", engine); + + const result = yield* gated.execute("code", { onElicitation }); + + expect(result).toEqual({ result: "ok" }); + expect(calls.execute).toBe(1); + expect(balance.state.calls).toBe(1); + }), + ); + + it.effect("blocks execute with the limit message and never runs the engine", () => + Effect.gen(function* () { + const { engine, calls } = makeFakeEngine(); + const balance = makeBalanceCheck(() => Effect.succeed({ allowed: false })); + const gate = makeExecutionLimitGate(balance.check); + const gated = gate.decorate("org_blocked", engine); + + const result = yield* gated.execute("code", { onElicitation }); + + expect(result).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE }); + expect(calls.execute).toBe(0); + }), + ); + + it.effect("blocks executeWithPause as a completed error result", () => + Effect.gen(function* () { + const { engine, calls } = makeFakeEngine(); + const gate = makeExecutionLimitGate( + makeBalanceCheck(() => Effect.succeed({ allowed: false })).check, + ); + const gated = gate.decorate("org_blocked", engine); + + const outcome = yield* gated.executeWithPause("code"); + + expect(outcome).toEqual({ + status: "completed", + result: { result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE }, + }); + expect(calls.executeWithPause).toBe(0); + }), + ); + + it.effect("check fails with the typed ExecutionLimitReachedError when blocked", () => + Effect.gen(function* () { + const gate = makeExecutionLimitGate( + makeBalanceCheck(() => Effect.succeed({ allowed: false })).check, + ); + + const error = yield* Effect.flip(gate.check("org_blocked")); + + expect(error._tag).toBe("ExecutionLimitReachedError"); + expect(error.organizationId).toBe("org_blocked"); + expect(error.message).toBe(EXECUTION_LIMIT_BLOCKED_MESSAGE); + }), + ); + + it.effect("fails open when the billing service errors (check attempted, execution runs)", () => + Effect.gen(function* () { + const { engine, calls } = makeFakeEngine(); + const balance = makeBalanceCheck(() => Effect.fail(new Error("autumn down"))); + const gate = makeExecutionLimitGate(balance.check); + const gated = gate.decorate("org_erroring", engine); + + const result = yield* gated.execute("code", { onElicitation }); + + expect(balance.state.calls).toBe(1); // the check WAS attempted + expect(result).toEqual({ result: "ok" }); // and the execution still ran + expect(calls.execute).toBe(1); + }), + ); + + // Live clock: the timeout budget is a real timer here (10ms), so the test + // must actually wait for it rather than suspend on the virtual TestClock. + it.live("fails open when the balance check exceeds its timeout", () => + Effect.gen(function* () { + const { engine, calls } = makeFakeEngine(); + const balance = makeBalanceCheck(() => Effect.never); + const gate = makeExecutionLimitGate(balance.check, { timeoutMs: 10 }); + const gated = gate.decorate("org_slow", engine); + + const result = yield* gated.execute("code", { onElicitation }); + + expect(balance.state.calls).toBe(1); // the check WAS attempted + expect(result).toEqual({ result: "ok" }); // timeout => allowed + expect(calls.execute).toBe(1); + }), + ); + + it.effect("caches the allowed outcome: one billing call across executes inside the TTL", () => + Effect.gen(function* () { + const { engine, calls } = makeFakeEngine(); + const balance = makeBalanceCheck(() => Effect.succeed({ allowed: true })); + const gate = makeExecutionLimitGate(balance.check); + const gated = gate.decorate("org_cached", engine); + + yield* gated.execute("code", { onElicitation }); + yield* gated.execute("code", { onElicitation }); + + expect(balance.state.calls).toBe(1); + expect(calls.execute).toBe(2); + }), + ); + + it.effect("caches the blocked outcome too", () => + Effect.gen(function* () { + const balance = makeBalanceCheck(() => Effect.succeed({ allowed: false })); + const gate = makeExecutionLimitGate(balance.check); + const { engine, calls } = makeFakeEngine(); + const gated = gate.decorate("org_blocked_cached", engine); + + const first = yield* gated.execute("code", { onElicitation }); + const second = yield* gated.execute("code", { onElicitation }); + + expect(first).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE }); + expect(second).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE }); + expect(balance.state.calls).toBe(1); + expect(calls.execute).toBe(0); + }), + ); + + it.effect("never caches errors: the next execute re-checks the balance", () => + Effect.gen(function* () { + const { engine } = makeFakeEngine(); + let failNext = true; + const balance = makeBalanceCheck(() => { + if (failNext) { + failNext = false; + return Effect.fail(new Error("transient")); + } + return Effect.succeed({ allowed: false }); + }); + const gate = makeExecutionLimitGate(balance.check); + const gated = gate.decorate("org_transient", engine); + + const first = yield* gated.execute("code", { onElicitation }); + const second = yield* gated.execute("code", { onElicitation }); + + expect(first).toEqual({ result: "ok" }); // failed open, not cached + expect(second).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE }); + expect(balance.state.calls).toBe(2); + }), + ); + + it.effect("caches per organization, not globally", () => + Effect.gen(function* () { + const allowedByOrg: Record = { org_a: true, org_b: false }; + const state = { calls: 0 }; + const gate = makeExecutionLimitGate((organizationId) => + Effect.sync(() => { + state.calls += 1; + return { allowed: allowedByOrg[organizationId] ?? true }; + }), + ); + const a = makeFakeEngine(); + const b = makeFakeEngine(); + + const resultA = yield* gate.decorate("org_a", a.engine).execute("code", { onElicitation }); + const resultB = yield* gate.decorate("org_b", b.engine).execute("code", { onElicitation }); + + expect(resultA).toEqual({ result: "ok" }); + expect(resultB).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE }); + expect(state.calls).toBe(2); + }), + ); + + it.effect("never gates resume, even for a blocked organization", () => + Effect.gen(function* () { + const { engine, calls } = makeFakeEngine(); + const balance = makeBalanceCheck(() => Effect.succeed({ allowed: false })); + const gate = makeExecutionLimitGate(balance.check); + const gated = gate.decorate("org_blocked", engine); + + const outcome = yield* gated.resume("exec_1", { action: "accept" }); + + expect(outcome).toEqual({ status: "completed", result: { result: "resumed" } }); + expect(calls.resume).toBe(1); + expect(balance.state.calls).toBe(0); // resume consults billing not at all + }), + ); +}); diff --git a/apps/cloud/src/engine/execution-rate-limit.test.ts b/apps/cloud/src/engine/execution-rate-limit.test.ts new file mode 100644 index 000000000..3a88cd3a9 --- /dev/null +++ b/apps/cloud/src/engine/execution-rate-limit.test.ts @@ -0,0 +1,189 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import type { ExecutionEngine } from "@executor-js/execution"; + +import { RATE_LIMIT_BLOCKED_MESSAGE, makeExecutionRateLimiter } from "./execution-rate-limit"; + +// In-memory stand-in for the counter DO: same fixed-window semantics, one +// { windowId, count } record per org. +const makeFakeCounter = () => { + const windows = new Map(); + const state = { calls: 0 }; + const increment = (organizationId: string, windowId: number) => + Effect.sync(() => { + state.calls += 1; + const stored = windows.get(organizationId); + const count = stored && stored.windowId === windowId ? stored.count + 1 : 1; + windows.set(organizationId, { windowId, count }); + return count; + }); + return { increment, state }; +}; + +const makeFakeEngine = () => { + const calls = { execute: 0, resume: 0 }; + const engine: ExecutionEngine = { + execute: () => + Effect.sync(() => { + calls.execute += 1; + return { result: "ok" }; + }), + executeWithPause: () => + Effect.succeed({ status: "completed", result: { result: "ok" } } as const), + resume: () => + Effect.sync(() => { + calls.resume += 1; + return { status: "completed", result: { result: "resumed" } } as const; + }), + getPausedExecution: () => Effect.succeed(null), + pausedExecutionCount: () => Effect.succeed(0), + hasPausedExecutions: () => Effect.succeed(false), + getDescription: Effect.succeed("fake"), + }; + return { engine, calls }; +}; + +const onElicitation = () => Effect.succeed({ action: "accept" as const }); + +describe("execution rate limiter", () => { + it.effect("allows executions under the limit", () => + Effect.gen(function* () { + const counter = makeFakeCounter(); + const limiter = makeExecutionRateLimiter(counter.increment, { limit: 3 }); + const { engine, calls } = makeFakeEngine(); + const limited = limiter.decorate("org_ok", engine); + + for (let i = 0; i < 3; i++) { + const result = yield* limited.execute("code", { onElicitation }); + expect(result).toEqual({ result: "ok" }); + } + + expect(calls.execute).toBe(3); + expect(counter.state.calls).toBe(3); + }), + ); + + it.effect("blocks the call after the limit with the backstop message", () => + Effect.gen(function* () { + const counter = makeFakeCounter(); + const limiter = makeExecutionRateLimiter(counter.increment, { limit: 2 }); + const { engine, calls } = makeFakeEngine(); + const limited = limiter.decorate("org_hot", engine); + + yield* limited.execute("code", { onElicitation }); + yield* limited.execute("code", { onElicitation }); + const blocked = yield* limited.execute("code", { onElicitation }); + + expect(blocked).toEqual({ result: null, error: RATE_LIMIT_BLOCKED_MESSAGE }); + expect(calls.execute).toBe(2); // the third never reached the engine + }), + ); + + it.effect("check fails with the typed ExecutionRateLimitExceededError over the limit", () => + Effect.gen(function* () { + const counter = makeFakeCounter(); + const limiter = makeExecutionRateLimiter(counter.increment, { limit: 1 }); + + yield* limiter.check("org_hot"); + const error = yield* Effect.flip(limiter.check("org_hot")); + + expect(error._tag).toBe("ExecutionRateLimitExceededError"); + expect(error.organizationId).toBe("org_hot"); + expect(error.message).toBe(RATE_LIMIT_BLOCKED_MESSAGE); + }), + ); + + it.effect("resets when the fixed window rolls over", () => + Effect.gen(function* () { + const counter = makeFakeCounter(); + let nowMs = 0; + const limiter = makeExecutionRateLimiter(counter.increment, { + limit: 1, + windowMs: 1_000, + now: () => nowMs, + }); + const { engine, calls } = makeFakeEngine(); + const limited = limiter.decorate("org_windowed", engine); + + yield* limited.execute("code", { onElicitation }); + const blocked = yield* limited.execute("code", { onElicitation }); + expect(blocked).toEqual({ result: null, error: RATE_LIMIT_BLOCKED_MESSAGE }); + + nowMs = 1_000; // next fixed window + const afterReset = yield* limited.execute("code", { onElicitation }); + + expect(afterReset).toEqual({ result: "ok" }); + expect(calls.execute).toBe(2); + }), + ); + + it.effect("fails open when the counter errors (increment attempted, execution runs)", () => + Effect.gen(function* () { + const state = { calls: 0 }; + const limiter = makeExecutionRateLimiter(() => + Effect.suspend(() => { + state.calls += 1; + return Effect.fail(new Error("counter DO unreachable")); + }), + ); + const { engine, calls } = makeFakeEngine(); + const limited = limiter.decorate("org_do_down", engine); + + const result = yield* limited.execute("code", { onElicitation }); + + expect(state.calls).toBe(1); // the increment WAS attempted + expect(result).toEqual({ result: "ok" }); // and the execution still ran + expect(calls.execute).toBe(1); + }), + ); + + // Live clock: the timeout budget is a real timer here (10ms). + it.live("fails open when the counter exceeds its timeout", () => + Effect.gen(function* () { + const limiter = makeExecutionRateLimiter(() => Effect.never, { timeoutMs: 10 }); + const { engine, calls } = makeFakeEngine(); + const limited = limiter.decorate("org_do_slow", engine); + + const result = yield* limited.execute("code", { onElicitation }); + + expect(result).toEqual({ result: "ok" }); + expect(calls.execute).toBe(1); + }), + ); + + it.effect("never gates resume, even over the limit", () => + Effect.gen(function* () { + const counter = makeFakeCounter(); + const limiter = makeExecutionRateLimiter(counter.increment, { limit: 1 }); + const { engine, calls } = makeFakeEngine(); + const limited = limiter.decorate("org_hot", engine); + + yield* limited.execute("code", { onElicitation }); + yield* limited.execute("code", { onElicitation }); // now over the limit + + const outcome = yield* limited.resume("exec_1", { action: "accept" }); + + expect(outcome).toEqual({ status: "completed", result: { result: "resumed" } }); + expect(calls.resume).toBe(1); + }), + ); + + it.effect("counts organizations independently", () => + Effect.gen(function* () { + const counter = makeFakeCounter(); + const limiter = makeExecutionRateLimiter(counter.increment, { limit: 1 }); + const a = makeFakeEngine(); + const b = makeFakeEngine(); + const limitedA = limiter.decorate("org_a", a.engine); + const limitedB = limiter.decorate("org_b", b.engine); + + yield* limitedA.execute("code", { onElicitation }); + const blockedA = yield* limitedA.execute("code", { onElicitation }); + const freshB = yield* limitedB.execute("code", { onElicitation }); + + expect(blockedA).toEqual({ result: null, error: RATE_LIMIT_BLOCKED_MESSAGE }); + expect(freshB).toEqual({ result: "ok" }); + }), + ); +}); From 3b4b9191e91e90c4be23f25ba39185ef6c1bd3ee Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Thu, 2 Jul 2026 13:04:38 -0700 Subject: [PATCH 6/6] Use typed test errors and instanceof assertions in gate tests --- apps/cloud/src/engine/execution-gate.test.ts | 18 ++++++++++++------ .../src/engine/execution-rate-limit.test.ts | 16 +++++++++++----- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/apps/cloud/src/engine/execution-gate.test.ts b/apps/cloud/src/engine/execution-gate.test.ts index cb61304f8..4b8309b11 100644 --- a/apps/cloud/src/engine/execution-gate.test.ts +++ b/apps/cloud/src/engine/execution-gate.test.ts @@ -1,9 +1,16 @@ import { describe, expect, it } from "@effect/vitest"; -import { Effect } from "effect"; +import { Data, Effect } from "effect"; import type { ExecutionEngine } from "@executor-js/execution"; -import { EXECUTION_LIMIT_BLOCKED_MESSAGE, makeExecutionLimitGate } from "./execution-gate"; +import { + EXECUTION_LIMIT_BLOCKED_MESSAGE, + ExecutionLimitReachedError, + makeExecutionLimitGate, +} from "./execution-gate"; + +// Stand-in for an upstream billing failure (Autumn down, network error). +class FakeUpstreamError extends Data.TaggedError("FakeUpstreamError")<{}> {} // Minimal engine fake: records calls, always completes successfully. The gate // must never let a blocked execution reach it. @@ -102,16 +109,15 @@ describe("execution balance gate", () => { const error = yield* Effect.flip(gate.check("org_blocked")); - expect(error._tag).toBe("ExecutionLimitReachedError"); + expect(error).toBeInstanceOf(ExecutionLimitReachedError); expect(error.organizationId).toBe("org_blocked"); - expect(error.message).toBe(EXECUTION_LIMIT_BLOCKED_MESSAGE); }), ); it.effect("fails open when the billing service errors (check attempted, execution runs)", () => Effect.gen(function* () { const { engine, calls } = makeFakeEngine(); - const balance = makeBalanceCheck(() => Effect.fail(new Error("autumn down"))); + const balance = makeBalanceCheck(() => Effect.fail(new FakeUpstreamError())); const gate = makeExecutionLimitGate(balance.check); const gated = gate.decorate("org_erroring", engine); @@ -179,7 +185,7 @@ describe("execution balance gate", () => { const balance = makeBalanceCheck(() => { if (failNext) { failNext = false; - return Effect.fail(new Error("transient")); + return Effect.fail(new FakeUpstreamError()); } return Effect.succeed({ allowed: false }); }); diff --git a/apps/cloud/src/engine/execution-rate-limit.test.ts b/apps/cloud/src/engine/execution-rate-limit.test.ts index 3a88cd3a9..260ccc150 100644 --- a/apps/cloud/src/engine/execution-rate-limit.test.ts +++ b/apps/cloud/src/engine/execution-rate-limit.test.ts @@ -1,9 +1,16 @@ import { describe, expect, it } from "@effect/vitest"; -import { Effect } from "effect"; +import { Data, Effect } from "effect"; import type { ExecutionEngine } from "@executor-js/execution"; -import { RATE_LIMIT_BLOCKED_MESSAGE, makeExecutionRateLimiter } from "./execution-rate-limit"; +import { + RATE_LIMIT_BLOCKED_MESSAGE, + ExecutionRateLimitExceededError, + makeExecutionRateLimiter, +} from "./execution-rate-limit"; + +// Stand-in for a counter-DO failure (unreachable, storage error). +class FakeCounterError extends Data.TaggedError("FakeCounterError")<{}> {} // In-memory stand-in for the counter DO: same fixed-window semantics, one // { windowId, count } record per org. @@ -88,9 +95,8 @@ describe("execution rate limiter", () => { yield* limiter.check("org_hot"); const error = yield* Effect.flip(limiter.check("org_hot")); - expect(error._tag).toBe("ExecutionRateLimitExceededError"); + expect(error).toBeInstanceOf(ExecutionRateLimitExceededError); expect(error.organizationId).toBe("org_hot"); - expect(error.message).toBe(RATE_LIMIT_BLOCKED_MESSAGE); }), ); @@ -124,7 +130,7 @@ describe("execution rate limiter", () => { const limiter = makeExecutionRateLimiter(() => Effect.suspend(() => { state.calls += 1; - return Effect.fail(new Error("counter DO unreachable")); + return Effect.fail(new FakeCounterError()); }), ); const { engine, calls } = makeFakeEngine();