Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config.example.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ module.exports = {
callbackURL: "http://localhost:" + port + "/auth/github/callback",
// An API token for the backend to make API requests with.
token: "oauth api token for server-side api calls",
// How much of the token's hourly GitHub API quota to reserve for live
// webhook/socket traffic. Bulk backfills (bin/refresh-*) pace themselves to
// stay above this floor and pause when remaining quota hits it, so a large
// backfill never starves normal operation. Defaults to 1000 if omitted.
bulkReserve: 1000,
// This will need to be the same secret you use on the Webhooks page for
// the repo Pulldasher is going to monitor.
hook_secret:
Expand Down
36 changes: 30 additions & 6 deletions lib/git-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Label from "../models/label.js";
import Status from "../models/status.js";
import Signature from "../models/signature.js";
import getLogin from "./get-user-login.js";
import pacer from "./pacer.js";

const MyOctokit = Octokit.plugin(throttling, retry);
const gitDebug = debug("pulldasher:github");
Expand Down Expand Up @@ -61,6 +62,8 @@ const github = new MyOctokit({
// (5xx / network) — the throttle plugin already logs 4xx rate limits — and
// rethrow untouched so plugin-retry's retry logic is unaffected.
github.hook.error("request", (error, options) => {
// A failed response still reports the current quota; feed it to the pacer.
pacer.observe(error.response && error.response.headers);
const status = error.status;
if (status === undefined || status >= 500) {
gitDebug(
Expand All @@ -74,6 +77,13 @@ github.hook.error("request", (error, options) => {
throw error;
});

// Track the shared quota from every response's x-ratelimit-* headers so the
// pacer (which gates only bulk requests) always has a current view — live
// webhook/socket calls update it too, which is how bulk yields to live traffic.
github.hook.after("request", (response) => {
pacer.observe(response.headers);
});

const githubRest = github.rest;

export default {
Expand All @@ -100,7 +110,7 @@ export default {
*/
getOpenPulls: function (repo) {
return logErrors(
github.paginate(githubRest.pulls.list, params({ state: "open" }, repo)),
pacedPaginate(githubRest.pulls.list, params({ state: "open" }, repo)),
"Getting open pulls in repo %s",
repo
);
Expand All @@ -113,7 +123,7 @@ export default {
*/
getAllPulls: function (repo) {
return logErrors(
github.paginate(githubRest.pulls.list, params({ state: "all" }, repo)),
pacedPaginate(githubRest.pulls.list, params({ state: "all" }, repo)),
"Getting all pulls in repo %s",
repo
);
Expand All @@ -134,8 +144,7 @@ export default {
getOpenIssues: function (repo) {
const searchParams = params({ state: "open" }, repo);
return logErrors(
github
.paginate(githubRest.issues.listForRepo, searchParams)
pacedPaginate(githubRest.issues.listForRepo, searchParams)
.then(filterOutPulls)
.then(addRepo(searchParams)),
"Getting open issues in repo %s",
Expand All @@ -151,8 +160,7 @@ export default {
getAllIssues: function (repo) {
const searchParams = params({ state: "all" }, repo);
return logErrors(
github
.paginate(githubRest.issues.listForRepo, searchParams)
pacedPaginate(githubRest.issues.listForRepo, searchParams)
.then(filterOutPulls)
.then(addRepo(searchParams)),
"Getting all issues in repo %s",
Expand Down Expand Up @@ -645,3 +653,19 @@ function logErrors(promise, ...messageAndArgs) {
throw err;
});
}

/**
* Paginate a bulk listing, pacing between pages so a full-repo list can't burst
* through the quota. Page one fetches un-paced and seeds the pacer's quota view
* (via the after-hook observe); each `gate()` then spaces the fetch of the next
* page. Bulk-only: the live webhook/socket path never lists whole repos, so
* gating every page is safe.
*/
async function pacedPaginate(route, parameters) {
const items = [];
for await (const response of github.paginate.iterator(route, parameters)) {
items.push(...response.data);
await pacer.gate();
}
return items;
}
138 changes: 138 additions & 0 deletions lib/pacer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import config from "./config-loader.js";
import debug from "./debug.js";
import Promise from "bluebird";

const pacerDebug = debug("pulldasher:pacer");

// Reserve a slice of the hourly quota for live (webhook/socket) traffic; bulk
// backfills pause rather than spend below it. See config.github.bulkReserve.
const reserve = config.github.bulkReserve ?? 1000;

// Only log a paced delay once it's long enough to explain a visibly slow
// backfill — short spacing isn't worth the noise.
const LOG_DELAY_THRESHOLD_MS = 5000;

/**
* Proactively paces bulk GitHub requests so a backfill yields to live traffic
* instead of draining the shared token's quota to zero. A single process-wide
* instance: rate state is global to the token, so every bulk call gates against
* the same view, fed by `observe` on every response (live calls included).
*
* `observe(headers)` — record the latest quota from any response (free).
* `gate()` — await this before pushing a *bulk* item; it spreads bulk work
* across the reset window and blocks entirely while below the reserve floor.
*
* Pacing is calibrated by actual consumption, not by gate count: each gate
* advances a token-bucket cursor by the number of requests spent since the last
* gate (`x-ratelimit-used` delta), so a pull whose processing fans out to a
* dozen calls correctly waits a dozen intervals — and a live-traffic spike,
* which also bumps `used`, pushes the cursor out and makes bulk yield.
*/
const pacer = {
remaining: null,
reset: null,
used: null,
lastUsed: null,
nextSlot: 0,

observe: function (headers) {
if (!headers) {
return;
}
const remaining = headers["x-ratelimit-remaining"];
const reset = headers["x-ratelimit-reset"];
const used = headers["x-ratelimit-used"];
if (remaining !== undefined) {
this.remaining = Number(remaining);
}
if (reset !== undefined) {
this.reset = Number(reset);
}
if (used !== undefined) {
this.used = Number(used);
}
},

gate: function () {
const now = Date.now();
const { delayMs, nextSlot, lastUsed } = computePace({
remaining: this.remaining,
reset: this.reset,
used: this.used,
reserve: reserve,
now: now,
nextSlot: this.nextSlot,
lastUsed: this.lastUsed,
});
this.nextSlot = nextSlot;
this.lastUsed = lastUsed;

// The floor-pause is the only positive delay that isn't even-spread pacing.
if (delayMs > 0 && this.remaining != null && this.remaining <= reserve) {
pacerDebug(
"bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset",
this.remaining,
reserve,
Math.round(delayMs / 1000)
);
} else if (delayMs >= LOG_DELAY_THRESHOLD_MS) {
pacerDebug(
"pacing bulk: waiting %sms (remaining %s)",
Math.round(delayMs),
this.remaining
);
}

return Promise.delay(delayMs);
},
};

export default pacer;

/**
* Pure pacing decision. Given the latest quota view and a token-bucket cursor,
* returns how long the next bulk request should wait, the advanced cursor, and
* the `used` baseline to diff against next time.
*
* - unknown quota, or a `reset` already in the past (stale window) → allow
* immediately; the next response refreshes the view.
* - `remaining ≤ reserve` → pause until `reset` (let the window refill).
* - otherwise → even-spread: the spendable budget (`remaining − reserve`)
* divided across the time left in the window gives the per-request interval.
* Advance the cursor by `spent × interval`, where `spent` is the requests
* consumed since the last gate (`used − lastUsed`), so the spacing tracks
* real consumption — a multi-call item or a live spike pushes the next slot
* further out. A shrinking `remaining` also widens the interval, so bulk
* yields on both signals.
*/
export function computePace({
remaining,
reset,
used,
reserve,
now,
nextSlot,
lastUsed,
}) {
const baseline = used ?? lastUsed;
// No quota view, or a window that already rolled over (our `remaining` is
// stale): proceed now and let the next response refresh the view.
const allowNow = { delayMs: 0, nextSlot: now, lastUsed: baseline };
if (remaining == null || reset == null) {
return allowNow;
}
const resetMs = reset * 1000;
if (resetMs <= now) {
return allowNow;
}
if (remaining <= reserve) {
return { delayMs: resetMs - now, nextSlot: resetMs, lastUsed: baseline };
}
const interval = (resetMs - now) / (remaining - reserve);
// Clamp at 0: across a window rollover the server resets `used` to ~0 while
// our `lastUsed` still holds the pre-reset high, so the delta can go negative.
const spent =
used == null || lastUsed == null ? 0 : Math.max(0, used - lastUsed);
const slot = Math.max(now, nextSlot + spent * interval);
return { delayMs: slot - now, nextSlot: slot, lastUsed: baseline };
}
23 changes: 17 additions & 6 deletions lib/refresh.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import utils from "./utils.js";
import NotifyQueue from "notify-queue";
import debug from "./debug.js";
import Promise from "bluebird";
import pacer from "./pacer.js";

// Queues for making all refreshes be synchronous, one at a time.
var issueQueue = new NotifyQueue();
Expand Down Expand Up @@ -61,7 +62,8 @@ export default {
* and return a promise that is fulfilled when the item is fully processed.
*
* Single-item refreshes (webhooks, socket) have no end-of-run report, so no
* failure collector is attached.
* failure collector is attached — and they aren't quota-paced, so live traffic
* is never delayed.
*/
function pushOnQueue(queue) {
return function (githubResponse) {
Expand All @@ -81,13 +83,20 @@ function pushOnQueue(queue) {
* Each item carries the run's `onFailure` collector (null for single-item
* webhook/socket refreshes), so a failed parse is recorded for that run's
* end-of-run report.
*
* This is the bulk path, so it paces enqueueing against the GitHub quota:
* `pacer.gate()` waits for each item's slot *before* pushing it, keeping the
* shared queue shallow. The wait happens here, off the queue — so a paced or
* floor-paused backfill never blocks the single consumer, and live
* webhook/socket refreshes keep draining promptly.
Comment on lines +86 to +91

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand..

A backfill is going to happen from the CLI, not the running app. i.e. they are in separate processes. They share the API key of course, but not the queue.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea that pushOnQueue() is given higher priority than pushAllOnQueue()?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A backfill is going to happen from the CLI, not the running app. i.e. they are in separate processes.

True except for refresh.openPulls(), which gets called by the running app on startup. It pushes to the same queue as web hooks and uses pushAllOnQueue() to do its work.

Is the idea that pushOnQueue() is given higher priority than pushAllOnQueue()?

Via a crude mechanism that sits in front of adding to the queue. pushAllOnQueue() waits for the rate limit before adding to the queue, so its waiting happens async independent of the queue processing, which itself happens async of the queue pushes. While it's waiting, it isn't adding things to the queue while web hooks are free to add to the queue during that time. If the bulk producer coroutine isn't rate-limited at all, it will add many items to the queue, the processing of which will be in direct competition with the web hook items.

*/
function pushAllOnQueue(queue, onFailure) {
return function (githubResponses) {
return async function (githubResponses) {
for (const githubResponse of githubResponses) {
await pacer.gate();
queue.push({ response: githubResponse, onFailure: onFailure });

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still feels quite weird to be rate-limiting pushing things onto the queue.

i.e. pulling from the queue and executing the api call is what updates the knowledge of the world (the headers) and pushing to the queue is about future commitment.

I haven't hard time imagining if it work work out to have a queue that you're throttling additions to, but draining the queue is actually what gives you the feedback of allowable rates.

Maybe instead we throttle the actual thing that needs throttling to, but have different bulkReserve depending on the source of the action (smaller reserve for things triggered by the web-app and big reserve for cli-triggered actions). The refreshOpenPulls on load doesn't need special treatment as it lasts less than a minute and only happens on startup.

}
return new Promise(function (resolve) {
githubResponses.forEach(function (githubResponse) {
queue.push({ response: githubResponse, onFailure: onFailure });
});
queue.push(resolve);
});
};
Expand Down Expand Up @@ -212,7 +221,9 @@ export function processPullItem(
// queue or the fixed pop deps, so it stays scoped to the run that enqueued it —
// a bulk run collects its own failures, while webhook/socket refreshes (which
// push onFailure null) don't accumulate. Unwrap so the process functions see
// one response plus their injected collaborators.
// one response plus their injected collaborators. (Quota pacing happens at
// enqueue time, in pushAllOnQueue, not here — so a paused backfill can't block
// the single consumer that webhook refreshes also depend on.)
issueQueue.pop(function (item, next) {
if (typeof item === "function") {
item();
Expand Down
94 changes: 94 additions & 0 deletions test/pacer.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { test } from "node:test";
import assert from "node:assert/strict";
import { computePace } from "../lib/pacer.js";

const NOW = 1_700_000_000_000; // fixed clock (ms); reset values are NOW-relative
const RESERVE = 1000;
const resetIn = (seconds) => NOW / 1000 + seconds;

// Before the first response is observed there's no quota view, so a bulk
// request must proceed immediately rather than stall forever.
test("computePace allows the request when quota is unknown", () => {
const { delayMs } = computePace({
remaining: null,
reset: null,
used: null,
reserve: RESERVE,
now: NOW,
nextSlot: 0,
lastUsed: null,
});
assert.equal(delayMs, 0);
});

// A reset timestamp in the past means the window already rolled over and our
// remaining is stale — allow the request and let the next response refresh it.
test("computePace allows the request when the reset window is stale", () => {
const { delayMs } = computePace({
remaining: 0,
reset: resetIn(-10),
used: 5000,
reserve: RESERVE,
now: NOW,
nextSlot: 0,
lastUsed: 4000,
});
assert.equal(delayMs, 0);
});

// At or below the reserve floor, bulk must pause until the quota refills so it
// stops competing with live traffic.
test("computePace pauses until reset when remaining is at or below the reserve", () => {
const { delayMs, nextSlot } = computePace({
remaining: 500,
reset: resetIn(600),
used: 4500,
reserve: RESERVE,
now: NOW,
nextSlot: 0,
lastUsed: 4490,
});
assert.equal(delayMs, 600_000);
// The cursor jumps to the reset so post-refill pacing starts from the window edge.
assert.equal(nextSlot, NOW + 600_000);
});

// The real first-gate state: page one was fetched un-paced, so the after-hook
// `observe` has already populated the quota view (remaining/reset/used) — but no
// prior gate has set a `used` baseline. With nothing to diff against, the gate
// can't know how much was spent yet, so it proceeds immediately and just records
// the baseline (returned as lastUsed) for the next gate.
test("computePace does not delay the first gate, but records the usage baseline", () => {
const { delayMs, nextSlot, lastUsed } = computePace({
remaining: 5000,
reset: resetIn(3600),
used: 100,
reserve: RESERVE,
now: NOW,
nextSlot: 0,
lastUsed: null,
});
assert.equal(delayMs, 0);
assert.equal(nextSlot, NOW);
assert.equal(lastUsed, 100);
});

// The cursor advances by the requests actually spent since the last gate, not
// by one per gate — so a multi-call item waits proportionally longer.
test("computePace advances the cursor by the requests spent since the last gate", () => {
const params = {
remaining: 1100, // budget 100
reset: resetIn(100), // window 100_000ms → interval = 1000ms / request
used: undefined, // set per case
reserve: RESERVE,
now: NOW,
nextSlot: NOW,
lastUsed: 1000,
};

const oneCall = computePace({ ...params, used: 1001 });
assert.equal(oneCall.delayMs, 1000);

const eightCalls = computePace({ ...params, used: 1008 });
assert.equal(eightCalls.delayMs, 8000);
});