Skip to content

[IcebergIO] Cache destination Row keys in AssignDestinationsAndPartitions#39155

Open
atognolas wants to merge 1 commit into
apache:masterfrom
atognolag:u1-row-key-cache
Open

[IcebergIO] Cache destination Row keys in AssignDestinationsAndPartitions#39155
atognolas wants to merge 1 commit into
apache:masterfrom
atognolag:u1-row-key-cache

Conversation

@atognolas

Copy link
Copy Markdown
Contributor

Summary

  • Add a Map<String, Row> keyCache in AssignDestinationsAndPartitions.AssignDoFn to reuse Row key objects across millions of input rows
  • Only N distinct (destination, partition) combinations exist (e.g. 400 for a 400-bucket table), but every row currently allocates a new Row with schema validation overhead
  • After the first N rows, all subsequent lookups are cache hits

Motivation

Observed during production benchmarks at scale (39M users, 400 partitions, 99-column schema). Row allocation + schema validation dominated bundle processing time unnecessarily.

Test plan

  • Existing AssignDestinationsAndPartitionsTest passes
  • Run IcebergIO integration tests
  • Verify via profiling that Row allocation drops after initial cache warm-up

🤖 Generated with Claude Code

…ions

AssignDestinationsAndPartitions creates a new Row key object for every
input record. In practice, only N distinct (destination, partition)
combinations exist (e.g. 400 for a 400-bucket table), but millions of
rows each allocate a new Row with schema validation overhead.

Add a Map<String, Row> keyCache that caches Row keys by their
composite string key. After the first N rows, all subsequent lookups
are cache hits, eliminating millions of redundant Row allocations.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request optimizes the IcebergIO connector by caching destination and partition Row keys. By reusing these objects instead of creating new ones for every input row, the change reduces memory pressure and CPU usage during bundle processing, particularly in large-scale scenarios with many partitions.

Highlights

  • Performance Optimization: Introduced a key cache in AssignDoFn to reuse Row objects for destination and partition combinations, significantly reducing object allocation overhead.
  • Resource Management: Implemented a Map-based cache that stores Row keys, preventing redundant schema validation and object creation for recurring partition paths.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a cache (keyCache) in AssignDestinationsAndPartitions to avoid recreating Row objects for every element. The reviewer suggests using a nested map structure (Map<String, Map<String, Row>>) instead of a flat map with a concatenated string key to prevent unnecessary string allocations on cache hits, which can significantly improve performance when processing millions of rows.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

private transient @MonotonicNonNull Map<String, PartitionKey> partitionKeys;
private transient @MonotonicNonNull Map<String, BeamRowWrapper> wrappers;
private transient @MonotonicNonNull Map<String, Instant> lastRefreshTimes;
private transient @MonotonicNonNull Map<String, Row> keyCache;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using a single flat map with a concatenated string key (tableIdentifier + "|" + partitionPath) requires allocating a new String object for every single input element, even on cache hits. Since this DoFn processes millions of rows, this overhead can be avoided by using a nested map structure: Map<String, Map<String, Row>>.

Suggested change
private transient @MonotonicNonNull Map<String, Row> keyCache;
private transient @MonotonicNonNull Map<String, Map<String, Row>> keyCache;

Comment on lines +180 to +186
String cacheKey = tableIdentifier + "|" + partitionPath;
Row destAndPartition = checkStateNotNull(keyCache).get(cacheKey);
if (destAndPartition == null) {
destAndPartition =
Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build();
keyCache.put(cacheKey, destAndPartition);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Instead of concatenating tableIdentifier and partitionPath into a new String on every element, use a nested map lookup. This avoids any object allocation on cache hits, which is the common case.

      Map<String, Map<String, Row>> cache = checkStateNotNull(keyCache);
      Map<String, Row> tableCache = cache.get(tableIdentifier);
      if (tableCache == null) {
        tableCache = new HashMap<>();
        cache.put(tableIdentifier, tableCache);
      }
      Row destAndPartition = tableCache.get(partitionPath);
      if (destAndPartition == null) {
        destAndPartition =
            Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build();
        tableCache.put(partitionPath, destAndPartition);
      }

@atognolas

Copy link
Copy Markdown
Contributor Author

@ahmedabu98 — Would you mind reviewing this? These are a set of IcebergIO fixes validated on production-scale benchmarks (39M users, 400 partitions, 99-column schema). The four PRs are independent and can be reviewed/merged in any order.

@ahmedabu98 ahmedabu98 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we're okay without this PR. Creating Rows is cheap and we already have done all the work to get partitionPath at this point, so there's not much to optimize

@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants