[IcebergIO] Use GroupByKey for bounded inputs to avoid OOM#39156
[IcebergIO] Use GroupByKey for bounded inputs to avoid OOM#39156atognolas wants to merge 1 commit into
Conversation
WriteToPartitions uses GroupIntoBatches for both bounded and unbounded inputs. GroupIntoBatches materializes all values for a key into a Java List in memory. For nested schemas, RowCoder.getEncodedElementByteSize() reports wire size (~500 bytes/row) but the actual heap cost is ~10x higher (~4-5KB/row). This causes OOM on bounded pipelines with large partitions — observed 277GB heap usage and 99.79% GC thrashing on n4-highmem-48 (384GB) machines. Switch to GroupByKey for bounded inputs. GBK returns a lazy Iterable backed by the shuffle service, consuming constant ~100MB memory regardless of partition size. Keep GroupIntoBatches for unbounded inputs where triggering semantics and optional autoSharding (via withShardedKey) are needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request optimizes memory usage in the IcebergIO connector by switching from GroupIntoBatches to GroupByKey for batch processing. By leveraging the shuffle service's lazy iteration, the connector now maintains a constant memory footprint regardless of partition size, effectively resolving OOM issues observed in production pipelines with complex nested schemas. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request modifies the partition grouping logic in WriteToPartitions.java to use GroupByKey for bounded inputs while keeping GroupIntoBatches for unbounded inputs. The reviewer noted that ignoring autoSharding for bounded inputs could cause performance regressions, such as hot key bottlenecks, and suggested preserving the GroupIntoBatches path with sharding when autoSharding is explicitly enabled.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if (IcebergUtils.isUnbounded(input)) { | ||
| GroupIntoBatches<Row, Row> groupIntoPartitions = | ||
| GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE); | ||
| if (triggeringFrequency != null) { | ||
| groupIntoPartitions = groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency); | ||
| } | ||
|
|
||
| if (autoSharding) { | ||
| if (autoSharding) { | ||
| return input | ||
| .apply(groupIntoPartitions.withShardedKey()) | ||
| .setCoder( | ||
| KvCoder.of( | ||
| org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder), | ||
| IterableCoder.of(dataCoder))) | ||
| .apply( | ||
| "DropShardId", | ||
| MapElements.into(kvs(rows(), iterables(rows()))) | ||
| .via(kv -> KV.of(kv.getKey().getKey(), kv.getValue()))) | ||
| .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder))); | ||
| } | ||
| return input | ||
| .apply(groupIntoPartitions.withShardedKey()) | ||
| .setCoder( | ||
| KvCoder.of( | ||
| org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder), | ||
| IterableCoder.of(dataCoder))) | ||
| .apply( | ||
| "DropShardId", | ||
| MapElements.into(kvs(rows(), iterables(rows()))) | ||
| .via(kv -> KV.of(kv.getKey().getKey(), kv.getValue()))) | ||
| .apply(groupIntoPartitions) | ||
| .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder))); | ||
| } else { | ||
| return input | ||
| .apply(groupIntoPartitions) | ||
| .apply(GroupByKey.create()) | ||
| .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder))); | ||
| } |
There was a problem hiding this comment.
Silently ignoring autoSharding for bounded inputs can lead to severe performance regressions (such as hot key bottlenecks) for batch pipelines that explicitly enabled this option to parallelize writes to large partitions.
We should preserve the GroupIntoBatches path with sharding when autoSharding is explicitly enabled, even for bounded inputs, while defaulting to the memory-efficient GroupByKey when autoSharding is disabled.
if (IcebergUtils.isUnbounded(input) || autoSharding) {
GroupIntoBatches<Row, Row> groupIntoPartitions =
GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE);
if (IcebergUtils.isUnbounded(input) && triggeringFrequency != null) {
groupIntoPartitions = groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency);
}
if (autoSharding) {
return input
.apply(groupIntoPartitions.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
IterableCoder.of(dataCoder)))
.apply(
"DropShardId",
MapElements.into(kvs(rows(), iterables(rows())))
.via(kv -> KV.of(kv.getKey().getKey(), kv.getValue())))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder)));
}
return input
.apply(groupIntoPartitions)
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder)));
} else {
return input
.apply(GroupByKey.create())
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder)));
}There was a problem hiding this comment.
@atognolas I think this is a valid point.
Were you seeing OOMs even with autosharding enabled? AFAIK autosharding should distribute to different keys, so the state for any one key shouldn't get too large.
If it makes sense, we can keep the GroupIntoBatches.withShardedKey() path for autosharding batch writes.
Otherwise maybe we can add a num_shards config to do fixed sharding per partition, and stick with GBK
|
@ahmedabu98 — Would you mind reviewing this? These are a set of IcebergIO fixes validated on production-scale benchmarks (39M users, 400 partitions, 99-column schema). The four PRs are independent and can be reviewed/merged in any order. |
|
Assigning reviewers: R: @ahmedabu98 for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Summary
GroupIntoBatcheswithGroupByKeyfor bounded (batch) inputs inWriteToPartitionsGroupIntoBatchesonly for unbounded (streaming) inputs where triggering semantics and optionalautoSharding(viawithShardedKey) are neededGroupIntoBatches.ofByteSize()materializes all values for a key into a JavaListin memory —RowCoderreports wire size (~500 bytes/row) but actual heap is ~10× higher for nested schemasMotivation
Observed OOM on production pipelines with nested schemas (99 columns, 3 protobuf structs):
n4-standard-16(64GB): GC thrashing at 99.79%, SDK harness killedn4-highmem-48(384GB): 277GB/377GB heap used for a single batchGroupByKeyreturns a lazyIterablebacked by the shuffle service, consuming constant ~100MB memory regardless of partition size.Impact
GroupIntoBatcheswith triggering + optionalautoShardingautoShardingonly meaningful for streaming (Dataflow dynamic key splitting)Test plan
WriteToPartitionsTestpasses🤖 Generated with Claude Code