Feature: Incremental Append Scan#3512
Conversation
Pure refactor of the table-scan hierarchy with no behavioural change, isolating the scan-architecture work from the upcoming incremental append scan feature (split out of apache#3364 per reviewer request). - Introduce BaseScan(ABC) as a superclass of TableScan holding the snapshot-independent surface (row filter, options, limit, chaining helpers, and the format-converter sinks built on to_arrow). TableScan keeps snapshot_id, catalog, table_identifier, snapshot(), use_ref() and abstract count(), so its existing surface is unchanged. - to_pandas/to_polars become concrete defaults on BaseScan and to_duckdb/to_ray move up too. This loosens, but does not break, TableScan's abstract contract. to_arrow_batch_reader stays concrete on DataScan (not abstract on BaseScan) so external TableScan subclasses that were valid before still instantiate. - Extract ManifestGroupPlanner from DataScan and route all of DataScan's local planning (scan_plan_helper and _plan_files_local) through it, so the partition/metrics/residual evaluators live in exactly one place. - Preserve all docstrings and explanatory comments. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Adds IncrementalAppendScan, which accumulates the data appended between two snapshots. Builds on the BaseScan / ManifestGroupPlanner refactor in sm/table-scan-refactor; split out of apache#3364 per reviewer request, reviving the work from apache#2235. - Table.incremental_append_scan(...) builds an IncrementalAppendScan over the (from_snapshot_id_exclusive, to_snapshot_id_inclusive] range; StagedTable overrides it to raise, mirroring scan(). - Walks the append-only ancestors in the range, dedups the data manifests whose added_snapshot_id is in range (Set semantics via ManifestFile __eq__/__hash__), and filters manifest entries to ADDED-in-range via a new manifest_entry_filter on ManifestGroupPlanner.plan_files. - Projects onto the table's current schema (not the snapshot schema). - from_snapshot_id_exclusive is validated with is_parent_ancestor_of, so an expired start cursor is accepted as long as the lineage still passes through it; equal from/to is rejected. Adds the snapshot helpers ancestors_between_ids and is_parent_ancestor_of. - Unit tests (validation, current-schema projection, type preservation, expired-from) and integration tests (append-only, non-append ignored, schema evolution within range, partition/metrics pruning, disconnected snapshots), plus the test_incremental_read provision fixture. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| table_identifier=self._identifier, | ||
| ) | ||
|
|
||
| def incremental_append_scan( |
There was a problem hiding this comment.
[AI reviewer aid] New convenience method mirroring Table.scan (naming thought). Args mirror scan minus snapshot_id plus the two snapshot-range args.
| ) -> DataScan: | ||
| raise ValueError("Cannot scan a staged table") | ||
|
|
||
| def incremental_append_scan( |
There was a problem hiding this comment.
[AI reviewer aid] Mirrors StagedTable.scan two lines up — staged tables have no committed metadata to scan against.
| def to_pandas(self, **kwargs: Any) -> pd.DataFrame: | ||
| """Read a Pandas DataFrame eagerly from this Iceberg table. | ||
|
|
||
| class IncrementalAppendScan(BaseScan): |
There was a problem hiding this comment.
[AI reviewer aid] Mirrors Java's IncrementalAppendScan interface and BaseIncrementalAppendScan implementation. Only the append variant of IncrementalScan — changelog scan is out of scope here.
|
|
||
| con = connection or duckdb.connect(database=":memory:") | ||
| con.register(table_name, self.to_arrow()) | ||
| def projection(self) -> Schema: |
There was a problem hiding this comment.
[AI reviewer aid] Always uses the table's current schema, unlike TableScan.projection() which uses the snapshot's schema when snapshot_id is set. Matches Java: BaseTable.newIncrementalAppendScan constructs the scan with schema(), which on BaseTable.schema() returns ops.current().schema() — the table's current schema, not snapshot-bound. C++ does the same: TableScanBuilder::ResolveSnapshotSchema falls through to metadata_->Schema() for incremental scans (no snapshot_id on the context). Older-schema rows in range get NULL for new columns — covered by test_incremental_append_scan_schema_evolution_within_range.
| return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) | ||
|
|
||
| return con | ||
| def plan_files(self) -> Iterable[FileScanTask]: |
There was a problem hiding this comment.
[AI reviewer aid] Mirrors Java's BaseIncrementalAppendScan.doPlanFiles and appendFilesFromSnapshots — walk ancestors, filter to append snapshots, dedup manifests whose added_snapshot_id is in range, then filter manifest entries by (snapshot_id in range, status == ADDED). Set semantics on the manifest dedup match the Java snippet and rely on ManifestFile.__eq__/__hash__ being defined (which they are on main since #2233).
| def plan_files( | ||
| self, | ||
| manifests: Iterable[ManifestFile], | ||
| manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _: True, |
There was a problem hiding this comment.
[AI reviewer aid] This manifest filter is new. Introducing that for append scan logic where some manifests are skipped
| yield from ancestors_of(to_snapshot, table_metadata) | ||
|
|
||
|
|
||
| def ancestors_between_ids( |
There was a problem hiding this comment.
[AI reviewer aid] Mirrors Java's SnapshotUtil.ancestorsBetween. Differs from the existing ancestors_between (snapshot-based, inclusive-inclusive) above by taking IDs and being exclusive-inclusive, to match the incremental-scan validation pattern. Raises if to_snapshot_id_inclusive is missing from metadata, mirroring Java.
| yield from ancestors_of(to_snapshot, table_metadata) | ||
|
|
||
|
|
||
| def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool: |
There was a problem hiding this comment.
[AI reviewer aid] Mirrors Java's SnapshotUtil.isParentAncestorOf, including the Cannot find snapshot raise on missing snapshot (Java throws one hop down, via ancestorsOf(long, lookup)).
| def test_incremental_append_scan_metrics_pruning(catalog: Catalog) -> None: | ||
| test_table = catalog.load_table("default.test_incremental_read") | ||
|
|
||
| # Non-partition predicate: the manifest/partition evaluators degenerate, leaving the per-file |
There was a problem hiding this comment.
[AI reviewer aid] Filters on a non-partition column (number), so the manifest and partition evaluators degenerate to ALWAYS_TRUE and it's the per-file metrics evaluator (column min/max/null stats) that must do all the pruning. Covers a layer of ManifestGroupPlanner that the existing DataScan integration coverage doesn't exercise end-to-end through a real scan.
…s faithful moves - TableScan(BaseScan, ABC) -> TableScan(BaseScan): ABC is redundant since BaseScan already extends it. No behavioural change (identical MRO, ABCMeta metaclass, __abstractmethods__; TableScan stays non-instantiable). - ManifestGroupPlanner.plan_manifest_entries / plan_files: restore the baseline statement order and blank lines so the extracted bodies read as verbatim moves of DataScan.scan_plan_helper / _plan_files_local. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- IncrementalAppendScan.__init__: order from/to before options/limit to match the Table.incremental_append_scan factory and the TableScan convention (snapshot-range args precede options/limit). Safe: update() reconstructs by keyword. - IncrementalAppendScan.plan_files: return [] (not iter([])) in the empty-range branch for a consistent, reusable result. - is_parent_ancestor_of: guard against a None parent before comparing. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Unit: StagedTable.incremental_append_scan raises (mirrors scan()); None round-trips through from_snapshot_exclusive/to_snapshot_inclusive. - Integration: empty-range result still carries the projected schema. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Per review: the test asserts behaviour (a table with no current snapshot plans zero files across every read path); the rationale for adding it belongs in the PR discussion, not an in-code comment tied to internals. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
# Conflicts: # pyiceberg/table/__init__.py
# Conflicts: # pyiceberg/table/__init__.py
# Rationale for this change Split out of #3364 at the reviewers' request (to @rambleraptor's and @kevinjqliu's point that the architecture refactor and the new feature are easier to review separately). This PR will be followed by #3512 that makes use of this refactor to implement append scans more cleanly. **This PR is a pure refactor with no behavioural change.** The diff looks large, but it is almost entirely code being moved, not changed. # Changes - Introduce `BaseScan(ABC)` as a superclass of `TableScan`, holding the snapshot-independent surface: `table_metadata`, `io`, `row_filter`, `selected_fields`, `case_sensitive`, `options`, `limit`, the chaining helpers (`select` / `filter` / `with_case_sensitive` / `update`), and the format-converter sinks (`to_pandas` / `to_polars` / `to_duckdb` / `to_ray`) built on an abstract `to_arrow()`. - `TableScan` keeps everything snapshot-specific — `snapshot_id`, `catalog`, `table_identifier`, `snapshot()`, the snapshot-aware `projection()`, `use_ref()`, and abstract `count()` — so its existing public surface is unchanged. - Extract `ManifestGroupPlanner` from `DataScan` (the `_build_*` evaluators, `_check_sequence_number`, and the manifest-entry / file-scan-task planning) and route all of `DataScan`'s local planning — `scan_plan_helper()` and `_plan_files_local()` — through it, so the partition / metrics / residual pipeline lives in exactly one place. # Back-compatibility - `to_arrow_batch_reader` stays concrete on `DataScan` and is **not** made abstract on `BaseScan`, so external `TableScan` subclasses that were valid before still instantiate. - `to_pandas` / `to_polars` become concrete defaults on `BaseScan`, and `to_duckdb` / `to_ray` move up to `BaseScan` too. This **loosens** `TableScan`'s abstract contract (they were abstract on `TableScan` before) without breaking existing subclasses. - All docstrings and explanatory comments are preserved. # Are these changes tested? The existing `DataScan` unit and integration tests exercise the refactored planning path unchanged. A regression test is added for the no-current-snapshot planning path, which now flows through `ManifestGroupPlanner`. # Are there any user-facing changes? No. Some previously-`DataScan`-only converters (`to_duckdb` / `to_ray`) are now inherited by `TableScan` as well, which is purely additive. --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com>
|
#3511 is now merged! |
The TableScan/ManifestGroupPlanner refactor (apache#3511) is now merged to main, so this resolves to the feature-only delta. Adopts the refactor exactly as merged, including the review changes in Kevin's 'Apply suggestions from code review' commit: - BaseScan no longer declares an abstract projection() (it is not part of the base contract; snapshot-aware projection lives on TableScan). - DataScan.partition_filters stays a @cached_property. To keep BaseScan projection-free while still sharing the Arrow materialization between DataScan and IncrementalAppendScan, the _to_arrow*_via_file_scan_tasks helpers take the projected schema as an explicit argument (each scan passes its own projection()).
|
Capturing @kevinjqliu's follow-ups: #3364 (comment). I'm happy to follow-up with the docs PR for this right after - my thinking was to keep this PR isolated given it's large enough as is 🙌. One more: |
| and manifest_entry.status == ManifestEntryStatus.ADDED, | ||
| ) | ||
|
|
||
| def to_arrow(self) -> pa.Table: |
There was a problem hiding this comment.
[AI reviewer aid] DataScan.to_arrow / to_arrow_batch_reader take a dictionary_columns argument (added in #3461), and the shared _to_arrow*_via_file_scan_tasks helpers already thread it. I've deliberately not exposed it on IncrementalAppendScan here, to keep this PR as isolated as possible. Follow-up will add dictionary_columns to the incremental scan's Arrow reads, with a dict-encoding test mirroring #3461's test_dictionary_columns_produces_dict_encoded_output.
rambleraptor
left a comment
There was a problem hiding this comment.
I want to make sure that we're supporting compaction properly. That seems to be the biggest missing piece we're missing
Can you write a test where we add some files, compact them, and then ensure that the scan doesn't pick up the newly compacted file (for a double count)?
rambleraptor
left a comment
There was a problem hiding this comment.
A couple nits and a question around adding a new test case. Thanks a lot!
| self.from_snapshot_id_exclusive = from_snapshot_id_exclusive | ||
| self.to_snapshot_id_inclusive = to_snapshot_id_inclusive | ||
|
|
||
| def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int | None) -> IAS: |
There was a problem hiding this comment.
What do you think about changing this to update_from_snapshot_id_exclusive? (same for inclusive below)
The variables are public, so we have both a variable and a method and it can be unclear which to use.
| options=options, | ||
| limit=limit, | ||
| ) | ||
| self.from_snapshot_id_exclusive = from_snapshot_id_exclusive |
There was a problem hiding this comment.
We should make this a private variable and then expose it out with a getter.
The methods below are trying to make it explicit how to change these variables, so we should make it impossible to change them directly.
There was a problem hiding this comment.
Thanks for bringing this up - I realised I need to think more about the API shape here now and in the future (#3364 (comment)).
Thinking out loud, a couple of examples:
scan = test_table.incremental_append_scan(
selected_fields=("number",),
# This is *required*, diverging from Java but mirroring Spark
from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id,
to_snapshot_id_inclusive=test_table.snapshots()[2].snapshot_id,
)
# No setters on the `IncrementalAppendScan` - similar to `snapshot_id` with normal scans.
# Matches Spark surface, that only supports `from` and `to` snapshots being set, and nothing else
# https://iceberg.apache.org/docs/latest/spark-queries/#incremental-readvs
scan = (
test_table.incremental_append_scan(
selected_fields=("number",)
# No snapshot IDs here maybe? But that differs from normal `.scan()` surface where snapshot_id is present there but not on the `DataScan`
)
.from_snapshot_id_exclusive(...) # *OR* inclusive *OR* omitted - diverging from Spark, doesn't matter because this is internal to the scan
.to_snapshot_id_inclusive(...)
.use_ref(...) # Support more beyond just snapshot IDs, permitting bad combinations at API layer (they would throw at runtime instead)
)
# Matches Java surface that supports far more than Spark - but is engine facing (not user-facing)I previously argued for matching Spark's semantics but I'm now thinking that we should keep the API surface here more open, similar to Java - which shouldn't be too hard to do. Let me have a proper think here (but happy if folks want to chime in with opinions).
There was a problem hiding this comment.
I've now moved to the following setup. Like the normal data scan, IncrementalAppendScan is a plain value object: its constructor parameters are exactly its public attributes — the invariant the inherited BaseScan.update() relies on to return refined copies, so the builders below are thin update() wrappers.
class IncrementalAppendScan(BaseScan):
# range state (new) — a single `from` slot + an inclusive flag, mirroring java/cpp:
from_snapshot_id: int | None
from_snapshot_inclusive: bool # False = exclusive (the default)
to_snapshot_id: int | None
branch: str | None
# + inherited BaseScan config: row_filter, selected_fields, case_sensitive, options, limitThe factory's Spark-style from_snapshot_id_exclusive / to_snapshot_id_inclusive kwargs map
onto those attributes (from_snapshot_id + from_snapshot_inclusive):
# Factory (front door) — mirrors `table.scan(...)`; both range bounds optional.
table.incremental_append_scan(
from_snapshot_id_exclusive: int | None = None, # start, exclusive; default = oldest ancestor of `to`
to_snapshot_id_inclusive: int | None = None, # end, inclusive; default = current snapshot
row_filter: str | BooleanExpression = ALWAYS_TRUE,
selected_fields: tuple[str, ...] = ("*",),
case_sensitive: bool = True,
options: Properties = EMPTY_DICT,
limit: int | None = None,
) -> IncrementalAppendScan
# Builders (power surface) — each returns a refined copy, like select()/filter().
IncrementalAppendScan.from_snapshot_id_exclusive(id) / .from_snapshot_id_inclusive(id)
.to_snapshot_id_inclusive(id)
.use_branch(name)
# .from_ref_* / to_ref_* — start/end by ref, follow-up
# + inherited select() / filter() / with_case_sensitive() / update()Choosing from_snapshot_id_exclusive / to_snapshot_id_inclusive instead of set/update_snapshot_id_exclusive matches the existing flow of filter() / select() naming for normal data scans and makes the chaining above read more smoothly, IMO.
(Note: I've left out branch / use_branch to be added as a follow-up together with appropriate tests, because that makes sense to me as a splitting point here)
…ntalAppendScan Address review feedback: remove the from_snapshot_exclusive/to_snapshot_inclusive fluent builders (which clashed with the public from/to attributes and left them mutable) in favour of setting the snapshot range at construction, mirroring DataScan.snapshot_id and Spark's required start-snapshot-id option surface. - from_snapshot_id_exclusive is now a required, keyword-only argument; the deferred plan-time 'start snapshot not set' error is gone (fail-fast). - Range-first, keyword-only signatures on Table.incremental_append_scan and IncrementalAppendScan.__init__. - StagedTable.incremental_append_scan keeps from optional so a no-arg call raises the staged-table ValueError, not a TypeError. - Simplify user-facing docstrings to Spark's option surface (the expired-cursor behaviour stays covered by tests and internal comments). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Rework the IncrementalAppendScan surface toward the Java/C++ engine-facing shape (agreed in the PR discussion), keeping all config as public attributes: - State is from_snapshot_id + from_snapshot_inclusive (False=exclusive) + to_snapshot_id -- a single start slot plus a flag, mirroring Java's TableScanContext. Builders from_snapshot_id_exclusive / from_snapshot_id_inclusive / to_snapshot_id_inclusive return refined copies via BaseScan.update(). - The start is now optional: an unset start scans from the oldest ancestor of the end snapshot (matching Java); an inclusive start resolves to its parent as the exclusive boundary; an exclusive start still admits an expired snapshot. The end still defaults to the current snapshot, and an empty table with no range set scans nothing. - Table.incremental_append_scan keeps the Spark-style surface (from_snapshot_id_exclusive / to_snapshot_id_inclusive), mapped onto the fields. - Add snapshots.is_ancestor_of (mirrors Java SnapshotUtil.isAncestorOf) to validate inclusive starts. Branch selection and ref-based start/end remain follow-ups. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback: verify an incremental append scan does not double-count files produced by compaction. Insert a compaction snapshot into the test_incremental_read fixture (system.rewrite_data_files rewrites the two letter='b' files into one, an operation=replace snapshot) and add an integration test asserting a scan spanning the compaction reads each appended row exactly once -- the rewritten file, added by the compaction rather than an append, is not picked up. Re-index the existing incremental integration tests for the new snapshot layout. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…elper - Restore covariant=True on the IAS TypeVar, matching the sibling scan TypeVars. - Narrow a builder test assertion to set equality (drop the dead disjunction). - Make is_parent_ancestor_of symmetric with is_ancestor_of (any(); the explicit None guard was redundant since the compared id is non-None). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| to_snapshot_id = current_snapshot.snapshot_id | ||
|
|
||
| # An unset start scans the whole lineage of the end snapshot (from its oldest ancestor). | ||
| if self.from_snapshot_id is None: |
There was a problem hiding this comment.
[AI reviewer aid] An unset start scans from the oldest ancestor of the end snapshot, matching Java's BaseIncrementalScan (a null exclusive boundary; ancestors_between_ids(None, to) then walks the whole lineage, cf. oldestAncestorOf). Earlier revisions required the start (Spark's start-snapshot-id); opened up to match Java's engine-facing surface.
| if self.from_snapshot_id is None: | ||
| return None, to_snapshot_id | ||
|
|
||
| if self.from_snapshot_inclusive: |
There was a problem hiding this comment.
[AI reviewer aid] An inclusive start resolves to its parent as the exclusive boundary (None at the root) and must be present and an ancestor of the end. Mirrors Java's BaseIncrementalScan.fromSnapshotIdExclusive (isAncestorOf check, then parentId()).
|
|
||
| # An exclusive start does not need to be present in the table metadata (it may have been | ||
| # expired). It is valid as long as it is the parent of some ancestor of the end snapshot. | ||
| if not is_parent_ancestor_of(to_snapshot_id, self.from_snapshot_id, self.table_metadata): |
There was a problem hiding this comment.
[AI reviewer aid] An exclusive start is validated with is_parent_ancestor_of (not is_ancestor_of), so an expired start cursor is accepted as long as the lineage still passes through it; equal from/to is rejected (a snapshot is never its own parent-ancestor). Mirrors Java's BaseIncrementalScan (see the expiry note there).
| self.from_snapshot_inclusive = from_snapshot_inclusive | ||
| self.to_snapshot_id = to_snapshot_id | ||
|
|
||
| def from_snapshot_id_exclusive(self: IAS, from_snapshot_id: int) -> IAS: |
There was a problem hiding this comment.
[AI reviewer aid] Builders map to Java's overloaded fromSnapshotExclusive(long) / fromSnapshotInclusive(long). Python has no overloading, so the variants get distinct names; ref-based start (from_ref_*) is a follow-up. Each returns a refined copy via BaseScan.update(), like select() / filter().
| yield from ancestors_of(to_snapshot, table_metadata) | ||
|
|
||
|
|
||
| def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool: |
There was a problem hiding this comment.
[AI reviewer aid] Mirrors Java's SnapshotUtil.isAncestorOf (a snapshot is its own ancestor). Validates an inclusive start; sibling of is_parent_ancestor_of below.
|
|
||
| @pytest.mark.integration | ||
| @pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) | ||
| def test_incremental_append_scan_does_not_double_count_compacted_files(catalog: Catalog) -> None: |
There was a problem hiding this comment.
[AI reviewer aid] Compaction (rewrite_data_files, snapshots[3]) commits a replace snapshot whose rewritten file is added by that snapshot, not by an append. The append-only filter drops the replace snapshot and the manifest added_snapshot_id / status == ADDED entry filter drops its file, so the appended rows are read exactly once — never double-counted with the rewritten file.
rewrite_data_files commits no snapshot when there's nothing to compact, which would silently shift the fixture's snapshot indices and make the double-count test scan to the wrong (also non-append) snapshot while still passing. Assert snapshots[3] really is the compaction (operation=replace) so a no-op fails loudly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| # Compact: letter='b' has two files (from the previous two appends); rewrite them into one. | ||
| # This commits a non-append (replace) snapshot whose rewritten file the append scan must not pick up. |
Closes #2634.
Rationale for this change
Adds
IncrementalAppendScan, which reads the data appended between two snapshots — the building block for incremental ingestion. Largely a revival of the work in #2235; see #2634 and the previous PRs for motivation.Split out of #3364 at the reviewers' request, and builds on the now-merged
BaseScan/ManifestGroupPlannerrefactor (#3511), so this PR's diff is the append-scan feature alone.The surface mirrors Iceberg-Java's engine-facing API (snapshot IDs, inclusive/exclusive start, optional start) rather than the narrower Spark read options, since PyIceberg is increasingly used by engines (e.g. Polars). See the API discussion on this PR.
References: https://github.com/apache/iceberg (Iceberg-Java and Spark) and apache/iceberg-cpp#590. Inline review-aid comments (prefixed
[AI reviewer aid]) point at the relevant reference code.API
Table.incremental_append_scan(...)returns anIncrementalAppendScan;StagedTableoverrides it to raise, mirroringscan(). The scan reads the rows added by append snapshots in(from, to], projected onto the table's current schema; delete / overwrite / replace snapshots in the range (e.g. compaction) are ignored.The range is set via the factory's Spark-style kwargs or the builder methods, each of which returns a refined copy (like
select()/filter()):The range is held as public attributes —
from_snapshot_id+from_snapshot_inclusive+to_snapshot_id— a single start slot plus an inclusive flag, mirroring Java'sTableScanContextand consistent with the other scans.Changes
BaseIncrementalScan: an unset start scans from the oldest ancestor of the end; an inclusive start resolves to its parent as the exclusive boundary; an exclusive start is validated withis_parent_ancestor_of, so an expired start cursor is accepted as long as the lineage still passes through it; the end defaults to the current snapshot; an empty table with no range set scans nothing.added_snapshot_idis in range (set semantics viaManifestFile.__eq__/__hash__), and filters manifest entries toADDED-in-range via a newmanifest_entry_filteronManifestGroupPlanner.plan_files. Compacted (rewrite_data_files) output is therefore not picked up — no double counting.NULLfor newer columns.ancestors_between_ids,is_ancestor_of, andis_parent_ancestor_of.to_arrow/to_arrow_batch_reader) is shared withDataScanvia small module-level helpers that take the projected schema explicitly, soBaseScanstays projection-free (per the Refactor: extractBaseScanandManifestGroupPlanner#3511 review).Out of scope (tracked follow-ups)
use_branch) and per-endpoint ref/tag start & end (from_ref_*/to_ref_*) — the rest of the engine-facing surface Java exposes.count(), REST server-side planning, and user-facing doc examples (mkdocs).dictionary_columnsonIncrementalAppendScan.to_arrow/to_arrow_batch_reader(added toDataScanin feat: add dictionary_columns to to_arrow() / to_arrow_batch_reader() for memory-efficient reads #3461; the shared helpers already thread it) — kept out to isolate this PR.Are these changes tested?
Yes — unit tests (range resolution including unset / inclusive / exclusive and expired start, current-schema projection, builder and
update()copies, empty table, staged-table guard) and integration tests (append-only, non-append snapshots ignored, compaction not double-counted, schema evolution within range, partition- and metrics-evaluator pruning, disconnected snapshots), plus thetest_incremental_readprovision fixture.Are there any user-facing changes?
Yes — the new
Table.incremental_append_scan(...)API andIncrementalAppendScanclass. No changes to existing public surface.