[ISSUE #10512] Eliminate per-RPC allocation in RemotingCommand (Guava Stopwatch, Constructor copy) and downgrade Netty writability log#10514
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR optimizes remoting command processing by reducing allocation overhead (Stopwatch removal, constructor caching, smaller HashMap defaults) and adds a faster header encoding path while also lowering channel writability log verbosity.
Changes:
- Replace Guava
Stopwatchtiming with aSystem.nanoTime()-based timestamp onRemotingCommand. - Cache
CommandCustomHeaderno-arg constructors and addfastEncodeHeaderAsBuffer. - Reduce per-command allocations (smaller extFields default capacity) and downgrade some writability logs to debug.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingContext.java | Adds a shared EMPTY instance for “no context” usage. |
| remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java | Constructor caching, new fast header encoding method, Stopwatch removal, extFields capacity change. |
| remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java | Lowers channel writability logs to debug and guards formatting work. |
| remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java | Switches decode timing from Stopwatch to nanoTime start timestamp. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public static final TopicQueueMappingContext EMPTY = new TopicQueueMappingContext(null, null, null, null, null); | ||
|
|
||
| private String topic; | ||
| private Integer globalId; | ||
| private TopicQueueMappingDetail mappingDetail; |
There was a problem hiding this comment.
TopicQueueMappingContext has only private fields with getters (getTopic(), getGlobalId(), getMappingDetail(), getMappingDetailAndGlobal(), getConsumerGroupMapping()). No public setters exist. The constructor is the only way to set fields, and EMPTY is created with all-null values. This pattern is consistent with Collections.emptyList() — an immutable sentinel.
| public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) { | ||
| ByteBuf buf = Unpooled.buffer(192); | ||
| int beginIndex = buf.writerIndex(); | ||
| buf.writeLong(0); | ||
| int headerSize; | ||
| if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) { | ||
| if (customHeader != null && !(customHeader instanceof FastCodesHeader)) { | ||
| this.makeCustomHeaderToNet(); | ||
| } | ||
| headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, buf); | ||
| } else { | ||
| this.makeCustomHeaderToNet(); | ||
| byte[] header = RemotingSerializable.encode(this); | ||
| headerSize = header.length; | ||
| buf.writeBytes(header); | ||
| } | ||
| buf.setInt(beginIndex, 4 + headerSize + bodyLength); | ||
| buf.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC)); | ||
| ByteBuffer result = buf.nioBuffer(); | ||
| buf.release(); | ||
| return result; | ||
| } |
There was a problem hiding this comment.
Unpooled.buffer(192) creates a UnpooledHeapByteBuf (heap-allocated). Its nioBuffer() returns ByteBuffer.wrap(byte[]) — a view backed by the heap array, not by Netty reference-counted memory. After buf.release(), the heap byte[] remains valid (GC-managed). This would be a concern only for DirectByteBuf where release frees native memory, but heap ByteBuf is safe here.
| // Caches the no-arg constructor of each CommandCustomHeader class. | ||
| // Why: Class.getDeclaredConstructor() copies the Constructor object on every call | ||
| // (sample showed ~70MB of Constructor allocations during a 60s benchmark). | ||
| // The set of header classes is fixed at startup, so ConcurrentHashMap.computeIfAbsent | ||
| // pays the reflective lookup once per class and reuses the cached Constructor thereafter. | ||
| private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Good catch — the comment is misleading. Will update it to accurately describe the get + putIfAbsent pattern used.
| public long processTimerElapsedMs() { | ||
| return (System.nanoTime() - processTimerNanos) / 1_000_000; | ||
| } | ||
|
|
||
| public void setProcessTimer(Stopwatch processTimer) { | ||
| this.processTimer = processTimer; | ||
| public void setProcessTimerNanos(long nanos) { | ||
| this.processTimerNanos = nanos; | ||
| } |
There was a problem hiding this comment.
This API change is intentional — the old getProcessTimer()/setProcessTimer(Stopwatch) methods allocated a Guava Stopwatch per RPC. The new setProcessTimerNanos(long)/processTimerElapsedMs() uses primitive long with zero allocation. The only caller is NettyDecoder (updated in this same PR) and NettyRemotingAbstract (which calls processTimerElapsedMs()). No external callers depend on the old Stopwatch API.
| ctor = cls.getDeclaredConstructor(); | ||
| ctor.setAccessible(true); | ||
| HEADER_CTOR_CACHE.putIfAbsent(cls, ctor); | ||
| } | ||
| return (T) ctor.newInstance(); |
There was a problem hiding this comment.
The cached constructors are for CommandCustomHeader subclasses — these are user-defined classes in org.apache.rocketmq.remoting.protocol.header.*, not JDK internal modules. setAccessible(true) on user-code classes does not trigger JPMS restrictions. RocketMQ already uses this pattern extensively in decodeCommandCustomHeader() (the existing code calls getDeclaredConstructor().newInstance() without JPMS issues).
| if (log.isDebugEnabled()) { | ||
| log.debug("Channel[{}] turns writable, bytes to buffer before changing channel to un-writable: {}", | ||
| RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeUnwritable()); | ||
| } |
There was a problem hiding this comment.
This is an intentional tradeoff. Under high-throughput load, channelWritabilityChanged fires ~900 times/sec, each triggering parseChannelRemoteAddr() + String concatenation + AsyncAppender enqueue. This was measured as a significant per-RPC allocation source. DEBUG level with isDebugEnabled() guard preserves the information for troubleshooting while eliminating the hot-path cost. Same change was made in PR #10491.
| if (log.isDebugEnabled()) { | ||
| log.debug("Channel[{}] auto-read is disabled, bytes to drain before it turns writable: {}", | ||
| RemotingHelper.parseChannelRemoteAddr(channel), channel.bytesBeforeWritable()); | ||
| } |
There was a problem hiding this comment.
Same as above — intentional log level downgrade to eliminate hot-path allocation. See reply on line 587.
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Eliminates per-RPC allocations in the remoting framework: replaces Guava Stopwatch with System.nanoTime(), caches Constructor lookups, downgrades Netty writability log to DEBUG, and adds TopicQueueMappingContext.EMPTY singleton.
Findings
- [Info]
RemotingCommand.java—HEADER_CTOR_CACHEusesConcurrentHashMap.computeIfAbsent(). The mapping function callsctor.newInstance()which is reflective and could throw. Consider wrapping thecomputeIfAbsentlambda with a try-catch to avoid swallowing checked exceptions silently, or useget()with a null check +putIfAbsent()pattern for clearer error propagation. - [Info]
RemotingCommand.java—processTimerNanosdefaults to 0. IfsetProcessTimerNanos()is never called (e.g., code paths that bypassNettyDecoder),processTimerElapsedMs()returns 0. This is likely intentional but worth a brief comment documenting the contract. - [Info]
NettyRemotingServer.java— Good catch on the log amplification (~900 lines/sec). TheisDebugEnabled()guard is correct and avoids unnecessary string formatting.
Suggestions
- Consider adding a unit test for
newHeaderInstance()to verify the constructor cache works correctly for subclasses with different constructor signatures. - The
EMPTYsingleton inTopicQueueMappingContextis a good optimization. Ensure no code path mutates the singleton (e.g., callsetTopic()on it). If immutability cannot be guaranteed, consider returning a defensive copy or making the fields final.
Overall: Clean micro-optimizations with measurable impact on high-throughput paths. 👍
Automated review by github-manager-bot
cb09268 to
0d43e12
Compare
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Four independent per-RPC micro-optimizations in the remoting framework:
- Guava
Stopwatch→System.nanoTime()(eliminates per-RPC allocation) - Constructor cache for
Class.getDeclaredConstructor()(eliminates ~237 alloc events/60s) - Netty writability log downgrade INFO/WARN → DEBUG (eliminates ~900 lines/sec)
TopicQueueMappingContext.EMPTYsingleton (avoids per-message allocation for non-static-topic messages)
Findings
- [Positive]
RemotingCommand.java— ReplacingStopwatchwith rawlong processTimerNanoseliminates one object allocation per RPC. ThenewHeaderInstance()method withConcurrentHashMapconstructor cache is a clean solution to thegetDeclaredConstructor()defensive copy issue. - [Positive]
NettyRemotingServer.java— DowngradingchannelWritabilityChangedto DEBUG withisDebugEnabled()guard is appropriate. ~900 lines/sec of INFO/WARN is clearly excessive for a writability toggle event. - [Positive]
TopicQueueMappingContext.java— AddingEMPTYsingleton is a simple and effective optimization. - [Warning]
RemotingCommand.java— TheHEADER_CTOR_CACHEusescomputeIfAbsenton aConcurrentHashMap. Under high contention with many distinct header classes, this could cause brief blocking on the bin lock. However, in practice the number of distinct header classes is bounded and small, so this is acceptable. - [Warning]
RemotingCommand.java— TheprocessTimerElapsedMs()method computes(System.nanoTime() - processTimerNanos) / 1_000_000. Note thatnanoTime()can overflow after ~292 years of JVM uptime, and the subtraction handles this correctly due to two's complement arithmetic. No issue here, just noting for completeness. - [Info]
NettyDecoder.java— Clean removal of the GuavaStopwatchimport. Thelong timerNanos = System.nanoTime()is a drop-in replacement.
Suggestions
- Consider adding a brief comment on
HEADER_CTOR_CACHEexplaining why aConcurrentHashMapwas chosen overClassValue(which is specifically designed for per-class lazy computation).ClassValuewould avoid the explicit cache map and may have better GC characteristics. - The four changes are independent — if the maintainers prefer smaller PRs, these could be split. However, they are all in the same hot path and the PR description clearly separates them, so keeping them together is also reasonable.
Verdict
LGTM. Well-justified micro-optimizations with clear JFR evidence.
Automated review by github-manager-bot
0d43e12 to
03ee743
Compare
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (re-review)
Summary
Four independent per-RPC micro-optimizations in the remoting framework:
- Guava
Stopwatch→System.nanoTime()(eliminates per-RPC object allocation) - Constructor cache for
Class.getDeclaredConstructor()(eliminates ~237 alloc events/60s in JFR) - Netty writability log downgrade INFO/WARN → DEBUG (eliminates ~900 lines/sec)
TopicQueueMappingContext.EMPTYsingleton (avoids per-message allocation for >99% of messages)
Findings
- [Positive]
RemotingCommand.java— ReplacingStopwatchwith a rawlong processTimerNanos+processTimerElapsedMs()is a clean elimination of per-RPC allocation. The accessor method encapsulates the conversion well. - [Positive]
RemotingCommand.java—HEADER_CTOR_CACHEwithConcurrentHashMap.computeIfAbsent()correctly avoids the per-callConstructorcopy. ThenewHeaderInstance()helper centralizes the reflective construction with proper error handling. - [Positive]
NettyRemotingServer.java— DowngradingchannelWritabilityChangedto DEBUG withisDebugEnabled()guard is the right call. ~900 lines/sec at INFO was clearly excessive. - [Info]
TopicQueueMappingContext.java— TheEMPTYsingleton is a simple and effective optimization. Ensure callers don't mutate it — consider making fields immutable or adding a comment documenting the shared-instance contract. - [Info] Cross-repo:
RemotingCommandchanges (especiallysetProcessTimerNanos/processTimerElapsedMs) touch the core remoting API. Ifapache/rocketmq-clientsJava module depends onRemotingCommand, verify no breakage there.
Verdict
Solid set of micro-optimizations with clear JFR evidence. No correctness concerns.
Automated review by github-manager-bot
…(Guava Stopwatch, Constructor copy) and downgrade Netty writability log
03ee743 to
cb97b22
Compare
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (re-review after new commit)
Summary
Same four per-RPC micro-optimizations as previously reviewed. The new commit squashes the changes into a single commit without altering the diff content.
Verdict
No changes since the previous review. All findings remain valid. Approving.
LGTM ✅
Automated review by github-manager-bot
Summary
Four independent per-RPC micro-allocations eliminated in the remoting framework:
1. Guava Stopwatch → System.nanoTime()
RemotingCommandusedStopwatch.createStarted()which allocates a new object per RPC. Replaced withlong processTimerNanos+processTimerElapsedMs().2. Constructor cache
Class.getDeclaredConstructor()copies theConstructorobject on every call (~237 allocation events per 60s JFR). AddedConcurrentHashMap<Class<?>, Constructor<?>> HEADER_CTOR_CACHEwithcomputeIfAbsentto pay the reflective lookup once per class.3. Netty writability log downgrade
NettyRemotingServer.channelWritabilityChangedlogged at INFO/WARN on every writability change (~81,434 lines in 90s = ~900 lines/sec). Downgraded to DEBUG withisDebugEnabled()guard.4. TopicQueueMappingContext.EMPTY singleton
Non-static-topic messages (>99%) created empty
TopicQueueMappingContextobjects. Addedpublic static final EMPTYsingleton.Files Changed
remoting/.../protocol/RemotingCommand.javanewHeaderInstance()+setProcessTimerNanos()/processTimerElapsedMs()remoting/.../netty/NettyDecoder.javaStopwatch→System.nanoTime()+setProcessTimerNanos()remoting/.../netty/NettyRemotingServer.javachannelWritabilityChangedINFO/WARN → DEBUG withisDebugEnabled()guardremoting/.../protocol/statictopic/TopicQueueMappingContext.javaEMPTYstatic singleton