Skip to content

Commit 3dfbe3a

Browse files
author
wangjiahua.wjh
committed
[ISSUE apache#10481] Cache OTel Attributes objects in broker/remoting metrics hot path
Builds on top of PR apache#10443 (which introduces the typed AttributeKey constants). After AttributeKey instances are de-duplicated, the next allocation hot spot in the metrics path is the Attributes object itself. For repeated parameter combinations the broker keeps rebuilding identical immutable Attributes (each ~200-300 byte). This commit introduces a two-level cache (volatile inline cache + ConcurrentHashMap fallback) at three points: - BrokerMetricsManager.getOrBuildTopicAttributes(topic, msgType, isSystem): 4-field volatile inline cache + CHM keyed by 'topic|msgType|isSystem'. - RemotingMetricsManager.getOrBuildAttributes(reqCode, respCode, isLongPolling, result): long-packed cache key (35 bits across 4 args, no String allocation), volatile inline cache + CHM. Falls back to direct build for unknown result values. - RemotingCodeDistributionHandler: replace two independent volatile fields (lastCode, lastAdder) with an immutable CachedCounter holder published through a single volatile reference. A single volatile read per inc() returns a self-consistent (code, adder) snapshot, eliminating a torn-read between the two fields under @sharable concurrent access from multiple Netty EventLoop threads.
1 parent f8e69a4 commit 3dfbe3a

3 files changed

Lines changed: 105 additions & 1 deletion

File tree

broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.List;
7373
import java.util.Locale;
7474
import java.util.Map;
75+
import java.util.concurrent.ConcurrentHashMap;
7576
import java.util.concurrent.TimeUnit;
7677
import java.util.function.Supplier;
7778

@@ -105,6 +106,7 @@
105106
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE_KEY;
106107
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY_KEY;
107108
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM_KEY;
109+
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE_KEY;
108110
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE_KEY;
109111
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID;
110112
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_TYPE;
@@ -164,6 +166,12 @@ public class BrokerMetricsManager {
164166
private LongCounter rollBackMessagesTotal = new NopLongCounter();
165167
private LongHistogram transactionFinishLatency = new NopLongHistogram();
166168

169+
private final ConcurrentHashMap<String, Attributes> topicAttributesCache = new ConcurrentHashMap<>();
170+
private volatile String lastTopicName;
171+
private volatile String lastTopicMsgType;
172+
private volatile boolean lastTopicIsSystem;
173+
private volatile Attributes lastTopicAttributes;
174+
167175
private final RemotingMetricsManager remotingMetricsManager;
168176
private final PopMetricsManager popMetricsManager;
169177

@@ -195,6 +203,26 @@ public AttributesBuilder newAttributesBuilder() {
195203
return attributesBuilder;
196204
}
197205

206+
public Attributes getOrBuildTopicAttributes(String topic, String messageType, boolean isSystem) {
207+
Attributes lastAttrs = this.lastTopicAttributes;
208+
if (lastAttrs != null && isSystem == this.lastTopicIsSystem && topic.equals(this.lastTopicName) && messageType.equals(this.lastTopicMsgType)) {
209+
return lastAttrs;
210+
}
211+
String cacheKey = topic + '|' + messageType + '|' + isSystem;
212+
Attributes attrs = topicAttributesCache.computeIfAbsent(cacheKey, k ->
213+
newAttributesBuilder()
214+
.put(LABEL_TOPIC_KEY, topic)
215+
.put(LABEL_MESSAGE_TYPE_KEY, messageType)
216+
.put(LABEL_IS_SYSTEM_KEY, isSystem)
217+
.build()
218+
);
219+
this.lastTopicName = topic;
220+
this.lastTopicMsgType = messageType;
221+
this.lastTopicIsSystem = isSystem;
222+
this.lastTopicAttributes = attrs;
223+
return attrs;
224+
}
225+
198226
private Attributes buildLagAttributes(ConsumerLagCalculator.BaseCalculateResult result) {
199227
AttributesBuilder attributesBuilder = newAttributesBuilder();
200228
attributesBuilder.put(LABEL_CONSUMER_GROUP_KEY, result.group);

remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,30 @@
3030
import java.time.Duration;
3131
import java.util.Arrays;
3232
import java.util.List;
33+
import java.util.concurrent.ConcurrentHashMap;
3334
import java.util.function.Supplier;
3435
import org.apache.rocketmq.common.Pair;
3536
import org.apache.rocketmq.common.metrics.NopLongHistogram;
37+
import org.apache.rocketmq.remoting.common.RemotingHelper;
3638

3739
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.HISTOGRAM_RPC_LATENCY;
40+
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING_KEY;
3841
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE_KEY;
42+
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE_KEY;
43+
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE_KEY;
44+
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT_KEY;
3945
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING;
40-
4146
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_CANCELED;
47+
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY;
4248
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_SUCCESS;
4349
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;
4450

4551
public class RemotingMetricsManager {
4652
private LongHistogram rpcLatency = new NopLongHistogram();
4753
private Supplier<AttributesBuilder> attributesBuilderSupplier;
54+
private final ConcurrentHashMap<Long, Attributes> attributesCache = new ConcurrentHashMap<>();
55+
private volatile long lastAttrsCacheKey;
56+
private volatile Attributes lastCachedAttrs;
4857

4958
public RemotingMetricsManager() {
5059
}
@@ -87,6 +96,44 @@ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
8796
return Lists.newArrayList(new Pair<>(selector, viewBuilder));
8897
}
8998

99+
public Attributes getOrBuildAttributes(int requestCode, int responseCode,
100+
boolean isLongPolling, String result) {
101+
int resultIdx;
102+
if (RESULT_SUCCESS.equals(result)) resultIdx = 0;
103+
else if (RESULT_ONEWAY.equals(result)) resultIdx = 1;
104+
else if (RESULT_WRITE_CHANNEL_FAILED.equals(result)) resultIdx = 2;
105+
else if (RESULT_CANCELED.equals(result)) resultIdx = 3;
106+
else resultIdx = -1;
107+
108+
if (resultIdx < 0) {
109+
return buildAttributes(requestCode, responseCode, isLongPolling, result);
110+
}
111+
112+
long key = ((long) requestCode << 19)
113+
| ((long) (responseCode & 0xFFFF) << 3)
114+
| (isLongPolling ? 4L : 0L)
115+
| resultIdx;
116+
Attributes cached = this.lastCachedAttrs;
117+
if (cached != null && key == this.lastAttrsCacheKey) {
118+
return cached;
119+
}
120+
Attributes attrs = attributesCache.computeIfAbsent(key,
121+
k -> buildAttributes(requestCode, responseCode, isLongPolling, result));
122+
this.lastAttrsCacheKey = key;
123+
this.lastCachedAttrs = attrs;
124+
return attrs;
125+
}
126+
127+
private Attributes buildAttributes(int requestCode, int responseCode,
128+
boolean isLongPolling, String result) {
129+
return newAttributesBuilder()
130+
.put(LABEL_IS_LONG_POLLING_KEY, isLongPolling)
131+
.put(LABEL_REQUEST_CODE_KEY, RemotingHelper.getRequestCodeDesc(requestCode))
132+
.put(LABEL_RESPONSE_CODE_KEY, RemotingHelper.getResponseCodeDesc(responseCode))
133+
.put(LABEL_RESULT_KEY, result)
134+
.build();
135+
}
136+
90137
public String getWriteAndFlushResult(Future<?> future) {
91138
String result = RESULT_SUCCESS;
92139
if (future.isCancelled()) {

remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,50 @@
3030
@ChannelHandler.Sharable
3131
public class RemotingCodeDistributionHandler extends ChannelDuplexHandler {
3232

33+
/**
34+
* Immutable holder for a cached code→adder pair. Published via a single volatile
35+
* write so racing threads always observe a consistent pair (never a mixed code from
36+
* one update with an adder from another).
37+
*/
38+
private static final class CachedCounter {
39+
final int code;
40+
final LongAdder adder;
41+
42+
CachedCounter(int code, LongAdder adder) {
43+
this.code = code;
44+
this.adder = adder;
45+
}
46+
}
47+
3348
private final ConcurrentMap<Integer, LongAdder> inboundDistribution;
3449
private final ConcurrentMap<Integer, LongAdder> outboundDistribution;
50+
private volatile CachedCounter lastIn;
51+
private volatile CachedCounter lastOut;
3552

3653
public RemotingCodeDistributionHandler() {
3754
inboundDistribution = new ConcurrentHashMap<>();
3855
outboundDistribution = new ConcurrentHashMap<>();
3956
}
4057

4158
private void countInbound(int requestCode) {
59+
CachedCounter cached = lastIn;
60+
if (cached != null && cached.code == requestCode) {
61+
cached.adder.increment();
62+
return;
63+
}
4264
LongAdder item = inboundDistribution.computeIfAbsent(requestCode, k -> new LongAdder());
65+
lastIn = new CachedCounter(requestCode, item);
4366
item.increment();
4467
}
4568

4669
private void countOutbound(int responseCode) {
70+
CachedCounter cached = lastOut;
71+
if (cached != null && cached.code == responseCode) {
72+
cached.adder.increment();
73+
return;
74+
}
4775
LongAdder item = outboundDistribution.computeIfAbsent(responseCode, k -> new LongAdder());
76+
lastOut = new CachedCounter(responseCode, item);
4877
item.increment();
4978
}
5079

0 commit comments

Comments
 (0)