Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.StreamPartitioner;

/**
* Partitions records on the GroupByKey repartition topic.
*
* <ul>
* <li><b>data</b> records go to the single partition selected by hashing the (already encoded
* Beam key) Kafka record key — the same scheme Kafka's default partitioner uses — so every
* value of a key lands together;
* <li><b>watermark</b> reports are broadcast to <i>every</i> partition, so each downstream
* GroupByKey task observes the terminal watermark and fires its keys.
* </ul>
*
* @param <T> the data element type carried by data payloads
*/
class GroupByKeyBroadcastPartitioner<T> implements StreamPartitioner<byte[], KStreamsPayload<T>> {

@Override
public Integer partition(String topic, byte[] key, KStreamsPayload<T> value, int numPartitions) {
// Required by the interface but unused: Kafka Streams calls partitions() (overridden below)
// when it is present. Kept consistent with the data-hash path for safety.
return key == null ? 0 : Utils.toPositive(Utils.murmur2(key)) % numPartitions;
}

@Override
public Optional<Set<Integer>> partitions(
String topic, byte[] key, KStreamsPayload<T> value, int numPartitions) {
if (value.isWatermark()) {
Set<Integer> all = new HashSet<>();
for (int partition = 0; partition < numPartitions; partition++) {
all.add(partition);
}
return Optional.of(all);
}
int partition = Utils.toPositive(Utils.murmur2(key)) % numPartitions;
return Optional.of(Collections.singleton(partition));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/**
* Executes a {@code GroupByKey} (GlobalWindow, default trigger, no allowed lateness).
*
* <p>Records arrive on the repartition topic keyed by the encoded Beam key, so every value of a key
* is co-located here. Each value is appended to a per-key buffer in a Kafka Streams state store.
* Watermark reports are fed to a {@link WatermarkManager}; when the input watermark reaches {@link
* BoundedWindow#TIMESTAMP_MAX_VALUE} (the end of the global window) every buffered key is emitted
* once as {@code KV<K, Iterable<V>>} and the buffer cleared, then the watermark is forwarded
* downstream.
*
* <p>Buffering whole value lists and re-encoding on each append is O(n^2) per key; fine for this
* first GroupByKey, and replaced when this moves to runner-core {@code GroupAlsoByWindow}.
*/
class GroupByKeyProcessor
implements Processor<byte[], KStreamsPayload<?>, byte[], KStreamsPayload<?>> {

private final String stateStoreName;
private final Coder<Object> keyCoder;
private final IterableCoder<@Nullable Object> bufferCoder;

private final WatermarkManager watermarkManager = new WatermarkManager();
private Instant lastForwardedWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
// The global window fires exactly once, when the watermark first reaches its end. Later watermark
// reports (e.g. the same terminal watermark broadcast across repartition partitions) must not
// re-fire. This flag is in-memory only; restart correctness comes from the state store plus
// exactly-once-v2: the buffered values and consumer offsets are committed atomically, and the
// store is empty once a key has fired, so a restart cannot double-emit. Persisting watermark
// holds is part of the separate WatermarkManager persistence work, not this initial GroupByKey.
private boolean fired = false;

@tvalentyn tvalentyn Jun 29, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this initial implementation, do we need to worry about persisting the state of fired/lastForwardedWatermark, etc, so that we are resilient to runner task restarts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we flush the state (and clear it and successfully commit), the state should be empty after restart.
We will have to rework this when we add support for allowed lateness, then this flag will definitely need to be persisted (we will need to persist the pane index, which will be equivalent to this).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed ,flush + clear + commit means the store is empty after a restart, so no double-fire. And good point that once we add allowed lateness the fired flag effectively becomes the persisted pane index; I'll carry that over when that lands. Thanks!


private @Nullable ProcessorContext<byte[], KStreamsPayload<?>> context;
private @Nullable KeyValueStore<byte[], byte[]> store;

GroupByKeyProcessor(
String stateStoreName, Coder<Object> keyCoder, Coder<@Nullable Object> valueCoder) {
this.stateStoreName = stateStoreName;
this.keyCoder = keyCoder;
this.bufferCoder = IterableCoder.of(valueCoder);
}

@Override
public void init(ProcessorContext<byte[], KStreamsPayload<?>> context) {
this.context = context;
this.store = context.getStateStore(stateStoreName);
}

@Override
public void process(Record<byte[], KStreamsPayload<?>> record) {
KStreamsPayload<?> payload = record.value();
if (payload.isData()) {
byte[] encodedKey = record.key();
Object element = payload.getData().getValue();
if (encodedKey == null || element == null) {
throw new IllegalStateException("GroupByKey data record is missing its key or value");
}
appendValue(encodedKey, element);
return;
}
WatermarkPayload report = payload.asWatermark();
watermarkManager.observe(
report.getSourcePartition(),
new Instant(report.getWatermarkMillis()),
report.getTotalSourcePartitions());
Instant advanced = watermarkManager.advance();
if (!fired && !advanced.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
fireAll(record);
fired = true;
}
if (advanced.isAfter(lastForwardedWatermark)) {
lastForwardedWatermark = advanced;
forwardWatermark(record, advanced.getMillis());
}
}

private void appendValue(byte[] encodedKey, Object kvObject) {
KV<?, ?> kv = (KV<?, ?>) kvObject;
KeyValueStore<byte[], byte[]> kvStore = checkInitialized(store);
byte[] existing = kvStore.get(encodedKey);
List<@Nullable Object> values = existing == null ? new ArrayList<>() : decodeBuffer(existing);
values.add(kv.getValue());
kvStore.put(encodedKey, encodeBuffer(values));
}

private void fireAll(Record<byte[], KStreamsPayload<?>> trigger) {
// NOTE: this emits every buffered key in a single watermark turn. For a very large key space
// that risks memory pressure and exceeding the poll / transaction timeout. Acceptable for this
// initial GlobalWindow GroupByKey (fire once at end of input); incremental, timer-driven output
// via runner-core GroupAlsoByWindow lands with the windowing/timers work.
ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context);
KeyValueStore<byte[], byte[]> kvStore = checkInitialized(store);
List<byte[]> firedKeys = new ArrayList<>();
Comment thread
je-ik marked this conversation as resolved.
try (KeyValueIterator<byte[], byte[]> it = kvStore.all()) {
while (it.hasNext()) {
org.apache.kafka.streams.KeyValue<byte[], byte[]> entry = it.next();
Object key = decodeKey(entry.key);
List<@Nullable Object> values = decodeBuffer(entry.value);
// The pane fires at the end of the global window, so the grouped element carries the
// window's max timestamp (END_OF_GLOBAL_WINDOW). Emitting at TIMESTAMP_MIN_VALUE (the
// default of valueInGlobalWindow) would make the output appear arbitrarily late and be
// dropped downstream once the watermark has advanced.
WindowedValue<KV<Object, Iterable<@Nullable Object>>> output =
WindowedValues.timestampedValueInGlobalWindow(
KV.of(key, (Iterable<@Nullable Object>) values),
GlobalWindow.INSTANCE.maxTimestamp());
ctx.forward(
new Record<byte[], KStreamsPayload<?>>(
entry.key, KStreamsPayload.data(output), trigger.timestamp()));
firedKeys.add(entry.key);
}
}
for (byte[] key : firedKeys) {
kvStore.delete(key);
}
}

private void forwardWatermark(Record<byte[], KStreamsPayload<?>> trigger, long watermarkMillis) {
ProcessorContext<byte[], KStreamsPayload<?>> ctx = checkInitialized(context);
// GroupByKey is a single logical source for the next stage; report it as partition 0 of 1.
ctx.forward(
new Record<byte[], KStreamsPayload<?>>(
trigger.key(), KStreamsPayload.watermark(watermarkMillis, 0, 1), trigger.timestamp()));
}

private byte[] encodeBuffer(List<@Nullable Object> values) {
try {
return CoderUtils.encodeToByteArray(bufferCoder, values);
} catch (CoderException e) {
throw new RuntimeException("Failed to encode GroupByKey value buffer", e);
}
}

private List<@Nullable Object> decodeBuffer(byte[] bytes) {
try {
List<@Nullable Object> values = new ArrayList<>();
for (@Nullable Object value : CoderUtils.decodeFromByteArray(bufferCoder, bytes)) {
values.add(value);
}
return values;
} catch (CoderException e) {
throw new RuntimeException("Failed to decode GroupByKey value buffer", e);
}
}

private Object decodeKey(byte[] bytes) {
try {
return CoderUtils.decodeFromByteArray(keyCoder, bytes);
} catch (CoderException e) {
throw new RuntimeException("Failed to decode GroupByKey key", e);
}
}

private static <T> T checkInitialized(@Nullable T value) {
if (value == null) {
throw new IllegalStateException("GroupByKeyProcessor used before init()");
}
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.Stores;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Translates the {@code beam:transform:group_by_key:v1} URN — the runner's first stateful,
* shuffle-bearing transform.
*
* <p>This is the simplest GroupByKey: GlobalWindow, default trigger, no allowed lateness (per the
* plan agreed with the mentor). Each key's values are buffered in a Kafka Streams state store and
* emitted once as {@code KV<K, Iterable<V>>} when the watermark reaches {@link
* org.apache.beam.sdk.transforms.windowing.BoundedWindow#TIMESTAMP_MAX_VALUE}.
*
* <p>Topology added (the Beam key becomes the Kafka record key so Kafka Streams shuffles by it):
*
* <ul>
* <li>a {@link ShuffleByKeyProcessor} wired to the input's producer, which sets the Kafka record
* key to the encoded Beam key for data records and passes watermark reports through;
* <li>a {@link Topology#addSink sink} to an internal repartition topic, with the payload encoded
* via {@link KStreamsPayloadSerde} and a {@link GroupByKeyBroadcastPartitioner} that hashes
* data by key and fans watermark reports out to every partition;
* <li>a {@link Topology#addSource source} reading the repartition topic back;
* <li>the {@link GroupByKeyProcessor} plus a persistent state store, wired to the source.
* </ul>
*
* <p>The repartition topic is expected to exist on the broker before the job starts (same
* pre-create assumption as the Impulse bootstrap topic); auto-creation lands with the AdminClient
* wiring in a follow-up.
*/
class GroupByKeyTranslator implements PTransformTranslator {

static final String SHUFFLE_SUFFIX = "-shuffle-by-key";
static final String SINK_SUFFIX = "-repartition-sink";
static final String SOURCE_SUFFIX = "-repartition-source";
static final String STATE_STORE_SUFFIX = "-state";
static final String REPARTITION_TOPIC_PREFIX = "__beam_gbk_";

@Override
public void translate(
String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) {
RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId);
String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values());
String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());

@SuppressWarnings({"unchecked", "rawtypes"})
WindowedValues.WindowedValueCoder<KV<Object, Object>> inputCoder =
(WindowedValues.WindowedValueCoder)
instantiateCoder(inputPCollectionId, pipeline.getComponents());
KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) inputCoder.getValueCoder();
Coder<Object> keyCoder = kvCoder.getKeyCoder();
// User values may be null; the checker tracks that through to the buffered iterables.
@SuppressWarnings("unchecked")
Coder<@Nullable Object> valueCoder =
(Coder<@Nullable Object>) (Coder<?>) kvCoder.getValueCoder();

String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);

String shuffleName = transformId + SHUFFLE_SUFFIX;
String sinkName = transformId + SINK_SUFFIX;
String sourceName = transformId + SOURCE_SUFFIX;
String stateStoreName = transformId + STATE_STORE_SUFFIX;
String repartitionTopic = repartitionTopic(transformId);

KStreamsPayloadSerde<KV<Object, Object>> payloadSerde = new KStreamsPayloadSerde<>(inputCoder);

Topology topology = context.getTopology();

// Re-key data records by the encoded Beam key; pass watermark reports through.
topology.addProcessor(shuffleName, () -> new ShuffleByKeyProcessor(keyCoder), parentProcessor);

// Shuffle through the repartition topic: data partitioned by key, watermark broadcast.
topology.addSink(
sinkName,
repartitionTopic,
Serdes.ByteArray().serializer(),
payloadSerde.serializer(),
new GroupByKeyBroadcastPartitioner<>(),
shuffleName);
topology.addSource(
sourceName,
Serdes.ByteArray().deserializer(),
payloadSerde.deserializer(),
repartitionTopic);

// Buffer values per key and fire KV<K, Iterable<V>> at the terminal watermark.
topology.addProcessor(
transformId,
() -> new GroupByKeyProcessor(stateStoreName, keyCoder, valueCoder),
sourceName);
topology.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(stateStoreName), Serdes.ByteArray(), Serdes.ByteArray()),
transformId);

context.registerPCollectionProducer(outputPCollectionId, transformId);
}

/** The internal repartition topic name for a GroupByKey transform. */
static String repartitionTopic(String transformId) {
return REPARTITION_TOPIC_PREFIX + transformId.replaceAll("[^a-zA-Z0-9._-]", "_");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public KafkaStreamsPipelineTranslator() {
ImmutableMap.<String, PTransformTranslator>builder()
.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, new RedistributeTranslator())
.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator())
.put(ExecutableStage.URN, new ExecutableStageTranslator())
.build());
}
Expand Down
Loading
Loading