diff --git a/config.example.js b/config.example.js index 97789c12..de99381a 100644 --- a/config.example.js +++ b/config.example.js @@ -50,6 +50,11 @@ module.exports = { callbackURL: "http://localhost:" + port + "/auth/github/callback", // An API token for the backend to make API requests with. token: "oauth api token for server-side api calls", + // How much of the token's hourly GitHub API quota to reserve for live + // webhook/socket traffic. Bulk backfills (bin/refresh-*) pace themselves to + // stay above this floor and pause when remaining quota hits it, so a large + // backfill never starves normal operation. Defaults to 1000 if omitted. + bulkReserve: 1000, // This will need to be the same secret you use on the Webhooks page for // the repo Pulldasher is going to monitor. hook_secret: diff --git a/lib/git-manager.js b/lib/git-manager.js index b9ae5259..935f40c8 100644 --- a/lib/git-manager.js +++ b/lib/git-manager.js @@ -14,6 +14,7 @@ import Label from "../models/label.js"; import Status from "../models/status.js"; import Signature from "../models/signature.js"; import getLogin from "./get-user-login.js"; +import pacer from "./pacer.js"; const MyOctokit = Octokit.plugin(throttling, retry); const gitDebug = debug("pulldasher:github"); @@ -61,6 +62,8 @@ const github = new MyOctokit({ // (5xx / network) — the throttle plugin already logs 4xx rate limits — and // rethrow untouched so plugin-retry's retry logic is unaffected. github.hook.error("request", (error, options) => { + // A failed response still reports the current quota; feed it to the pacer. + pacer.observe(error.response && error.response.headers); const status = error.status; if (status === undefined || status >= 500) { gitDebug( @@ -74,6 +77,13 @@ github.hook.error("request", (error, options) => { throw error; }); +// Track the shared quota from every response's x-ratelimit-* headers so the +// pacer (which gates only bulk requests) always has a current view — live +// webhook/socket calls update it too, which is how bulk yields to live traffic. +github.hook.after("request", (response) => { + pacer.observe(response.headers); +}); + const githubRest = github.rest; export default { @@ -100,7 +110,7 @@ export default { */ getOpenPulls: function (repo) { return logErrors( - github.paginate(githubRest.pulls.list, params({ state: "open" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "open" }, repo)), "Getting open pulls in repo %s", repo ); @@ -113,7 +123,7 @@ export default { */ getAllPulls: function (repo) { return logErrors( - github.paginate(githubRest.pulls.list, params({ state: "all" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "all" }, repo)), "Getting all pulls in repo %s", repo ); @@ -134,8 +144,7 @@ export default { getOpenIssues: function (repo) { const searchParams = params({ state: "open" }, repo); return logErrors( - github - .paginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting open issues in repo %s", @@ -151,8 +160,7 @@ export default { getAllIssues: function (repo) { const searchParams = params({ state: "all" }, repo); return logErrors( - github - .paginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting all issues in repo %s", @@ -645,3 +653,19 @@ function logErrors(promise, ...messageAndArgs) { throw err; }); } + +/** + * Paginate a bulk listing, pacing between pages so a full-repo list can't burst + * through the quota. Page one fetches un-paced and seeds the pacer's quota view + * (via the after-hook observe); each `gate()` then spaces the fetch of the next + * page. Bulk-only: the live webhook/socket path never lists whole repos, so + * gating every page is safe. + */ +async function pacedPaginate(route, parameters) { + const items = []; + for await (const response of github.paginate.iterator(route, parameters)) { + items.push(...response.data); + await pacer.gate(); + } + return items; +} diff --git a/lib/pacer.js b/lib/pacer.js new file mode 100644 index 00000000..c8f25d13 --- /dev/null +++ b/lib/pacer.js @@ -0,0 +1,138 @@ +import config from "./config-loader.js"; +import debug from "./debug.js"; +import Promise from "bluebird"; + +const pacerDebug = debug("pulldasher:pacer"); + +// Reserve a slice of the hourly quota for live (webhook/socket) traffic; bulk +// backfills pause rather than spend below it. See config.github.bulkReserve. +const reserve = config.github.bulkReserve ?? 1000; + +// Only log a paced delay once it's long enough to explain a visibly slow +// backfill — short spacing isn't worth the noise. +const LOG_DELAY_THRESHOLD_MS = 5000; + +/** + * Proactively paces bulk GitHub requests so a backfill yields to live traffic + * instead of draining the shared token's quota to zero. A single process-wide + * instance: rate state is global to the token, so every bulk call gates against + * the same view, fed by `observe` on every response (live calls included). + * + * `observe(headers)` — record the latest quota from any response (free). + * `gate()` — await this before pushing a *bulk* item; it spreads bulk work + * across the reset window and blocks entirely while below the reserve floor. + * + * Pacing is calibrated by actual consumption, not by gate count: each gate + * advances a token-bucket cursor by the number of requests spent since the last + * gate (`x-ratelimit-used` delta), so a pull whose processing fans out to a + * dozen calls correctly waits a dozen intervals — and a live-traffic spike, + * which also bumps `used`, pushes the cursor out and makes bulk yield. + */ +const pacer = { + remaining: null, + reset: null, + used: null, + lastUsed: null, + nextSlot: 0, + + observe: function (headers) { + if (!headers) { + return; + } + const remaining = headers["x-ratelimit-remaining"]; + const reset = headers["x-ratelimit-reset"]; + const used = headers["x-ratelimit-used"]; + if (remaining !== undefined) { + this.remaining = Number(remaining); + } + if (reset !== undefined) { + this.reset = Number(reset); + } + if (used !== undefined) { + this.used = Number(used); + } + }, + + gate: function () { + const now = Date.now(); + const { delayMs, nextSlot, lastUsed } = computePace({ + remaining: this.remaining, + reset: this.reset, + used: this.used, + reserve: reserve, + now: now, + nextSlot: this.nextSlot, + lastUsed: this.lastUsed, + }); + this.nextSlot = nextSlot; + this.lastUsed = lastUsed; + + // The floor-pause is the only positive delay that isn't even-spread pacing. + if (delayMs > 0 && this.remaining != null && this.remaining <= reserve) { + pacerDebug( + "bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset", + this.remaining, + reserve, + Math.round(delayMs / 1000) + ); + } else if (delayMs >= LOG_DELAY_THRESHOLD_MS) { + pacerDebug( + "pacing bulk: waiting %sms (remaining %s)", + Math.round(delayMs), + this.remaining + ); + } + + return Promise.delay(delayMs); + }, +}; + +export default pacer; + +/** + * Pure pacing decision. Given the latest quota view and a token-bucket cursor, + * returns how long the next bulk request should wait, the advanced cursor, and + * the `used` baseline to diff against next time. + * + * - unknown quota, or a `reset` already in the past (stale window) → allow + * immediately; the next response refreshes the view. + * - `remaining ≤ reserve` → pause until `reset` (let the window refill). + * - otherwise → even-spread: the spendable budget (`remaining − reserve`) + * divided across the time left in the window gives the per-request interval. + * Advance the cursor by `spent × interval`, where `spent` is the requests + * consumed since the last gate (`used − lastUsed`), so the spacing tracks + * real consumption — a multi-call item or a live spike pushes the next slot + * further out. A shrinking `remaining` also widens the interval, so bulk + * yields on both signals. + */ +export function computePace({ + remaining, + reset, + used, + reserve, + now, + nextSlot, + lastUsed, +}) { + const baseline = used ?? lastUsed; + // No quota view, or a window that already rolled over (our `remaining` is + // stale): proceed now and let the next response refresh the view. + const allowNow = { delayMs: 0, nextSlot: now, lastUsed: baseline }; + if (remaining == null || reset == null) { + return allowNow; + } + const resetMs = reset * 1000; + if (resetMs <= now) { + return allowNow; + } + if (remaining <= reserve) { + return { delayMs: resetMs - now, nextSlot: resetMs, lastUsed: baseline }; + } + const interval = (resetMs - now) / (remaining - reserve); + // Clamp at 0: across a window rollover the server resets `used` to ~0 while + // our `lastUsed` still holds the pre-reset high, so the delta can go negative. + const spent = + used == null || lastUsed == null ? 0 : Math.max(0, used - lastUsed); + const slot = Math.max(now, nextSlot + spent * interval); + return { delayMs: slot - now, nextSlot: slot, lastUsed: baseline }; +} diff --git a/lib/refresh.js b/lib/refresh.js index 16b64b90..9d19faee 100644 --- a/lib/refresh.js +++ b/lib/refresh.js @@ -4,6 +4,7 @@ import utils from "./utils.js"; import NotifyQueue from "notify-queue"; import debug from "./debug.js"; import Promise from "bluebird"; +import pacer from "./pacer.js"; // Queues for making all refreshes be synchronous, one at a time. var issueQueue = new NotifyQueue(); @@ -61,7 +62,8 @@ export default { * and return a promise that is fulfilled when the item is fully processed. * * Single-item refreshes (webhooks, socket) have no end-of-run report, so no - * failure collector is attached. + * failure collector is attached — and they aren't quota-paced, so live traffic + * is never delayed. */ function pushOnQueue(queue) { return function (githubResponse) { @@ -81,13 +83,20 @@ function pushOnQueue(queue) { * Each item carries the run's `onFailure` collector (null for single-item * webhook/socket refreshes), so a failed parse is recorded for that run's * end-of-run report. + * + * This is the bulk path, so it paces enqueueing against the GitHub quota: + * `pacer.gate()` waits for each item's slot *before* pushing it, keeping the + * shared queue shallow. The wait happens here, off the queue — so a paced or + * floor-paused backfill never blocks the single consumer, and live + * webhook/socket refreshes keep draining promptly. */ function pushAllOnQueue(queue, onFailure) { - return function (githubResponses) { + return async function (githubResponses) { + for (const githubResponse of githubResponses) { + await pacer.gate(); + queue.push({ response: githubResponse, onFailure: onFailure }); + } return new Promise(function (resolve) { - githubResponses.forEach(function (githubResponse) { - queue.push({ response: githubResponse, onFailure: onFailure }); - }); queue.push(resolve); }); }; @@ -212,7 +221,9 @@ export function processPullItem( // queue or the fixed pop deps, so it stays scoped to the run that enqueued it — // a bulk run collects its own failures, while webhook/socket refreshes (which // push onFailure null) don't accumulate. Unwrap so the process functions see -// one response plus their injected collaborators. +// one response plus their injected collaborators. (Quota pacing happens at +// enqueue time, in pushAllOnQueue, not here — so a paused backfill can't block +// the single consumer that webhook refreshes also depend on.) issueQueue.pop(function (item, next) { if (typeof item === "function") { item(); diff --git a/test/pacer.test.js b/test/pacer.test.js new file mode 100644 index 00000000..f9e449c2 --- /dev/null +++ b/test/pacer.test.js @@ -0,0 +1,94 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { computePace } from "../lib/pacer.js"; + +const NOW = 1_700_000_000_000; // fixed clock (ms); reset values are NOW-relative +const RESERVE = 1000; +const resetIn = (seconds) => NOW / 1000 + seconds; + +// Before the first response is observed there's no quota view, so a bulk +// request must proceed immediately rather than stall forever. +test("computePace allows the request when quota is unknown", () => { + const { delayMs } = computePace({ + remaining: null, + reset: null, + used: null, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: null, + }); + assert.equal(delayMs, 0); +}); + +// A reset timestamp in the past means the window already rolled over and our +// remaining is stale — allow the request and let the next response refresh it. +test("computePace allows the request when the reset window is stale", () => { + const { delayMs } = computePace({ + remaining: 0, + reset: resetIn(-10), + used: 5000, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: 4000, + }); + assert.equal(delayMs, 0); +}); + +// At or below the reserve floor, bulk must pause until the quota refills so it +// stops competing with live traffic. +test("computePace pauses until reset when remaining is at or below the reserve", () => { + const { delayMs, nextSlot } = computePace({ + remaining: 500, + reset: resetIn(600), + used: 4500, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: 4490, + }); + assert.equal(delayMs, 600_000); + // The cursor jumps to the reset so post-refill pacing starts from the window edge. + assert.equal(nextSlot, NOW + 600_000); +}); + +// The real first-gate state: page one was fetched un-paced, so the after-hook +// `observe` has already populated the quota view (remaining/reset/used) — but no +// prior gate has set a `used` baseline. With nothing to diff against, the gate +// can't know how much was spent yet, so it proceeds immediately and just records +// the baseline (returned as lastUsed) for the next gate. +test("computePace does not delay the first gate, but records the usage baseline", () => { + const { delayMs, nextSlot, lastUsed } = computePace({ + remaining: 5000, + reset: resetIn(3600), + used: 100, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: null, + }); + assert.equal(delayMs, 0); + assert.equal(nextSlot, NOW); + assert.equal(lastUsed, 100); +}); + +// The cursor advances by the requests actually spent since the last gate, not +// by one per gate — so a multi-call item waits proportionally longer. +test("computePace advances the cursor by the requests spent since the last gate", () => { + const params = { + remaining: 1100, // budget 100 + reset: resetIn(100), // window 100_000ms → interval = 1000ms / request + used: undefined, // set per case + reserve: RESERVE, + now: NOW, + nextSlot: NOW, + lastUsed: 1000, + }; + + const oneCall = computePace({ ...params, used: 1001 }); + assert.equal(oneCall.delayMs, 1000); + + const eightCalls = computePace({ ...params, used: 1008 }); + assert.equal(eightCalls.delayMs, 8000); +});