-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMcpCallCoalescer.java
More file actions
109 lines (96 loc) · 4.38 KB
/
Copy pathMcpCallCoalescer.java
File metadata and controls
109 lines (96 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
public final class McpCallCoalescer<V> {
private record CacheEntry<V>(V value, long expiresAtMs) {}
private final ConcurrentHashMap<String, CompletableFuture<V>> inFlight = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CacheEntry<V>> cache = new ConcurrentHashMap<>();
private final ConcurrentLinkedQueue<String> accessOrder = new ConcurrentLinkedQueue<>();
private final long ttlMillis;
private final int maxEntries;
public McpCallCoalescer(Duration ttl, int maxEntries) {
this.ttlMillis = Math.max(1L, Objects.requireNonNull(ttl, "ttl").toMillis());
if (maxEntries < 1) {
throw new IllegalArgumentException("maxEntries must be at least 1");
}
this.maxEntries = maxEntries;
}
public CompletableFuture<V> get(String key, Supplier<CompletableFuture<V>> loader) {
Objects.requireNonNull(key, "key");
Objects.requireNonNull(loader, "loader");
long now = System.currentTimeMillis();
CacheEntry<V> cached = cache.get(key);
if (cached != null && cached.expiresAtMs() > now) {
accessOrder.offer(key);
return CompletableFuture.completedFuture(cached.value());
}
if (cached != null) {
cache.remove(key, cached);
}
CompletableFuture<V> pending = new CompletableFuture<>();
CompletableFuture<V> shared = inFlight.putIfAbsent(key, pending);
if (shared != null) {
return shared.thenApply(Function.identity());
}
try {
CompletableFuture<V> upstream = Objects.requireNonNull(loader.get(), "loader returned null");
upstream.whenComplete((value, error) -> {
inFlight.remove(key, pending);
if (error != null) {
pending.completeExceptionally(unwrap(error));
return;
}
cache.put(key, new CacheEntry<>(value, System.currentTimeMillis() + ttlMillis));
accessOrder.offer(key);
prune(System.currentTimeMillis());
pending.complete(value);
});
} catch (Throwable error) {
inFlight.remove(key, pending);
pending.completeExceptionally(error);
}
return pending.thenApply(Function.identity());
}
public void invalidate(String key) {
cache.remove(key);
}
public void clear() {
cache.clear();
accessOrder.clear();
}
public int cachedSize() {
prune(System.currentTimeMillis());
return cache.size();
}
public int inFlightCount() {
return inFlight.size();
}
private void prune(long now) {
cache.entrySet().removeIf(entry -> entry.getValue().expiresAtMs() <= now);
while (cache.size() > maxEntries) {
String victim = accessOrder.poll();
if (victim == null) {
return;
}
CacheEntry<V> entry = cache.get(victim);
if (entry != null && (entry.expiresAtMs() <= now || cache.size() > maxEntries)) {
cache.remove(victim, entry);
}
}
}
private Throwable unwrap(Throwable error) {
if ((error instanceof CompletionException || error instanceof ExecutionException) && error.getCause() != null) {
return error.getCause();
}
return error;
}
}
/*
This solves duplicate MCP and agent tool calls piling up when several parts of a Java service ask for the same expensive result at nearly the same time. Built because modern agent backends fan out to search, embeddings, repo reads, and hosted tools, then accidentally stampede the same endpoint with identical arguments. Use it when you want one in-flight call per key plus a short TTL cache for fresh results. The trick: every caller gets its own dependent future, so one timeout or cancellation does not kill the shared upstream work. Drop this into any Java 17+ service that wraps MCP tools, HTTP fetchers, model helpers, or internal platform RPCs.
*/