Skip to content

[ISSUE #10434] Implement proxy gRPC offset query RPCs#10509

Open
Zhengcy05 wants to merge 1 commit into
apache:developfrom
Zhengcy05:enhance/proxy-grpc-offset-rpc
Open

[ISSUE #10434] Implement proxy gRPC offset query RPCs#10509
Zhengcy05 wants to merge 1 commit into
apache:developfrom
Zhengcy05:enhance/proxy-grpc-offset-rpc

Conversation

@Zhengcy05

Copy link
Copy Markdown

Which Issue(s) This PR Fixes

Fixes #10434

  • Fixes #issue_id

Brief Description

This PR implements the v5 gRPC QueryOffset and GetOffset RPCs in the proxy.
Previously, these RPCs were declared by the proto service but were not overridden by the proxy gRPC service implementation, so clients received UNIMPLEMENTED.
The change adds a new OffsetActivity, wires it through the gRPC activity/application layers, and maps:

  • GetOffset to consumer offset query
  • QueryOffset(BEGINNING) to min offset
  • QueryOffset(END) to max offset
  • QueryOffset(TIMESTAMP) to timestamp-based offset search
    It also exposes searchOffset through the proxy processor/message service layers so the timestamp policy can reuse the existing broker remoting capability.

How Did You Test This Change?

env JAVA_HOME=/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home mvn -pl proxy -Dspotbugs.skip=true -Dtest=OffsetActivityTest,GrpcMessagingApplicationTest -DfailIfNoTests=false test

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

Implements proxy gRPC offset query RPCs (GetOffset and QueryOffset) to allow clients to query consumer group offsets via the Proxy layer. Adds new OffsetActivity, GrpcOffsetConverter, and processor methods across 12 files.

Findings

  • [Warning] OffsetActivity.java:56getOffset() uses messagingProcessor.getOffset() inside CompletableFuture.supplyAsync() with a cached thread pool. If messagingProcessor.getOffset() is a blocking call (e.g., involves network I/O to broker), ensure the thread pool has bounded size to prevent resource exhaustion under load. Consider using a dedicated executor for offset queries.
  • [Warning] GrpcOffsetConverter.java — The converter maps between gRPC GetOffsetRequest/QueryOffsetRequest and internal request objects. Verify that all required fields are validated — particularly resource.name() (consumer group) which if empty could cause unexpected behavior downstream.
  • [Info] DefaultGrpcMessagingActivity.java — New OffsetActivity is correctly initialized and wired into the lifecycle (startAsync/awaitRunning/stopAsync). The pattern follows existing activities consistently.
  • [Info] Proxy gRPC service definition — Adding getOffset and queryOffset to the messaging service extends the Proxy API surface. Ensure the proto definitions are also updated in the rocketmq-clients repo for cross-language support.

Suggestions

  • Cross-repo: This PR adds new gRPC RPCs to the Proxy. The apache/rocketmq-clients repo may need corresponding proto updates and client-side implementations for GetOffset/QueryOffset. Consider coordinating with client SDK maintainers.
  • Add integration tests for the new offset query endpoints, especially edge cases: non-existent consumer group, empty group name, broker unreachable.
  • Consider adding rate limiting or caching for offset queries if they're expected to be high-frequency, as they may trigger broker-side lookups.

Compatibility Note

This extends the gRPC API. Existing clients will not be affected since these are new RPCs. However, clients that don't know about these RPCs will get UNIMPLEMENTED if they try to call an older Proxy version.

Solid feature implementation with good structural consistency. 🚀


Automated review by github-manager-bot

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

Implements proxy gRPC offset query RPCs (GetOffset and QueryOffset) in the RocketMQ Proxy layer, wiring through the full stack: gRPC activity → processor → message service → broker client.

Findings

  • [Positive] OffsetActivity.java — Clean implementation following the existing pattern of other activity classes. The queryOffset0() method correctly handles BEGINNING, END, and TIMESTAMP policies with proper validation.
  • [Positive] GrpcMessagingApplication.java — Proper routing of getOffset and queryOffset to the OffsetActivity delegate.
  • [Positive] Good test coverage: OffsetActivityTest.java (+170 lines) and GrpcMessagingApplicationTest.java additions cover the new RPCs.
  • [Warning] OffsetActivity.java — In getOffset(), a new CompletableFuture is created but only used in the catch block. The try block returns the future from messagingProcessor.queryConsumerOffset() directly. This pattern means the outer future is unused in the success path. Consider simplifying:
    try {
        validateTopicAndConsumerGroup(...);
        MessageQueue mq = convertMessageQueue(request.getMessageQueue());
        return this.messagingProcessor.queryConsumerOffset(ctx, mq, request.getGroup().getName(), MessagingProcessor.DEFAULT_TIMEOUT_MILLS)
            .thenApply(this::convertToGetOffsetResponse);
    } catch (Throwable t) {
        return CompletableFuture.failedFuture(t);
    }
    This avoids creating an unused CompletableFuture on every call. The same pattern applies to queryOffset().
  • [Warning] OffsetActivity.java — The convertToGetOffsetResponse and convertToQueryOffsetResponse methods create new response objects. Ensure the protobuf GetOffsetResponse and QueryOffsetResponse builders are being used efficiently (no unnecessary copies).
  • [Info] ConsumerProcessor.java / DefaultMessagingProcessor.java — The new queryConsumerOffset, getMinOffset, getMaxOffset, and searchOffset methods follow the existing delegation pattern correctly.

Suggestions

  • Consider using CompletableFuture.failedFuture(t) (Java 9+) instead of creating a new future and calling completeExceptionally(t). This is more concise and avoids the unused future allocation.
  • The queryOffset0() method uses request.hasTimestamp() to check for timestamp presence. Ensure this is consistent with the protobuf definition where timestamp is optional for non-TIMESTAMP policies.

Verdict

LGTM with minor suggestions. Solid implementation with good test coverage.


Automated review by github-manager-bot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Proxy v5 gRPC: implement QueryOffset / GetOffset so clients can read consumer offsets

2 participants