Skip to content

Bound log buffers and ship logs async to prevent OOM#303

Open
jasonopslevel wants to merge 1 commit into
mainfrom
buffering-fix
Open

Bound log buffers and ship logs async to prevent OOM#303
jasonopslevel wants to merge 1 commit into
mainfrom
buffering-fix

Conversation

@jasonopslevel

@jasonopslevel jasonopslevel commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

What

The runner was OOM-killing at high job-concurrency. Logs were shipped to OpsLevel synchronously inside the log drain loop, so a slow API call stalled draining while the pod's stdout/stderr buffers grew without bound. This makes log shipping async and bounds every buffer, turning peak memory into a predictable function of two knobs.

How it works

flowchart LR
    pod[Job pod\nstdout/stderr] -->|exec stream| buf[SafeBuffer\ncapped per stream]
    buf -->|drain every 50ms\nnever blocks| proc[Log processors]
    proc -->|batch| q[bounded channel\ndepth 2]
    q --> ship[shipper goroutine\nblocking API call]
    ship -->|RunnerAppendJobLog| api[OpsLevel API]
Loading
  • Drain loop never blocks on the network. submit() hands a batch to a bounded channel; a dedicated shipper goroutine makes the (blocking) API call. A slow API backs up into the queue instead of stalling the drain.
  • Every buffer is capped. SafeBuffer drops writes past its cap (bytes counted in the new opslevel_runner_log_bytes_dropped metric). The ship queue is bounded (depth 2). So memory can't grow unbounded no matter how chatty the job or how slow the API.
  • Final batch is never lost. At flush the pod has stopped producing, so the last batch is enqueued with a blocking send before the channel closes.

Sizing math

Peak memory per job ≈ 6 × job-pod-log-max-size:

term size
stdout + stderr buffers 2 × log-max-size
batch being assembled 1 × log-max-size
2 queued batches (base64) 2 × 1.34 × log-max-size
peak runner memory ≈ baseline + job-concurrency × 6 × job-pod-log-max-size

Default job-pod-log-max-size lowered 1 MB → 256 KB (≈1.5 MB/job).

job-concurrency log memory + ~100 MB baseline
3 ~4.5 MB ~105 MB
200 ~300 MB ~400 MB

Why this prevents OOMs

Before, the only thing limiting buffer growth was how fast the API responded — unbounded by design. Now there is a hard ceiling: capped buffers + bounded ship queue mean worst-case memory is fixed by config, so the pod limit can be sized from the formula instead of guessed. Tune memory with one lever (job-pod-log-max-size); the rest is internal.

@derek-etherton-opslevel

Copy link
Copy Markdown
Contributor

Buffer stuff is a bit dense for my humanity, so I'm going to cop out here and copy pasta what look like some valid claude code review comments:

  1. src/pkg/opslevelAppendLogProcessor.go:97 / src/pkg/faktoryRunnerAppendJobLogProcessor.go:87 — Flush deadlocks indefinitely on a slow API, defeating the whole point
    of the PR.
    The final batch is enqueued with an unconditional blocking send s.batches <- batch. If the shipper goroutine is stuck inside s.client.RunnerAppendJobLog (verified:
    MutateCTX uses context.Background(), and clientGQL.go never applies settings.timeout to the underlying retryablehttp client — no per-call timeout exists), and the
    depth-2 queue is full, Flush hangs forever. Same shape in the Faktory variant. The PR's stated premise is "the drain loop never blocks on the network" — Flush
    violates that. Fix: select { case s.batches <- batch: case <-time.After(timeout): s.droppedBatches++ }, or bound <-s.done similarly.

  2. src/pkg/buffer.go:46-50 — SafeBuffer fabricates log lines on mid-payload truncation.
    When a multi-line Write is truncated at the cap, the surviving bytes contain a partial line (no terminating \n) that gets concatenated with the next Write. Example:
    cap fills mid-line during "line1\nline2\nline3\n" → buffer holds "line1\nli"; next Write "more\n" produces a fabricated line "limore\n" that is base64-encoded and
    shipped to OpsLevel as if it were real pod output. Truncation is acceptable; silent fabrication is not. Fix: truncate at the last \n within p[:room] (or drop the
    whole chunk and count it).

  3. src/cmd/run.go:122 and src/cmd/faktory.go:165 — Shipper goroutines leak permanently if streamer.Flush is not reached.
    Each processor's constructor starts go s.ship(); the goroutine exits only when Flush() calls close(s.batches). Neither call site uses defer streamer.Flush(outcome),
    and there is no recover(). Any panic inside runner.Run (k8s client-go nil-deref, informer edge case) or tracer.Start strands the shipper goroutine + its held
    *opslevel.Client / faktory.Helper / batched slices for the runner's lifetime. In API mode the runner is long-running, so leaks accumulate. Fix: defer
    streamer.Flush(outcome) at all three call sites.

  4. src/pkg/logs.go:130 — Pre-existing s.quit <- true deadlock now also leaks shipper goroutines on SIGTERM.
    quit is unbuffered. On ctx.Done() (SIGTERM / shutdown), Run returns without draining quit; the subsequent s.quit <- true in Flush blocks forever, so processor Flushes
    never run, so the new shipper goroutines never close. In scope because Flush is a touched function (PR added recordDroppedBytes() to it). Fix: select { case s.quit
    <- true: default: }, or buffer quit.

  5. src/pkg/buffer.go:25 + src/cmd/test.go:42 — maxSize <= 0 magic-value sentinel for "unbounded" is a misconfig footgun.
    --job-pod-log-max-size is a plain Int with no floor validation; OPSLEVEL_JOB_POD_LOG_MAX_SIZE=0 silently disables the safety net documented as "protect the runner
    from OOM." Fix: validate > 0 at the flag boundary; pass math.MaxInt from tests instead of 0.

  6. src/pkg/opslevelAppendLogProcessor.go:11-16, 94-95, 119-122 (mirrored in faktoryRunner…) — Doc comments narrate the PR fix.
    CLAUDE.md: "Don't reference the current task, fix, or callers", "Default to writing no comments. Only add one when the WHY is non-obvious". The shipQueueDepth block,
    the Flush/submit/ship comments, and the SafeBuffer type comment all narrate the OOM fix and name callers (LogStreamer, client-go's exec stream). These read like
    commit messages and will rot when callers change. Move rationale to the PR description; keep code comments to non-obvious invariants.

  7. src/pkg/opslevelAppendLogProcessor.go:93-153 vs src/pkg/faktoryRunnerAppendJobLogProcessor.go:83-156 — Lifecycle plumbing is duplicated character-for-character.
    batches/done/droppedBatches, takeBatch(), the non-blocking submit() select, and the Flush() blocking-send + close + wait-done sequence are now identical across the
    two processors. Drift has already started (Faktory's ship() lacks the client==nil guard; warning wording differs: "shipping backpressure" vs "enqueue backpressure").
    Extract a batchShipper helper that takes a send func([]string) error callback.

  8. src/pkg/logs_test.go:62 — TestLogStreamerCapsOversizedLine relies on time.Sleep(150ms) to wait for the 50ms ticker.
    Real-time sleep tied to ticker cadence; flaky on contended CI. Fix: have the capture processor signal on a channel when a line arrives, or expose a synchronous drain
    hook.

@jasonopslevel

Copy link
Copy Markdown
Contributor Author

noted

Buffer stuff is a bit dense for my humanity, so I'm going to cop out here and copy pasta what look like some valid claude code review comments:

  1. src/pkg/opslevelAppendLogProcessor.go:97 / src/pkg/faktoryRunnerAppendJobLogProcessor.go:87 — Flush deadlocks indefinitely on a slow API, defeating the whole point
    of the PR.
    The final batch is enqueued with an unconditional blocking send s.batches <- batch. If the shipper goroutine is stuck inside s.client.RunnerAppendJobLog (verified:
    MutateCTX uses context.Background(), and clientGQL.go never applies settings.timeout to the underlying retryablehttp client — no per-call timeout exists), and the
    depth-2 queue is full, Flush hangs forever. Same shape in the Faktory variant. The PR's stated premise is "the drain loop never blocks on the network" — Flush
    violates that. Fix: select { case s.batches <- batch: case <-time.After(timeout): s.droppedBatches++ }, or bound <-s.done similarly.
  2. src/pkg/buffer.go:46-50 — SafeBuffer fabricates log lines on mid-payload truncation.
    When a multi-line Write is truncated at the cap, the surviving bytes contain a partial line (no terminating \n) that gets concatenated with the next Write. Example:
    cap fills mid-line during "line1\nline2\nline3\n" → buffer holds "line1\nli"; next Write "more\n" produces a fabricated line "limore\n" that is base64-encoded and
    shipped to OpsLevel as if it were real pod output. Truncation is acceptable; silent fabrication is not. Fix: truncate at the last \n within p[:room] (or drop the
    whole chunk and count it).
  3. src/cmd/run.go:122 and src/cmd/faktory.go:165 — Shipper goroutines leak permanently if streamer.Flush is not reached.
    Each processor's constructor starts go s.ship(); the goroutine exits only when Flush() calls close(s.batches). Neither call site uses defer streamer.Flush(outcome),
    and there is no recover(). Any panic inside runner.Run (k8s client-go nil-deref, informer edge case) or tracer.Start strands the shipper goroutine + its held
    *opslevel.Client / faktory.Helper / batched slices for the runner's lifetime. In API mode the runner is long-running, so leaks accumulate. Fix: defer
    streamer.Flush(outcome) at all three call sites.
  4. src/pkg/logs.go:130 — Pre-existing s.quit <- true deadlock now also leaks shipper goroutines on SIGTERM.
    quit is unbuffered. On ctx.Done() (SIGTERM / shutdown), Run returns without draining quit; the subsequent s.quit <- true in Flush blocks forever, so processor Flushes
    never run, so the new shipper goroutines never close. In scope because Flush is a touched function (PR added recordDroppedBytes() to it). Fix: select { case s.quit
    <- true: default: }, or buffer quit.
  5. src/pkg/buffer.go:25 + src/cmd/test.go:42 — maxSize <= 0 magic-value sentinel for "unbounded" is a misconfig footgun.
    --job-pod-log-max-size is a plain Int with no floor validation; OPSLEVEL_JOB_POD_LOG_MAX_SIZE=0 silently disables the safety net documented as "protect the runner
    from OOM." Fix: validate > 0 at the flag boundary; pass math.MaxInt from tests instead of 0.
  6. src/pkg/opslevelAppendLogProcessor.go:11-16, 94-95, 119-122 (mirrored in faktoryRunner…) — Doc comments narrate the PR fix.
    CLAUDE.md: "Don't reference the current task, fix, or callers", "Default to writing no comments. Only add one when the WHY is non-obvious". The shipQueueDepth block,
    the Flush/submit/ship comments, and the SafeBuffer type comment all narrate the OOM fix and name callers (LogStreamer, client-go's exec stream). These read like
    commit messages and will rot when callers change. Move rationale to the PR description; keep code comments to non-obvious invariants.
  7. src/pkg/opslevelAppendLogProcessor.go:93-153 vs src/pkg/faktoryRunnerAppendJobLogProcessor.go:83-156 — Lifecycle plumbing is duplicated character-for-character.
    batches/done/droppedBatches, takeBatch(), the non-blocking submit() select, and the Flush() blocking-send + close + wait-done sequence are now identical across the
    two processors. Drift has already started (Faktory's ship() lacks the client==nil guard; warning wording differs: "shipping backpressure" vs "enqueue backpressure").
    Extract a batchShipper helper that takes a send func([]string) error callback.
  8. src/pkg/logs_test.go:62 — TestLogStreamerCapsOversizedLine relies on time.Sleep(150ms) to wait for the 50ms ticker.
    Real-time sleep tied to ticker cadence; flaky on contended CI. Fix: have the capture processor signal on a channel when a line arrives, or expose a synchronous drain
    hook.

noted. Time to put my claude on a pip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants