You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Aggregation is a blocking stage, so GroupBy must hold per-group state until all input is consumed. Today that footprint is unbounded in two places:
GroupBy::result() materializes the entire result as a single Rows (one Row/Entry/Definition/Type graph per distinct group).
Even with the configurable spill storage from Storage for Aggregating Functions #1390, the MEMORY_FALLBACK_EXTERNAL backend does not bound peak memory: ExternalAggregationStorage::all() reloads every spilled bucket into one in-memory $merged map, and result() then builds a second full Rows. At high cardinality the "spill" backend therefore uses more peak memory than the in-memory one (measured with groupBy('email')->aggregate(count(), sum()): filesystem ≈ 432 MB vs memory ≈ 305 MB at ~98k groups; ~1.1 GB at 300k). Spilling bounds ingestion, then all()/result() un-bound it.
Goal: make GroupBy aggregation bounded end-to-end on the external path — O(batchSize + bucketCount) instead of O(distinctGroups) — and drop the redundant full-result copy for every backend. This is what makes MEMORY_FALLBACK_EXTERNAL actually deliver scaling beyond RAM. It also aligns GroupBy with Flow's streaming contract: it is currently the only operator that emits one giant batch instead of a stream of Rows.
Two coupled changes — neither alone fixes the filesystem path (one removes $merged, the other removes the result Rows; both are required):
Streaming k-way merge (storage): spill buckets sorted by group hash (one record per entry), then merge them with a min-heap, yielding one fully-merged group at a time — no $merged map.
Streaming result (core):GroupBy emits the result in batches instead of one Rows; the downstream pipeline already consumes per-Rows incrementally, so this flows through to file/stream loaders unchanged.
API Adjustments
No user-facing DataFrame/DSL API change — internal behavior only; existing groupBy() / aggregate() usage is unchanged.
Internal:
GroupBucketsCache: get(): array → stream(string): Generator<string, GroupEntry>; spill format becomes one sorted record per entry.
ExternalAggregationStorage::all(): streaming k-way merge (reuses the SplMinHeap pattern from ExternalSort).
GroupBy: add resultStream(FlowContext, int $batchSize): Generator<Rows>; GroupByProcessor does yield from. result() kept for the pivot path / fetch() / BC. Batch size from existing config.
Semantics decisions needed (the reason this touches core):
Output ordering: the memory backend yields groups in insertion order; the external merge yields them sorted by group hash. GroupBy does not currently guarantee an order — needs a contract decision.
to_output table mode:StreamLoader closes the output stream after the first batch and AsciiTableFormatter prints a header per batch, so multi-batch groupBy output breaks interactive table rendering. Requires fixing StreamLoader (write the header once, don't close mid-stream) or special-casing. Main cross-cutting change.
fetch() unaffected (materializes by design); pivot stays materialized (small, separate path).
Are you intending to also work on proposed change?
Yes
Are you interested in sponsoring this change?
No
Integration & Dependencies
No new dependencies. Reuses existing primitives (SplMinHeap as in ExternalSort, the configured Serializer, Filesystem).
Describe the Proposal
Aggregation is a blocking stage, so GroupBy must hold per-group state until all input is consumed. Today that footprint is unbounded in two places:
GroupBy::result()materializes the entire result as a singleRows(oneRow/Entry/Definition/Typegraph per distinct group).MEMORY_FALLBACK_EXTERNALbackend does not bound peak memory:ExternalAggregationStorage::all()reloads every spilled bucket into one in-memory$mergedmap, andresult()then builds a second fullRows. At high cardinality the "spill" backend therefore uses more peak memory than the in-memory one (measured withgroupBy('email')->aggregate(count(), sum()): filesystem ≈ 432 MB vs memory ≈ 305 MB at ~98k groups; ~1.1 GB at 300k). Spilling bounds ingestion, thenall()/result()un-bound it.Goal: make GroupBy aggregation bounded end-to-end on the external path —
O(batchSize + bucketCount)instead ofO(distinctGroups)— and drop the redundant full-result copy for every backend. This is what makesMEMORY_FALLBACK_EXTERNALactually deliver scaling beyond RAM. It also aligns GroupBy with Flow's streaming contract: it is currently the only operator that emits one giant batch instead of a stream ofRows.Two coupled changes — neither alone fixes the filesystem path (one removes
$merged, the other removes the resultRows; both are required):$mergedmap.GroupByemits the result in batches instead of oneRows; the downstream pipeline already consumes per-Rowsincrementally, so this flows through to file/stream loaders unchanged.API Adjustments
No user-facing DataFrame/DSL API change — internal behavior only; existing
groupBy()/aggregate()usage is unchanged.Internal:
GroupBucketsCache:get(): array→stream(string): Generator<string, GroupEntry>; spill format becomes one sorted record per entry.ExternalAggregationStorage::all(): streaming k-way merge (reuses theSplMinHeappattern fromExternalSort).GroupBy: addresultStream(FlowContext, int $batchSize): Generator<Rows>;GroupByProcessordoesyield from.result()kept for the pivot path /fetch()/ BC. Batch size from existing config.Semantics decisions needed (the reason this touches core):
to_outputtable mode:StreamLoadercloses the output stream after the first batch andAsciiTableFormatterprints a header per batch, so multi-batch groupBy output breaks interactive table rendering. Requires fixingStreamLoader(write the header once, don't close mid-stream) or special-casing. Main cross-cutting change.fetch()unaffected (materializes by design); pivot stays materialized (small, separate path).Are you intending to also work on proposed change?
Yes
Are you interested in sponsoring this change?
No
Integration & Dependencies
No new dependencies. Reuses existing primitives (
SplMinHeapas inExternalSort, the configuredSerializer,Filesystem).