[ISSUE #10434] Implement proxy gRPC offset query RPCs#10509
Conversation
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Implements proxy gRPC offset query RPCs (GetOffset and QueryOffset) to allow clients to query consumer group offsets via the Proxy layer. Adds new OffsetActivity, GrpcOffsetConverter, and processor methods across 12 files.
Findings
- [Warning]
OffsetActivity.java:56—getOffset()usesmessagingProcessor.getOffset()insideCompletableFuture.supplyAsync()with a cached thread pool. IfmessagingProcessor.getOffset()is a blocking call (e.g., involves network I/O to broker), ensure the thread pool has bounded size to prevent resource exhaustion under load. Consider using a dedicated executor for offset queries. - [Warning]
GrpcOffsetConverter.java— The converter maps between gRPCGetOffsetRequest/QueryOffsetRequestand internal request objects. Verify that all required fields are validated — particularlyresource.name()(consumer group) which if empty could cause unexpected behavior downstream. - [Info]
DefaultGrpcMessagingActivity.java— NewOffsetActivityis correctly initialized and wired into the lifecycle (startAsync/awaitRunning/stopAsync). The pattern follows existing activities consistently. - [Info]
Proxy gRPC service definition— AddinggetOffsetandqueryOffsetto the messaging service extends the Proxy API surface. Ensure the proto definitions are also updated in therocketmq-clientsrepo for cross-language support.
Suggestions
- Cross-repo: This PR adds new gRPC RPCs to the Proxy. The
apache/rocketmq-clientsrepo may need corresponding proto updates and client-side implementations forGetOffset/QueryOffset. Consider coordinating with client SDK maintainers. - Add integration tests for the new offset query endpoints, especially edge cases: non-existent consumer group, empty group name, broker unreachable.
- Consider adding rate limiting or caching for offset queries if they're expected to be high-frequency, as they may trigger broker-side lookups.
Compatibility Note
This extends the gRPC API. Existing clients will not be affected since these are new RPCs. However, clients that don't know about these RPCs will get UNIMPLEMENTED if they try to call an older Proxy version.
Solid feature implementation with good structural consistency. 🚀
Automated review by github-manager-bot
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Implements proxy gRPC offset query RPCs (GetOffset and QueryOffset) in the RocketMQ Proxy layer, wiring through the full stack: gRPC activity → processor → message service → broker client.
Findings
- [Positive]
OffsetActivity.java— Clean implementation following the existing pattern of other activity classes. ThequeryOffset0()method correctly handlesBEGINNING,END, andTIMESTAMPpolicies with proper validation. - [Positive]
GrpcMessagingApplication.java— Proper routing ofgetOffsetandqueryOffsetto theOffsetActivitydelegate. - [Positive] Good test coverage:
OffsetActivityTest.java(+170 lines) andGrpcMessagingApplicationTest.javaadditions cover the new RPCs. - [Warning]
OffsetActivity.java— IngetOffset(), a newCompletableFutureis created but only used in the catch block. The try block returns the future frommessagingProcessor.queryConsumerOffset()directly. This pattern means the outerfutureis unused in the success path. Consider simplifying:This avoids creating an unusedtry { validateTopicAndConsumerGroup(...); MessageQueue mq = convertMessageQueue(request.getMessageQueue()); return this.messagingProcessor.queryConsumerOffset(ctx, mq, request.getGroup().getName(), MessagingProcessor.DEFAULT_TIMEOUT_MILLS) .thenApply(this::convertToGetOffsetResponse); } catch (Throwable t) { return CompletableFuture.failedFuture(t); }
CompletableFutureon every call. The same pattern applies toqueryOffset(). - [Warning]
OffsetActivity.java— TheconvertToGetOffsetResponseandconvertToQueryOffsetResponsemethods create new response objects. Ensure the protobufGetOffsetResponseandQueryOffsetResponsebuilders are being used efficiently (no unnecessary copies). - [Info]
ConsumerProcessor.java/DefaultMessagingProcessor.java— The newqueryConsumerOffset,getMinOffset,getMaxOffset, andsearchOffsetmethods follow the existing delegation pattern correctly.
Suggestions
- Consider using
CompletableFuture.failedFuture(t)(Java 9+) instead of creating a new future and callingcompleteExceptionally(t). This is more concise and avoids the unused future allocation. - The
queryOffset0()method usesrequest.hasTimestamp()to check for timestamp presence. Ensure this is consistent with the protobuf definition where timestamp is optional for non-TIMESTAMP policies.
Verdict
LGTM with minor suggestions. Solid implementation with good test coverage.
Automated review by github-manager-bot
Which Issue(s) This PR Fixes
Fixes #10434
Brief Description
This PR implements the v5 gRPC QueryOffset and GetOffset RPCs in the proxy.
Previously, these RPCs were declared by the proto service but were not overridden by the proxy gRPC service implementation, so clients received UNIMPLEMENTED.
The change adds a new OffsetActivity, wires it through the gRPC activity/application layers, and maps:
It also exposes searchOffset through the proxy processor/message service layers so the timestamp policy can reuse the existing broker remoting capability.
How Did You Test This Change?
env JAVA_HOME=/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home mvn -pl proxy -Dspotbugs.skip=true -Dtest=OffsetActivityTest,GrpcMessagingApplicationTest -DfailIfNoTests=false test