diff --git a/.changeset/spec-fetch-cache.md b/.changeset/spec-fetch-cache.md new file mode 100644 index 000000000..923d636f2 --- /dev/null +++ b/.changeset/spec-fetch-cache.md @@ -0,0 +1,5 @@ +--- +"executor": patch +--- + +Stop re-downloading OpenAPI specs the app already has. Spec URLs now resolve through a tenant-shared cache (URL → content hash + ETag/Last-Modified over the existing content-addressed blob store): the add flow's detect → preview → add sequence downloads a spec once instead of per step, and refreshing an integration revalidates with a conditional request — an unchanged upstream costs a bodyless 304 instead of a multi-MB download. diff --git a/e2e/scenarios/openapi-spec-fetch-cache-ui.test.ts b/e2e/scenarios/openapi-spec-fetch-cache-ui.test.ts new file mode 100644 index 000000000..f5162553a --- /dev/null +++ b/e2e/scenarios/openapi-spec-fetch-cache-ui.test.ts @@ -0,0 +1,141 @@ +// Cross-target (browser): the spec-fetch cache, driven through the real UI. +// The API-surface twin (openapi-spec-fetch-cache.test.ts) pins the same +// contract headlessly; this one exists so the journey is watchable — the +// session video + trace show the add form analyzing a pasted URL, the +// integration landing, and an Edit → re-fetch-on-save refresh — while the +// counting spec server proves what the network actually did underneath: +// - the whole add journey (the form's debounced analyze + the submit) +// downloads the spec ONCE, +// - the refresh consults the server but 304s instead of re-downloading. +// Skips on targets without a browser surface (selfhost today). +import { createHash, randomBytes } from "node:crypto"; +import { createServer } from "node:http"; + +import { expect } from "@effect/vitest"; +import { Effect } from "effect"; + +import { scenario } from "../src/scenario"; +import { Browser, Target } from "../src/services"; + +const pingSpec = JSON.stringify({ + openapi: "3.0.3", + info: { title: "Cached Ping API", version: "1.0.0" }, + servers: [{ url: "https://api.example.com", description: "Production" }], + paths: { + "/ping": { + get: { + operationId: "ping", + summary: "Return a pong", + responses: { "200": { description: "pong" } }, + }, + }, + }, +}); + +/** A real 127.0.0.1 spec host with a strong ETag and a request ledger — + * the ground truth the video's UI journey is asserted against. Non-spec + * paths (the add flow's OAuth discovery probes) 404 outside the count. */ +const serveCountingSpec = (body: string) => + Effect.acquireRelease( + Effect.callback<{ + readonly url: string; + readonly downloads: () => number; + readonly notModified: () => number; + readonly close: () => void; + }>((resume) => { + let downloads = 0; + let notModified = 0; + const etag = `"${createHash("sha256").update(body).digest("hex")}"`; + const server = createServer((request, response) => { + if (!request.url?.startsWith("/spec.json")) { + response.writeHead(404); + response.end(); + return; + } + if (request.headers["if-none-match"] === etag) { + notModified += 1; + response.writeHead(304, { etag }); + response.end(); + return; + } + downloads += 1; + response.writeHead(200, { "content-type": "application/json", etag }); + response.end(body); + }); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + const port = typeof address === "object" && address ? address.port : 0; + resume( + Effect.succeed({ + url: `http://127.0.0.1:${port}/spec.json`, + downloads: () => downloads, + notModified: () => notModified, + close: () => { + server.close(); + server.closeAllConnections(); + }, + }), + ); + }); + }), + (server) => Effect.sync(server.close), + ); + +scenario( + "OpenAPI · adding a spec by URL in the UI downloads it once and Edit → re-fetch 304s", + {}, + Effect.scoped( + Effect.gen(function* () { + const target = yield* Target; + const browser = yield* Browser; + const identity = yield* target.newIdentity(); + const specServer = yield* serveCountingSpec(pingSpec); + const name = `Cache Demo ${randomBytes(3).toString("hex")}`; + + yield* browser.session(identity, async ({ page, step }) => { + await step("Open the Add OpenAPI source form", async () => { + await page.goto("/integrations/add/openapi", { waitUntil: "networkidle" }); + await page.getByPlaceholder("https://api.example.com/openapi.json").waitFor(); + }); + + await step("Paste the spec URL — the form analyzes it (first download)", async () => { + await page.getByPlaceholder("https://api.example.com/openapi.json").fill(specServer.url); + // The debounced analyze fetches and previews the spec. + await page.getByRole("button", { name: "Add integration" }).waitFor({ timeout: 20_000 }); + }); + + await step("Name it and add the integration", async () => { + const nameInput = page.getByLabel("Name", { exact: true }); + if (await nameInput.isVisible().catch(() => false)) { + await nameInput.fill(name); + } + await page.getByRole("button", { name: "Add integration" }).click(); + await page.waitForURL(/\/integrations\/(?!add\b)[^/?]+$/, { timeout: 30_000 }); + await page.getByText("Connections").first().waitFor(); + }); + + await step("The whole add journey cost exactly one spec download", async () => { + // The form's analyze already fetched it; addSpec on the server reused + // the cached copy instead of re-downloading. + expect(specServer.downloads(), "one download across analyze + add").toBe(1); + }); + + await step("Open Edit and stage a re-fetch of the spec", async () => { + await page.getByRole("button", { name: "Edit" }).click(); + await page.getByText("Update spec").waitFor({ timeout: 10_000 }); + await page.getByText("Re-fetch the spec on save").click(); + }); + + await step("Save — the refresh revalidates (304) instead of re-downloading", async () => { + await page.getByRole("button", { name: "Save", exact: true }).click(); + await page.getByText("Update spec").waitFor({ state: "hidden", timeout: 30_000 }); + }); + + await step("The server saw a conditional request, not a second download", async () => { + expect(specServer.notModified(), "the refresh got a bodyless 304").toBe(1); + expect(specServer.downloads(), "still exactly one full download").toBe(1); + }); + }); + }), + ), +); diff --git a/e2e/scenarios/openapi-spec-fetch-cache.test.ts b/e2e/scenarios/openapi-spec-fetch-cache.test.ts new file mode 100644 index 000000000..aac7a4564 --- /dev/null +++ b/e2e/scenarios/openapi-spec-fetch-cache.test.ts @@ -0,0 +1,185 @@ +// Cross-target: the spec-fetch cache — "we don't re-download a spec we already +// have". A real 127.0.0.1 server serves the spec with a strong ETag and honors +// If-None-Match, counting every request it sees. The observable contract at +// the product boundary is that server's request log: +// - the add flow (preview, preview again, addSpec) downloads the spec ONCE — +// the URL-index cache serves the repeats, across separate HTTP requests +// (on cloud each request is its own executor, so this also proves the +// index is durable, not in-memory), +// - updateSpec on an unchanged upstream still hits the server (an explicit +// refresh must revalidate) but gets a bodyless 304, not a re-download, +// - updateSpec on a CHANGED upstream busts the cache through the validators +// and the new tool catalog lands. +import { randomBytes } from "node:crypto"; +import { createHash } from "node:crypto"; +import { createServer } from "node:http"; + +import { expect } from "@effect/vitest"; +import { Effect } from "effect"; +import { composePluginApi } from "@executor-js/api/server"; +import { openApiHttpPlugin } from "@executor-js/plugin-openapi/api"; +import { IntegrationSlug } from "@executor-js/sdk/shared"; + +import { scenario } from "../src/scenario"; +import { Api, Target } from "../src/services"; + +const api = composePluginApi([openApiHttpPlugin()] as const); + +const specV1 = JSON.stringify({ + openapi: "3.0.3", + info: { title: "Cached API", version: "1.0.0" }, + paths: { + "/ping": { + get: { + operationId: "ping", + summary: "Return a pong", + responses: { "200": { description: "pong" } }, + }, + }, + }, +}); + +const specV2 = JSON.stringify({ + openapi: "3.0.3", + info: { title: "Cached API", version: "2.0.0" }, + paths: { + "/ping": { + get: { + operationId: "ping", + summary: "Return a pong", + responses: { "200": { description: "pong" } }, + }, + }, + "/widgets": { + get: { + operationId: "listWidgets", + summary: "List widgets", + responses: { "200": { description: "widgets" } }, + }, + }, + }, +}); + +/** A real 127.0.0.1 spec host that serves a strong ETag (the body's SHA-256), + * honors `If-None-Match` with a 304, and counts what it saw — the request + * ledger the assertions run against. */ +const serveCountingSpec = (initial: string) => + Effect.acquireRelease( + Effect.callback<{ + readonly url: string; + readonly setBody: (body: string) => void; + readonly downloads: () => number; + readonly notModified: () => number; + readonly close: () => void; + }>((resume) => { + let body = initial; + let downloads = 0; + let notModified = 0; + const server = createServer((request, response) => { + // Only /spec.json is the spec. Everything else (the add flow's OAuth + // discovery probes .well-known paths on this host) 404s and stays out + // of the download count — the assertions are about spec transfers. + if (!request.url?.startsWith("/spec.json")) { + response.writeHead(404); + response.end(); + return; + } + const etag = `"${createHash("sha256").update(body).digest("hex")}"`; + if (request.headers["if-none-match"] === etag) { + notModified += 1; + response.writeHead(304, { etag }); + response.end(); + return; + } + downloads += 1; + response.writeHead(200, { "content-type": "application/json", etag }); + response.end(body); + }); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + const port = typeof address === "object" && address ? address.port : 0; + resume( + Effect.succeed({ + url: `http://127.0.0.1:${port}/spec.json`, + setBody: (next: string) => { + body = next; + }, + downloads: () => downloads, + notModified: () => notModified, + close: () => { + server.close(); + server.closeAllConnections(); + }, + }), + ); + }); + }), + (server) => Effect.sync(server.close), + ); + +scenario( + "OpenAPI · the add flow downloads a spec once and refresh revalidates instead of re-downloading", + {}, + Effect.scoped( + Effect.gen(function* () { + const target = yield* Target; + const { client } = yield* Api; + const identity = yield* target.newIdentity(); + const apiClient = yield* client(api, identity); + + const slug = `spec-cache-${randomBytes(4).toString("hex")}`; + const specServer = yield* serveCountingSpec(specV1); + + yield* Effect.ensuring( + Effect.gen(function* () { + // The add flow as the UI drives it: analyze (debounce can fire it + // more than once), then add. Three API calls, one download. + const firstPreview = yield* apiClient.openapi.previewSpec({ + payload: { spec: specServer.url }, + }); + expect(firstPreview.operationCount, "preview sees the v1 spec").toBe(1); + const secondPreview = yield* apiClient.openapi.previewSpec({ + payload: { spec: specServer.url }, + }); + expect(secondPreview.operationCount, "re-preview still sees v1").toBe(1); + const added = yield* apiClient.openapi.addSpec({ + payload: { + spec: { kind: "url", url: specServer.url }, + slug, + baseUrl: "http://127.0.0.1:59999", // tools are never invoked here + }, + }); + expect(added.toolCount, "v1 spec has one operation").toBe(1); + expect( + specServer.downloads(), + "preview → preview → add downloaded the spec exactly once", + ).toBe(1); + + // Explicit refresh with the upstream unchanged: the server must be + // consulted (a refresh is a freshness demand), but with the stored + // ETag it answers 304 and nothing is re-downloaded. + const unchanged = yield* apiClient.openapi.updateSpec({ + params: { slug }, + payload: {}, + }); + expect(unchanged.addedTools, "no tools appeared").toEqual([]); + expect(unchanged.removedTools, "no tools vanished").toEqual([]); + expect(specServer.notModified(), "the refresh revalidated and got a 304").toBe(1); + expect(specServer.downloads(), "an unchanged spec is not re-downloaded").toBe(1); + + // The upstream ships v2: the validators no longer match, the cache + // busts, and the refreshed catalog lands. + specServer.setBody(specV2); + const updated = yield* apiClient.openapi.updateSpec({ + params: { slug }, + payload: {}, + }); + expect(updated.addedTools, "the new operation arrived").toEqual(["widgets.listWidgets"]); + expect(specServer.downloads(), "the changed spec was downloaded").toBe(2); + expect(specServer.notModified(), "no spurious 304 for a changed body").toBe(1); + }), + apiClient.openapi.removeSpec({ params: { slug } }).pipe(Effect.ignore), + ); + }), + ), +); diff --git a/packages/plugins/openapi/src/sdk/index.ts b/packages/plugins/openapi/src/sdk/index.ts index f2657a10d..eb4b101c7 100644 --- a/packages/plugins/openapi/src/sdk/index.ts +++ b/packages/plugins/openapi/src/sdk/index.ts @@ -1,4 +1,17 @@ -export { parse, resolveSpecText, fetchSpecText } from "./parse"; +export { + parse, + resolveSpecText, + fetchSpecText, + fetchSpecDocument, + type SpecDocumentResult, + type SpecFetchValidators, +} from "./parse"; +export { + fetchSpecTextCached, + SPEC_FETCH_TTL, + type ResolvedSpecDocument, + type SpecFetchFreshness, +} from "./spec-cache"; export { extract, streamOperationBindingsFromStructure } from "./extract"; export { structuralSplit, @@ -42,7 +55,12 @@ export { type OpenApiPluginExtension, type OpenApiPluginOptions, } from "./plugin"; -export { type OpenapiStore, type StoredOperation, makeDefaultOpenapiStore } from "./store"; +export { + type OpenapiStore, + type StoredOperation, + type SpecSourceEntry, + makeDefaultOpenapiStore, +} from "./store"; export { decodeOpenApiIntegrationConfig, renderAuthTemplate, diff --git a/packages/plugins/openapi/src/sdk/parse.ts b/packages/plugins/openapi/src/sdk/parse.ts index 556e38dff..0dbbe3601 100644 --- a/packages/plugins/openapi/src/sdk/parse.ts +++ b/packages/plugins/openapi/src/sdk/parse.ts @@ -1,40 +1,58 @@ import type { OpenAPI, OpenAPIV3, OpenAPIV3_1 } from "openapi-types"; -import { Duration, Effect, Schema } from "effect"; -import { HttpClient, HttpClientRequest } from "effect/unstable/http"; +import { Duration, Effect, Option, Predicate, Schema } from "effect"; +import { Headers, HttpClient, HttpClientRequest } from "effect/unstable/http"; import { JSON_SCHEMA, load as parseYamlDocument } from "js-yaml"; import { OpenApiExtractionError, OpenApiParseError } from "./errors"; export type ParsedDocument = OpenAPIV3.Document | OpenAPIV3_1.Document; -export interface SpecFetchCredentials { - readonly headers?: Record; - readonly queryParams?: Record; -} - // ExtractionError subclass raised from parse() for non-3.x specs class OpenApiExtractionErrorFromParse extends OpenApiExtractionError {} +/** A previous response's cache validators, replayed as a conditional GET. */ +export interface SpecFetchValidators { + readonly etag?: string; + readonly lastModified?: string; +} + +export type SpecDocumentResult = + | { + readonly _tag: "Fetched"; + readonly text: string; + readonly etag?: string; + readonly lastModified?: string; + } + /** Only possible when `validators` were sent and the server returned 304. */ + | { readonly _tag: "NotModified" }; + /** - * Fetch an OpenAPI spec URL and return its body text. Uses the Effect - * HttpClient so the caller chooses the transport via layer — in Cloudflare - * Workers, `FetchHttpClient.layer` binds to the Workers-native `fetch`. - * Bounded by a 60s timeout. + * Fetch an OpenAPI spec URL. Uses the Effect HttpClient so the caller chooses + * the transport via layer (in Cloudflare Workers, `FetchHttpClient.layer` + * binds to the Workers-native `fetch`). Bounded by a 60s timeout. + * + * When `validators` (a previous response's ETag / Last-Modified) are provided + * the request is conditional, and a 304 comes back as `NotModified` instead of + * a body: the caller reuses its stored copy. + * + * Spec-document fetches are deliberately UNAUTHENTICATED: the fetched text is + * cached and shared per tenant (content-addressed blob + `spec_source` index), + * so connection credentials must never be threaded into this request. A future + * authed-spec feature has to bypass that cache, not extend this function. */ -export const fetchSpecText = Effect.fn("OpenApi.fetchSpecText")(function* ( +export const fetchSpecDocument = Effect.fn("OpenApi.fetchSpecDocument")(function* ( url: string, - credentials?: SpecFetchCredentials, + validators?: SpecFetchValidators, ) { const client = yield* HttpClient.HttpClient; - const requestUrl = new URL(url); - for (const [name, value] of Object.entries(credentials?.queryParams ?? {})) { - requestUrl.searchParams.set(name, value); - } - let request = HttpClientRequest.get(requestUrl.toString()).pipe( + let request = HttpClientRequest.get(url).pipe( HttpClientRequest.setHeader("Accept", "application/json, application/yaml, text/yaml, */*"), ); - for (const [name, value] of Object.entries(credentials?.headers ?? {})) { - request = HttpClientRequest.setHeader(request, name, value); + if (validators?.etag !== undefined) { + request = HttpClientRequest.setHeader(request, "If-None-Match", validators.etag); + } + if (validators?.lastModified !== undefined) { + request = HttpClientRequest.setHeader(request, "If-Modified-Since", validators.lastModified); } const response = yield* client.execute(request).pipe( Effect.timeout(Duration.seconds(60)), @@ -45,12 +63,18 @@ export const fetchSpecText = Effect.fn("OpenApi.fetchSpecText")(function* ( }), ), ); + if ( + response.status === 304 && + (validators?.etag !== undefined || validators?.lastModified !== undefined) + ) { + return { _tag: "NotModified" } as const satisfies SpecDocumentResult; + } if (response.status < 200 || response.status >= 300) { return yield* new OpenApiParseError({ message: `Failed to fetch OpenAPI document: HTTP ${response.status}`, }); } - const specText = yield* response.text.pipe( + const text = yield* response.text.pipe( Effect.mapError( (_cause) => new OpenApiParseError({ @@ -58,16 +82,33 @@ export const fetchSpecText = Effect.fn("OpenApi.fetchSpecText")(function* ( }), ), ); - return specText; + const etag = Option.getOrUndefined(Headers.get(response.headers, "etag")); + const lastModified = Option.getOrUndefined(Headers.get(response.headers, "last-modified")); + return { + _tag: "Fetched", + text, + ...(etag !== undefined ? { etag } : {}), + ...(lastModified !== undefined ? { lastModified } : {}), + } as const satisfies SpecDocumentResult; +}); + +/** Fetch an OpenAPI spec URL and return its body text (unconditional GET). */ +export const fetchSpecText = Effect.fn("OpenApi.fetchSpecText")(function* (url: string) { + const result = yield* fetchSpecDocument(url); + if (Predicate.isTagged(result, "Fetched")) return result.text; + // Unreachable without validators; a server 304-ing an unconditional GET is broken. + return yield* new OpenApiParseError({ + message: "Failed to fetch OpenAPI document: unexpected 304 response", + }); }); /** * Resolve an input string to spec text — if it's a URL, fetch it via * HttpClient; otherwise return it as-is. */ -export const resolveSpecText = (input: string, credentials?: SpecFetchCredentials) => +export const resolveSpecText = (input: string) => input.startsWith("http://") || input.startsWith("https://") - ? fetchSpecText(input, credentials) + ? fetchSpecText(input) : Effect.succeed(input); /** diff --git a/packages/plugins/openapi/src/sdk/plugin.ts b/packages/plugins/openapi/src/sdk/plugin.ts index d5bd42e09..7fb4c06f4 100644 --- a/packages/plugins/openapi/src/sdk/plugin.ts +++ b/packages/plugins/openapi/src/sdk/plugin.ts @@ -22,7 +22,8 @@ import { import { decodeOpenApiIntegrationConfig, type OpenApiIntegrationConfig } from "./config"; import { OpenApiExtractionError, OpenApiOAuthError, OpenApiParseError } from "./errors"; -import { parse, resolveSpecText } from "./parse"; +import { parse } from "./parse"; +import { fetchSpecTextCached, type SpecFetchFreshness } from "./spec-cache"; import { extract } from "./extract"; import { OAuth2AuthorizationCodeFlow, @@ -384,6 +385,11 @@ const maybeUrl = (value: string): URL | null => { } }; +// Same URL-vs-inline-text split resolveSpecText applies, kept as a predicate so +// URL inputs can route through the spec cache while blobs pass straight through. +const isSpecUrlInput = (value: string): boolean => + value.startsWith("http://") || value.startsWith("https://"); + const addProbeCandidate = (candidates: string[], value: string | undefined): void => { const trimmed = value?.trim(); if (!trimmed) return; @@ -564,21 +570,34 @@ export interface OpenApiPluginOptions { } export const openApiPlugin = definePlugin((options?: OpenApiPluginOptions) => { + // URL inputs go through the tenant's spec cache (URL index over the + // content-addressed blob store): within-TTL repeats are served from the + // store, and refreshes revalidate with the stored ETag/Last-Modified so an + // unchanged upstream costs a bodyless 304. Inline blobs pass through. const resolveSpecForInput = ( spec: OpenApiSpecInput, + storage: OpenapiStore, httpClientLayer: Layer.Layer, + freshness: SpecFetchFreshness, ): Effect.Effect< { readonly specText: string; + /** True when the spec blob is already persisted under its hash. */ + readonly persisted: boolean; }, - OpenApiParseError | OpenApiExtractionError | OpenApiOAuthError + OpenApiParseError | OpenApiExtractionError | OpenApiOAuthError | StorageFailure > => Effect.gen(function* () { if (spec.kind === "url") { - const specText = yield* resolveSpecText(spec.url).pipe(Effect.provide(httpClientLayer)); - return { specText }; + const resolved = yield* fetchSpecTextCached({ + url: spec.url, + storage, + httpClientLayer, + freshness, + }); + return { specText: resolved.specText, persisted: resolved.persisted }; } - return { specText: spec.value }; + return { specText: spec.value, persisted: false }; }); return { @@ -646,7 +665,14 @@ export const openApiPlugin = definePlugin((options?: OpenApiPluginOptions) => { Effect.gen(function* () { // Resolve URL → text and parse BEFORE opening a transaction. Holding // `BEGIN` across a network fetch is the Hyperdrive deadlock path. - const resolved = yield* resolveSpecForInput(config.spec, httpClientLayer); + // The cache makes the UI's preview → add sequence a single download: + // preview already fetched and persisted this URL moments ago. + const resolved = yield* resolveSpecForInput( + config.spec, + ctx.storage, + httpClientLayer, + "prefer-cache", + ); const compiled = yield* compileOpenApiSpec(resolved.specText); // Defaults the add page derives from its preview, applied here so @@ -724,8 +750,11 @@ export const openApiPlugin = definePlugin((options?: OpenApiPluginOptions) => { // The spec blob is written OUTSIDE the transaction: it's // content-addressed (re-puts are idempotent) and an aborted register // leaves only an unreferenced blob behind - while blob backends like - // R2 couldn't roll back with the transaction anyway. - yield* ctx.storage.putSpec(specHash, resolved.specText); + // R2 couldn't roll back with the transaction anyway. URL inputs were + // already persisted by the cached fetch; skip re-putting multi-MB text. + if (!resolved.persisted) { + yield* ctx.storage.putSpec(specHash, resolved.specText); + } // The content-addressed defs blob lets the serve path resolve the // shared `definitions` without re-parsing the spec. Same idempotent, // outside-the-transaction rationale as the spec blob. @@ -779,8 +808,16 @@ export const openApiPlugin = definePlugin((options?: OpenApiPluginOptions) => { } // Resolve + compile BEFORE the transaction (same Hyperdrive-deadlock - // rule as addSpec: never hold BEGIN across a network fetch). - const resolved = yield* resolveSpecForInput(specInput, httpClientLayer); + // rule as addSpec: never hold BEGIN across a network fetch). Refresh + // is an explicit "check upstream" — revalidate instead of trusting + // the TTL, so it always sees a changed spec; an unchanged one costs + // a bodyless 304 instead of a multi-MB download. + const resolved = yield* resolveSpecForInput( + specInput, + ctx.storage, + httpClientLayer, + "revalidate", + ); const compiled = yield* compileOpenApiSpec(resolved.specText); const previousOperations = yield* ctx.storage.listOperations(rawSlug); @@ -790,9 +827,12 @@ export const openApiPlugin = definePlugin((options?: OpenApiPluginOptions) => { // The resolved spec text lives in the plugin blob store keyed by its // content hash (`spec/`); the config carries only the hash. Put // the blob outside the transaction - re-puts are idempotent and an - // aborted config update just leaves an unreferenced blob. + // aborted config update just leaves an unreferenced blob. URL inputs + // were already persisted by the cached fetch. const specHash = yield* sha256Hex(resolved.specText); - yield* ctx.storage.putSpec(specHash, resolved.specText); + if (!resolved.persisted) { + yield* ctx.storage.putSpec(specHash, resolved.specText); + } yield* ctx.storage.putDefs(specHash, JSON.stringify(compiled.hoistedDefs)); const nextConfig: OpenApiIntegrationConfig = { @@ -853,9 +893,17 @@ export const openApiPlugin = definePlugin((options?: OpenApiPluginOptions) => { previewSpec: (input: string | OpenApiPreviewInput) => Effect.gen(function* () { const previewInput = typeof input === "string" ? { spec: input } : input; - const specText = yield* resolveSpecText(previewInput.spec).pipe( - Effect.provide(httpClientLayer), - ); + // The add form re-previews on a debounce while the user types, and + // addSpec re-resolves the same URL right after — the cache turns + // that into one download per distinct spec URL. + const specText = isSpecUrlInput(previewInput.spec) + ? (yield* fetchSpecTextCached({ + url: previewInput.spec, + storage: ctx.storage, + httpClientLayer, + freshness: "prefer-cache", + })).specText + : previewInput.spec; const preview = yield* previewSpecText(specText); return yield* enrichPreviewWithDiscoveredOAuth({ specText, @@ -1060,10 +1108,19 @@ export const openApiPlugin = definePlugin((options?: OpenApiPluginOptions) => { catch: (error) => error, }).pipe(Effect.option); if (Option.isNone(parsed)) return null; - const specText = yield* resolveSpecText(trimmed).pipe( - Effect.provide(httpClientLayer), - Effect.catch(() => Effect.succeed(null)), - ); + // Detect probes the same URL the user is about to preview/add — go + // through the cache so the probe download is reused by those steps. + const specText = isSpecUrlInput(trimmed) + ? yield* fetchSpecTextCached({ + url: trimmed, + storage: detectCtx.storage, + httpClientLayer, + freshness: "prefer-cache", + }).pipe( + Effect.map((resolved) => resolved.specText), + Effect.catch(() => Effect.succeed(null)), + ) + : trimmed; if (specText === null) return null; const doc = yield* parse(specText).pipe(Effect.catch(() => Effect.succeed(null))); if (!doc) return null; diff --git a/packages/plugins/openapi/src/sdk/spec-blob.test.ts b/packages/plugins/openapi/src/sdk/spec-blob.test.ts index 186050633..3d49572a7 100644 --- a/packages/plugins/openapi/src/sdk/spec-blob.test.ts +++ b/packages/plugins/openapi/src/sdk/spec-blob.test.ts @@ -140,6 +140,8 @@ describe("OpenAPI plugin — spec blob storage", () => { getSpec: (specHash) => Effect.succeed(specHash === hash ? text : null), putDefs: () => Effect.void, getDefs: () => Effect.succeed(null), + getSpecSource: () => Effect.succeed(null), + putSpecSource: () => Effect.void, }; const resolve = (config: IntegrationConfig) => diff --git a/packages/plugins/openapi/src/sdk/spec-cache.test.ts b/packages/plugins/openapi/src/sdk/spec-cache.test.ts new file mode 100644 index 000000000..b8def773e --- /dev/null +++ b/packages/plugins/openapi/src/sdk/spec-cache.test.ts @@ -0,0 +1,216 @@ +// --------------------------------------------------------------------------- +// Spec-fetch cache coverage. +// +// The `spec_source` index over the content-addressed blob store is what turns +// the add flow's detect → preview → addSpec sequence into ONE download, and +// updateSpec's re-fetch into a bodyless 304 when the upstream is unchanged. +// These tests pin the observable contract at the HTTP boundary (request and +// 304 counts against a real local server), not the index internals: +// - within-TTL repeats of previewSpec/addSpec hit the network once, +// - updateSpec always revalidates (a TTL-fresh entry must not mask an +// upstream change) and downloads nothing when the server 304s, +// - a changed upstream busts the cache through the validators, +// - inline blob specs never touch the network or the URL index. +// --------------------------------------------------------------------------- + +import { describe, expect, it } from "@effect/vitest"; +import { Effect, Ref, Scope } from "effect"; +import { + FetchHttpClient, + HttpRouter, + HttpServerRequest, + HttpServerResponse, +} from "effect/unstable/http"; + +import { createExecutor, sha256Hex } from "@executor-js/sdk"; +import { + makeTestConfig, + memoryCredentialsPlugin, + serveTestHttpRoutes, +} from "@executor-js/sdk/testing"; + +import { openApiPlugin } from "./plugin"; + +const specV1 = JSON.stringify({ + openapi: "3.0.3", + info: { title: "Cached API", version: "1.0.0" }, + paths: { + "/ping": { + get: { + operationId: "ping", + summary: "Return a pong", + responses: { "200": { description: "pong" } }, + }, + }, + }, +}); + +const specV2 = JSON.stringify({ + openapi: "3.0.3", + info: { title: "Cached API", version: "2.0.0" }, + paths: { + "/ping": { + get: { + operationId: "ping", + summary: "Return a pong", + responses: { "200": { description: "pong" } }, + }, + }, + "/widgets": { + get: { + operationId: "listWidgets", + summary: "List widgets", + responses: { "200": { description: "widgets" } }, + }, + }, + }, +}); + +interface SpecServer { + readonly specUrl: string; + readonly setSpec: (body: string) => Effect.Effect; + /** Requests that reached the server (200s AND 304s). */ + readonly requestCount: Effect.Effect; + /** Of those, how many were answered 304 Not Modified. */ + readonly notModifiedCount: Effect.Effect; +} + +/** A local spec host that serves a strong ETag (the body's SHA-256) and + * honors `If-None-Match` with a 304 — the upstream shape the conditional + * refresh path is written against. */ +const serveEtagSpecServer = (initial: string): Effect.Effect => + Effect.gen(function* () { + const body = yield* Ref.make(initial); + const requests = yield* Ref.make(0); + const notModified = yield* Ref.make(0); + const server = yield* serveTestHttpRoutes([ + HttpRouter.route( + "GET", + "/spec.json", + Effect.gen(function* () { + yield* Ref.update(requests, (count) => count + 1); + const current = yield* Ref.get(body); + const etag = `"${yield* sha256Hex(current)}"`; + const request = yield* HttpServerRequest.HttpServerRequest; + if (request.headers["if-none-match"] === etag) { + yield* Ref.update(notModified, (count) => count + 1); + return HttpServerResponse.empty({ status: 304, headers: { etag } }); + } + return HttpServerResponse.text(current, { + status: 200, + contentType: "application/json", + headers: { etag }, + }); + }), + ), + ]); + return { + specUrl: server.url("/spec.json"), + setSpec: (next) => Ref.set(body, next), + requestCount: Ref.get(requests), + notModifiedCount: Ref.get(notModified), + }; + }); + +// The spec server is a real 127.0.0.1 listener — reach it over the default +// fetch-based client, like production would. +const makeCacheTestExecutor = () => + createExecutor( + makeTestConfig({ + plugins: [ + openApiPlugin({ httpClientLayer: FetchHttpClient.layer }), + memoryCredentialsPlugin(), + ] as const, + }), + ); + +describe("OpenAPI spec-fetch cache", () => { + it.effect("preview → preview → addSpec downloads the spec once", () => + Effect.scoped( + Effect.gen(function* () { + const server = yield* serveEtagSpecServer(specV1); + const executor = yield* makeCacheTestExecutor(); + + // The add form's debounced analyze fires previewSpec more than once. + yield* executor.openapi.previewSpec(server.specUrl); + yield* executor.openapi.previewSpec(server.specUrl); + const added = yield* executor.openapi.addSpec({ + spec: { kind: "url", url: server.specUrl }, + slug: "cached_api", + }); + + expect(added.toolCount).toBe(1); + expect(yield* server.requestCount).toBe(1); + }), + ), + ); + + it.effect("updateSpec revalidates with the stored ETag and skips the download on 304", () => + Effect.scoped( + Effect.gen(function* () { + const server = yield* serveEtagSpecServer(specV1); + const executor = yield* makeCacheTestExecutor(); + + yield* executor.openapi.addSpec({ + spec: { kind: "url", url: server.specUrl }, + slug: "cached_api", + }); + expect(yield* server.requestCount).toBe(1); + + // Unchanged upstream: refresh must still hit the network (revalidate, + // not trust the TTL) but get a bodyless 304, and the catalog stays. + const unchanged = yield* executor.openapi.updateSpec("cached_api"); + expect(unchanged.toolCount).toBe(1); + expect(unchanged.addedTools).toEqual([]); + expect(unchanged.removedTools).toEqual([]); + expect(yield* server.requestCount).toBe(2); + expect(yield* server.notModifiedCount).toBe(1); + }), + ), + ); + + it.effect("updateSpec picks up a changed upstream through the validators", () => + Effect.scoped( + Effect.gen(function* () { + const server = yield* serveEtagSpecServer(specV1); + const executor = yield* makeCacheTestExecutor(); + + yield* executor.openapi.addSpec({ + spec: { kind: "url", url: server.specUrl }, + slug: "cached_api", + }); + yield* server.setSpec(specV2); + + // Immediately after the add (well within the TTL): the changed spec + // must still land because refresh revalidates unconditionally. + const updated = yield* executor.openapi.updateSpec("cached_api"); + expect(updated.toolCount).toBe(2); + expect(updated.addedTools).toEqual(["widgets.listWidgets"]); + expect(yield* server.notModifiedCount).toBe(0); + + // And the refreshed entry serves the NEW spec from cache. + const preview = yield* executor.openapi.previewSpec(server.specUrl); + expect(preview.operationCount).toBe(2); + expect(yield* server.requestCount).toBe(2); + }), + ), + ); + + it.effect("inline blob specs never touch the network", () => + Effect.scoped( + Effect.gen(function* () { + const server = yield* serveEtagSpecServer(specV1); + const executor = yield* makeCacheTestExecutor(); + + yield* executor.openapi.previewSpec(specV1); + yield* executor.openapi.addSpec({ + spec: { kind: "blob", value: specV1 }, + slug: "inline_api", + baseUrl: "https://api.example.test", + }); + + expect(yield* server.requestCount).toBe(0); + }), + ), + ); +}); diff --git a/packages/plugins/openapi/src/sdk/spec-cache.ts b/packages/plugins/openapi/src/sdk/spec-cache.ts new file mode 100644 index 000000000..4a571dc3d --- /dev/null +++ b/packages/plugins/openapi/src/sdk/spec-cache.ts @@ -0,0 +1,124 @@ +import { Clock, Duration, Effect, Predicate } from "effect"; +import type { Layer } from "effect"; +import type { HttpClient } from "effect/unstable/http"; + +import { sha256Hex, type StorageFailure } from "@executor-js/sdk/core"; + +import type { OpenApiParseError } from "./errors"; +import { fetchSpecDocument, fetchSpecText, type SpecFetchValidators } from "./parse"; +import type { OpenapiStore, SpecSourceEntry } from "./store"; + +// --------------------------------------------------------------------------- +// Cached spec fetch — the URL-keyed index over the content-addressed blob +// store. The blob store alone can't skip a download (its key is the SHA-256 of +// the body, unknowable before fetching), so the `spec_source` index remembers +// what each URL last resolved to plus the response's cache validators: +// +// fresh entry (within TTL) -> serve the spec blob, no network +// stale entry w/ validators -> conditional GET; 304 revalidates the blob +// miss / changed upstream -> full download, blob + index updated +// +// Spec fetches are unauthenticated by design (see fetchSpecDocument), so one +// org-shared entry per tenant is safe. Preview persists through this path too, +// which is what makes the add flow's preview -> addSpec sequence a single +// download instead of two: an abandoned preview leaves only an unreferenced +// content-addressed blob, the same accepted cost as an aborted addSpec. +// --------------------------------------------------------------------------- + +/** How long a cached fetch keeps serving without touching the network. Sized + * for the add flow (debounced previews, preview -> add) — not a freshness + * contract for refresh, which revalidates unconditionally. */ +export const SPEC_FETCH_TTL = Duration.minutes(5); + +export type SpecFetchFreshness = + /** Serve a within-TTL cache entry without any network round trip. */ + | "prefer-cache" + /** Always hit the network; validators still turn an unchanged spec into a + * bodyless 304. The explicit-refresh (`updateSpec`) mode. */ + | "revalidate"; + +export interface ResolvedSpecDocument { + readonly specText: string; + readonly specHash: string; + /** True when the spec blob is known to already be in the store (cache hit or + * this call wrote it) — lets callers skip a redundant multi-MB re-put. */ + readonly persisted: boolean; +} + +/** + * Fetch a spec URL through the tenant's `spec_source` cache. Storage failures + * on the write-back are surfaced (a broken store should fail loudly, not + * silently double-fetch forever). + */ +export const fetchSpecTextCached = (input: { + readonly url: string; + readonly storage: OpenapiStore; + readonly httpClientLayer: Layer.Layer; + readonly freshness: SpecFetchFreshness; +}): Effect.Effect => + Effect.gen(function* () { + const now = yield* Clock.currentTimeMillis; + const entry = yield* input.storage.getSpecSource(input.url); + + if ( + entry !== null && + input.freshness === "prefer-cache" && + now - entry.fetchedAt < Duration.toMillis(SPEC_FETCH_TTL) + ) { + const cached = yield* input.storage.getSpec(entry.specHash); + // A missing blob (pruned out-of-band) falls through to a full fetch. + if (cached !== null) { + return { specText: cached, specHash: entry.specHash, persisted: true }; + } + } + + const validators: SpecFetchValidators | undefined = + entry !== null && (entry.etag !== undefined || entry.lastModified !== undefined) + ? { + ...(entry.etag !== undefined ? { etag: entry.etag } : {}), + ...(entry.lastModified !== undefined ? { lastModified: entry.lastModified } : {}), + } + : undefined; + + const conditional = yield* fetchSpecDocument(input.url, validators).pipe( + Effect.provide(input.httpClientLayer), + ); + + if (Predicate.isTagged(conditional, "NotModified") && entry !== null) { + const cached = yield* input.storage.getSpec(entry.specHash); + if (cached !== null) { + yield* putSpecSourceRaceSafe(input.storage, { ...entry, fetchedAt: now }); + return { specText: cached, specHash: entry.specHash, persisted: true }; + } + } + + // 304 with the blob gone is a broken-index corner: refetch for real. The + // unconditional fetchSpecText fails typed on a (protocol-violating) bare 304. + const fetched: { + readonly text: string; + readonly etag?: string; + readonly lastModified?: string; + } = Predicate.isTagged(conditional, "Fetched") + ? conditional + : { text: yield* fetchSpecText(input.url).pipe(Effect.provide(input.httpClientLayer)) }; + + const specHash = yield* sha256Hex(fetched.text); + yield* input.storage.putSpec(specHash, fetched.text); + yield* putSpecSourceRaceSafe(input.storage, { + url: input.url, + specHash, + ...(fetched.etag !== undefined ? { etag: fetched.etag } : {}), + ...(fetched.lastModified !== undefined ? { lastModified: fetched.lastModified } : {}), + fetchedAt: now, + }); + return { specText: fetched.text, specHash, persisted: true }; + }).pipe( + Effect.withSpan("openapi.spec_cache.fetch", { + attributes: { "openapi.spec.url": input.url }, + }), + ); + +// Two debounced previews of the same URL can race the first insert; the loser's +// unique violation is harmless (the winner just wrote an equivalent entry). +const putSpecSourceRaceSafe = (storage: OpenapiStore, entry: SpecSourceEntry) => + storage.putSpecSource(entry).pipe(Effect.catchTag("UniqueViolationError", () => Effect.void)); diff --git a/packages/plugins/openapi/src/sdk/store.ts b/packages/plugins/openapi/src/sdk/store.ts index 548a52103..2058f82f5 100644 --- a/packages/plugins/openapi/src/sdk/store.ts +++ b/packages/plugins/openapi/src/sdk/store.ts @@ -25,8 +25,10 @@ import { OperationBinding } from "./types"; // --------------------------------------------------------------------------- const OPERATION_COLLECTION = "operation"; +const SPEC_SOURCE_COLLECTION = "spec_source"; const STORE_OWNER = "org" as const; const OPERATION_KEY_VERSION = "op"; +const SPEC_SOURCE_KEY_VERSION = "src"; const encodeBinding = Schema.encodeSync(OperationBinding); const decodeBinding = Schema.decodeUnknownSync(OperationBinding); @@ -87,6 +89,34 @@ const stableKeyHash = (value: string): string => { const operationKey = (integration: string, toolName: string): string => `${OPERATION_KEY_VERSION}.${stableKeyHash(integration)}.${stableKeyHash(toolName)}`; +const specSourceKey = (url: string): string => `${SPEC_SOURCE_KEY_VERSION}.${stableKeyHash(url)}`; + +// What we remember about the last successful fetch of a spec URL: the content +// hash it resolved to (the blob address) plus the response's cache validators. +// `url` is stored in the row too so a key-hash collision can't serve another +// URL's spec. +const SpecSourceStorage = Schema.Struct({ + url: Schema.String, + specHash: Schema.String, + etag: Schema.optional(Schema.String), + lastModified: Schema.optional(Schema.String), + fetchedAt: Schema.Number, +}); +const decodeSpecSourceStorage = Schema.decodeUnknownOption(SpecSourceStorage); + +export interface SpecSourceEntry { + /** The spec URL this entry describes. */ + readonly url: string; + /** Content hash the URL last resolved to — the `spec/` blob address. */ + readonly specHash: string; + /** Response `ETag`, replayed as `If-None-Match` on refresh. */ + readonly etag?: string; + /** Response `Last-Modified`, replayed as `If-Modified-Since` on refresh. */ + readonly lastModified?: string; + /** Epoch millis of the last fetch that produced (or revalidated) the entry. */ + readonly fetchedAt: number; +} + const legacyOperationKey = (integration: string, toolName: string): string => `${integration}.${toolName}`; @@ -137,6 +167,13 @@ export interface OpenapiStore { /** Load the compiled `#/$defs/*` JSON by content hash; null when no blob * exists (legacy rows added before the defs blob). */ readonly getDefs: (specHash: string) => Effect.Effect; + /** Look up what a spec URL last resolved to (content hash + cache + * validators). Null when the URL was never fetched (or the row predates the + * index). Org-owned like the spec blob: the fetch is unauthenticated, so the + * result is shareable tenant-wide. */ + readonly getSpecSource: (url: string) => Effect.Effect; + /** Record a successful fetch (or revalidation) of a spec URL. */ + readonly putSpecSource: (entry: SpecSourceEntry) => Effect.Effect; } export const makeDefaultOpenapiStore = ({ pluginStorage, blobs }: StorageDeps): OpenapiStore => { @@ -216,5 +253,35 @@ export const makeDefaultOpenapiStore = ({ pluginStorage, blobs }: StorageDeps): blobs.put(defsBlobKey(specHash), defsJson, { owner: STORE_OWNER }), getDefs: (specHash) => blobs.get(defsBlobKey(specHash)), + + getSpecSource: (url) => + Effect.gen(function* () { + const row = yield* pluginStorage.get({ + collection: SPEC_SOURCE_COLLECTION, + key: specSourceKey(url), + }); + if (!row) return null; + const decoded = decodeSpecSourceStorage(row.data); + if (Option.isNone(decoded)) return null; + // Guard against a stableKeyHash collision serving another URL's spec. + if (decoded.value.url !== url) return null; + return decoded.value; + }), + + putSpecSource: (entry) => + pluginStorage + .put({ + owner: STORE_OWNER, + collection: SPEC_SOURCE_COLLECTION, + key: specSourceKey(entry.url), + data: { + url: entry.url, + specHash: entry.specHash, + ...(entry.etag !== undefined ? { etag: entry.etag } : {}), + ...(entry.lastModified !== undefined ? { lastModified: entry.lastModified } : {}), + fetchedAt: entry.fetchedAt, + }, + }) + .pipe(Effect.asVoid), }; };