diff --git a/.changeset/executor-cache-primitive.md b/.changeset/executor-cache-primitive.md new file mode 100644 index 000000000..f53672ef1 --- /dev/null +++ b/.changeset/executor-cache-primitive.md @@ -0,0 +1,5 @@ +--- +"executor": patch +--- + +Add `executor.cache`, a host-pluggable key-value cache on the SDK surface: `ExecutorConfig.cache` accepts an Effect KeyValueStore (Cloudflare KV adapter included via `@executor-js/cloudflare/key-value-store`), with a bounded in-memory TTL fallback otherwise. diff --git a/apps/cloud/src/engine/execution-stack.ts b/apps/cloud/src/engine/execution-stack.ts index 6a245788b..69b1a5dda 100644 --- a/apps/cloud/src/engine/execution-stack.ts +++ b/apps/cloud/src/engine/execution-stack.ts @@ -42,6 +42,7 @@ import { PluginsProvider, collectTables, } from "@executor-js/api/server"; +import { layerCloudflareKeyValueStore } from "@executor-js/cloudflare/key-value-store"; import { makeDynamicWorkerExecutor } from "@executor-js/runtime-dynamic-worker"; import executorConfig from "../../executor.config"; @@ -104,6 +105,16 @@ export const CloudCodeExecutorProvider: Layer.Layer = Laye * exported so that overlay builds over the SAME four seams. There is no neutral * no-op-decorator variant anymore: every cloud execution meters. */ +// Durable `executor.cache` backend (wrangler.jsonc `kv_namespaces`). Typed as +// `Layer` because the binding is optional: test workers / older local +// setups run without it, and `makeScopedExecutor` reads the service with +// `Effect.serviceOption` — absent, the executor falls back to its in-memory +// cache (which on per-request cloud executors means effectively no caching: +// correct, just cold). +export const CloudCacheLayer: Layer.Layer = env.CACHE + ? layerCloudflareKeyValueStore(env.CACHE) + : Layer.empty; + export const CloudExecutionSeamsLayer: Layer.Layer< DbProvider | PluginsProvider | HostConfig | CodeExecutorProvider, never, @@ -113,4 +124,5 @@ export const CloudExecutionSeamsLayer: Layer.Layer< CloudPluginsProvider, CloudHostConfig, CloudCodeExecutorProvider, + CloudCacheLayer, ); diff --git a/apps/cloud/src/env-augment.d.ts b/apps/cloud/src/env-augment.d.ts index 8911a8303..80245894c 100644 --- a/apps/cloud/src/env-augment.d.ts +++ b/apps/cloud/src/env-augment.d.ts @@ -28,6 +28,12 @@ declare global { // what `@executor-js/cloudflare/blob-store` accepts. BLOBS?: import("@cloudflare/workers-types").R2Bucket; + // `executor.cache` backend (wrangler.jsonc `kv_namespaces`). Optional for + // the same reason as BLOBS: absent, the executor uses its in-memory + // fallback. Typed via @cloudflare/workers-types to match what + // `@executor-js/cloudflare/key-value-store` accepts. + CACHE?: import("@cloudflare/workers-types").KVNamespace; + // SSRF / private-network egress guard. Unset in production -> the guard is // ON; the test workers set "true" so fixtures can reach localhost. ALLOW_LOCAL_NETWORK?: string; diff --git a/apps/cloud/wrangler.jsonc b/apps/cloud/wrangler.jsonc index 68ca6297a..b94749395 100644 --- a/apps/cloud/wrangler.jsonc +++ b/apps/cloud/wrangler.jsonc @@ -64,6 +64,17 @@ "bucket_name": "executor-cloud-blobs", }, ], + // `executor.cache` backend (derived-data cache; eventually consistent). The + // code path is guarded (engine/execution-stack.ts reads `env.CACHE` + // optionally), so shipping without the binding just means the in-memory + // fallback. To enable: `wrangler kv namespace create executor-cloud-cache`, + // then uncomment with the returned id. + // "kv_namespaces": [ + // { + // "binding": "CACHE", + // "id": "", + // }, + // ], "placement": { "region": "aws:us-east-1", }, diff --git a/packages/core/api/src/server/scoped-executor.ts b/packages/core/api/src/server/scoped-executor.ts index 4164bc7d3..c8484d596 100644 --- a/packages/core/api/src/server/scoped-executor.ts +++ b/packages/core/api/src/server/scoped-executor.ts @@ -32,6 +32,7 @@ // --------------------------------------------------------------------------- import { Context, Effect, Option } from "effect"; +import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore"; import type { McpResource } from "@executor-js/host-mcp"; import { @@ -213,6 +214,11 @@ export const makeScopedExecutor = < const { db, blobs } = yield* DbProvider; const { plugins: pluginsFactory } = yield* PluginsProvider; const config = yield* HostConfig; + // Optional durable cache seam: a host boot layer that provides an Effect + // KeyValueStore (e.g. Cloudflare KV) makes it `executor.cache`; absent one + // the executor's bounded in-memory fallback applies. Read optionally so it + // never enters the `R` channel for hosts without a KV tier. + const cache = yield* Effect.serviceOption(KeyValueStore.KeyValueStore); // Explicit config wins; otherwise fall back to the request origin if a host // provided one (HTTP middleware / MCP session DO). Stays `undefined` for // non-request callers — `coreTools.webBaseUrl` is optional and only the @@ -263,6 +269,10 @@ export const makeScopedExecutor = < subject: Subject.make(accountId), db, blobs, + ...Option.match(cache, { + onNone: () => ({}), + onSome: (store) => ({ cache: store }), + }), plugins, httpClientLayer, fetch: hostedFetch, diff --git a/packages/core/sdk/src/executor-cache.test.ts b/packages/core/sdk/src/executor-cache.test.ts new file mode 100644 index 000000000..a0a16f830 --- /dev/null +++ b/packages/core/sdk/src/executor-cache.test.ts @@ -0,0 +1,91 @@ +// --------------------------------------------------------------------------- +// `executor.cache` — the SDK cache seam. A host-provided KeyValueStore wins; +// absent one the executor falls back to a bounded in-memory store with TTL +// expiry and LRU eviction. The fallback is Clock-based, so TTL is pinned with +// the virtual TestClock instead of patching Date.now. +// --------------------------------------------------------------------------- + +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; +import { TestClock } from "effect/testing"; +import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore"; + +import { createExecutor } from "./executor"; +import { makeTestConfig } from "./test-config"; + +const makeExecutor = (cache?: KeyValueStore.KeyValueStore) => + createExecutor(makeTestConfig(cache !== undefined ? { cache } : {})); + +describe("executor cache", () => { + it.effect("falls back to an in-memory store when no cache is configured", () => + Effect.scoped( + Effect.gen(function* () { + const executor = yield* makeExecutor(); + + yield* executor.cache.set("a", "value"); + expect(yield* executor.cache.get("a")).toBe("value"); + + yield* executor.cache.remove("a"); + expect(yield* executor.cache.get("a")).toBeUndefined(); + }), + ), + ); + + it.effect("prefers a host-provided store", () => + Effect.scoped( + Effect.gen(function* () { + const backing = new Map(); + const hostStore = KeyValueStore.makeStringOnly({ + get: (key) => Effect.sync(() => backing.get(key)), + set: (key, value) => Effect.sync(() => void backing.set(key, value)), + remove: (key) => Effect.sync(() => void backing.delete(key)), + clear: Effect.sync(() => backing.clear()), + size: Effect.sync(() => backing.size), + }); + const executor = yield* makeExecutor(hostStore); + + yield* executor.cache.set("a", "value"); + expect(backing.get("a")).toBe("value"); + }), + ), + ); + + it.effect("expires fallback entries by TTL", () => + Effect.scoped( + Effect.gen(function* () { + const executor = yield* makeExecutor(); + + yield* executor.cache.set("a", "value"); + expect(yield* executor.cache.size).toBe(1); + + yield* TestClock.adjust("10 minutes"); + + expect(yield* executor.cache.get("a")).toBeUndefined(); + expect(yield* executor.cache.size).toBe(0); + }), + ), + ); + + it.effect("evicts the least recently used key at capacity", () => + Effect.scoped( + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const capacity = 2_048; + + yield* executor.cache.set("a", "old"); + for (let index = 0; index < capacity - 1; index += 1) { + yield* executor.cache.set(`key-${index}`, String(index)); + } + + // Re-writing "a" refreshes its LRU position, so the overflow evicts + // the oldest untouched key instead. + yield* executor.cache.set("a", "new"); + yield* executor.cache.set("overflow", "value"); + + expect(yield* executor.cache.get("a")).toBe("new"); + expect(yield* executor.cache.get("key-0")).toBeUndefined(); + expect(yield* executor.cache.get("key-1")).toBe("1"); + }), + ), + ); +}); diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 530462a8a..67d05f7e2 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -1,5 +1,6 @@ -import { Effect, Inspectable, Layer, Option, Predicate, Schema } from "effect"; +import { Clock, Effect, Inspectable, Layer, Option, Predicate, Schema } from "effect"; import { FetchHttpClient, type HttpClient } from "effect/unstable/http"; +import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore"; import { fumadb } from "@executor-js/fumadb"; import { memoryAdapter } from "@executor-js/fumadb/adapters/memory"; import { withQueryContext, type Condition, type ConditionBuilder } from "@executor-js/fumadb/query"; @@ -326,6 +327,14 @@ export type Executor = { ) => Effect.Effect; readonly close: () => Effect.Effect; + + /** Ephemeral key-value cache for derived data. Host-provided when durable + * storage exists (e.g. Cloudflare KV via `ExecutorConfig.cache`); otherwise + * a bounded in-memory TTL store scoped to this executor. Consumers must + * treat entries as best-effort and re-derivable — a cold cache is always + * correct — and must NOT key anything tenant-sensitive without prefixing, + * since a host-provided namespace may outlive and span executors. */ + readonly cache: KeyValueStore.KeyValueStore; } & PluginExtensions; export interface ExecutorDb { @@ -353,6 +362,13 @@ export interface ExecutorConfig isStorageFailure(cause) ? cause : new StorageError({ message, cause }); +// --------------------------------------------------------------------------- +// Default `executor.cache` backend — a bounded in-memory string store with TTL +// expiry and LRU eviction (Map iteration order is insertion order; touched +// keys are re-inserted). Deliberately modest: it exists so cache consumers can +// be written against one interface everywhere, while hosts with real KV +// (Cloudflare) swap in a durable backend via `ExecutorConfig.cache`. +// --------------------------------------------------------------------------- + +const MEMORY_CACHE_CAPACITY = 2_048; +const MEMORY_CACHE_TTL_MS = 10 * 60 * 1000; + +const makeMemoryCacheStore = (): KeyValueStore.KeyValueStore => { + const rows = new Map(); + const evictExpired = (now: number): void => { + for (const [key, entry] of rows) { + if (entry.expiresAt <= now) rows.delete(key); + } + }; + const evictOverCapacity = (): void => { + while (rows.size > MEMORY_CACHE_CAPACITY) { + const oldest = rows.keys().next().value; + if (oldest === undefined) break; + rows.delete(oldest); + } + }; + return KeyValueStore.makeStringOnly({ + get: (key) => + Effect.map(Clock.currentTimeMillis, (now) => { + const entry = rows.get(key); + if (entry === undefined) return undefined; + if (entry.expiresAt <= now) { + rows.delete(key); + return undefined; + } + // LRU touch: move to the back of the insertion order. + rows.delete(key); + rows.set(key, entry); + return entry.value; + }), + set: (key, value) => + Effect.map(Clock.currentTimeMillis, (now) => { + evictExpired(now); + rows.delete(key); + rows.set(key, { value, expiresAt: now + MEMORY_CACHE_TTL_MS }); + evictOverCapacity(); + }), + remove: (key) => + Effect.sync(() => { + rows.delete(key); + }), + clear: Effect.sync(() => { + rows.clear(); + }), + size: Effect.map(Clock.currentTimeMillis, (now) => { + evictExpired(now); + return rows.size; + }), + }); +}; + const pluginStorageFailure = (pluginId: string, hook: string, cause: unknown): StorageFailure => storageFailureFromUnknown(`${hook} failed for plugin ${pluginId}`, cause); @@ -1286,6 +1362,7 @@ export const createExecutor = (effect: Effect.Effect) => fuma.transaction(effect); // Populated once, never mutated after startup. @@ -3479,6 +3556,7 @@ export const createExecutor = => value as Executor; diff --git a/packages/core/sdk/src/test-config.ts b/packages/core/sdk/src/test-config.ts index bce9392b4..a9b91421f 100644 --- a/packages/core/sdk/src/test-config.ts +++ b/packages/core/sdk/src/test-config.ts @@ -118,6 +118,9 @@ export type TestConfigOptions["coreTools"]; + /** Host-style cache backend override; omitted tests get the executor's + * in-memory fallback. */ + readonly cache?: ExecutorConfig["cache"]; /** Override the OAuth callback URL. Pass `null` to construct an executor with * no OAuth callback (exercises the fail-loud redirect path). */ readonly redirectUri?: string | null; @@ -156,6 +159,7 @@ export const makeTestConfig = ; + readonly maxConcurrentDeletes: () => number; +} => { + const values = new Map(); + let activeDeletes = 0; + let maxActiveDeletes = 0; + + // oxlint-disable-next-line executor/no-double-cast -- test double: only the KV slice the adapter calls is implemented + const kv = { + get: async (key: string) => values.get(key) ?? null, + put: async (key: string, value: string) => { + values.set(key, value); + }, + delete: async (key: string) => { + activeDeletes += 1; + maxActiveDeletes = Math.max(maxActiveDeletes, activeDeletes); + await new Promise((resolve) => setTimeout(resolve, 0)); + values.delete(key); + activeDeletes -= 1; + }, + list: async (options?: { readonly cursor?: string }) => { + const offset = options?.cursor === undefined ? 0 : Number(options.cursor); + const keys = [...values.keys()].sort().slice(offset, offset + pageSize); + const nextOffset = offset + pageSize; + const listComplete = nextOffset >= values.size; + return { + keys: keys.map((name) => ({ name })), + list_complete: listComplete, + cursor: listComplete ? "" : String(nextOffset), + }; + }, + } as unknown as KVNamespace; + + return { + kv, + values, + maxConcurrentDeletes: () => maxActiveDeletes, + }; +}; + +describe("makeCloudflareKeyValueStore", () => { + it.effect("round-trips string values", () => + Effect.gen(function* () { + const { kv } = makeFakeKv(10); + const store = makeCloudflareKeyValueStore(kv); + + yield* store.set("a", "value"); + expect(yield* store.get("a")).toBe("value"); + + yield* store.remove("a"); + expect(yield* store.get("a")).toBeUndefined(); + }), + ); + + it.effect("counts keys across list pages", () => + Effect.gen(function* () { + const { values, kv } = makeFakeKv(2); + values.set("a", "1"); + values.set("b", "2"); + values.set("c", "3"); + + const store = makeCloudflareKeyValueStore(kv); + expect(yield* store.size).toBe(3); + }), + ); + + it.effect("clears paginated keys in bounded parallel batches", () => + Effect.gen(function* () { + const { values, kv, maxConcurrentDeletes } = makeFakeKv(25); + for (let index = 0; index < 75; index += 1) { + values.set(`key-${index.toString().padStart(2, "0")}`, String(index)); + } + + const store = makeCloudflareKeyValueStore(kv); + yield* store.clear; + + expect(values.size).toBe(0); + expect(maxConcurrentDeletes()).toBeGreaterThan(1); + expect(maxConcurrentDeletes()).toBeLessThanOrEqual(50); + }), + ); +}); diff --git a/packages/hosts/cloudflare/src/key-value-store.ts b/packages/hosts/cloudflare/src/key-value-store.ts new file mode 100644 index 000000000..d4003e503 --- /dev/null +++ b/packages/hosts/cloudflare/src/key-value-store.ts @@ -0,0 +1,83 @@ +// --------------------------------------------------------------------------- +// Effect KeyValueStore over a Cloudflare KV namespace — the durable backend +// for `executor.cache` on the Cloudflare hosts. The SDK owns the cache seam +// (`ExecutorConfig.cache`, with an in-memory fallback); this binding lives +// here so the SDK stays platform-agnostic, mirroring blob-store.ts for R2. +// +// KV is eventually consistent across edge locations — fine for a cache +// (consumers must treat entries as best-effort), wrong for anything needing +// read-after-write. `size`/`clear` paginate the whole namespace and exist for +// interface completeness; production consumers should not call them on a +// namespace of any size. +// --------------------------------------------------------------------------- + +import { Effect, Layer } from "effect"; +import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore"; +import type { KVNamespace } from "@cloudflare/workers-types"; + +const KV_DELETE_CONCURRENCY = 50; + +const storeError = (method: string, key: string | undefined) => (cause: unknown) => + new KeyValueStore.KeyValueStoreError({ + method, + ...(key !== undefined ? { key } : {}), + message: `Cloudflare KV ${method} failed`, + cause, + }); + +const listAllKeys = async (kv: KVNamespace): Promise => { + const keys: string[] = []; + let cursor: string | undefined; + do { + const page = await kv.list(cursor === undefined ? {} : { cursor }); + keys.push(...page.keys.map((key) => key.name)); + cursor = page.list_complete ? undefined : page.cursor; + } while (cursor !== undefined); + return keys; +}; + +const deleteAllKeys = async (kv: KVNamespace, keys: readonly string[]): Promise => { + for (let index = 0; index < keys.length; index += KV_DELETE_CONCURRENCY) { + await Promise.all( + keys.slice(index, index + KV_DELETE_CONCURRENCY).map((key) => kv.delete(key)), + ); + } +}; + +export const makeCloudflareKeyValueStore = (kv: KVNamespace): KeyValueStore.KeyValueStore => + KeyValueStore.makeStringOnly({ + get: (key) => + Effect.tryPromise({ + try: async () => (await kv.get(key)) ?? undefined, + catch: storeError("get", key), + }), + set: (key, value) => + Effect.tryPromise({ + try: () => kv.put(key, value), + catch: storeError("set", key), + }), + remove: (key) => + Effect.tryPromise({ + try: () => kv.delete(key), + catch: storeError("remove", key), + }), + clear: Effect.tryPromise({ + try: async (signal) => { + void signal; + await deleteAllKeys(kv, await listAllKeys(kv)); + }, + catch: storeError("clear", undefined), + }), + size: Effect.tryPromise({ + try: async () => (await listAllKeys(kv)).length, + catch: storeError("size", undefined), + }), + }); + +/** Boot-layer form: provide the KV namespace as the ambient + * `KeyValueStore.KeyValueStore` service `makeScopedExecutor` reads + * optionally and threads into `createExecutor({ cache })`. */ +export const layerCloudflareKeyValueStore = ( + kv: KVNamespace, +): Layer.Layer => + Layer.succeed(KeyValueStore.KeyValueStore, makeCloudflareKeyValueStore(kv));