Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/executor-cache-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"executor": patch
---

Add `executor.cache`, a host-pluggable key-value cache on the SDK surface: `ExecutorConfig.cache` accepts an Effect KeyValueStore (Cloudflare KV adapter included via `@executor-js/cloudflare/key-value-store`), with a bounded in-memory TTL fallback otherwise.
12 changes: 12 additions & 0 deletions apps/cloud/src/engine/execution-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
PluginsProvider,
collectTables,
} from "@executor-js/api/server";
import { layerCloudflareKeyValueStore } from "@executor-js/cloudflare/key-value-store";
import { makeDynamicWorkerExecutor } from "@executor-js/runtime-dynamic-worker";

import executorConfig from "../../executor.config";
Expand Down Expand Up @@ -104,6 +105,16 @@ export const CloudCodeExecutorProvider: Layer.Layer<CodeExecutorProvider> = Laye
* exported so that overlay builds over the SAME four seams. There is no neutral
* no-op-decorator variant anymore: every cloud execution meters.
*/
// Durable `executor.cache` backend (wrangler.jsonc `kv_namespaces`). Typed as
// `Layer<never>` because the binding is optional: test workers / older local
// setups run without it, and `makeScopedExecutor` reads the service with
// `Effect.serviceOption` — absent, the executor falls back to its in-memory
// cache (which on per-request cloud executors means effectively no caching:
// correct, just cold).
export const CloudCacheLayer: Layer.Layer<never> = env.CACHE
? layerCloudflareKeyValueStore(env.CACHE)
: Layer.empty;

export const CloudExecutionSeamsLayer: Layer.Layer<
DbProvider | PluginsProvider | HostConfig | CodeExecutorProvider,
never,
Expand All @@ -113,4 +124,5 @@ export const CloudExecutionSeamsLayer: Layer.Layer<
CloudPluginsProvider,
CloudHostConfig,
CloudCodeExecutorProvider,
CloudCacheLayer,
);
6 changes: 6 additions & 0 deletions apps/cloud/src/env-augment.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ declare global {
// what `@executor-js/cloudflare/blob-store` accepts.
BLOBS?: import("@cloudflare/workers-types").R2Bucket;

// `executor.cache` backend (wrangler.jsonc `kv_namespaces`). Optional for
// the same reason as BLOBS: absent, the executor uses its in-memory
// fallback. Typed via @cloudflare/workers-types to match what
// `@executor-js/cloudflare/key-value-store` accepts.
CACHE?: import("@cloudflare/workers-types").KVNamespace;

// SSRF / private-network egress guard. Unset in production -> the guard is
// ON; the test workers set "true" so fixtures can reach localhost.
ALLOW_LOCAL_NETWORK?: string;
Expand Down
11 changes: 11 additions & 0 deletions apps/cloud/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@
"bucket_name": "executor-cloud-blobs",
},
],
// `executor.cache` backend (derived-data cache; eventually consistent). The
// code path is guarded (engine/execution-stack.ts reads `env.CACHE`
// optionally), so shipping without the binding just means the in-memory
// fallback. To enable: `wrangler kv namespace create executor-cloud-cache`,
// then uncomment with the returned id.
// "kv_namespaces": [
// {
// "binding": "CACHE",
// "id": "<id from wrangler kv namespace create>",
// },
// ],
"placement": {
"region": "aws:us-east-1",
},
Expand Down
10 changes: 10 additions & 0 deletions packages/core/api/src/server/scoped-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
// ---------------------------------------------------------------------------

import { Context, Effect, Option } from "effect";
import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore";

import type { McpResource } from "@executor-js/host-mcp";
import {
Expand Down Expand Up @@ -213,6 +214,11 @@ export const makeScopedExecutor = <
const { db, blobs } = yield* DbProvider;
const { plugins: pluginsFactory } = yield* PluginsProvider;
const config = yield* HostConfig;
// Optional durable cache seam: a host boot layer that provides an Effect
// KeyValueStore (e.g. Cloudflare KV) makes it `executor.cache`; absent one
// the executor's bounded in-memory fallback applies. Read optionally so it
// never enters the `R` channel for hosts without a KV tier.
const cache = yield* Effect.serviceOption(KeyValueStore.KeyValueStore);
// Explicit config wins; otherwise fall back to the request origin if a host
// provided one (HTTP middleware / MCP session DO). Stays `undefined` for
// non-request callers — `coreTools.webBaseUrl` is optional and only the
Expand Down Expand Up @@ -263,6 +269,10 @@ export const makeScopedExecutor = <
subject: Subject.make(accountId),
db,
blobs,
...Option.match(cache, {
onNone: () => ({}),
onSome: (store) => ({ cache: store }),
}),
plugins,
httpClientLayer,
fetch: hostedFetch,
Expand Down
91 changes: 91 additions & 0 deletions packages/core/sdk/src/executor-cache.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// ---------------------------------------------------------------------------
// `executor.cache` — the SDK cache seam. A host-provided KeyValueStore wins;
// absent one the executor falls back to a bounded in-memory store with TTL
// expiry and LRU eviction. The fallback is Clock-based, so TTL is pinned with
// the virtual TestClock instead of patching Date.now.
// ---------------------------------------------------------------------------

import { describe, expect, it } from "@effect/vitest";
import { Effect } from "effect";
import { TestClock } from "effect/testing";
import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore";

import { createExecutor } from "./executor";
import { makeTestConfig } from "./test-config";

const makeExecutor = (cache?: KeyValueStore.KeyValueStore) =>
createExecutor(makeTestConfig(cache !== undefined ? { cache } : {}));

describe("executor cache", () => {
it.effect("falls back to an in-memory store when no cache is configured", () =>
Effect.scoped(
Effect.gen(function* () {
const executor = yield* makeExecutor();

yield* executor.cache.set("a", "value");
expect(yield* executor.cache.get("a")).toBe("value");

yield* executor.cache.remove("a");
expect(yield* executor.cache.get("a")).toBeUndefined();
}),
),
);

it.effect("prefers a host-provided store", () =>
Effect.scoped(
Effect.gen(function* () {
const backing = new Map<string, string>();
const hostStore = KeyValueStore.makeStringOnly({
get: (key) => Effect.sync(() => backing.get(key)),
set: (key, value) => Effect.sync(() => void backing.set(key, value)),
remove: (key) => Effect.sync(() => void backing.delete(key)),
clear: Effect.sync(() => backing.clear()),
size: Effect.sync(() => backing.size),
});
const executor = yield* makeExecutor(hostStore);

yield* executor.cache.set("a", "value");
expect(backing.get("a")).toBe("value");
}),
),
);

it.effect("expires fallback entries by TTL", () =>
Effect.scoped(
Effect.gen(function* () {
const executor = yield* makeExecutor();

yield* executor.cache.set("a", "value");
expect(yield* executor.cache.size).toBe(1);

yield* TestClock.adjust("10 minutes");

expect(yield* executor.cache.get("a")).toBeUndefined();
expect(yield* executor.cache.size).toBe(0);
}),
),
);

it.effect("evicts the least recently used key at capacity", () =>
Effect.scoped(
Effect.gen(function* () {
const executor = yield* makeExecutor();
const capacity = 2_048;

yield* executor.cache.set("a", "old");
for (let index = 0; index < capacity - 1; index += 1) {
yield* executor.cache.set(`key-${index}`, String(index));
}

// Re-writing "a" refreshes its LRU position, so the overflow evicts
// the oldest untouched key instead.
yield* executor.cache.set("a", "new");
yield* executor.cache.set("overflow", "value");

expect(yield* executor.cache.get("a")).toBe("new");
expect(yield* executor.cache.get("key-0")).toBeUndefined();
expect(yield* executor.cache.get("key-1")).toBe("1");
}),
),
);
});
80 changes: 79 additions & 1 deletion packages/core/sdk/src/executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Effect, Inspectable, Layer, Option, Predicate, Schema } from "effect";
import { Clock, Effect, Inspectable, Layer, Option, Predicate, Schema } from "effect";
import { FetchHttpClient, type HttpClient } from "effect/unstable/http";
import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore";
import { fumadb } from "@executor-js/fumadb";
import { memoryAdapter } from "@executor-js/fumadb/adapters/memory";
import { withQueryContext, type Condition, type ConditionBuilder } from "@executor-js/fumadb/query";
Expand Down Expand Up @@ -326,6 +327,14 @@ export type Executor<TPlugins extends readonly AnyPlugin[] = readonly []> = {
) => Effect.Effect<unknown, ExecuteError>;

readonly close: () => Effect.Effect<void, StorageFailure>;

/** Ephemeral key-value cache for derived data. Host-provided when durable
* storage exists (e.g. Cloudflare KV via `ExecutorConfig.cache`); otherwise
* a bounded in-memory TTL store scoped to this executor. Consumers must
* treat entries as best-effort and re-derivable — a cold cache is always
* correct — and must NOT key anything tenant-sensitive without prefixing,
* since a host-provided namespace may outlive and span executors. */
readonly cache: KeyValueStore.KeyValueStore;
} & PluginExtensions<TPlugins>;

export interface ExecutorDb {
Expand Down Expand Up @@ -353,6 +362,13 @@ export interface ExecutorConfig<TPlugins extends readonly AnyPlugin[] = readonly
* values stay out of the relational tier.
*/
readonly blobs?: BlobStore;
/**
* Backend for `executor.cache`. Hosts with a durable KV tier hand one in
* (e.g. `makeCloudflareKeyValueStore` from `@executor-js/cloudflare/key-value-store`);
* without one the executor falls back to a bounded in-memory TTL store,
* which only helps within a single long-lived process.
*/
readonly cache?: KeyValueStore.KeyValueStore;
readonly plugins?: TPlugins;
/** Config-level credential providers, merged with every
* `plugin.credentialProviders`. Config providers register first, so the
Expand Down Expand Up @@ -427,6 +443,66 @@ const validateExecutorDbTables = (required: FumaTables, actual: FumaTables): voi
const storageFailureFromUnknown = (message: string, cause: unknown): StorageFailure =>
isStorageFailure(cause) ? cause : new StorageError({ message, cause });

// ---------------------------------------------------------------------------
// Default `executor.cache` backend — a bounded in-memory string store with TTL
// expiry and LRU eviction (Map iteration order is insertion order; touched
// keys are re-inserted). Deliberately modest: it exists so cache consumers can
// be written against one interface everywhere, while hosts with real KV
// (Cloudflare) swap in a durable backend via `ExecutorConfig.cache`.
// ---------------------------------------------------------------------------

const MEMORY_CACHE_CAPACITY = 2_048;
const MEMORY_CACHE_TTL_MS = 10 * 60 * 1000;

const makeMemoryCacheStore = (): KeyValueStore.KeyValueStore => {
const rows = new Map<string, { readonly value: string; readonly expiresAt: number }>();
const evictExpired = (now: number): void => {
for (const [key, entry] of rows) {
if (entry.expiresAt <= now) rows.delete(key);
}
};
const evictOverCapacity = (): void => {
while (rows.size > MEMORY_CACHE_CAPACITY) {
const oldest = rows.keys().next().value;
if (oldest === undefined) break;
rows.delete(oldest);
}
};
return KeyValueStore.makeStringOnly({
get: (key) =>
Effect.map(Clock.currentTimeMillis, (now) => {
const entry = rows.get(key);
if (entry === undefined) return undefined;
if (entry.expiresAt <= now) {
rows.delete(key);
return undefined;
}
// LRU touch: move to the back of the insertion order.
rows.delete(key);
rows.set(key, entry);
return entry.value;
}),
set: (key, value) =>
Effect.map(Clock.currentTimeMillis, (now) => {
evictExpired(now);
rows.delete(key);
rows.set(key, { value, expiresAt: now + MEMORY_CACHE_TTL_MS });
evictOverCapacity();
}),
remove: (key) =>
Effect.sync(() => {
rows.delete(key);
}),
clear: Effect.sync(() => {
rows.clear();
}),
size: Effect.map(Clock.currentTimeMillis, (now) => {
evictExpired(now);
return rows.size;
}),
});
};

const pluginStorageFailure = (pluginId: string, hook: string, cause: unknown): StorageFailure =>
storageFailureFromUnknown(`${hook} failed for plugin ${pluginId}`, cause);

Expand Down Expand Up @@ -1286,6 +1362,7 @@ export const createExecutor = <const TPlugins extends readonly AnyPlugin[] = rea
const fuma = makeFumaClient(rootDb);
const core = makeCoreDb(fuma);
const blobs = config.blobs ?? makeFumaBlobStore(fuma);
const cacheStore = config.cache ?? makeMemoryCacheStore();
const transaction = <A, E>(effect: Effect.Effect<A, E>) => fuma.transaction(effect);

// Populated once, never mutated after startup.
Expand Down Expand Up @@ -3479,6 +3556,7 @@ export const createExecutor = <const TPlugins extends readonly AnyPlugin[] = rea
},
execute,
close,
cache: cacheStore,
};

const toExecutor = (value: unknown): Executor<TPlugins> => value as Executor<TPlugins>;
Expand Down
4 changes: 4 additions & 0 deletions packages/core/sdk/src/test-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ export type TestConfigOptions<TPlugins extends readonly AnyPlugin[] = readonly [
readonly backend?: TestDatabaseBackend;
readonly dataDir?: string;
readonly coreTools?: ExecutorConfig<TPlugins>["coreTools"];
/** Host-style cache backend override; omitted tests get the executor's
* in-memory fallback. */
readonly cache?: ExecutorConfig<TPlugins>["cache"];
/** Override the OAuth callback URL. Pass `null` to construct an executor with
* no OAuth callback (exercises the fail-loud redirect path). */
readonly redirectUri?: string | null;
Expand Down Expand Up @@ -156,6 +159,7 @@ export const makeTestConfig = <const TPlugins extends readonly AnyPlugin[] = rea
db,
plugins: options?.plugins,
coreTools: options?.coreTools,
...(options?.cache !== undefined ? { cache: options.cache } : {}),
testDb,
// Tests default to auto-accepting elicitation prompts.
onElicitation: "accept-all",
Expand Down
4 changes: 4 additions & 0 deletions packages/hosts/cloudflare/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
"types": "./src/blob-store.ts",
"default": "./src/blob-store.ts"
},
"./key-value-store": {
"types": "./src/key-value-store.ts",
"default": "./src/key-value-store.ts"
},
"./mcp/do-headers": {
"types": "./src/mcp/do-headers.ts",
"default": "./src/mcp/do-headers.ts"
Expand Down
Loading
Loading