feat: CDC-based mixed index synchronization (#4873)#4906
Draft
porunov wants to merge 1 commit into
Draft
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an opt-in Change-Data-Capture (CDC) pipeline to keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with committed graph data by asynchronously reindexing affected elements from their current state, eliminating the “dual write” divergence risk.
Changes:
- Introduces per-index CDC configuration (
index.[X].cdc.*) and commit-path logic to skip synchronous mixed-index writes in cdc-only mode. - Adds
MixedIndexUpdateApplier(reindex-from-current-state) andCdcElementChangein core, plus Cassandrastorage.cql.cdctable option support. - Adds a new
janusgraph-cdcmodule implementing a Kafka consumer worker, Debezium Cassandra JSON decoder, and extensive unit/component/E2E tests + documentation.
Reviewed changes
Copilot reviewed 33 out of 33 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pom.xml | Registers new janusgraph-cdc Maven module in the build. |
| mkdocs.yml | Adds CDC operator guide page to documentation nav. |
| janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/MixedIndexUpdateApplierTest.java | Validates reindex-from-current-state behavior over Lucene in cdc-only mode. |
| janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/CdcSkipMutationTest.java | Verifies synchronous mixed-index write is skipped in cdc-only and retained in dual mode. |
| janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLCdcTableOptionTest.java | Unit-tests that storage.cql.cdc toggles cdc=true on edgestore DDL only. |
| janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java | Refactors CREATE TABLE building and conditionally applies Cassandra cdc=true for edgestore. |
| janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java | Adds storage.cql.cdc configuration option. |
| janusgraph-core/src/test/java/org/janusgraph/graphdb/configuration/CdcIndexConfigTest.java | Tests defaults and per-index scoping of index.[X].cdc.*. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java | Computes cdc-only backing indexes and skips synchronous mixed-index writes for them. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/database/log/TransactionLogHeader.java | Makes TransactionLogHeader.Modification constructor public for reuse in decoding. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/database/index/MixedIndexUpdateApplier.java | Adds backend-agnostic reindex-from-current-state applier for CDC worker. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/database/index/CdcElementChange.java | Adds normalized “element changed” model consumed by the applier/worker. |
| janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java | Adds index.[X].cdc.enabled and index.[X].cdc.synchronous options. |
| janusgraph-cdc/src/test/resources/cassandra-cdc.yaml | Provides Cassandra config enabling CDC for full pipeline E2E test. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoderTest.java | Tests Debezium JSON decoding against real JanusGraph-serialized bytes. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcWorkerConvergenceTest.java | Drives worker+decoder+applier via MockConsumer to validate convergence semantics. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcWorkerConfigurationTest.java | Tests worker configuration defaults and properties parsing. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcKafkaElasticsearchTest.java | Testcontainers E2E for Kafka → worker → ElasticSearch convergence. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMainTest.java | Tests runner wiring and config reflection of CDC-enabled backing indexes. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcIndexUpdateWorkerLoopTest.java | Unit-tests polling loop semantics (dedupe/retry/commit/rewind). |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcEventDecoderTest.java | Smoke test for decoder SPI and CdcElementChange interop. |
| janusgraph-cdc/src/test/java/org/janusgraph/cdc/CdcCassandraDebeziumElasticsearchTest.java | Full Cassandra CDC → Debezium → Kafka → ElasticSearch pipeline E2E (profile-gated). |
| janusgraph-cdc/src/test/java/io/debezium/connector/cassandra/JanusGraphCdcConnectorStarter.java | Test-only bridge to start Debezium Cassandra connector embedded. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/DebeziumCassandraJsonDecoder.java | Implements Debezium Cassandra JSON → CdcElementChange decoding. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcWorkerConfiguration.java | Defines immutable worker/Kafka configuration and consumer properties. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorkerMain.java | Adds standalone runner that opens graph, wires decoder+applier, starts workers. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexUpdateWorker.java | Implements Kafka consume/decode/dedupe/apply/retry/commit/rewind loop. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcIndexApplier.java | Functional interface to abstract index application for worker tests. |
| janusgraph-cdc/src/main/java/org/janusgraph/cdc/CdcEventDecoder.java | Decoder SPI for CDC record formats. |
| janusgraph-cdc/pom.xml | New module POM with Kafka clients + testcontainers/Debezium profile gating. |
| docs/configs/janusgraph-cfg.md | Regenerates config reference including new CDC options. |
| docs/changelog.md | Adds 1.2.0 upgrade note describing CDC mixed index synchronization. |
| docs/advanced-topics/cdc-mixed-index.md | Adds operator guide for Cassandra CDC + Debezium + Kafka + worker setup. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+114
to
+118
| if (relation.isEdge()) { | ||
| return new CdcElementChange(ElementCategory.EDGE, relation.id()); | ||
| } | ||
| // A property (or system) column changed -> reindex the owning vertex. | ||
| return new CdcElementChange(ElementCategory.VERTEX, vertexId); |
a7d91de to
5659caf
Compare
48c32ba to
3e6cc0e
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/MixedIndexUpdateApplierTest.java:1
- The 10-second schema/index enablement timeout is likely to be flaky under loaded CI runners (especially with filesystem-backed Lucene + BerkeleyJE). Consider increasing these timeouts (e.g., 30–60 seconds) or using a shared constant aligned with other JanusGraph index-status tests. The same concern applies to other new tests using 10-second
awaitGraphIndexStatustimeouts.
Comment on lines
275
to
278
| public Modification(Change state, Object outVertexId, org.janusgraph.diskstorage.Entry relationEntry) { | ||
| this.state = state; | ||
| this.outVertexId = outVertexId; | ||
| this.relationEntry = relationEntry; |
Comment on lines
+54
to
+67
| private CdcWorkerConfiguration(Builder b) { | ||
| this.bootstrapServers = Objects.requireNonNull(b.bootstrapServers, BOOTSTRAP_SERVERS + " is required"); | ||
| if (b.topics == null || b.topics.isEmpty()) { | ||
| throw new IllegalArgumentException(TOPICS + " is required"); | ||
| } | ||
| this.topics = Collections.unmodifiableList(new ArrayList<>(b.topics)); | ||
| this.groupId = b.groupId; | ||
| this.maxPollRecords = b.maxPollRecords; | ||
| this.pollTimeout = b.pollTimeout; | ||
| this.workerThreads = b.workerThreads; | ||
| this.retryLimit = b.retryLimit; | ||
| this.retryInitialWait = b.retryInitialWait; | ||
| this.retryMaxWait = b.retryMaxWait; | ||
| } |
Comment on lines
+62
to
+69
| final StandardJanusGraph standardGraph = (StandardJanusGraph) graph; | ||
| final Set<String> cdcIndexes = cdcEnabledBackingIndexes(standardGraph); | ||
| final MixedIndexUpdateApplier applier = new MixedIndexUpdateApplier(standardGraph, cdcIndexes::contains); | ||
| final DebeziumCassandraJsonDecoder decoder = new DebeziumCassandraJsonDecoder(standardGraph); | ||
| final List<CdcIndexUpdateWorker> created = new ArrayList<>(); | ||
| for (int i = 0; i < config.getWorkerThreads(); i++) { | ||
| created.add(new CdcIndexUpdateWorker(consumerFactory.get(), decoder, applier::apply, config)); | ||
| } |
Comment on lines
+32
to
+46
| jobs: | ||
| tests: | ||
| runs-on: ubuntu-22.04 | ||
| strategy: | ||
| matrix: | ||
| include: | ||
| - name: cdc-java8 | ||
| java: 8 | ||
| - name: cdc-java11 | ||
| install-args: "-Pjava-11" | ||
| java: 11 | ||
| - name: cdc-e2e-java17 | ||
| java: 17 | ||
| steps: | ||
| - run: 'echo "No build required"' |
Keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with the graph by deriving their updates from a Change-Data-Capture stream of the committed graph data, instead of a synchronous second write during the transaction that can diverge on failure and leave a permanently stale index. Pipeline (Apache Cassandra): commit (graph data only) -> Cassandra edgestore(cdc=true) -> Debezium -> Kafka -> CdcIndexUpdateWorker consumer group -> reindex-from-current-state -> ES bulk. Key design points: - Reindex-from-current-state: the worker reads each changed element's current graph state and fully replaces its index document (reusing IndexSerializer, like transaction recovery). This is idempotent and order-independent, so out-of-order or duplicate events still converge to the current state -- no strict ordering required, and a stale event can never overwrite a fresh value. - No dual write: the only synchronous write is to storage; the index is updated downstream from the committed change stream, so it cannot diverge. - Element-keyed Kafka partitioning gives per-element ordering and horizontal scaling via a consumer group; batches are de-duplicated and applied as one ElasticSearch _bulk per index. - At-least-once: offsets are committed only after a batch is durably applied; on failure the batch is reprocessed (rewind) rather than skipped, so the index eventually catches up. Configuration (opt-in, disabled by default): - storage.cql.cdc: emit the Cassandra cdc=true table option on the edgestore table. - index.[X].cdc.enabled / index.[X].cdc.synchronous: per-index dual mode (write synchronously AND via CDC) or cdc-only mode (skip the synchronous write; ES updated solely via CDC). Components: - janusgraph-core: per-index CDC config options, the commit-side skip hook in StandardJanusGraph, and MixedIndexUpdateApplier (the backend-agnostic reindex-from-current-state engine, covering vertex, edge and property-element mixed indexes). - janusgraph-cql: the storage.cql.cdc table option (no Kafka dependency in production code). - janusgraph-cdc (new module; core + kafka-clients): the CdcEventDecoder SPI, DebeziumCassandraJsonDecoder, CdcWorkerConfiguration, CdcIndexUpdateWorker, and the standalone CdcIndexUpdateWorkerMain runner. Testing: 43 tests, including unit/component coverage (decoder vs real serialized bytes incl. poison-pill/invalid-Base64 skip, reindex engine over vertex/edge/property-element indexes, worker loop via Kafka MockConsumer incl. at-least-once rewind on decode/apply failure, fail-fast config and CDC-enablement validation, full-chain convergence over Lucene incl. vertex/edge add/update/remove and out-of-order delivery) and two real-container E2Es -- worker -> Kafka -> ElasticSearch, and the full Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline. The real-container tests are gated behind the cassandra-cdc-e2e Maven profile (auto-activated on Java 17+, required by Debezium 3.x and Testcontainers 2.x); the default Java 8/11 build excludes them and stays green. CI: a dedicated workflow (.github/workflows/ci-cdc.yml) runs the cdc unit tests on Java 8 and 11 and the full real-container suite -- including the Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline -- on Java 17 with Docker, so the integration is exercised on every change and guards against regression. Docs: advanced-topics/cdc-mixed-index.md operator guide, a 1.2.0 changelog upgrade note, and the regenerated configuration reference. Fixes JanusGraph#4873 Replaces JanusGraph#4874 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Oleksandr Porunov <alexandr.porunov@gmail.com>
3e6cc0e to
0ee776b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Keep mixed indexes (ElasticSearch/Solr/Lucene) eventually consistent with the graph by deriving their updates from a Change-Data-Capture stream of the committed graph data, instead of a synchronous second write during the transaction that can diverge on failure and leave a permanently stale index.
Pipeline (Apache Cassandra): commit (graph data only) -> Cassandra edgestore(cdc=true) ->
Debezium -> Kafka -> CdcIndexUpdateWorker consumer group -> reindex-from-current-state -> ES bulk.
Key design points:
Configuration (opt-in, disabled by default):
Components:
Testing: 38 tests, including unit/component coverage (decoder vs real serialized bytes, reindex engine, worker loop via Kafka MockConsumer, full-chain convergence over Lucene incl. vertex/edge add/update/remove and out-of-order delivery) and two real-container E2Es -- worker -> Kafka -> ElasticSearch, and the full Cassandra-CDC -> Debezium -> Kafka -> ElasticSearch pipeline. The full Debezium E2E is gated behind the cassandra-cdc-e2e Maven profile (auto-activated on Java 17+, required by Debezium 3.x and Testcontainers 2.x); the default Java 8/11 build excludes it and stays green.
Docs: advanced-topics/cdc-mixed-index.md operator guide, a 1.2.0 changelog upgrade note, and the regenerated configuration reference.
Fixes #4873
Replaces #4874
Thank you for contributing to JanusGraph!
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
master)?For code changes:
For documentation related changes: