-
Notifications
You must be signed in to change notification settings - Fork 10
Pace bulk refresh against the GitHub API quota #473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,138 @@ | ||
| import config from "./config-loader.js"; | ||
| import debug from "./debug.js"; | ||
| import Promise from "bluebird"; | ||
|
|
||
| const pacerDebug = debug("pulldasher:pacer"); | ||
|
|
||
| // Reserve a slice of the hourly quota for live (webhook/socket) traffic; bulk | ||
| // backfills pause rather than spend below it. See config.github.bulkReserve. | ||
| const reserve = config.github.bulkReserve ?? 1000; | ||
|
|
||
| // Only log a paced delay once it's long enough to explain a visibly slow | ||
| // backfill — short spacing isn't worth the noise. | ||
| const LOG_DELAY_THRESHOLD_MS = 5000; | ||
|
|
||
| /** | ||
| * Proactively paces bulk GitHub requests so a backfill yields to live traffic | ||
| * instead of draining the shared token's quota to zero. A single process-wide | ||
| * instance: rate state is global to the token, so every bulk call gates against | ||
| * the same view, fed by `observe` on every response (live calls included). | ||
| * | ||
| * `observe(headers)` — record the latest quota from any response (free). | ||
| * `gate()` — await this before pushing a *bulk* item; it spreads bulk work | ||
| * across the reset window and blocks entirely while below the reserve floor. | ||
| * | ||
| * Pacing is calibrated by actual consumption, not by gate count: each gate | ||
| * advances a token-bucket cursor by the number of requests spent since the last | ||
| * gate (`x-ratelimit-used` delta), so a pull whose processing fans out to a | ||
| * dozen calls correctly waits a dozen intervals — and a live-traffic spike, | ||
| * which also bumps `used`, pushes the cursor out and makes bulk yield. | ||
| */ | ||
| const pacer = { | ||
| remaining: null, | ||
| reset: null, | ||
| used: null, | ||
| lastUsed: null, | ||
| nextSlot: 0, | ||
|
|
||
| observe: function (headers) { | ||
| if (!headers) { | ||
| return; | ||
| } | ||
| const remaining = headers["x-ratelimit-remaining"]; | ||
| const reset = headers["x-ratelimit-reset"]; | ||
| const used = headers["x-ratelimit-used"]; | ||
| if (remaining !== undefined) { | ||
| this.remaining = Number(remaining); | ||
| } | ||
| if (reset !== undefined) { | ||
| this.reset = Number(reset); | ||
| } | ||
| if (used !== undefined) { | ||
| this.used = Number(used); | ||
| } | ||
| }, | ||
|
|
||
| gate: function () { | ||
| const now = Date.now(); | ||
| const { delayMs, nextSlot, lastUsed } = computePace({ | ||
| remaining: this.remaining, | ||
| reset: this.reset, | ||
| used: this.used, | ||
| reserve: reserve, | ||
| now: now, | ||
| nextSlot: this.nextSlot, | ||
| lastUsed: this.lastUsed, | ||
| }); | ||
| this.nextSlot = nextSlot; | ||
| this.lastUsed = lastUsed; | ||
|
|
||
| // The floor-pause is the only positive delay that isn't even-spread pacing. | ||
| if (delayMs > 0 && this.remaining != null && this.remaining <= reserve) { | ||
| pacerDebug( | ||
| "bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset", | ||
| this.remaining, | ||
| reserve, | ||
| Math.round(delayMs / 1000) | ||
| ); | ||
| } else if (delayMs >= LOG_DELAY_THRESHOLD_MS) { | ||
| pacerDebug( | ||
| "pacing bulk: waiting %sms (remaining %s)", | ||
| Math.round(delayMs), | ||
| this.remaining | ||
| ); | ||
| } | ||
|
|
||
| return Promise.delay(delayMs); | ||
| }, | ||
| }; | ||
|
|
||
| export default pacer; | ||
|
|
||
| /** | ||
| * Pure pacing decision. Given the latest quota view and a token-bucket cursor, | ||
| * returns how long the next bulk request should wait, the advanced cursor, and | ||
| * the `used` baseline to diff against next time. | ||
| * | ||
| * - unknown quota, or a `reset` already in the past (stale window) → allow | ||
| * immediately; the next response refreshes the view. | ||
| * - `remaining ≤ reserve` → pause until `reset` (let the window refill). | ||
| * - otherwise → even-spread: the spendable budget (`remaining − reserve`) | ||
| * divided across the time left in the window gives the per-request interval. | ||
| * Advance the cursor by `spent × interval`, where `spent` is the requests | ||
| * consumed since the last gate (`used − lastUsed`), so the spacing tracks | ||
| * real consumption — a multi-call item or a live spike pushes the next slot | ||
| * further out. A shrinking `remaining` also widens the interval, so bulk | ||
| * yields on both signals. | ||
| */ | ||
| export function computePace({ | ||
| remaining, | ||
| reset, | ||
| used, | ||
| reserve, | ||
| now, | ||
| nextSlot, | ||
| lastUsed, | ||
| }) { | ||
| const baseline = used ?? lastUsed; | ||
| // No quota view, or a window that already rolled over (our `remaining` is | ||
| // stale): proceed now and let the next response refresh the view. | ||
| const allowNow = { delayMs: 0, nextSlot: now, lastUsed: baseline }; | ||
| if (remaining == null || reset == null) { | ||
| return allowNow; | ||
| } | ||
| const resetMs = reset * 1000; | ||
| if (resetMs <= now) { | ||
| return allowNow; | ||
| } | ||
| if (remaining <= reserve) { | ||
| return { delayMs: resetMs - now, nextSlot: resetMs, lastUsed: baseline }; | ||
| } | ||
| const interval = (resetMs - now) / (remaining - reserve); | ||
| // Clamp at 0: across a window rollover the server resets `used` to ~0 while | ||
| // our `lastUsed` still holds the pre-reset high, so the delta can go negative. | ||
| const spent = | ||
| used == null || lastUsed == null ? 0 : Math.max(0, used - lastUsed); | ||
| const slot = Math.max(now, nextSlot + spent * interval); | ||
| return { delayMs: slot - now, nextSlot: slot, lastUsed: baseline }; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import utils from "./utils.js"; | |
| import NotifyQueue from "notify-queue"; | ||
| import debug from "./debug.js"; | ||
| import Promise from "bluebird"; | ||
| import pacer from "./pacer.js"; | ||
|
|
||
| // Queues for making all refreshes be synchronous, one at a time. | ||
| var issueQueue = new NotifyQueue(); | ||
|
|
@@ -61,7 +62,8 @@ export default { | |
| * and return a promise that is fulfilled when the item is fully processed. | ||
| * | ||
| * Single-item refreshes (webhooks, socket) have no end-of-run report, so no | ||
| * failure collector is attached. | ||
| * failure collector is attached — and they aren't quota-paced, so live traffic | ||
| * is never delayed. | ||
| */ | ||
| function pushOnQueue(queue) { | ||
| return function (githubResponse) { | ||
|
|
@@ -81,13 +83,20 @@ function pushOnQueue(queue) { | |
| * Each item carries the run's `onFailure` collector (null for single-item | ||
| * webhook/socket refreshes), so a failed parse is recorded for that run's | ||
| * end-of-run report. | ||
| * | ||
| * This is the bulk path, so it paces enqueueing against the GitHub quota: | ||
| * `pacer.gate()` waits for each item's slot *before* pushing it, keeping the | ||
| * shared queue shallow. The wait happens here, off the queue — so a paced or | ||
| * floor-paused backfill never blocks the single consumer, and live | ||
| * webhook/socket refreshes keep draining promptly. | ||
| */ | ||
| function pushAllOnQueue(queue, onFailure) { | ||
| return function (githubResponses) { | ||
| return async function (githubResponses) { | ||
| for (const githubResponse of githubResponses) { | ||
| await pacer.gate(); | ||
| queue.push({ response: githubResponse, onFailure: onFailure }); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It still feels quite weird to be rate-limiting pushing things onto the queue. i.e. pulling from the queue and executing the api call is what updates the knowledge of the world (the headers) and pushing to the queue is about future commitment. I haven't hard time imagining if it work work out to have a queue that you're throttling additions to, but draining the queue is actually what gives you the feedback of allowable rates. Maybe instead we throttle the actual thing that needs throttling to, but have different bulkReserve depending on the source of the action (smaller reserve for things triggered by the web-app and big reserve for cli-triggered actions). The refreshOpenPulls on load doesn't need special treatment as it lasts less than a minute and only happens on startup. |
||
| } | ||
| return new Promise(function (resolve) { | ||
| githubResponses.forEach(function (githubResponse) { | ||
| queue.push({ response: githubResponse, onFailure: onFailure }); | ||
| }); | ||
| queue.push(resolve); | ||
| }); | ||
| }; | ||
|
|
@@ -212,7 +221,9 @@ export function processPullItem( | |
| // queue or the fixed pop deps, so it stays scoped to the run that enqueued it — | ||
| // a bulk run collects its own failures, while webhook/socket refreshes (which | ||
| // push onFailure null) don't accumulate. Unwrap so the process functions see | ||
| // one response plus their injected collaborators. | ||
| // one response plus their injected collaborators. (Quota pacing happens at | ||
| // enqueue time, in pushAllOnQueue, not here — so a paused backfill can't block | ||
| // the single consumer that webhook refreshes also depend on.) | ||
| issueQueue.pop(function (item, next) { | ||
| if (typeof item === "function") { | ||
| item(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| import { test } from "node:test"; | ||
| import assert from "node:assert/strict"; | ||
| import { computePace } from "../lib/pacer.js"; | ||
|
|
||
| const NOW = 1_700_000_000_000; // fixed clock (ms); reset values are NOW-relative | ||
| const RESERVE = 1000; | ||
| const resetIn = (seconds) => NOW / 1000 + seconds; | ||
|
|
||
| // Before the first response is observed there's no quota view, so a bulk | ||
| // request must proceed immediately rather than stall forever. | ||
| test("computePace allows the request when quota is unknown", () => { | ||
| const { delayMs } = computePace({ | ||
| remaining: null, | ||
| reset: null, | ||
| used: null, | ||
| reserve: RESERVE, | ||
| now: NOW, | ||
| nextSlot: 0, | ||
| lastUsed: null, | ||
| }); | ||
| assert.equal(delayMs, 0); | ||
| }); | ||
|
|
||
| // A reset timestamp in the past means the window already rolled over and our | ||
| // remaining is stale — allow the request and let the next response refresh it. | ||
| test("computePace allows the request when the reset window is stale", () => { | ||
| const { delayMs } = computePace({ | ||
| remaining: 0, | ||
| reset: resetIn(-10), | ||
| used: 5000, | ||
| reserve: RESERVE, | ||
| now: NOW, | ||
| nextSlot: 0, | ||
| lastUsed: 4000, | ||
| }); | ||
| assert.equal(delayMs, 0); | ||
| }); | ||
|
|
||
| // At or below the reserve floor, bulk must pause until the quota refills so it | ||
| // stops competing with live traffic. | ||
| test("computePace pauses until reset when remaining is at or below the reserve", () => { | ||
| const { delayMs, nextSlot } = computePace({ | ||
| remaining: 500, | ||
| reset: resetIn(600), | ||
| used: 4500, | ||
| reserve: RESERVE, | ||
| now: NOW, | ||
| nextSlot: 0, | ||
| lastUsed: 4490, | ||
| }); | ||
| assert.equal(delayMs, 600_000); | ||
| // The cursor jumps to the reset so post-refill pacing starts from the window edge. | ||
| assert.equal(nextSlot, NOW + 600_000); | ||
| }); | ||
|
|
||
| // The real first-gate state: page one was fetched un-paced, so the after-hook | ||
| // `observe` has already populated the quota view (remaining/reset/used) — but no | ||
| // prior gate has set a `used` baseline. With nothing to diff against, the gate | ||
| // can't know how much was spent yet, so it proceeds immediately and just records | ||
| // the baseline (returned as lastUsed) for the next gate. | ||
| test("computePace does not delay the first gate, but records the usage baseline", () => { | ||
| const { delayMs, nextSlot, lastUsed } = computePace({ | ||
| remaining: 5000, | ||
| reset: resetIn(3600), | ||
| used: 100, | ||
| reserve: RESERVE, | ||
| now: NOW, | ||
| nextSlot: 0, | ||
| lastUsed: null, | ||
| }); | ||
| assert.equal(delayMs, 0); | ||
| assert.equal(nextSlot, NOW); | ||
| assert.equal(lastUsed, 100); | ||
| }); | ||
|
|
||
| // The cursor advances by the requests actually spent since the last gate, not | ||
| // by one per gate — so a multi-call item waits proportionally longer. | ||
| test("computePace advances the cursor by the requests spent since the last gate", () => { | ||
| const params = { | ||
| remaining: 1100, // budget 100 | ||
| reset: resetIn(100), // window 100_000ms → interval = 1000ms / request | ||
| used: undefined, // set per case | ||
| reserve: RESERVE, | ||
| now: NOW, | ||
| nextSlot: NOW, | ||
| lastUsed: 1000, | ||
| }; | ||
|
|
||
| const oneCall = computePace({ ...params, used: 1001 }); | ||
| assert.equal(oneCall.delayMs, 1000); | ||
|
|
||
| const eightCalls = computePace({ ...params, used: 1008 }); | ||
| assert.equal(eightCalls.delayMs, 8000); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand..
A backfill is going to happen from the CLI, not the running app. i.e. they are in separate processes. They share the API key of course, but not the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea that
pushOnQueue()is given higher priority thanpushAllOnQueue()?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True except for
refresh.openPulls(), which gets called by the running app on startup. It pushes to the same queue as web hooks and usespushAllOnQueue()to do its work.Via a crude mechanism that sits in front of adding to the queue.
pushAllOnQueue()waits for the rate limit before adding to the queue, so its waiting happens async independent of the queue processing, which itself happens async of the queue pushes. While it's waiting, it isn't adding things to the queue while web hooks are free to add to the queue during that time. If the bulk producer coroutine isn't rate-limited at all, it will add many items to the queue, the processing of which will be in direct competition with the web hook items.