[improve](streaming-job) Bind the incremental phase to a fixed BE and reuse the cdc reader#64423
[improve](streaming-job) Bind the incremental phase to a fixed BE and reuse the cdc reader#64423JNSimba wants to merge 5 commits into
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR improves streaming insert (CDC) binlog-phase stability and reader reuse by pinning binlog execution to a bound BE, preventing displaced tasks from interfering with a successor task’s reader/streamload, and adding an idle-reader reaper on the cdc_client side to release unused readers while keeping upstream slots.
Changes:
- Add BE binding/preference for binlog phase (FE selects preferred BE; offset provider routes RPCs to the bound BE).
- Add cdc_client-side reader ownership + liveness tracking (keepAlive) and an idle-reader cleanup scheduler.
- Prevent displaced tasks from closing shared resources and ensure commit/cleanup uses the owning taskId.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Add offset-based guard to reuse/rebuild the live stream reader. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java | Ownership checks to avoid displaced-task cleanup/commit; keepAlive during polling; adjust finishSplitRecords behavior for binlog reuse. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java | keepAlive heartbeat when fetching end offset. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java | Add ownership, keepAlive, FE-requested rebuild, and scheduled idle-reader release. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java | Add idle reader cleanup timing constants. |
| fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java | Add selectBackend overload to prefer a bound backendId. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java | Add bound-backend binding hook for providers. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Route fetch/compare/clean RPCs to bound BE; store boundBackendId transiently. |
| fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java | Route manual status alter handling to StreamingInsertJob.onManualStatusAltered. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java | Bind BE selection in binlog phase; mark rebuildReader on RPC errors/timeouts; set rebuildReader on requests; adjust release semantics. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Persist boundBackendId; track needRebuildReader; release reader on manual pause; avoid overwriting manual pause failure reason. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java | Add default releaseRemoteReader hook for tasks without CDC readers. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java | Add rebuildReader flag for FE→cdc_client coordination. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
run buildall |
There was a problem hiding this comment.
I reviewed the actual 13-file PR diff from GitHub. The goal is to pin incremental/binlog streaming work to a BE for reader reuse and rebuild/release readers when needed. The implementation is focused, but I found two lifecycle/data-correctness issues that need to be fixed before approval.\n\nCritical checkpoint conclusions:\n- Goal/test proof: the goal is only partially met; there are no PR tests covering BE rebind, displaced tasks, manual pause, or reader reuse failure paths.\n- Concurrency/lifecycle: not safe yet. Reader ownership can move while job-scoped stream-load state remains, and BE rebinding can leave the old live reader running.\n- Data writes/transactionality: blocking issue. Rows buffered by a displaced task can be flushed by a later task without the old offset being committed.\n- Persistence/compatibility: boundBackendId is persisted, and I did not find a thrift/storage-format change. The remaining concern is operational lifecycle during rebind/failover rather than serialization compatibility.\n- Configuration/observability: no new config items; logs are present, but they do not compensate for the lifecycle gaps.\n- Parallel paths: snapshot vs binlog behavior was considered. The issues are specific to the new binlog reader reuse/rebind path and displaced async writes.\n\nUser focus: no additional user-provided focus points were present.
TPC-H: Total hot run time: 29024 ms |
TPC-DS: Total hot run time: 168887 ms |
|
/review |
|
run buildall |
1 similar comment
|
run buildall |
There was a problem hiding this comment.
I found two additional blocking issues beyond the existing inline review threads: the persisted bound BE is not replayed into the transient offset provider, and the CDC client still treats a point-in-time ownership check as a lease while it continues to touch the shared reader/loader state.
Critical checkpoint conclusions:
- Goal/test: the PR goal is clear, but I did not run tests in this review-only pass; lifecycle/concurrency tests around FE failover and task takeover would be needed.
- Focused change: the change is focused on CDC reader reuse and bound-BE routing.
- Concurrency/locks/lifecycle: not satisfied. Existing threads already cover several races/leaks, and the added positive-owner race below is another blocker.
- Persistence/replay/data consistency: not satisfied because journal replay can leave provider routing stale after
boundBackendIdchanges. - Config/compatibility/FE-BE variables: no additional compatibility issue found beyond the bound-BE routing sync problem.
- Tests/observability/performance: no additional test evidence was provided in this review context; no separate performance regression identified.
User focus: no additional user-provided review focus was present, so I performed the full PR review without a special focus area.
TPC-H: Total hot run time: 28958 ms |
TPC-H: Total hot run time: 28970 ms |
TPC-DS: Total hot run time: 168893 ms |
TPC-DS: Total hot run time: 169481 ms |
FE Regression Coverage ReportIncrement line coverage |
|
/review |
|
run buildall |
There was a problem hiding this comment.
Automated review result: request changes.
I found one additional regression in the common pause cleanup path. Existing review threads already cover several blocking reader ownership, bound-BE replay/rebind, idle reaper, and stream-load reuse races, so I did not duplicate those inline.
Critical checkpoint conclusions:
- Goal/test coverage: The PR targets fixed BE affinity and binlog reader reuse. The added flow is not fully proven for pause/cancel lifecycle, and existing threads cover unresolved correctness races.
- Scope/focus: A CDC-specific cleanup change leaks into the generic streaming task pause path; see the inline comment.
- Concurrency/lifecycle: Existing threads already identify unresolved owner-lease races, idle reaper races, blocking release under lock, and stale loader reuse. No duplicate comments added.
- Config/compatibility: No new config item or incompatible storage/wire-format issue found beyond optional request/persisted fields with safe defaults.
- Parallel paths: The TVF-backed StreamingInsertTask path must preserve local cleanup when CDC reader release is separated.
- Testing/results: I did not run tests in this review. I did not find coverage for the pause cleanup regression.
- Observability: Existing logging is sufficient for the newly reviewed control flow.
- Transaction/persistence/data writes: No additional issue found beyond the existing bound-backend replay and ownership comments.
- User focus: No additional user-provided review focus was supplied.
TPC-H: Total hot run time: 28675 ms |
TPC-DS: Total hot run time: 168819 ms |
|
/review |
|
run buildall |
There was a problem hiding this comment.
Review result: request changes.
I read the repository code-review skill and the existing inline review threads before reviewing this diff. I avoided reposting the already-known CDC ownership, cleanup, binding replay, and idle-reaper comments; the inline comments below are distinct additional blockers.
Critical checkpoint conclusions:
- Goal: fixed-BE binlog reader reuse is only partially safe. The current patch can still reuse a reader without the required single-owner/single-BE guarantee.
- Scope/focus: the code changes are focused on streaming CDC, but they cross the FE-to-cdc_client protocol and long-lived reader lifecycle, so compatibility and concurrency need stronger handling.
- Concurrency: this path has FE scheduler threads, user PAUSE/RESUME commands, asynchronous cdc_client write threads, releaseReader RPCs, and the idle reaper. The new manual pause path can race with immediate resume and reuse the same live reader.
- Lifecycle: long-lived readers now survive task boundaries; detach/rebuild must be deterministic at manual pause/resume and mixed-version boundaries.
- Configuration: no new config items.
- Compatibility: the new primitive request flag is not safe for rolling upgrades because old FE payloads deserialize as reuse-enabled on a new cdc_client.
- Parallel paths: both MySQL and PostgreSQL binlog paths are affected by the protocol/default and manual pause issues. TVF local cleanup appears restored by the latest patch.
- Tests: this PR does not add visible automated coverage for the new lifecycle. Please cover rolling-upgrade/default-request behavior and pause-then-immediate-resume around an in-flight task.
- Persistence/failover: boundBackendId is persisted and resynced in replayOffsetProviderIfNeed in the current patch; existing review threads still cover other replay/binding concerns.
- Data writes: the remaining issues can duplicate or mix CDC records before offset commit, so they are correctness blockers.
- Observability: the added logs help diagnosis but do not prevent the unsafe interleavings.
User focus: no additional user-provided focus points were supplied.
TPC-H: Total hot run time: 29339 ms |
TPC-DS: Total hot run time: 168864 ms |
…id sharing a still-polling reader
|
/review |
|
run buildall |
There was a problem hiding this comment.
Automated review result: request changes.
I found three additional reader-lifecycle issues beyond the already-known inline threads. I did not duplicate the existing comments about the primitive rebuildReader compatibility issue, owner lease gaps around commit/flush, manual pause release races, keepAlive synchronization, or boundBackend replay; those still need to be resolved or explicitly closed with evidence.
Critical checkpoint conclusions:
- Goal: fixed-BE incremental CDC reader reuse is the right goal, but the current cleanup/rebuild paths can still leave duplicate or unclosed readers.
- Scope: the diff is focused, but it changes a high-risk lifecycle shared by FE scheduling, cdc_client ownership, and source replication slots.
- Concurrency: yes; task execution, async cdc_client work, release RPCs, idle reaper, and FE status changes interact. The new comments cover cases where ownership bookkeeping changes before the old reader is actually stopped.
- Lifecycle: not safe yet. Rebuild, idle timeout, and drop cleanup can lose track of a live reader or close the wrong BE.
- Compatibility: existing rolling-upgrade comment for the new primitive boolean remains applicable.
- Persistence/failover: existing boundBackend/provider replay thread should be verified after the latest changes.
- Parallel paths: dispatch rebind, manual release, idle reaper, rebuild, and drop cleanup need consistent deterministic release semantics.
- Tests: no new test files are in the PR diff for these failure modes; please add targeted coverage or provide concrete manual validation.
- User focus: no additional user-provided review focus was supplied.
TPC-H: Total hot run time: 28326 ms |
FE UT Coverage ReportIncrement line coverage |
TPC-DS: Total hot run time: 169298 ms |
What problem does this PR solve?
Problem Summary:
For from-to (MySQL/PG CDC) streaming jobs, once a job enters the incremental (binlog) phase, two issues hurt throughput:
max_interval= 10s) re-selects a BE via global round-robin, so the task drifts across BEs with no job→BE affinity.As a result every round rebuilds the reader. For PG this means reconnecting the replication slot and re-locating the WAL position (~15s each round), which together with large-transaction buffering is a major cause of idle / low-throughput stalls in the incremental phase.
What's changed
This PR binds the incremental phase to a fixed BE and reuses the cdc reader, so that in steady state each round only polls — zero re-location — and only restart / pause / BE-change boundaries trigger a rebuild. Scope is from-to (MySQL/PG) incremental phase only; the snapshot phase is unchanged, and TVF mode is out of scope.
FE — job→BE affinity. Persist a
boundBackendIdon the job. In the incremental phase the job is bound to a fixed BE; the snapshot phase keeps the original per-round selection. The heartbeat RPCs (fetchEndOffset/compareOffset) are routed to the bound BE so they double as a reader keepalive. Replay-safe:boundBackendId <= 0means "unbound" and round-robin re-selects, so old payloads and FE failover degrade to one extra rebuild, never to incorrectness.cdc_client — reader reuse with an offset guard. Reuse the live stream reader across tasks when the request start offset equals the reader's real consumed position; otherwise rebuild (close, re-locate WAL, re-read from the committed offset). The binlog path no longer tears down the reader on each round.
Reader lifecycle. A background reaper releases an idle reader (stop engine, keep slot) only when there is neither a
writeRecordsrequest nor an FE heartbeat beyond a timeout. A RUNNING-but-upstream-idle reader stays resident (large transactions pay no re-location), while a truly orphaned connection (FE crash / re-bind / partition) is reclaimed.Zombie-task isolation. A
rebuildReaderflag swaps the reader instance, and an owner-task-id gate fences a still-running stale task from a newly claimed one at the dangerous actions (close stream load / cleanup reader / commit offset), so a slow task cannot corrupt the offset of its successor.Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?