-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[GSoC 2026] Kafka Streams runner — GroupByKey (GlobalWindow, fire at watermark) #39141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
je-ik
merged 4 commits into
apache:feat/18479-kafka-streams-runner-skeleton
from
junaiddshaukat:feat/ks-group-by-key
Jun 30, 2026
+667
−8
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
bd0f280
Add GroupByKey (GlobalWindow, fire at watermark)
junaiddshaukat e39e29b
Address review: emit GroupByKey output at the global window's end
junaiddshaukat 2344a94
Document GroupByKey scope: restart resilience and large-key-space limits
junaiddshaukat f0c889e
Rename GroupByKeyRekeyProcessor to ShuffleByKeyProcessor
junaiddshaukat File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
62 changes: 62 additions & 0 deletions
62
...ava/org/apache/beam/runners/kafka/streams/translation/GroupByKeyBroadcastPartitioner.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.kafka.streams.translation; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import org.apache.kafka.common.utils.Utils; | ||
| import org.apache.kafka.streams.processor.StreamPartitioner; | ||
|
|
||
| /** | ||
| * Partitions records on the GroupByKey repartition topic. | ||
| * | ||
| * <ul> | ||
| * <li><b>data</b> records go to the single partition selected by hashing the (already encoded | ||
| * Beam key) Kafka record key — the same scheme Kafka's default partitioner uses — so every | ||
| * value of a key lands together; | ||
| * <li><b>watermark</b> reports are broadcast to <i>every</i> partition, so each downstream | ||
| * GroupByKey task observes the terminal watermark and fires its keys. | ||
| * </ul> | ||
| * | ||
| * @param <T> the data element type carried by data payloads | ||
| */ | ||
| class GroupByKeyBroadcastPartitioner<T> implements StreamPartitioner<byte[], KStreamsPayload<T>> { | ||
|
|
||
| @Override | ||
| public Integer partition(String topic, byte[] key, KStreamsPayload<T> value, int numPartitions) { | ||
| // Required by the interface but unused: Kafka Streams calls partitions() (overridden below) | ||
| // when it is present. Kept consistent with the data-hash path for safety. | ||
| return key == null ? 0 : Utils.toPositive(Utils.murmur2(key)) % numPartitions; | ||
| } | ||
|
|
||
| @Override | ||
| public Optional<Set<Integer>> partitions( | ||
| String topic, byte[] key, KStreamsPayload<T> value, int numPartitions) { | ||
| if (value.isWatermark()) { | ||
| Set<Integer> all = new HashSet<>(); | ||
| for (int partition = 0; partition < numPartitions; partition++) { | ||
| all.add(partition); | ||
| } | ||
| return Optional.of(all); | ||
| } | ||
| int partition = Utils.toPositive(Utils.murmur2(key)) % numPartitions; | ||
| return Optional.of(Collections.singleton(partition)); | ||
| } | ||
| } |
196 changes: 196 additions & 0 deletions
196
.../src/main/java/org/apache/beam/runners/kafka/streams/translation/GroupByKeyProcessor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,196 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.kafka.streams.translation; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import org.apache.beam.sdk.coders.Coder; | ||
| import org.apache.beam.sdk.coders.CoderException; | ||
| import org.apache.beam.sdk.coders.IterableCoder; | ||
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | ||
| import org.apache.beam.sdk.transforms.windowing.GlobalWindow; | ||
| import org.apache.beam.sdk.util.CoderUtils; | ||
| import org.apache.beam.sdk.values.KV; | ||
| import org.apache.beam.sdk.values.WindowedValue; | ||
| import org.apache.beam.sdk.values.WindowedValues; | ||
| import org.apache.kafka.streams.processor.api.Processor; | ||
| import org.apache.kafka.streams.processor.api.ProcessorContext; | ||
| import org.apache.kafka.streams.processor.api.Record; | ||
| import org.apache.kafka.streams.state.KeyValueIterator; | ||
| import org.apache.kafka.streams.state.KeyValueStore; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
| import org.joda.time.Instant; | ||
|
|
||
| /** | ||
| * Executes a {@code GroupByKey} (GlobalWindow, default trigger, no allowed lateness). | ||
| * | ||
| * <p>Records arrive on the repartition topic keyed by the encoded Beam key, so every value of a key | ||
| * is co-located here. Each value is appended to a per-key buffer in a Kafka Streams state store. | ||
| * Watermark reports are fed to a {@link WatermarkManager}; when the input watermark reaches {@link | ||
| * BoundedWindow#TIMESTAMP_MAX_VALUE} (the end of the global window) every buffered key is emitted | ||
| * once as {@code KV<K, Iterable<V>>} and the buffer cleared, then the watermark is forwarded | ||
| * downstream. | ||
| * | ||
| * <p>Buffering whole value lists and re-encoding on each append is O(n^2) per key; fine for this | ||
| * first GroupByKey, and replaced when this moves to runner-core {@code GroupAlsoByWindow}. | ||
| */ | ||
| class GroupByKeyProcessor | ||
| implements Processor<byte[], KStreamsPayload<?>, byte[], KStreamsPayload<?>> { | ||
|
|
||
| private final String stateStoreName; | ||
| private final Coder<Object> keyCoder; | ||
| private final IterableCoder<@Nullable Object> bufferCoder; | ||
|
|
||
| private final WatermarkManager watermarkManager = new WatermarkManager(); | ||
| private Instant lastForwardedWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; | ||
| // The global window fires exactly once, when the watermark first reaches its end. Later watermark | ||
| // reports (e.g. the same terminal watermark broadcast across repartition partitions) must not | ||
| // re-fire. This flag is in-memory only; restart correctness comes from the state store plus | ||
| // exactly-once-v2: the buffered values and consumer offsets are committed atomically, and the | ||
| // store is empty once a key has fired, so a restart cannot double-emit. Persisting watermark | ||
| // holds is part of the separate WatermarkManager persistence work, not this initial GroupByKey. | ||
| private boolean fired = false; | ||
|
|
||
| private @Nullable ProcessorContext<byte[], KStreamsPayload<?>> context; | ||
| private @Nullable KeyValueStore<byte[], byte[]> store; | ||
|
|
||
| GroupByKeyProcessor( | ||
| String stateStoreName, Coder<Object> keyCoder, Coder<@Nullable Object> valueCoder) { | ||
| this.stateStoreName = stateStoreName; | ||
| this.keyCoder = keyCoder; | ||
| this.bufferCoder = IterableCoder.of(valueCoder); | ||
| } | ||
|
|
||
| @Override | ||
| public void init(ProcessorContext<byte[], KStreamsPayload<?>> context) { | ||
| this.context = context; | ||
| this.store = context.getStateStore(stateStoreName); | ||
| } | ||
|
|
||
| @Override | ||
| public void process(Record<byte[], KStreamsPayload<?>> record) { | ||
| KStreamsPayload<?> payload = record.value(); | ||
| if (payload.isData()) { | ||
| byte[] encodedKey = record.key(); | ||
| Object element = payload.getData().getValue(); | ||
| if (encodedKey == null || element == null) { | ||
| throw new IllegalStateException("GroupByKey data record is missing its key or value"); | ||
| } | ||
| appendValue(encodedKey, element); | ||
| return; | ||
| } | ||
| WatermarkPayload report = payload.asWatermark(); | ||
| watermarkManager.observe( | ||
| report.getSourcePartition(), | ||
| new Instant(report.getWatermarkMillis()), | ||
| report.getTotalSourcePartitions()); | ||
| Instant advanced = watermarkManager.advance(); | ||
| if (!fired && !advanced.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { | ||
| fireAll(record); | ||
| fired = true; | ||
| } | ||
| if (advanced.isAfter(lastForwardedWatermark)) { | ||
| lastForwardedWatermark = advanced; | ||
| forwardWatermark(record, advanced.getMillis()); | ||
| } | ||
| } | ||
|
|
||
| private void appendValue(byte[] encodedKey, Object kvObject) { | ||
| KV<?, ?> kv = (KV<?, ?>) kvObject; | ||
| KeyValueStore<byte[], byte[]> kvStore = checkInitialized(store); | ||
| byte[] existing = kvStore.get(encodedKey); | ||
| List<@Nullable Object> values = existing == null ? new ArrayList<>() : decodeBuffer(existing); | ||
| values.add(kv.getValue()); | ||
| kvStore.put(encodedKey, encodeBuffer(values)); | ||
| } | ||
|
|
||
| private void fireAll(Record<byte[], KStreamsPayload<?>> trigger) { | ||
| // NOTE: this emits every buffered key in a single watermark turn. For a very large key space | ||
| // that risks memory pressure and exceeding the poll / transaction timeout. Acceptable for this | ||
| // initial GlobalWindow GroupByKey (fire once at end of input); incremental, timer-driven output | ||
| // via runner-core GroupAlsoByWindow lands with the windowing/timers work. | ||
| ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context); | ||
| KeyValueStore<byte[], byte[]> kvStore = checkInitialized(store); | ||
| List<byte[]> firedKeys = new ArrayList<>(); | ||
|
je-ik marked this conversation as resolved.
|
||
| try (KeyValueIterator<byte[], byte[]> it = kvStore.all()) { | ||
| while (it.hasNext()) { | ||
| org.apache.kafka.streams.KeyValue<byte[], byte[]> entry = it.next(); | ||
| Object key = decodeKey(entry.key); | ||
| List<@Nullable Object> values = decodeBuffer(entry.value); | ||
| // The pane fires at the end of the global window, so the grouped element carries the | ||
| // window's max timestamp (END_OF_GLOBAL_WINDOW). Emitting at TIMESTAMP_MIN_VALUE (the | ||
| // default of valueInGlobalWindow) would make the output appear arbitrarily late and be | ||
| // dropped downstream once the watermark has advanced. | ||
| WindowedValue<KV<Object, Iterable<@Nullable Object>>> output = | ||
| WindowedValues.timestampedValueInGlobalWindow( | ||
| KV.of(key, (Iterable<@Nullable Object>) values), | ||
| GlobalWindow.INSTANCE.maxTimestamp()); | ||
| ctx.forward( | ||
| new Record<byte[], KStreamsPayload<?>>( | ||
| entry.key, KStreamsPayload.data(output), trigger.timestamp())); | ||
| firedKeys.add(entry.key); | ||
| } | ||
| } | ||
| for (byte[] key : firedKeys) { | ||
| kvStore.delete(key); | ||
| } | ||
| } | ||
|
|
||
| private void forwardWatermark(Record<byte[], KStreamsPayload<?>> trigger, long watermarkMillis) { | ||
| ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context); | ||
| // GroupByKey is a single logical source for the next stage; report it as partition 0 of 1. | ||
| ctx.forward( | ||
| new Record<byte[], KStreamsPayload<?>>( | ||
| trigger.key(), KStreamsPayload.watermark(watermarkMillis, 0, 1), trigger.timestamp())); | ||
| } | ||
|
|
||
| private byte[] encodeBuffer(List<@Nullable Object> values) { | ||
| try { | ||
| return CoderUtils.encodeToByteArray(bufferCoder, values); | ||
| } catch (CoderException e) { | ||
| throw new RuntimeException("Failed to encode GroupByKey value buffer", e); | ||
| } | ||
| } | ||
|
|
||
| private List<@Nullable Object> decodeBuffer(byte[] bytes) { | ||
| try { | ||
| List<@Nullable Object> values = new ArrayList<>(); | ||
| for (@Nullable Object value : CoderUtils.decodeFromByteArray(bufferCoder, bytes)) { | ||
| values.add(value); | ||
| } | ||
| return values; | ||
| } catch (CoderException e) { | ||
| throw new RuntimeException("Failed to decode GroupByKey value buffer", e); | ||
| } | ||
| } | ||
|
|
||
| private Object decodeKey(byte[] bytes) { | ||
| try { | ||
| return CoderUtils.decodeFromByteArray(keyCoder, bytes); | ||
| } catch (CoderException e) { | ||
| throw new RuntimeException("Failed to decode GroupByKey key", e); | ||
| } | ||
| } | ||
|
|
||
| private static <T> T checkInitialized(@Nullable T value) { | ||
| if (value == null) { | ||
| throw new IllegalStateException("GroupByKeyProcessor used before init()"); | ||
| } | ||
| return value; | ||
| } | ||
| } | ||
130 changes: 130 additions & 0 deletions
130
...src/main/java/org/apache/beam/runners/kafka/streams/translation/GroupByKeyTranslator.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.kafka.streams.translation; | ||
|
|
||
| import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder; | ||
|
|
||
| import org.apache.beam.model.pipeline.v1.RunnerApi; | ||
| import org.apache.beam.sdk.coders.Coder; | ||
| import org.apache.beam.sdk.coders.KvCoder; | ||
| import org.apache.beam.sdk.values.KV; | ||
| import org.apache.beam.sdk.values.WindowedValues; | ||
| import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; | ||
| import org.apache.kafka.common.serialization.Serdes; | ||
| import org.apache.kafka.streams.Topology; | ||
| import org.apache.kafka.streams.state.Stores; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
|
||
| /** | ||
| * Translates the {@code beam:transform:group_by_key:v1} URN — the runner's first stateful, | ||
| * shuffle-bearing transform. | ||
| * | ||
| * <p>This is the simplest GroupByKey: GlobalWindow, default trigger, no allowed lateness (per the | ||
| * plan agreed with the mentor). Each key's values are buffered in a Kafka Streams state store and | ||
| * emitted once as {@code KV<K, Iterable<V>>} when the watermark reaches {@link | ||
| * org.apache.beam.sdk.transforms.windowing.BoundedWindow#TIMESTAMP_MAX_VALUE}. | ||
| * | ||
| * <p>Topology added (the Beam key becomes the Kafka record key so Kafka Streams shuffles by it): | ||
| * | ||
| * <ul> | ||
| * <li>a {@link ShuffleByKeyProcessor} wired to the input's producer, which sets the Kafka record | ||
| * key to the encoded Beam key for data records and passes watermark reports through; | ||
| * <li>a {@link Topology#addSink sink} to an internal repartition topic, with the payload encoded | ||
| * via {@link KStreamsPayloadSerde} and a {@link GroupByKeyBroadcastPartitioner} that hashes | ||
| * data by key and fans watermark reports out to every partition; | ||
| * <li>a {@link Topology#addSource source} reading the repartition topic back; | ||
| * <li>the {@link GroupByKeyProcessor} plus a persistent state store, wired to the source. | ||
| * </ul> | ||
| * | ||
| * <p>The repartition topic is expected to exist on the broker before the job starts (same | ||
| * pre-create assumption as the Impulse bootstrap topic); auto-creation lands with the AdminClient | ||
| * wiring in a follow-up. | ||
| */ | ||
| class GroupByKeyTranslator implements PTransformTranslator { | ||
|
|
||
| static final String SHUFFLE_SUFFIX = "-shuffle-by-key"; | ||
| static final String SINK_SUFFIX = "-repartition-sink"; | ||
| static final String SOURCE_SUFFIX = "-repartition-source"; | ||
| static final String STATE_STORE_SUFFIX = "-state"; | ||
| static final String REPARTITION_TOPIC_PREFIX = "__beam_gbk_"; | ||
|
|
||
| @Override | ||
| public void translate( | ||
| String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) { | ||
| RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId); | ||
| String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); | ||
| String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); | ||
|
|
||
| @SuppressWarnings({"unchecked", "rawtypes"}) | ||
| WindowedValues.WindowedValueCoder<KV<Object, Object>> inputCoder = | ||
| (WindowedValues.WindowedValueCoder) | ||
| instantiateCoder(inputPCollectionId, pipeline.getComponents()); | ||
| KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) inputCoder.getValueCoder(); | ||
| Coder<Object> keyCoder = kvCoder.getKeyCoder(); | ||
| // User values may be null; the checker tracks that through to the buffered iterables. | ||
| @SuppressWarnings("unchecked") | ||
| Coder<@Nullable Object> valueCoder = | ||
| (Coder<@Nullable Object>) (Coder<?>) kvCoder.getValueCoder(); | ||
|
|
||
| String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); | ||
|
|
||
| String shuffleName = transformId + SHUFFLE_SUFFIX; | ||
| String sinkName = transformId + SINK_SUFFIX; | ||
| String sourceName = transformId + SOURCE_SUFFIX; | ||
| String stateStoreName = transformId + STATE_STORE_SUFFIX; | ||
| String repartitionTopic = repartitionTopic(transformId); | ||
|
|
||
| KStreamsPayloadSerde<KV<Object, Object>> payloadSerde = new KStreamsPayloadSerde<>(inputCoder); | ||
|
|
||
| Topology topology = context.getTopology(); | ||
|
|
||
| // Re-key data records by the encoded Beam key; pass watermark reports through. | ||
| topology.addProcessor(shuffleName, () -> new ShuffleByKeyProcessor(keyCoder), parentProcessor); | ||
|
|
||
| // Shuffle through the repartition topic: data partitioned by key, watermark broadcast. | ||
| topology.addSink( | ||
| sinkName, | ||
| repartitionTopic, | ||
| Serdes.ByteArray().serializer(), | ||
| payloadSerde.serializer(), | ||
| new GroupByKeyBroadcastPartitioner<>(), | ||
| shuffleName); | ||
| topology.addSource( | ||
| sourceName, | ||
| Serdes.ByteArray().deserializer(), | ||
| payloadSerde.deserializer(), | ||
| repartitionTopic); | ||
|
|
||
| // Buffer values per key and fire KV<K, Iterable<V>> at the terminal watermark. | ||
| topology.addProcessor( | ||
| transformId, | ||
| () -> new GroupByKeyProcessor(stateStoreName, keyCoder, valueCoder), | ||
| sourceName); | ||
| topology.addStateStore( | ||
| Stores.keyValueStoreBuilder( | ||
| Stores.persistentKeyValueStore(stateStoreName), Serdes.ByteArray(), Serdes.ByteArray()), | ||
| transformId); | ||
|
|
||
| context.registerPCollectionProducer(outputPCollectionId, transformId); | ||
| } | ||
|
|
||
| /** The internal repartition topic name for a GroupByKey transform. */ | ||
| static String repartitionTopic(String transformId) { | ||
| return REPARTITION_TOPIC_PREFIX + transformId.replaceAll("[^a-zA-Z0-9._-]", "_"); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for this initial implementation, do we need to worry about persisting the state of
fired/lastForwardedWatermark, etc, so that we are resilient to runner task restarts?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we flush the state (and clear it and successfully commit), the state should be empty after restart.
We will have to rework this when we add support for allowed lateness, then this flag will definitely need to be persisted (we will need to persist the pane index, which will be equivalent to this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed ,flush + clear + commit means the store is empty after a restart, so no double-fire. And good point that once we add allowed lateness the fired flag effectively becomes the persisted pane index; I'll carry that over when that lands. Thanks!