You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The runner was OOM-killing at high job-concurrency. Logs were shipped to OpsLevel synchronously inside the log drain loop, so a slow API call stalled draining while the pod's stdout/stderr buffers grew without bound. This makes log shipping async and bounds every buffer, turning peak memory into a predictable function of two knobs.
How it works
flowchart LR
pod[Job pod\nstdout/stderr] -->|exec stream| buf[SafeBuffer\ncapped per stream]
buf -->|drain every 50ms\nnever blocks| proc[Log processors]
proc -->|batch| q[bounded channel\ndepth 2]
q --> ship[shipper goroutine\nblocking API call]
ship -->|RunnerAppendJobLog| api[OpsLevel API]
Loading
Drain loop never blocks on the network.submit() hands a batch to a bounded channel; a dedicated shipper goroutine makes the (blocking) API call. A slow API backs up into the queue instead of stalling the drain.
Every buffer is capped.SafeBuffer drops writes past its cap (bytes counted in the new opslevel_runner_log_bytes_dropped metric). The ship queue is bounded (depth 2). So memory can't grow unbounded no matter how chatty the job or how slow the API.
Final batch is never lost. At flush the pod has stopped producing, so the last batch is enqueued with a blocking send before the channel closes.
Before, the only thing limiting buffer growth was how fast the API responded — unbounded by design. Now there is a hard ceiling: capped buffers + bounded ship queue mean worst-case memory is fixed by config, so the pod limit can be sized from the formula instead of guessed. Tune memory with one lever (job-pod-log-max-size); the rest is internal.
Buffer stuff is a bit dense for my humanity, so I'm going to cop out here and copy pasta what look like some valid claude code review comments:
src/pkg/opslevelAppendLogProcessor.go:97 / src/pkg/faktoryRunnerAppendJobLogProcessor.go:87 — Flush deadlocks indefinitely on a slow API, defeating the whole point
of the PR.
The final batch is enqueued with an unconditional blocking send s.batches <- batch. If the shipper goroutine is stuck inside s.client.RunnerAppendJobLog (verified:
MutateCTX uses context.Background(), and clientGQL.go never applies settings.timeout to the underlying retryablehttp client — no per-call timeout exists), and the
depth-2 queue is full, Flush hangs forever. Same shape in the Faktory variant. The PR's stated premise is "the drain loop never blocks on the network" — Flush
violates that. Fix: select { case s.batches <- batch: case <-time.After(timeout): s.droppedBatches++ }, or bound <-s.done similarly.
src/pkg/buffer.go:46-50 — SafeBuffer fabricates log lines on mid-payload truncation.
When a multi-line Write is truncated at the cap, the surviving bytes contain a partial line (no terminating \n) that gets concatenated with the next Write. Example:
cap fills mid-line during "line1\nline2\nline3\n" → buffer holds "line1\nli"; next Write "more\n" produces a fabricated line "limore\n" that is base64-encoded and
shipped to OpsLevel as if it were real pod output. Truncation is acceptable; silent fabrication is not. Fix: truncate at the last \n within p[:room] (or drop the
whole chunk and count it).
src/cmd/run.go:122 and src/cmd/faktory.go:165 — Shipper goroutines leak permanently if streamer.Flush is not reached.
Each processor's constructor starts go s.ship(); the goroutine exits only when Flush() calls close(s.batches). Neither call site uses defer streamer.Flush(outcome),
and there is no recover(). Any panic inside runner.Run (k8s client-go nil-deref, informer edge case) or tracer.Start strands the shipper goroutine + its held
*opslevel.Client / faktory.Helper / batched slices for the runner's lifetime. In API mode the runner is long-running, so leaks accumulate. Fix: defer
streamer.Flush(outcome) at all three call sites.
src/pkg/logs.go:130 — Pre-existing s.quit <- true deadlock now also leaks shipper goroutines on SIGTERM.
quit is unbuffered. On ctx.Done() (SIGTERM / shutdown), Run returns without draining quit; the subsequent s.quit <- true in Flush blocks forever, so processor Flushes
never run, so the new shipper goroutines never close. In scope because Flush is a touched function (PR added recordDroppedBytes() to it). Fix: select { case s.quit
<- true: default: }, or buffer quit.
src/pkg/buffer.go:25 + src/cmd/test.go:42 — maxSize <= 0 magic-value sentinel for "unbounded" is a misconfig footgun.
--job-pod-log-max-size is a plain Int with no floor validation; OPSLEVEL_JOB_POD_LOG_MAX_SIZE=0 silently disables the safety net documented as "protect the runner
from OOM." Fix: validate > 0 at the flag boundary; pass math.MaxInt from tests instead of 0.
src/pkg/opslevelAppendLogProcessor.go:11-16, 94-95, 119-122 (mirrored in faktoryRunner…) — Doc comments narrate the PR fix.
CLAUDE.md: "Don't reference the current task, fix, or callers", "Default to writing no comments. Only add one when the WHY is non-obvious". The shipQueueDepth block,
the Flush/submit/ship comments, and the SafeBuffer type comment all narrate the OOM fix and name callers (LogStreamer, client-go's exec stream). These read like
commit messages and will rot when callers change. Move rationale to the PR description; keep code comments to non-obvious invariants.
src/pkg/opslevelAppendLogProcessor.go:93-153 vs src/pkg/faktoryRunnerAppendJobLogProcessor.go:83-156 — Lifecycle plumbing is duplicated character-for-character.
batches/done/droppedBatches, takeBatch(), the non-blocking submit() select, and the Flush() blocking-send + close + wait-done sequence are now identical across the
two processors. Drift has already started (Faktory's ship() lacks the client==nil guard; warning wording differs: "shipping backpressure" vs "enqueue backpressure").
Extract a batchShipper helper that takes a send func([]string) error callback.
src/pkg/logs_test.go:62 — TestLogStreamerCapsOversizedLine relies on time.Sleep(150ms) to wait for the 50ms ticker.
Real-time sleep tied to ticker cadence; flaky on contended CI. Fix: have the capture processor signal on a channel when a line arrives, or expose a synchronous drain
hook.
Buffer stuff is a bit dense for my humanity, so I'm going to cop out here and copy pasta what look like some valid claude code review comments:
src/pkg/opslevelAppendLogProcessor.go:97 / src/pkg/faktoryRunnerAppendJobLogProcessor.go:87 — Flush deadlocks indefinitely on a slow API, defeating the whole point
of the PR.
The final batch is enqueued with an unconditional blocking send s.batches <- batch. If the shipper goroutine is stuck inside s.client.RunnerAppendJobLog (verified:
MutateCTX uses context.Background(), and clientGQL.go never applies settings.timeout to the underlying retryablehttp client — no per-call timeout exists), and the
depth-2 queue is full, Flush hangs forever. Same shape in the Faktory variant. The PR's stated premise is "the drain loop never blocks on the network" — Flush
violates that. Fix: select { case s.batches <- batch: case <-time.After(timeout): s.droppedBatches++ }, or bound <-s.done similarly.
src/pkg/buffer.go:46-50 — SafeBuffer fabricates log lines on mid-payload truncation.
When a multi-line Write is truncated at the cap, the surviving bytes contain a partial line (no terminating \n) that gets concatenated with the next Write. Example:
cap fills mid-line during "line1\nline2\nline3\n" → buffer holds "line1\nli"; next Write "more\n" produces a fabricated line "limore\n" that is base64-encoded and
shipped to OpsLevel as if it were real pod output. Truncation is acceptable; silent fabrication is not. Fix: truncate at the last \n within p[:room] (or drop the
whole chunk and count it).
src/cmd/run.go:122 and src/cmd/faktory.go:165 — Shipper goroutines leak permanently if streamer.Flush is not reached.
Each processor's constructor starts go s.ship(); the goroutine exits only when Flush() calls close(s.batches). Neither call site uses defer streamer.Flush(outcome),
and there is no recover(). Any panic inside runner.Run (k8s client-go nil-deref, informer edge case) or tracer.Start strands the shipper goroutine + its held
*opslevel.Client / faktory.Helper / batched slices for the runner's lifetime. In API mode the runner is long-running, so leaks accumulate. Fix: defer
streamer.Flush(outcome) at all three call sites.
src/pkg/logs.go:130 — Pre-existing s.quit <- true deadlock now also leaks shipper goroutines on SIGTERM.
quit is unbuffered. On ctx.Done() (SIGTERM / shutdown), Run returns without draining quit; the subsequent s.quit <- true in Flush blocks forever, so processor Flushes
never run, so the new shipper goroutines never close. In scope because Flush is a touched function (PR added recordDroppedBytes() to it). Fix: select { case s.quit
<- true: default: }, or buffer quit.
src/pkg/buffer.go:25 + src/cmd/test.go:42 — maxSize <= 0 magic-value sentinel for "unbounded" is a misconfig footgun.
--job-pod-log-max-size is a plain Int with no floor validation; OPSLEVEL_JOB_POD_LOG_MAX_SIZE=0 silently disables the safety net documented as "protect the runner
from OOM." Fix: validate > 0 at the flag boundary; pass math.MaxInt from tests instead of 0.
src/pkg/opslevelAppendLogProcessor.go:11-16, 94-95, 119-122 (mirrored in faktoryRunner…) — Doc comments narrate the PR fix.
CLAUDE.md: "Don't reference the current task, fix, or callers", "Default to writing no comments. Only add one when the WHY is non-obvious". The shipQueueDepth block,
the Flush/submit/ship comments, and the SafeBuffer type comment all narrate the OOM fix and name callers (LogStreamer, client-go's exec stream). These read like
commit messages and will rot when callers change. Move rationale to the PR description; keep code comments to non-obvious invariants.
src/pkg/opslevelAppendLogProcessor.go:93-153 vs src/pkg/faktoryRunnerAppendJobLogProcessor.go:83-156 — Lifecycle plumbing is duplicated character-for-character.
batches/done/droppedBatches, takeBatch(), the non-blocking submit() select, and the Flush() blocking-send + close + wait-done sequence are now identical across the
two processors. Drift has already started (Faktory's ship() lacks the client==nil guard; warning wording differs: "shipping backpressure" vs "enqueue backpressure").
Extract a batchShipper helper that takes a send func([]string) error callback.
src/pkg/logs_test.go:62 — TestLogStreamerCapsOversizedLine relies on time.Sleep(150ms) to wait for the 50ms ticker.
Real-time sleep tied to ticker cadence; flaky on contended CI. Fix: have the capture processor signal on a channel when a line arrives, or expose a synchronous drain
hook.
noted. Time to put my claude on a pip
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
The runner was OOM-killing at high
job-concurrency. Logs were shipped to OpsLevel synchronously inside the log drain loop, so a slow API call stalled draining while the pod's stdout/stderr buffers grew without bound. This makes log shipping async and bounds every buffer, turning peak memory into a predictable function of two knobs.How it works
flowchart LR pod[Job pod\nstdout/stderr] -->|exec stream| buf[SafeBuffer\ncapped per stream] buf -->|drain every 50ms\nnever blocks| proc[Log processors] proc -->|batch| q[bounded channel\ndepth 2] q --> ship[shipper goroutine\nblocking API call] ship -->|RunnerAppendJobLog| api[OpsLevel API]submit()hands a batch to a bounded channel; a dedicated shipper goroutine makes the (blocking) API call. A slow API backs up into the queue instead of stalling the drain.SafeBufferdrops writes past its cap (bytes counted in the newopslevel_runner_log_bytes_droppedmetric). The ship queue is bounded (depth 2). So memory can't grow unbounded no matter how chatty the job or how slow the API.Sizing math
Peak memory per job ≈
6 × job-pod-log-max-size:log-max-sizelog-max-sizelog-max-sizeDefault
job-pod-log-max-sizelowered 1 MB → 256 KB (≈1.5 MB/job).job-concurrencyWhy this prevents OOMs
Before, the only thing limiting buffer growth was how fast the API responded — unbounded by design. Now there is a hard ceiling: capped buffers + bounded ship queue mean worst-case memory is fixed by config, so the pod limit can be sized from the formula instead of guessed. Tune memory with one lever (
job-pod-log-max-size); the rest is internal.