Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

/** Processor for GroupByKey. */
class GroupByKeyProcessor
implements Processor<
byte[],
KStreamsPayload<KV<Object, Object>>,
byte[],
KStreamsPayload<KV<Object, Iterable<Object>>>> {

private final String stateStoreName;
private final String transformId;
private final Coder<WindowedValue<KV<Object, Object>>> inputCoder;

private ProcessorContext<byte[], KStreamsPayload<KV<Object, Iterable<Object>>>> context;
private KeyValueStore<byte[], byte[]> stateStore;

GroupByKeyProcessor(
String stateStoreName,
String transformId,
Coder<WindowedValue<KV<Object, Object>>> inputCoder) {
this.stateStoreName = stateStoreName;
this.transformId = transformId;
this.inputCoder = inputCoder;
}

@Override
public void init(
ProcessorContext<byte[], KStreamsPayload<KV<Object, Iterable<Object>>>> context) {
this.context = context;
this.stateStore = context.getStateStore(stateStoreName);
}

@Override
public void process(Record<byte[], KStreamsPayload<KV<Object, Object>>> record) {
KStreamsPayload<KV<Object, Object>> payload = record.value();

if (payload.isData()) {
byte[] keyBytes = record.key();
byte[] existingBytes = stateStore.get(keyBytes);
List<WindowedValue<KV<Object, Object>>> list;
if (existingBytes == null) {
list = new ArrayList<>();
} else {
try {
list = listCoder.decode(new ByteArrayInputStream(existingBytes));
} catch (IOException e) {
throw new RuntimeException("Failed to decode buffered GroupByKey state", e);
}
}
list.add(payload.getData());
ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
listCoder.encode(list, os);
} catch (IOException e) {
throw new RuntimeException("Failed to encode buffered GroupByKey state", e);
}
stateStore.put(keyBytes, os.toByteArray());
} else {
WatermarkPayload watermark = payload.asWatermark();
if (watermark.getWatermarkMillis() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
try (KeyValueIterator<byte[], byte[]> iterator = stateStore.all()) {
while (iterator.hasNext()) {
org.apache.kafka.streams.KeyValue<byte[], byte[]> kv = iterator.next();
List<WindowedValue<KV<Object, Object>>> buffered;
try {
buffered = listCoder.decode(new ByteArrayInputStream(kv.value));
} catch (IOException e) {
throw new RuntimeException("Failed to decode buffered GroupByKey state on emit", e);
}
if (!buffered.isEmpty()) {
List<Object> values = new ArrayList<>();
for (WindowedValue<KV<Object, Object>> wv : buffered) {
values.add(wv.getValue().getValue());
}
Object key = buffered.get(0).getValue().getKey();
WindowedValue<KV<Object, Iterable<Object>>> outWv =
WindowedValues.valueInGlobalWindow(KV.of(key, values));
context.forward(
new Record<>(kv.key, KStreamsPayload.data(outWv), record.timestamp()));
}
}
}
// Since we fired everything for the global window, we can optionally clear the store here.
// But the pipeline is finishing.

// Forward the watermark downstream
context.forward(
new Record<>(
record.key(),
KStreamsPayload.watermark(
watermark.getWatermarkMillis(),
watermark.getSourcePartition(),
watermark.getTotalSourcePartitions()),
record.timestamp()));
}
}
}
Comment thread
boy397 marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.kafka.streams.translation;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.construction.RehydratedComponents;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.Stores;

/** Translates the {@code beam:transform:group_by_key:v1} URN. */
class GroupByKeyTranslator implements PTransformTranslator {

static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
static final String SINK_SUFFIX = "-sink";
static final String SOURCE_SUFFIX = "-source";
static final String EXTRACTOR_SUFFIX = "-extractor";
static final String STATE_STORE_SUFFIX = "-state";

@Override
public void translate(
String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) {
RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId);
String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values());
String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values());
String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId);

RehydratedComponents components = RehydratedComponents.forComponents(pipeline.getComponents());
RunnerApi.PCollection inputPColl =
pipeline.getComponents().getPcollectionsOrThrow(inputPCollectionId);

Coder<?> inputCoder;
try {
inputCoder = components.getCoder(inputPColl.getCoderId());
} catch (IOException e) {
throw new IllegalArgumentException("Failed to rehydrate coder for " + inputPCollectionId, e);
}

// Input coder should be WindowedValueCoder<KV<K, V>>
@SuppressWarnings("unchecked")
Coder<WindowedValue<KV<Object, Object>>> typedInputCoder =
(Coder<WindowedValue<KV<Object, Object>>>) inputCoder;

// We extract the KeyCoder to serialize the Kafka key
@SuppressWarnings("unchecked")
WindowedValueCoder<KV<Object, Object>> wvCoder =
(WindowedValueCoder<KV<Object, Object>>) inputCoder;
@SuppressWarnings("unchecked")
KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) wvCoder.getValueCoder();
Coder<Object> keyCoder = kvCoder.getKeyCoder();

KStreamsPayloadSerde<KV<Object, Object>> payloadSerde =
new KStreamsPayloadSerde<>(typedInputCoder);

Topology topology = context.getTopology();
String repartitionTopic = transformId + REPARTITION_TOPIC_SUFFIX;
String extractorNode = transformId + EXTRACTOR_SUFFIX;
String sinkNode = transformId + SINK_SUFFIX;
String sourceNode = transformId + SOURCE_SUFFIX;
String stateStoreName = transformId + STATE_STORE_SUFFIX;

// 1. Extractor processor: parses KStreamsPayload, extracts the key and forwards with proper
// Kafka key
topology.addProcessor(
extractorNode, () -> new KeyExtractorProcessor(keyCoder), parentProcessor);

// 2. Sink node: routes KStreamsPayload to the internal repartition topic
topology.addSink(
sinkNode,
repartitionTopic,
Serdes.ByteArray().serializer(),
payloadSerde.serializer(),
extractorNode);

// 3. Source node: reads from internal repartition topic
topology.addSource(
sourceNode,
Serdes.ByteArray().deserializer(),
payloadSerde.deserializer(),
repartitionTopic);

// 4. GroupByKey processor: Buffers values and emits grouped elements
topology.addProcessor(
transformId,
() -> new GroupByKeyProcessor(stateStoreName, transformId, typedInputCoder),
sourceNode);

// 5. State Store: Buffers elements per key
topology.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(stateStoreName), Serdes.ByteArray(), Serdes.ByteArray()),
transformId);

context.registerPCollectionProducer(outputPCollectionId, transformId);
}

/**
* Processor that extracts the Beam key from the data payload and assigns it as the Kafka record
* key.
*/
private static class KeyExtractorProcessor
implements Processor<
byte[],
KStreamsPayload<KV<Object, Object>>,
byte[],
KStreamsPayload<KV<Object, Object>>> {

private final Coder<Object> keyCoder;
private ProcessorContext<byte[], KStreamsPayload<KV<Object, Object>>> context;

KeyExtractorProcessor(Coder<Object> keyCoder) {
this.keyCoder = keyCoder;
}

@Override
public void init(ProcessorContext<byte[], KStreamsPayload<KV<Object, Object>>> context) {
this.context = context;
}

@Override
public void process(Record<byte[], KStreamsPayload<KV<Object, Object>>> record) {
KStreamsPayload<KV<Object, Object>> payload = record.value();
if (payload.isData()) {
try {
Object key = payload.getData().getValue().getKey();
ByteArrayOutputStream os = new ByteArrayOutputStream();
keyCoder.encode(key, os);
context.forward(record.withKey(os.toByteArray()));
} catch (IOException e) {
throw new RuntimeException("Failed to serialize Beam key for repartitioning", e);
}
} else {
// Watermark payload doesn't have a key, just forward with existing key
context.forward(record);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public KafkaStreamsPipelineTranslator() {
ImmutableMap.<String, PTransformTranslator>builder()
.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator())
.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, new RedistributeTranslator())
.put(PTransformTranslation.GROUP_BY_KEY_URN, new GroupByKeyTranslator())
.put(ExecutableStage.URN, new ExecutableStageTranslator())
.build());
}
Expand Down Expand Up @@ -134,7 +135,9 @@ public void translate(KafkaStreamsTranslationContext context, RunnerApi.Pipeline
@AutoService(NativeTransforms.IsNativeTransform.class)
public static class IsKafkaStreamsNativeTransform implements NativeTransforms.IsNativeTransform {
private static final Set<String> URNS =
ImmutableSet.of(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN);
ImmutableSet.of(
PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN,
PTransformTranslation.GROUP_BY_KEY_URN);

@Override
public boolean test(RunnerApi.PTransform pTransform) {
Expand Down
Loading
Loading