From 018cb59a55e52d48e4f221280f9bd638fc67fbe5 Mon Sep 17 00:00:00 2001
From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com>
Date: Thu, 2 Jul 2026 11:52:50 -0700
Subject: [PATCH 1/6] Add execution balance check to billing service
---
apps/cloud/src/extensions/billing/service.ts | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/apps/cloud/src/extensions/billing/service.ts b/apps/cloud/src/extensions/billing/service.ts
index 2d27248f4..1731fe4fa 100644
--- a/apps/cloud/src/extensions/billing/service.ts
+++ b/apps/cloud/src/extensions/billing/service.ts
@@ -22,6 +22,9 @@ export class AutumnError extends Data.TaggedError("AutumnError")<{
export type IAutumnService = Readonly<{
use: (fn: (client: Autumn) => Promise) => Effect.Effect;
+ checkExecutionBalance: (
+ organizationId: string,
+ ) => Effect.Effect<{ readonly allowed: boolean }, AutumnError, never>;
/**
* Fire-and-forget-safe execution usage tracker. Errors are caught and
* logged; the returned Effect never fails. Callers typically
@@ -44,6 +47,7 @@ const make = Effect.sync(() => {
);
return {
use: () => notConfigured,
+ checkExecutionBalance: () => notConfigured,
trackExecution: () => Effect.void,
} satisfies IAutumnService;
}
@@ -80,7 +84,16 @@ const make = Effect.sync(() => {
);
}).pipe(Effect.withSpan("autumn.trackExecution"));
- return { use, trackExecution } satisfies IAutumnService;
+ const checkExecutionBalance = (organizationId: string) =>
+ Effect.gen(function* () {
+ yield* Effect.annotateCurrentSpan({ "autumn.customer.id": organizationId });
+ const check = yield* use((c) =>
+ c.check({ customerId: organizationId, featureId: "executions" }),
+ );
+ return { allowed: check.allowed };
+ }).pipe(Effect.withSpan("autumn.checkExecutionBalance"));
+
+ return { use, checkExecutionBalance, trackExecution } satisfies IAutumnService;
});
export class AutumnService extends Context.Service()(
From 45f9afa2e3475e0f8a386244e7804d4b65e16166 Mon Sep 17 00:00:00 2001
From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com>
Date: Thu, 2 Jul 2026 12:56:07 -0700
Subject: [PATCH 2/6] Add pre-execution balance gate for org execution limits
Check the org's Autumn execution balance before execute/executeWithPause
(never resume, so paused executions can always complete). Blocked orgs get
a descriptive ExecuteResult.error instead of running. Fails open on any
billing error or a 2s timeout, and caches per-org outcomes for 60s.
---
apps/cloud/src/engine/execution-gate.ts | 205 ++++++++++++++++++++++++
1 file changed, 205 insertions(+)
create mode 100644 apps/cloud/src/engine/execution-gate.ts
diff --git a/apps/cloud/src/engine/execution-gate.ts b/apps/cloud/src/engine/execution-gate.ts
new file mode 100644
index 000000000..d6e652dd1
--- /dev/null
+++ b/apps/cloud/src/engine/execution-gate.ts
@@ -0,0 +1,205 @@
+// ---------------------------------------------------------------------------
+// Pre-execution balance gate — blocks new executions once an organization has
+// used up its plan's included executions.
+//
+// Usage is tracked to Autumn after every execution (execution-usage.ts), but
+// nothing ever CHECKED the balance before running, so a free-tier org could run
+// unbounded executions past its quota. This gate consults
+// `AutumnService.checkExecutionBalance` before `execute` / `executeWithPause`.
+// `resume` is never gated: a paused execution already consumed its quota slot
+// when it started, and blocking resume would strand approved work forever.
+//
+// FAIL OPEN is a hard requirement: any Autumn error, timeout, or missing
+// customer/feature allows the execution (logged + Sentry, mirroring
+// `trackExecution`'s reporting). Autumn being down must never block
+// executions or add meaningful latency, so the check is bounded by a short
+// timeout and its outcome is cached per organization.
+//
+// Error surfacing: the gate fails internally with the typed
+// `ExecutionLimitReachedError`, and the engine seam folds it into the
+// descriptive `ExecuteResult.error` channel. That is the codebase's mechanism
+// for getting a domain failure's message to the MCP client verbatim — engine
+// error-channel failures are deliberately rendered opaque by the MCP host
+// ("Internal tool error [correlation id]"), exactly like the runtimes fold
+// `CodeCompilationError` / `SandboxRuntimeError` into `ExecuteResult.error`.
+// ---------------------------------------------------------------------------
+
+import * as Sentry from "@sentry/cloudflare";
+import { Data, Effect } from "effect";
+import type * as Cause from "effect/Cause";
+
+import type { ExecutionEngine, ExecutionResult } from "@executor-js/execution";
+
+// The engine's completed-result payload (`ExecuteResult` in codemode-core),
+// derived from the public execution types so this app package doesn't need a
+// direct dependency on the kernel package that declares it.
+type EngineExecuteResult = Extract["result"];
+
+// One check per org per minute is fresh enough for a monthly quota; both
+// allowed AND blocked outcomes are cached (errors never are).
+const BALANCE_CACHE_TTL_MS = 60_000;
+// Autumn slower than this => fail open rather than stall a user-facing
+// execution behind the billing provider.
+const BALANCE_CHECK_TIMEOUT_MS = 2_000;
+// Sweep guard so a long-lived worker isolate serving many orgs can't grow the
+// cache map unbounded.
+const BALANCE_CACHE_MAX_ENTRIES = 10_000;
+
+export const EXECUTION_LIMIT_BLOCKED_MESSAGE =
+ "Execution limit reached: your plan's included executions for this billing period are used up. Upgrade your plan or wait for the reset to continue.";
+
+export class ExecutionLimitReachedError extends Data.TaggedError("ExecutionLimitReachedError")<{
+ readonly organizationId: string;
+ readonly message: string;
+}> {}
+
+/** Internal sentinel for a balance check that exceeded its time budget. */
+class GateCheckTimeoutError extends Data.TaggedError("GateCheckTimeoutError")<{
+ readonly timeoutMs: number;
+}> {}
+
+// ---------------------------------------------------------------------------
+// Shared gate seam — used by this gate and the rate-limit backstop.
+// ---------------------------------------------------------------------------
+
+export type GateDecision =
+ | { readonly blocked: false }
+ | { readonly blocked: true; readonly error: { readonly message: string } };
+
+/**
+ * Wrap an engine so `decide` runs before `execute` / `executeWithPause`. A
+ * blocked decision short-circuits to a descriptive `ExecuteResult.error`
+ * (which `formatExecuteResult` renders as a clean `isError` MCP tool result)
+ * WITHOUT invoking the inner engine — so a blocked execution is neither run
+ * nor usage-tracked. `resume` and all read-only members pass through
+ * untouched: paused executions must always be able to complete.
+ */
+export const withPreExecutionGate = (
+ engine: ExecutionEngine,
+ decide: Effect.Effect,
+): ExecutionEngine => ({
+ execute: (code, options) =>
+ Effect.flatMap(
+ decide,
+ (decision): Effect.Effect =>
+ decision.blocked
+ ? Effect.succeed({ result: null, error: decision.error.message })
+ : engine.execute(code, options),
+ ),
+ executeWithPause: (code, options) =>
+ Effect.flatMap(
+ decide,
+ (decision): Effect.Effect =>
+ decision.blocked
+ ? Effect.succeed({
+ status: "completed",
+ result: { result: null, error: decision.error.message },
+ })
+ : engine.executeWithPause(code, options),
+ ),
+ // resume is never gated: paused executions must be able to complete.
+ resume: (executionId, response) => engine.resume(executionId, response),
+ getPausedExecution: (executionId) => engine.getPausedExecution(executionId),
+ pausedExecutionCount: () => engine.pausedExecutionCount(),
+ hasPausedExecutions: () => engine.hasPausedExecutions(),
+ getDescription: engine.getDescription,
+});
+
+// ---------------------------------------------------------------------------
+// Balance gate factory
+// ---------------------------------------------------------------------------
+
+export type ExecutionBalanceCheck = (
+ organizationId: string,
+) => Effect.Effect<{ readonly allowed: boolean }, unknown>;
+
+/**
+ * Build a balance gate around `checkBalance` (in production:
+ * `AutumnService.checkExecutionBalance`). One gate instance holds one
+ * per-organization outcome cache; the metered decorator layer creates a single
+ * instance so all engines it decorates share it.
+ *
+ * `options.timeoutMs` exists for tests; production uses the default budget.
+ */
+export const makeExecutionLimitGate = (
+ checkBalance: ExecutionBalanceCheck,
+ options?: { readonly timeoutMs?: number },
+) => {
+ const timeoutMs = options?.timeoutMs ?? BALANCE_CHECK_TIMEOUT_MS;
+ const cache = new Map();
+
+ const writeCache = (organizationId: string, allowed: boolean, nowMs: number): void => {
+ if (cache.size >= BALANCE_CACHE_MAX_ENTRIES) {
+ for (const [key, entry] of cache) {
+ if (entry.expiresAtMs <= nowMs) cache.delete(key);
+ }
+ // Still saturated after dropping expired entries: reset rather than grow.
+ if (cache.size >= BALANCE_CACHE_MAX_ENTRIES) cache.clear();
+ }
+ cache.set(organizationId, { allowed, expiresAtMs: nowMs + BALANCE_CACHE_TTL_MS });
+ };
+
+ const toDecision = (organizationId: string, allowed: boolean): GateDecision =>
+ allowed
+ ? { blocked: false }
+ : {
+ blocked: true,
+ error: new ExecutionLimitReachedError({
+ organizationId,
+ message: EXECUTION_LIMIT_BLOCKED_MESSAGE,
+ }),
+ };
+
+ const decide = (organizationId: string): Effect.Effect =>
+ Effect.suspend(() => {
+ const nowMs = Date.now();
+ const cached = cache.get(organizationId);
+ if (cached && cached.expiresAtMs > nowMs) {
+ return Effect.succeed(toDecision(organizationId, cached.allowed));
+ }
+ return checkBalance(organizationId).pipe(
+ Effect.timeoutOrElse({
+ duration: `${timeoutMs} millis`,
+ orElse: () => Effect.fail(new GateCheckTimeoutError({ timeoutMs })),
+ }),
+ Effect.map(({ allowed }) => {
+ writeCache(organizationId, allowed, nowMs);
+ return toDecision(organizationId, allowed);
+ }),
+ // FAIL OPEN: Autumn errors, timeouts, and missing customers/features
+ // must never block executions. Reported like `trackExecution` so a
+ // billing outage still pages; the error outcome is never cached.
+ Effect.catch((error: unknown) =>
+ Effect.sync((): GateDecision => {
+ console.warn("[billing] execution balance check failed open:", error);
+ Sentry.captureException(error);
+ return { blocked: false };
+ }),
+ ),
+ );
+ });
+
+ return {
+ /**
+ * Test/diagnostic surface: succeeds when the execution is allowed (or the
+ * check failed open), fails with the typed `ExecutionLimitReachedError`
+ * when the organization is out of executions.
+ */
+ check: (organizationId: string): Effect.Effect =>
+ Effect.flatMap(decide(organizationId), (decision) =>
+ decision.blocked
+ ? Effect.fail(
+ new ExecutionLimitReachedError({
+ organizationId,
+ message: EXECUTION_LIMIT_BLOCKED_MESSAGE,
+ }),
+ )
+ : Effect.void,
+ ),
+ /** Wrap an engine so the balance gate runs before each new execution. */
+ decorate: (
+ organizationId: string,
+ engine: ExecutionEngine,
+ ): ExecutionEngine => withPreExecutionGate(engine, decide(organizationId)),
+ };
+};
From 1d8d9d0bff55ed0f480c08cd857a621f94939e35 Mon Sep 17 00:00:00 2001
From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com>
Date: Thu, 2 Jul 2026 12:56:16 -0700
Subject: [PATCH 3/6] Add per-org execution rate-limit backstop
1000 execute calls per org per hour, fixed window, counted in a minimal
per-org Durable Object (EXECUTION_RATE_LIMITER, one instance per org via
idFromName). Independent of billing so runaway automation is caught even
when Autumn is down; fails open if the counter DO errors or is slow. The
DO stores a single {windowId, count} record and purges itself by alarm
after two idle windows.
---
apps/cloud/src/engine/execution-rate-limit.ts | 212 ++++++++++++++++++
apps/cloud/src/env-augment.d.ts | 7 +
apps/cloud/src/server.ts | 5 +
apps/cloud/wrangler.jsonc | 11 +
4 files changed, 235 insertions(+)
create mode 100644 apps/cloud/src/engine/execution-rate-limit.ts
diff --git a/apps/cloud/src/engine/execution-rate-limit.ts b/apps/cloud/src/engine/execution-rate-limit.ts
new file mode 100644
index 000000000..2f06e9bb9
--- /dev/null
+++ b/apps/cloud/src/engine/execution-rate-limit.ts
@@ -0,0 +1,212 @@
+// ---------------------------------------------------------------------------
+// Per-org execution rate limit — an abuse backstop independent of billing.
+//
+// The balance gate (execution-gate.ts) depends on Autumn and fails open, so a
+// billing outage plus runaway automation could still run unbounded executions.
+// This limiter counts `execute` calls per organization in a fixed hourly
+// window, backed by a minimal counter Durable Object (cross-session state:
+// each MCP session lives in its own DO instance, so an in-memory counter
+// would be per-session and trivially bypassed by opening more sessions).
+//
+// Like the balance gate it FAILS OPEN: an unreachable counter DO, a missing
+// binding, or a slow call allows the execution (warn + Sentry). The backstop
+// must never take executions down with it.
+// ---------------------------------------------------------------------------
+
+import { DurableObject, env } from "cloudflare:workers";
+import * as Sentry from "@sentry/cloudflare";
+import { Data, Effect } from "effect";
+import type * as Cause from "effect/Cause";
+
+import type { ExecutionEngine } from "@executor-js/execution";
+
+import { withPreExecutionGate, type GateDecision } from "./execution-gate";
+
+// Fixed window: all executions in the same clock hour share one counter.
+export const RATE_LIMIT_WINDOW_MS = 3_600_000;
+// Calibration: the heaviest legitimate org runs ~1.1k executions per MONTH,
+// so 1000 per HOUR is far above any human-driven usage and only trips on
+// runaway automation (the incident this backstops: ~18k in 30 days would
+// still pass, which is fine — that class of overrun is the balance gate's
+// job; this catches tight loops).
+export const EXECUTIONS_PER_ORG_PER_HOUR = 1000;
+// Counter DO slower than this => fail open rather than stall executions.
+const RATE_LIMIT_CHECK_TIMEOUT_MS = 2_000;
+// The DO purges its storage this long after the last increment, so idle orgs
+// cost nothing. Two windows: long enough that an active window never purges.
+const COUNTER_PURGE_AFTER_MS = 2 * RATE_LIMIT_WINDOW_MS;
+
+export const RATE_LIMIT_BLOCKED_MESSAGE =
+ "Rate limit exceeded: too many executions this hour. This is an abuse backstop — contact support if you hit this legitimately.";
+
+export class ExecutionRateLimitExceededError extends Data.TaggedError(
+ "ExecutionRateLimitExceededError",
+)<{
+ readonly organizationId: string;
+ readonly message: string;
+}> {}
+
+/** Internal sentinel for a counter call that exceeded its time budget. */
+class RateLimitCheckTimeoutError extends Data.TaggedError("RateLimitCheckTimeoutError")<{
+ readonly timeoutMs: number;
+}> {}
+
+// ---------------------------------------------------------------------------
+// Counter Durable Object — one instance per organization (idFromName(orgId)).
+// Stores a single { windowId, count } record: an increment in a new window
+// resets the count, so old windows never accumulate. An alarm purges storage
+// after inactivity.
+// ---------------------------------------------------------------------------
+
+const WINDOW_RECORD_KEY = "window";
+
+type WindowRecord = {
+ readonly windowId: number;
+ readonly count: number;
+};
+
+export class ExecutionRateLimiterDO extends DurableObject {
+ private readonly counterStorage: DurableObjectState["storage"];
+
+ constructor(ctx: DurableObjectState, doEnv: Env) {
+ super(ctx, doEnv);
+ // Kept on an own field (not just inherited `this.ctx`) so tests can run
+ // the class against a fake storage under the `cloudflare:workers` stub.
+ this.counterStorage = ctx.storage;
+ }
+
+ /** Add one execution to `windowId`'s counter and return the new count. */
+ async increment(windowId: number): Promise {
+ const stored = await this.counterStorage.get(WINDOW_RECORD_KEY);
+ const count = stored && stored.windowId === windowId ? stored.count + 1 : 1;
+ await this.counterStorage.put(WINDOW_RECORD_KEY, { windowId, count });
+ await this.counterStorage.setAlarm(Date.now() + COUNTER_PURGE_AFTER_MS);
+ return count;
+ }
+
+ async alarm(): Promise {
+ await this.counterStorage.deleteAll();
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Client
+// ---------------------------------------------------------------------------
+
+/** Count one execution for (organizationId, windowId); returns the new count. */
+export type RateLimitIncrement = (
+ organizationId: string,
+ windowId: number,
+) => Effect.Effect;
+
+export type ExecutionRateLimiter = {
+ readonly check: (organizationId: string) => Effect.Effect;
+ readonly decorate: (
+ organizationId: string,
+ engine: ExecutionEngine,
+ ) => ExecutionEngine;
+};
+
+/**
+ * Build a rate limiter around an increment function (in production: the
+ * counter DO; in tests: an in-memory fake). `options` exist for tests.
+ */
+export const makeExecutionRateLimiter = (
+ increment: RateLimitIncrement,
+ options?: {
+ readonly limit?: number;
+ readonly windowMs?: number;
+ readonly timeoutMs?: number;
+ readonly now?: () => number;
+ },
+): ExecutionRateLimiter => {
+ const limit = options?.limit ?? EXECUTIONS_PER_ORG_PER_HOUR;
+ const windowMs = options?.windowMs ?? RATE_LIMIT_WINDOW_MS;
+ const timeoutMs = options?.timeoutMs ?? RATE_LIMIT_CHECK_TIMEOUT_MS;
+ const now = options?.now ?? Date.now;
+
+ const decide = (organizationId: string): Effect.Effect =>
+ Effect.suspend(() => {
+ const windowId = Math.floor(now() / windowMs);
+ return increment(organizationId, windowId).pipe(
+ Effect.timeoutOrElse({
+ duration: `${timeoutMs} millis`,
+ orElse: () => Effect.fail(new RateLimitCheckTimeoutError({ timeoutMs })),
+ }),
+ Effect.map(
+ (count): GateDecision =>
+ count > limit
+ ? {
+ blocked: true,
+ error: new ExecutionRateLimitExceededError({
+ organizationId,
+ message: RATE_LIMIT_BLOCKED_MESSAGE,
+ }),
+ }
+ : { blocked: false },
+ ),
+ // FAIL OPEN: the backstop must never block executions because its
+ // counter is unreachable or slow.
+ Effect.catch((error: unknown) =>
+ Effect.sync((): GateDecision => {
+ console.warn("[rate-limit] execution rate limit check failed open:", error);
+ Sentry.captureException(error);
+ return { blocked: false };
+ }),
+ ),
+ );
+ });
+
+ return {
+ check: (organizationId) =>
+ Effect.flatMap(decide(organizationId), (decision) =>
+ decision.blocked
+ ? Effect.fail(
+ new ExecutionRateLimitExceededError({
+ organizationId,
+ message: RATE_LIMIT_BLOCKED_MESSAGE,
+ }),
+ )
+ : Effect.void,
+ ),
+ decorate: (organizationId, engine) => withPreExecutionGate(engine, decide(organizationId)),
+ };
+};
+
+// ---------------------------------------------------------------------------
+// Cloud wiring — reads the EXECUTION_RATE_LIMITER binding from the worker env.
+// ---------------------------------------------------------------------------
+
+// The DO stub's RPC surface. The binding is declared untyped in
+// env-augment.d.ts (matching the BLOBS precedent), so the call site narrows it
+// to the one method the class exposes.
+type ExecutionRateLimiterStub = {
+ readonly increment: (windowId: number) => Promise;
+};
+
+type RateLimiterNamespace = {
+ readonly idFromName: (name: string) => DurableObjectId;
+ readonly get: (id: DurableObjectId) => unknown;
+};
+
+/**
+ * Production rate limiter backed by the `EXECUTION_RATE_LIMITER` counter DO.
+ * When the binding is absent (unit-test workers, older local setups) the
+ * limiter is disabled: every check passes, logged once at construction.
+ */
+export const makeCloudExecutionRateLimiter = (): ExecutionRateLimiter => {
+ const namespace = (env as { EXECUTION_RATE_LIMITER?: RateLimiterNamespace })
+ .EXECUTION_RATE_LIMITER;
+ if (!namespace) {
+ console.warn(
+ "[rate-limit] EXECUTION_RATE_LIMITER binding missing; execution rate limiting disabled",
+ );
+ return makeExecutionRateLimiter(() => Effect.succeed(0));
+ }
+ return makeExecutionRateLimiter((organizationId, windowId) =>
+ Effect.tryPromise(() => {
+ const stub = namespace.get(namespace.idFromName(organizationId)) as ExecutionRateLimiterStub;
+ return stub.increment(windowId);
+ }),
+ );
+};
diff --git a/apps/cloud/src/env-augment.d.ts b/apps/cloud/src/env-augment.d.ts
index 8911a8303..5e860a0fa 100644
--- a/apps/cloud/src/env-augment.d.ts
+++ b/apps/cloud/src/env-augment.d.ts
@@ -32,6 +32,13 @@ declare global {
// ON; the test workers set "true" so fixtures can reach localhost.
ALLOW_LOCAL_NETWORK?: string;
+ // Per-org execution rate-limit counter DO (wrangler.jsonc
+ // `durable_objects`). Declared optional here (matching the BLOBS
+ // precedent) rather than regenerating worker-configuration.d.ts: test
+ // workers and older local setups run without the binding, and the
+ // limiter degrades to disabled when absent.
+ EXECUTION_RATE_LIMITER?: import("@cloudflare/workers-types").DurableObjectNamespace;
+
// Billing
AUTUMN_SECRET_KEY?: string;
/** Optional Autumn base-URL override (Autumn emulator in tests/dev). */
diff --git a/apps/cloud/src/server.ts b/apps/cloud/src/server.ts
index eb1af1c1d..258e7ff34 100644
--- a/apps/cloud/src/server.ts
+++ b/apps/cloud/src/server.ts
@@ -55,6 +55,11 @@ export const McpSessionDOSqlite = Sentry.instrumentDurableObjectWithSentry(
// (with a `deleted_classes: ["McpSessionDO"]` migration) now that nothing binds it.
export class McpSessionDO extends DurableObject {}
+// Per-org execution rate-limit counter DO (abuse backstop; migration v3,
+// `EXECUTION_RATE_LIMITER` binding). Plain counter, no Sentry wrapper needed:
+// its callers already fail open and report errors themselves.
+export { ExecutionRateLimiterDO } from "./engine/execution-rate-limit";
+
// ---------------------------------------------------------------------------
// Worker fetch handler
//
diff --git a/apps/cloud/wrangler.jsonc b/apps/cloud/wrangler.jsonc
index 68ca6297a..e12eb31f2 100644
--- a/apps/cloud/wrangler.jsonc
+++ b/apps/cloud/wrangler.jsonc
@@ -22,6 +22,13 @@
"name": "MCP_SESSION",
"class_name": "McpSessionDOSqlite",
},
+ // Per-org execution rate-limit counter (abuse backstop). One instance
+ // per organization via idFromName(orgId); a single { windowId, count }
+ // record per instance, purged by alarm after inactivity.
+ {
+ "name": "EXECUTION_RATE_LIMITER",
+ "class_name": "ExecutionRateLimiterDO",
+ },
],
},
// The MCP session DO moved to the Cloudflare Agents (`McpAgent`) base, which
@@ -42,6 +49,10 @@
"tag": "v2",
"new_sqlite_classes": ["McpSessionDOSqlite"],
},
+ {
+ "tag": "v3",
+ "new_sqlite_classes": ["ExecutionRateLimiterDO"],
+ },
],
"services": [
{
From bc95e26b9bf1774dccb00edbca16cae833cbdfdf Mon Sep 17 00:00:00 2001
From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com>
Date: Thu, 2 Jul 2026 12:56:28 -0700
Subject: [PATCH 4/6] Wire execution guards into the metered engine decorator
Order: rate-limit backstop first (cheap counter), then the balance gate,
then usage tracking. Guards wrap outside the tracker so a blocked
execution is neither run nor tracked. Covers both planes (HTTP executor
plane and MCP session DO) since both build engines through this layer.
---
.../src/engine/execution-stack-metered.ts | 39 ++++++++++++++-----
1 file changed, 30 insertions(+), 9 deletions(-)
diff --git a/apps/cloud/src/engine/execution-stack-metered.ts b/apps/cloud/src/engine/execution-stack-metered.ts
index d82632ca9..4ea70b6d4 100644
--- a/apps/cloud/src/engine/execution-stack-metered.ts
+++ b/apps/cloud/src/engine/execution-stack-metered.ts
@@ -28,19 +28,40 @@ import {
import { AutumnService } from "../extensions/billing/service";
import type { DbService } from "../db/db";
import { CloudExecutionSeamsLayer } from "../engine/execution-stack";
+import { makeExecutionLimitGate } from "./execution-gate";
+import { makeCloudExecutionRateLimiter } from "./execution-rate-limit";
import { withExecutionUsageTracking } from "./execution-usage";
-// Usage-metering decorator bound to the billing service. `trackExecution` is
-// fire-and-forget (`Effect.runFork`) so the billing call can't stall a
-// user-facing execution.
+// Usage-metering decorator bound to the billing service, plus the two
+// pre-execution guards this layer owns, ordered cheapest first:
+// 1. rate-limit backstop (counter DO, independent of billing)
+// 2. execution balance gate (Autumn check, cached 60s, fails open)
+// 3. usage tracking — fire-and-forget (`Effect.runFork`) so the billing
+// call can't stall a user-facing execution.
+// The guards wrap OUTSIDE the tracker so a blocked execution is neither run
+// nor tracked. One gate/limiter instance per layer build: the balance cache is
+// shared by every engine this decorator produces (in the MCP session DO that's
+// the session's engines; on the HTTP plane the boot layer builds it once).
export const CloudMeteringEngineDecorator: Layer.Layer =
Layer.effect(EngineDecorator)(
- Effect.map(AutumnService.asEffect(), (autumn): EngineDecorator["Service"] => ({
- decorate: (engine, identity: EngineStackIdentity) =>
- withExecutionUsageTracking(identity.organizationId, engine, (organizationId) =>
- Effect.runFork(autumn.trackExecution(organizationId)),
- ),
- })),
+ Effect.map(AutumnService.asEffect(), (autumn): EngineDecorator["Service"] => {
+ const balanceGate = makeExecutionLimitGate((organizationId) =>
+ autumn.checkExecutionBalance(organizationId),
+ );
+ const rateLimiter = makeCloudExecutionRateLimiter();
+ return {
+ decorate: (engine, identity: EngineStackIdentity) =>
+ rateLimiter.decorate(
+ identity.organizationId,
+ balanceGate.decorate(
+ identity.organizationId,
+ withExecutionUsageTracking(identity.organizationId, engine, (organizationId) =>
+ Effect.runFork(autumn.trackExecution(organizationId)),
+ ),
+ ),
+ ),
+ };
+ }),
);
/**
From 92e9520873e29c9c6164c21009ae19fecc715281 Mon Sep 17 00:00:00 2001
From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com>
Date: Thu, 2 Jul 2026 12:58:42 -0700
Subject: [PATCH 5/6] Test the execution balance gate and rate-limit backstop
Covers allow/block, the typed errors, fail-open on billing errors and
timeouts (asserting the check was attempted AND the execution ran), the
60s per-org outcome cache (allowed and blocked cached, errors never),
window rollover, per-org isolation, and that resume is never gated.
---
apps/cloud/src/engine/execution-gate.test.ts | 234 ++++++++++++++++++
.../src/engine/execution-rate-limit.test.ts | 189 ++++++++++++++
2 files changed, 423 insertions(+)
create mode 100644 apps/cloud/src/engine/execution-gate.test.ts
create mode 100644 apps/cloud/src/engine/execution-rate-limit.test.ts
diff --git a/apps/cloud/src/engine/execution-gate.test.ts b/apps/cloud/src/engine/execution-gate.test.ts
new file mode 100644
index 000000000..cb61304f8
--- /dev/null
+++ b/apps/cloud/src/engine/execution-gate.test.ts
@@ -0,0 +1,234 @@
+import { describe, expect, it } from "@effect/vitest";
+import { Effect } from "effect";
+
+import type { ExecutionEngine } from "@executor-js/execution";
+
+import { EXECUTION_LIMIT_BLOCKED_MESSAGE, makeExecutionLimitGate } from "./execution-gate";
+
+// Minimal engine fake: records calls, always completes successfully. The gate
+// must never let a blocked execution reach it.
+const makeFakeEngine = () => {
+ const calls = { execute: 0, executeWithPause: 0, resume: 0 };
+ const engine: ExecutionEngine = {
+ execute: () =>
+ Effect.sync(() => {
+ calls.execute += 1;
+ return { result: "ok" };
+ }),
+ executeWithPause: () =>
+ Effect.sync(() => {
+ calls.executeWithPause += 1;
+ return { status: "completed", result: { result: "ok" } } as const;
+ }),
+ resume: () =>
+ Effect.sync(() => {
+ calls.resume += 1;
+ return { status: "completed", result: { result: "resumed" } } as const;
+ }),
+ getPausedExecution: () => Effect.succeed(null),
+ pausedExecutionCount: () => Effect.succeed(0),
+ hasPausedExecutions: () => Effect.succeed(false),
+ getDescription: Effect.succeed("fake"),
+ };
+ return { engine, calls };
+};
+
+const onElicitation = () => Effect.succeed({ action: "accept" as const });
+
+// Balance-check fake with programmable outcomes and a call counter.
+const makeBalanceCheck = (outcome: () => Effect.Effect<{ readonly allowed: boolean }, unknown>) => {
+ const state = { calls: 0 };
+ const check = (_organizationId: string) =>
+ Effect.suspend(() => {
+ state.calls += 1;
+ return outcome();
+ });
+ return { check, state };
+};
+
+describe("execution balance gate", () => {
+ it.effect("allows execution when the balance check allows", () =>
+ Effect.gen(function* () {
+ const { engine, calls } = makeFakeEngine();
+ const balance = makeBalanceCheck(() => Effect.succeed({ allowed: true }));
+ const gate = makeExecutionLimitGate(balance.check);
+ const gated = gate.decorate("org_allowed", engine);
+
+ const result = yield* gated.execute("code", { onElicitation });
+
+ expect(result).toEqual({ result: "ok" });
+ expect(calls.execute).toBe(1);
+ expect(balance.state.calls).toBe(1);
+ }),
+ );
+
+ it.effect("blocks execute with the limit message and never runs the engine", () =>
+ Effect.gen(function* () {
+ const { engine, calls } = makeFakeEngine();
+ const balance = makeBalanceCheck(() => Effect.succeed({ allowed: false }));
+ const gate = makeExecutionLimitGate(balance.check);
+ const gated = gate.decorate("org_blocked", engine);
+
+ const result = yield* gated.execute("code", { onElicitation });
+
+ expect(result).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE });
+ expect(calls.execute).toBe(0);
+ }),
+ );
+
+ it.effect("blocks executeWithPause as a completed error result", () =>
+ Effect.gen(function* () {
+ const { engine, calls } = makeFakeEngine();
+ const gate = makeExecutionLimitGate(
+ makeBalanceCheck(() => Effect.succeed({ allowed: false })).check,
+ );
+ const gated = gate.decorate("org_blocked", engine);
+
+ const outcome = yield* gated.executeWithPause("code");
+
+ expect(outcome).toEqual({
+ status: "completed",
+ result: { result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE },
+ });
+ expect(calls.executeWithPause).toBe(0);
+ }),
+ );
+
+ it.effect("check fails with the typed ExecutionLimitReachedError when blocked", () =>
+ Effect.gen(function* () {
+ const gate = makeExecutionLimitGate(
+ makeBalanceCheck(() => Effect.succeed({ allowed: false })).check,
+ );
+
+ const error = yield* Effect.flip(gate.check("org_blocked"));
+
+ expect(error._tag).toBe("ExecutionLimitReachedError");
+ expect(error.organizationId).toBe("org_blocked");
+ expect(error.message).toBe(EXECUTION_LIMIT_BLOCKED_MESSAGE);
+ }),
+ );
+
+ it.effect("fails open when the billing service errors (check attempted, execution runs)", () =>
+ Effect.gen(function* () {
+ const { engine, calls } = makeFakeEngine();
+ const balance = makeBalanceCheck(() => Effect.fail(new Error("autumn down")));
+ const gate = makeExecutionLimitGate(balance.check);
+ const gated = gate.decorate("org_erroring", engine);
+
+ const result = yield* gated.execute("code", { onElicitation });
+
+ expect(balance.state.calls).toBe(1); // the check WAS attempted
+ expect(result).toEqual({ result: "ok" }); // and the execution still ran
+ expect(calls.execute).toBe(1);
+ }),
+ );
+
+ // Live clock: the timeout budget is a real timer here (10ms), so the test
+ // must actually wait for it rather than suspend on the virtual TestClock.
+ it.live("fails open when the balance check exceeds its timeout", () =>
+ Effect.gen(function* () {
+ const { engine, calls } = makeFakeEngine();
+ const balance = makeBalanceCheck(() => Effect.never);
+ const gate = makeExecutionLimitGate(balance.check, { timeoutMs: 10 });
+ const gated = gate.decorate("org_slow", engine);
+
+ const result = yield* gated.execute("code", { onElicitation });
+
+ expect(balance.state.calls).toBe(1); // the check WAS attempted
+ expect(result).toEqual({ result: "ok" }); // timeout => allowed
+ expect(calls.execute).toBe(1);
+ }),
+ );
+
+ it.effect("caches the allowed outcome: one billing call across executes inside the TTL", () =>
+ Effect.gen(function* () {
+ const { engine, calls } = makeFakeEngine();
+ const balance = makeBalanceCheck(() => Effect.succeed({ allowed: true }));
+ const gate = makeExecutionLimitGate(balance.check);
+ const gated = gate.decorate("org_cached", engine);
+
+ yield* gated.execute("code", { onElicitation });
+ yield* gated.execute("code", { onElicitation });
+
+ expect(balance.state.calls).toBe(1);
+ expect(calls.execute).toBe(2);
+ }),
+ );
+
+ it.effect("caches the blocked outcome too", () =>
+ Effect.gen(function* () {
+ const balance = makeBalanceCheck(() => Effect.succeed({ allowed: false }));
+ const gate = makeExecutionLimitGate(balance.check);
+ const { engine, calls } = makeFakeEngine();
+ const gated = gate.decorate("org_blocked_cached", engine);
+
+ const first = yield* gated.execute("code", { onElicitation });
+ const second = yield* gated.execute("code", { onElicitation });
+
+ expect(first).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE });
+ expect(second).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE });
+ expect(balance.state.calls).toBe(1);
+ expect(calls.execute).toBe(0);
+ }),
+ );
+
+ it.effect("never caches errors: the next execute re-checks the balance", () =>
+ Effect.gen(function* () {
+ const { engine } = makeFakeEngine();
+ let failNext = true;
+ const balance = makeBalanceCheck(() => {
+ if (failNext) {
+ failNext = false;
+ return Effect.fail(new Error("transient"));
+ }
+ return Effect.succeed({ allowed: false });
+ });
+ const gate = makeExecutionLimitGate(balance.check);
+ const gated = gate.decorate("org_transient", engine);
+
+ const first = yield* gated.execute("code", { onElicitation });
+ const second = yield* gated.execute("code", { onElicitation });
+
+ expect(first).toEqual({ result: "ok" }); // failed open, not cached
+ expect(second).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE });
+ expect(balance.state.calls).toBe(2);
+ }),
+ );
+
+ it.effect("caches per organization, not globally", () =>
+ Effect.gen(function* () {
+ const allowedByOrg: Record = { org_a: true, org_b: false };
+ const state = { calls: 0 };
+ const gate = makeExecutionLimitGate((organizationId) =>
+ Effect.sync(() => {
+ state.calls += 1;
+ return { allowed: allowedByOrg[organizationId] ?? true };
+ }),
+ );
+ const a = makeFakeEngine();
+ const b = makeFakeEngine();
+
+ const resultA = yield* gate.decorate("org_a", a.engine).execute("code", { onElicitation });
+ const resultB = yield* gate.decorate("org_b", b.engine).execute("code", { onElicitation });
+
+ expect(resultA).toEqual({ result: "ok" });
+ expect(resultB).toEqual({ result: null, error: EXECUTION_LIMIT_BLOCKED_MESSAGE });
+ expect(state.calls).toBe(2);
+ }),
+ );
+
+ it.effect("never gates resume, even for a blocked organization", () =>
+ Effect.gen(function* () {
+ const { engine, calls } = makeFakeEngine();
+ const balance = makeBalanceCheck(() => Effect.succeed({ allowed: false }));
+ const gate = makeExecutionLimitGate(balance.check);
+ const gated = gate.decorate("org_blocked", engine);
+
+ const outcome = yield* gated.resume("exec_1", { action: "accept" });
+
+ expect(outcome).toEqual({ status: "completed", result: { result: "resumed" } });
+ expect(calls.resume).toBe(1);
+ expect(balance.state.calls).toBe(0); // resume consults billing not at all
+ }),
+ );
+});
diff --git a/apps/cloud/src/engine/execution-rate-limit.test.ts b/apps/cloud/src/engine/execution-rate-limit.test.ts
new file mode 100644
index 000000000..3a88cd3a9
--- /dev/null
+++ b/apps/cloud/src/engine/execution-rate-limit.test.ts
@@ -0,0 +1,189 @@
+import { describe, expect, it } from "@effect/vitest";
+import { Effect } from "effect";
+
+import type { ExecutionEngine } from "@executor-js/execution";
+
+import { RATE_LIMIT_BLOCKED_MESSAGE, makeExecutionRateLimiter } from "./execution-rate-limit";
+
+// In-memory stand-in for the counter DO: same fixed-window semantics, one
+// { windowId, count } record per org.
+const makeFakeCounter = () => {
+ const windows = new Map();
+ const state = { calls: 0 };
+ const increment = (organizationId: string, windowId: number) =>
+ Effect.sync(() => {
+ state.calls += 1;
+ const stored = windows.get(organizationId);
+ const count = stored && stored.windowId === windowId ? stored.count + 1 : 1;
+ windows.set(organizationId, { windowId, count });
+ return count;
+ });
+ return { increment, state };
+};
+
+const makeFakeEngine = () => {
+ const calls = { execute: 0, resume: 0 };
+ const engine: ExecutionEngine = {
+ execute: () =>
+ Effect.sync(() => {
+ calls.execute += 1;
+ return { result: "ok" };
+ }),
+ executeWithPause: () =>
+ Effect.succeed({ status: "completed", result: { result: "ok" } } as const),
+ resume: () =>
+ Effect.sync(() => {
+ calls.resume += 1;
+ return { status: "completed", result: { result: "resumed" } } as const;
+ }),
+ getPausedExecution: () => Effect.succeed(null),
+ pausedExecutionCount: () => Effect.succeed(0),
+ hasPausedExecutions: () => Effect.succeed(false),
+ getDescription: Effect.succeed("fake"),
+ };
+ return { engine, calls };
+};
+
+const onElicitation = () => Effect.succeed({ action: "accept" as const });
+
+describe("execution rate limiter", () => {
+ it.effect("allows executions under the limit", () =>
+ Effect.gen(function* () {
+ const counter = makeFakeCounter();
+ const limiter = makeExecutionRateLimiter(counter.increment, { limit: 3 });
+ const { engine, calls } = makeFakeEngine();
+ const limited = limiter.decorate("org_ok", engine);
+
+ for (let i = 0; i < 3; i++) {
+ const result = yield* limited.execute("code", { onElicitation });
+ expect(result).toEqual({ result: "ok" });
+ }
+
+ expect(calls.execute).toBe(3);
+ expect(counter.state.calls).toBe(3);
+ }),
+ );
+
+ it.effect("blocks the call after the limit with the backstop message", () =>
+ Effect.gen(function* () {
+ const counter = makeFakeCounter();
+ const limiter = makeExecutionRateLimiter(counter.increment, { limit: 2 });
+ const { engine, calls } = makeFakeEngine();
+ const limited = limiter.decorate("org_hot", engine);
+
+ yield* limited.execute("code", { onElicitation });
+ yield* limited.execute("code", { onElicitation });
+ const blocked = yield* limited.execute("code", { onElicitation });
+
+ expect(blocked).toEqual({ result: null, error: RATE_LIMIT_BLOCKED_MESSAGE });
+ expect(calls.execute).toBe(2); // the third never reached the engine
+ }),
+ );
+
+ it.effect("check fails with the typed ExecutionRateLimitExceededError over the limit", () =>
+ Effect.gen(function* () {
+ const counter = makeFakeCounter();
+ const limiter = makeExecutionRateLimiter(counter.increment, { limit: 1 });
+
+ yield* limiter.check("org_hot");
+ const error = yield* Effect.flip(limiter.check("org_hot"));
+
+ expect(error._tag).toBe("ExecutionRateLimitExceededError");
+ expect(error.organizationId).toBe("org_hot");
+ expect(error.message).toBe(RATE_LIMIT_BLOCKED_MESSAGE);
+ }),
+ );
+
+ it.effect("resets when the fixed window rolls over", () =>
+ Effect.gen(function* () {
+ const counter = makeFakeCounter();
+ let nowMs = 0;
+ const limiter = makeExecutionRateLimiter(counter.increment, {
+ limit: 1,
+ windowMs: 1_000,
+ now: () => nowMs,
+ });
+ const { engine, calls } = makeFakeEngine();
+ const limited = limiter.decorate("org_windowed", engine);
+
+ yield* limited.execute("code", { onElicitation });
+ const blocked = yield* limited.execute("code", { onElicitation });
+ expect(blocked).toEqual({ result: null, error: RATE_LIMIT_BLOCKED_MESSAGE });
+
+ nowMs = 1_000; // next fixed window
+ const afterReset = yield* limited.execute("code", { onElicitation });
+
+ expect(afterReset).toEqual({ result: "ok" });
+ expect(calls.execute).toBe(2);
+ }),
+ );
+
+ it.effect("fails open when the counter errors (increment attempted, execution runs)", () =>
+ Effect.gen(function* () {
+ const state = { calls: 0 };
+ const limiter = makeExecutionRateLimiter(() =>
+ Effect.suspend(() => {
+ state.calls += 1;
+ return Effect.fail(new Error("counter DO unreachable"));
+ }),
+ );
+ const { engine, calls } = makeFakeEngine();
+ const limited = limiter.decorate("org_do_down", engine);
+
+ const result = yield* limited.execute("code", { onElicitation });
+
+ expect(state.calls).toBe(1); // the increment WAS attempted
+ expect(result).toEqual({ result: "ok" }); // and the execution still ran
+ expect(calls.execute).toBe(1);
+ }),
+ );
+
+ // Live clock: the timeout budget is a real timer here (10ms).
+ it.live("fails open when the counter exceeds its timeout", () =>
+ Effect.gen(function* () {
+ const limiter = makeExecutionRateLimiter(() => Effect.never, { timeoutMs: 10 });
+ const { engine, calls } = makeFakeEngine();
+ const limited = limiter.decorate("org_do_slow", engine);
+
+ const result = yield* limited.execute("code", { onElicitation });
+
+ expect(result).toEqual({ result: "ok" });
+ expect(calls.execute).toBe(1);
+ }),
+ );
+
+ it.effect("never gates resume, even over the limit", () =>
+ Effect.gen(function* () {
+ const counter = makeFakeCounter();
+ const limiter = makeExecutionRateLimiter(counter.increment, { limit: 1 });
+ const { engine, calls } = makeFakeEngine();
+ const limited = limiter.decorate("org_hot", engine);
+
+ yield* limited.execute("code", { onElicitation });
+ yield* limited.execute("code", { onElicitation }); // now over the limit
+
+ const outcome = yield* limited.resume("exec_1", { action: "accept" });
+
+ expect(outcome).toEqual({ status: "completed", result: { result: "resumed" } });
+ expect(calls.resume).toBe(1);
+ }),
+ );
+
+ it.effect("counts organizations independently", () =>
+ Effect.gen(function* () {
+ const counter = makeFakeCounter();
+ const limiter = makeExecutionRateLimiter(counter.increment, { limit: 1 });
+ const a = makeFakeEngine();
+ const b = makeFakeEngine();
+ const limitedA = limiter.decorate("org_a", a.engine);
+ const limitedB = limiter.decorate("org_b", b.engine);
+
+ yield* limitedA.execute("code", { onElicitation });
+ const blockedA = yield* limitedA.execute("code", { onElicitation });
+ const freshB = yield* limitedB.execute("code", { onElicitation });
+
+ expect(blockedA).toEqual({ result: null, error: RATE_LIMIT_BLOCKED_MESSAGE });
+ expect(freshB).toEqual({ result: "ok" });
+ }),
+ );
+});
From 3b4b9191e91e90c4be23f25ba39185ef6c1bd3ee Mon Sep 17 00:00:00 2001
From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com>
Date: Thu, 2 Jul 2026 13:04:38 -0700
Subject: [PATCH 6/6] Use typed test errors and instanceof assertions in gate
tests
---
apps/cloud/src/engine/execution-gate.test.ts | 18 ++++++++++++------
.../src/engine/execution-rate-limit.test.ts | 16 +++++++++++-----
2 files changed, 23 insertions(+), 11 deletions(-)
diff --git a/apps/cloud/src/engine/execution-gate.test.ts b/apps/cloud/src/engine/execution-gate.test.ts
index cb61304f8..4b8309b11 100644
--- a/apps/cloud/src/engine/execution-gate.test.ts
+++ b/apps/cloud/src/engine/execution-gate.test.ts
@@ -1,9 +1,16 @@
import { describe, expect, it } from "@effect/vitest";
-import { Effect } from "effect";
+import { Data, Effect } from "effect";
import type { ExecutionEngine } from "@executor-js/execution";
-import { EXECUTION_LIMIT_BLOCKED_MESSAGE, makeExecutionLimitGate } from "./execution-gate";
+import {
+ EXECUTION_LIMIT_BLOCKED_MESSAGE,
+ ExecutionLimitReachedError,
+ makeExecutionLimitGate,
+} from "./execution-gate";
+
+// Stand-in for an upstream billing failure (Autumn down, network error).
+class FakeUpstreamError extends Data.TaggedError("FakeUpstreamError")<{}> {}
// Minimal engine fake: records calls, always completes successfully. The gate
// must never let a blocked execution reach it.
@@ -102,16 +109,15 @@ describe("execution balance gate", () => {
const error = yield* Effect.flip(gate.check("org_blocked"));
- expect(error._tag).toBe("ExecutionLimitReachedError");
+ expect(error).toBeInstanceOf(ExecutionLimitReachedError);
expect(error.organizationId).toBe("org_blocked");
- expect(error.message).toBe(EXECUTION_LIMIT_BLOCKED_MESSAGE);
}),
);
it.effect("fails open when the billing service errors (check attempted, execution runs)", () =>
Effect.gen(function* () {
const { engine, calls } = makeFakeEngine();
- const balance = makeBalanceCheck(() => Effect.fail(new Error("autumn down")));
+ const balance = makeBalanceCheck(() => Effect.fail(new FakeUpstreamError()));
const gate = makeExecutionLimitGate(balance.check);
const gated = gate.decorate("org_erroring", engine);
@@ -179,7 +185,7 @@ describe("execution balance gate", () => {
const balance = makeBalanceCheck(() => {
if (failNext) {
failNext = false;
- return Effect.fail(new Error("transient"));
+ return Effect.fail(new FakeUpstreamError());
}
return Effect.succeed({ allowed: false });
});
diff --git a/apps/cloud/src/engine/execution-rate-limit.test.ts b/apps/cloud/src/engine/execution-rate-limit.test.ts
index 3a88cd3a9..260ccc150 100644
--- a/apps/cloud/src/engine/execution-rate-limit.test.ts
+++ b/apps/cloud/src/engine/execution-rate-limit.test.ts
@@ -1,9 +1,16 @@
import { describe, expect, it } from "@effect/vitest";
-import { Effect } from "effect";
+import { Data, Effect } from "effect";
import type { ExecutionEngine } from "@executor-js/execution";
-import { RATE_LIMIT_BLOCKED_MESSAGE, makeExecutionRateLimiter } from "./execution-rate-limit";
+import {
+ RATE_LIMIT_BLOCKED_MESSAGE,
+ ExecutionRateLimitExceededError,
+ makeExecutionRateLimiter,
+} from "./execution-rate-limit";
+
+// Stand-in for a counter-DO failure (unreachable, storage error).
+class FakeCounterError extends Data.TaggedError("FakeCounterError")<{}> {}
// In-memory stand-in for the counter DO: same fixed-window semantics, one
// { windowId, count } record per org.
@@ -88,9 +95,8 @@ describe("execution rate limiter", () => {
yield* limiter.check("org_hot");
const error = yield* Effect.flip(limiter.check("org_hot"));
- expect(error._tag).toBe("ExecutionRateLimitExceededError");
+ expect(error).toBeInstanceOf(ExecutionRateLimitExceededError);
expect(error.organizationId).toBe("org_hot");
- expect(error.message).toBe(RATE_LIMIT_BLOCKED_MESSAGE);
}),
);
@@ -124,7 +130,7 @@ describe("execution rate limiter", () => {
const limiter = makeExecutionRateLimiter(() =>
Effect.suspend(() => {
state.calls += 1;
- return Effect.fail(new Error("counter DO unreachable"));
+ return Effect.fail(new FakeCounterError());
}),
);
const { engine, calls } = makeFakeEngine();