Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions engine/src/engine/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,34 @@ pub enum Decision {
/// decision / reason / count / last-seen). Low-cardinality, no secret values.
record: crate::engine::policy_log::PolicyDecisionRecord,
},
/// A per-repository TOFU signing baseline (JEF-263, ADR-0020): the learned set of
/// identities/issuers that have signed images under one `registry/repo`, plus when the
/// repo was first seen signed and whether that history is `established` yet. Written as a
/// **compacted, full-state** line — the latest line for a repo supersedes every earlier
/// one on replay (last-write-wins), so re-appending it (on change / per pass) keeps a live
/// repo's baseline inside the rotation window instead of silently aging out and re-arming
/// cold-start trust. Every field is `#[serde(default)]` so a future field can be added
/// without breaking replay of older lines. The identities/issuers are UNTRUSTED Fulcio
/// cert text — a consumer MUST escape them at render (the zero-egress state never leaves
/// the cluster).
SigningBaseline {
/// The canonical `registry/repo` key (host-normalized, tag/digest stripped).
#[serde(default)]
repo: String,
/// Every signer identity observed signing an image under this repo (sorted, deduped).
#[serde(default)]
identities: Vec<String>,
/// Every OIDC issuer observed signing under this repo (sorted, deduped).
#[serde(default)]
issuers: Vec<String>,
/// When the repo was first observed with a verifying signature, Unix epoch millis.
#[serde(default)]
first_seen_ms: u64,
/// Whether the signed history is `established` (matured past the TOFU grace window) —
/// `false` is a freshly-learned baseline (weaker evidence).
#[serde(default)]
established: bool,
},
}

/// One journal line: a [`Decision`] stamped with when it was recorded. The timestamp is
Expand Down
5 changes: 5 additions & 0 deletions engine/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ impl Engine {
// restore from the same journal, since it (not the engine) holds the shared
// decision ring.
journal::Decision::Admission { .. } => {}
// Per-repo signing baselines (JEF-263) restore into the dedicated
// `SigningBaselineStore`, not the engine's findings/reversion state —
// `run_watch` does that restore from the same journal, since it (not the engine
// core) owns the baseline store the sweep feeds each pass.
journal::Decision::SigningBaseline { .. } => {}
}
}
if latest_at > std::time::SystemTime::UNIX_EPOCH {
Expand Down
25 changes: 24 additions & 1 deletion engine/src/engine/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,22 @@ pub async fn run_watch(
// were already running when protector started (no admission event ever replays them).
let signing_observer = build_signing_observer();

// The durable per-repo TOFU signing baseline (JEF-263, ADR-0020): learned from the sweep's
// observed postures, persisted to (and, here on boot, replayed from) the SAME decision
// journal the engine already owns — so a repo's established signed history survives a
// restart instead of resetting to cold-start trust. Built once and mutated each pass;
// per-pass compaction inside the sweep keeps live baselines inside the journal's rotation
// window. A disabled journal ⇒ in-memory only (honest re-learn on restart).
let signing_journal = engine.journal();
let mut signing_baselines = state::SigningBaselineStore::new();
let restored_baselines = signing_baselines.restore(signing_journal.as_ref());
if restored_baselines > 0 {
tracing::info!(
restored_baselines,
"restored per-repo signing baselines from the durable journal"
);
}

// Runtime evidence (Falco alerts + the eBPF agent's behaviors) is a stream, not a
// an HTTP endpoint falcosidekick POSTs to, are held in a TTL'd store, and wake
// the loop so a "happening now" signal is acted on immediately (it flips a
Expand Down Expand Up @@ -433,7 +449,14 @@ pub async fn run_watch(
// shared admission-decision log (JEF-261). Bounded by the observer's cache + MAX_IMAGES;
// a no-op when no observer is configured. Run before `process` so the inventory reflects
// the same snapshot the engine just reasoned over.
super::signing_sweep::sweep(signing_observer.as_ref(), &snapshot, &policy_log).await;
super::signing_sweep::sweep(
signing_observer.as_ref(),
&snapshot,
&policy_log,
Some(&mut signing_baselines),
signing_journal.as_ref(),
)
.await;

engine.process(&snapshot).await;
}
Expand Down
129 changes: 124 additions & 5 deletions engine/src/engine/signing_sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
//! the bounded [`PolicyDecisionLog`] ring — no durable schema.

use std::sync::Arc;
use std::time::SystemTime;

use k8s_openapi::api::core::v1::Pod;

use super::journal::DecisionJournal;
use super::observe::Snapshot;
use super::policy_log::{PolicyDecisionLog, PolicyDecisionRecord};
use super::state::SigningBaselineStore;
use crate::policies::signature::{PostureMap, SigningObserver, SigningPosture};

/// Collect every distinct container image a running Pod references — regular, init, and
Expand Down Expand Up @@ -96,15 +99,42 @@ fn record_postures(log: &PolicyDecisionLog, map: &PostureMap) {
}
}

/// Fold this pass's observed postures into the durable per-repo signing baseline (JEF-263),
/// then compact the whole store back to the journal so a live repo's baseline stays inside
/// the rotation window (never aged out). Only `Signed` postures learn a baseline; the store
/// itself ignores the rest. Every step is a no-op on a disabled journal / cold store, so this
/// is safe to call unconditionally each pass.
fn learn_baselines(store: &mut SigningBaselineStore, journal: &DecisionJournal, map: &PostureMap) {
let now_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
for (image, posture) in map.entries() {
store.observe(image, posture, now_ms);
}
// Full-state compaction per pass: re-append every live repo so rotation can never drop an
// established baseline (the durability discipline that keeps cold-start trust from
// silently re-arming). Bounded by the store's repo cap; a handful of small lines for a
// real cluster.
store.compact(journal);
}

/// Run one signing-posture sweep over the snapshot's running pods and record the result.
/// A no-op (zero outbound calls, nothing recorded) when no observer is configured — so a
/// deploy without signature config behaves exactly as before. Bounded by the observer's
/// `max_images` cap + TTL cache, so a steady cluster re-sweeps for free and a churny one
/// can't amplify outbound verification.
///
/// The observed postures also feed the durable per-repo signing baseline (JEF-263) when a
/// `baseline` store + `journal` are wired: a signed image teaches the repo's TOFU baseline,
/// which is persisted to (and, on boot, replayed from) the SAME decision journal. This is
/// pure learning — never a gate (ADR-0016); drift/enforcement are later stages.
pub async fn sweep(
observer: Option<&SigningObserver>,
snapshot: &Snapshot,
log: &Arc<PolicyDecisionLog>,
baseline: Option<&mut SigningBaselineStore>,
journal: &DecisionJournal,
) {
let Some(observer) = observer else {
return;
Expand All @@ -115,6 +145,9 @@ pub async fn sweep(
}
let map = observer.sweep(images).await;
record_postures(log, &map);
if let Some(store) = baseline {
learn_baselines(store, journal, &map);
}
}

#[cfg(test)]
Expand All @@ -126,6 +159,7 @@ mod tests {
use async_trait::async_trait;

use super::*;
use crate::engine::state::SigningBaselineStore;
use crate::policies::signature::{SignatureObserver, Signer};

fn pod(images: &[&str], init: &[&str]) -> Pod {
Expand Down Expand Up @@ -211,7 +245,14 @@ mod tests {
..Default::default()
};
let log = Arc::new(PolicyDecisionLog::new());
sweep(Some(&obs), &snapshot, &log).await;
sweep(
Some(&obs),
&snapshot,
&log,
None,
&DecisionJournal::disabled(),
)
.await;
let rows = log.snapshot();
assert_eq!(rows.len(), 3, "one row per distinct running image");
let by_image: HashMap<_, _> = rows.iter().map(|r| (r.image.as_str(), r)).collect();
Expand Down Expand Up @@ -244,7 +285,14 @@ mod tests {
..Default::default()
};
let log = Arc::new(PolicyDecisionLog::new());
sweep(Some(&obs), &snapshot, &log).await;
sweep(
Some(&obs),
&snapshot,
&log,
None,
&DecisionJournal::disabled(),
)
.await;
let rows = log.snapshot();
assert_eq!(rows[0].signature, "checking");
assert_ne!(
Expand All @@ -261,7 +309,7 @@ mod tests {
..Default::default()
};
let log = Arc::new(PolicyDecisionLog::new());
sweep(None, &snapshot, &log).await;
sweep(None, &snapshot, &log, None, &DecisionJournal::disabled()).await;
assert!(log.snapshot().is_empty());
}

Expand All @@ -276,12 +324,83 @@ mod tests {
..Default::default()
};
let log = Arc::new(PolicyDecisionLog::new());
sweep(Some(&obs), &snapshot, &log).await;
sweep(Some(&obs), &snapshot, &log).await;
sweep(
Some(&obs),
&snapshot,
&log,
None,
&DecisionJournal::disabled(),
)
.await;
sweep(
Some(&obs),
&snapshot,
&log,
None,
&DecisionJournal::disabled(),
)
.await;
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"the second sweep is served from the observer cache — zero new outbound calls"
);
}

#[tokio::test]
async fn sweep_teaches_the_repo_baseline_from_a_signed_image() {
// The JEF-263 wiring: a signed image observed by the sweep learns a per-repo baseline,
// keyed by registry/repo. Pure learning — the log still records `allow`.
let (obs, _calls) = observer(vec![(
"ghcr.io/org/app:1",
signed("https://github.com/org/app/.github/workflows/r.yaml@refs/tags/v1"),
)]);
let snapshot = Snapshot {
pods: vec![pod(&["ghcr.io/org/app:1"], &[])],
..Default::default()
};
let log = Arc::new(PolicyDecisionLog::new());
let mut baseline = SigningBaselineStore::new();
sweep(
Some(&obs),
&snapshot,
&log,
Some(&mut baseline),
&DecisionJournal::disabled(),
)
.await;
let learned = baseline
.get("ghcr.io/org/app")
.expect("the signed image taught a repo baseline");
assert!(
learned
.identities
.contains("https://github.com/org/app/.github/workflows/r.yaml@refs/tags/v1")
);
assert!(
!learned.established,
"first sight is a fresh, weak baseline"
);
}

#[tokio::test]
async fn sweep_does_not_learn_a_baseline_for_an_unsigned_image() {
// A not-signed posture must never create a baseline (that's JEF-264 drift territory).
let (obs, _calls) = observer(vec![]); // unknown image ⇒ FakeObserver returns NotSigned
let snapshot = Snapshot {
pods: vec![pod(&["docker.io/library/postgres:16"], &[])],
..Default::default()
};
let log = Arc::new(PolicyDecisionLog::new());
let mut baseline = SigningBaselineStore::new();
sweep(
Some(&obs),
&snapshot,
&log,
Some(&mut baseline),
&DecisionJournal::disabled(),
)
.await;
assert!(baseline.is_empty(), "an unsigned image learns no baseline");
}
}
2 changes: 2 additions & 0 deletions engine/src/engine/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod readiness;
mod recency;
mod report;
mod reversion;
mod signing_baseline;
mod verdict_store;

pub use evidence::{CveEvidence, EntryEvidence, FindingEvidence};
Expand All @@ -30,4 +31,5 @@ pub(crate) use readiness::derive_readiness;
pub use recency::{Delta, RecencyInfo, StoredPosture};
pub use report::{LeftAloneEntry, Report, WouldActEntry, default_window_report};
pub use reversion::{ReversionLog, ReversionRecord};
pub use signing_baseline::{DEFAULT_MAX_REPOS, SigningBaseline, SigningBaselineStore};
pub use verdict_store::{BakeStats, ModelHealth, ReadinessConfig, VerdictEntry, VerdictStore};
Loading
Loading