Skip to content

Commit aad83e2

Browse files
authored
Merge pull request #230 from Deep-CodeAI/feat/4560-transport-retry
feat(#4560): default transient-network retry in shared HTTP transport
2 parents 21f4fe3 + 04ebafc commit aad83e2

4 files changed

Lines changed: 231 additions & 24 deletions

File tree

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,23 @@ All notable changes to Agents.KT are documented here. The format follows [Keep a
44

55
## [Unreleased]
66

7+
### Changed — default transient-network retry across all HTTP model providers (#4560)
8+
9+
The shared non-streaming transport (`HttpModelClientSupport.sendBounded`, used by Claude, OpenAI +
10+
DeepSeek/Kimi/OpenRouter/Perplexity, Gemini, and Ollama) now retries **transient** failures by default —
11+
connection-level exceptions (`IOException`: connection reset, refused, no-route, unexpected EOF) and transient
12+
HTTP statuses (408/429/500/502/503/504) — up to 3 attempts with exponential backoff (250ms→500ms). Previously
13+
only Ollama retried; every other provider failed fast on a network blip unless you opted into
14+
`onLLMError { Retry() }`. This matches the default behavior of official SDKs (e.g. OpenAI). Two deliberate
15+
exclusions: **`HttpTimeoutException` is not retried** (the per-request `timeout` is your *total* budget —
16+
retrying would silently multiply it; it surfaces immediately), and the **original exception type is preserved**
17+
on exhaustion (rethrown as-is, not wrapped) so the agent-level `onLLMError`/`LlmErrorDecision` can still
18+
pattern-match `e is ConnectException`. It sits **below** `onLLMError` (transport rides out blips first; the
19+
handler sees only what survives, identity intact); on the final attempt a transient *status* is returned
20+
unchanged so the per-provider parser still surfaces the provider's own error message. Streaming
21+
(`sendChatStream`) is not retried (re-issuing mid-stream would duplicate delivered tokens — a connect-phase
22+
follow-up). 6 tests (scripted fake `HttpClient`).
23+
724
### Added — seller-side x402 payments: `X402PaymentGate` (#4527, PRD §12.8) — experimental
825

926
`X402PaymentGate(requirements, facilitator).gate(handler)` wraps any JDK `HttpHandler` so a resource is served

docs/error-recovery.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,4 +230,23 @@ agent<String, Report>("analyst") {
230230

231231
The handler does **not** fire for budget caps (`onBudgetExceeded` owns those) or cancellation, and v1 scopes recovery to the agentic loop — a model failure during multi-skill LLM routing still propagates loud. See [production-hardening.md](production-hardening.md) for the deployment checklist entry.
232232

233+
### Transport-layer retry (automatic, below `onLLMError`) — #4560
234+
235+
Before a failure ever reaches `onLLMError`, the shared HTTP transport (`HttpModelClientSupport.sendBounded`, used by **every** provider's non-streaming call — Claude, OpenAI + DeepSeek/Kimi/OpenRouter/Perplexity, Gemini, Ollama) already retries **transient** failures **by default**, no opt-in:
236+
237+
- **connection-level exceptions** — an `IOException` from the send (connection reset, refused, no-route, unexpected EOF), and
238+
- **transient HTTP statuses** — 408 / 429 / 500 / 502 / 503 / 504.
239+
240+
Up to 3 attempts, exponential backoff (250ms → 500ms). This matches what official SDKs (e.g. OpenAI) do by default — a dropped connection or a 503 is the textbook retryable case, so you don't have to write a handler for it.
241+
242+
Two deliberate exclusions: **`HttpTimeoutException` is not retried** — the per-request `timeout` is your *total* budget, and retrying would silently multiply it, so a timeout surfaces immediately. And the **original exception type is preserved** on exhaustion (rethrown as-is, not wrapped) — that's what lets the `onLLMError` handler above match `e is ConnectException`.
243+
244+
The two layers compose, transport first:
245+
246+
```
247+
http.send → [transport retry: conn-level IOException / 5xx, ×3] → raw exception → [onLLMError: your policy] → loop
248+
```
249+
250+
So `onLLMError` sees only what survives the transport retries — use it for *semantic* recovery (a `RespondWith` fallback, a longer `Retry` schedule, escalation, or retrying a `HttpTimeoutException`), not for plain connection blips. Streaming (`sendChatStream`) is **not** transport-retried (re-issuing mid-stream would duplicate delivered tokens); wrap a streaming call in `onLLMError { Retry() }` or a `firstOf(...)` fallback if you need connect-phase resilience there. For higher-level recovery — fall over to another provider/model, or take the first of N samples — use `firstOf(a, b)` / `agent.speculative(n)`; to re-run until the output is valid, `loopUntil { … }` (see [composition.md](composition.md)).
251+
233252
---
Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,91 @@
11
package agents_engine.model
22

3+
import java.io.IOException
34
import java.net.http.HttpClient
45
import java.net.http.HttpRequest
56
import java.net.http.HttpResponse
7+
import java.net.http.HttpTimeoutException
68

79
/**
8-
* `agents_engine/model/HttpModelClientSupport.kt` — #2792 shared
9-
* transport seam for the JDK-HttpClient-backed provider adapters
10-
* ([ClaudeClient], [OpenAiClient], [OllamaClient], and [DeepSeekClient]
11-
* via OpenAi). Each adapter copy-pasted the same bounded-read +
12-
* OOM-guard block; the duplication is concentrated here so a future
13-
* transport-layer improvement (retry strategy, distributed tracing
14-
* span propagation, mTLS) is a one-place edit.
10+
* `agents_engine/model/HttpModelClientSupport.kt` — #2792 shared transport seam for the
11+
* JDK-HttpClient-backed provider adapters ([ClaudeClient], [OpenAiClient] (+ DeepSeek/Kimi/OpenRouter/
12+
* Perplexity via it), [GeminiClient], [OllamaClient]). Concentrates the bounded-read + OOM-guard and —
13+
* since #4560 — the **transient-network retry policy**, so transport-layer improvements are one-place edits.
1514
*
16-
* Scope is intentionally small for the 0.6.x line: just the
17-
* "POST a JSON body, read a bounded response, surface a provider-
18-
* tagged LlmProviderException on overflow" pattern. The per-adapter
19-
* `HttpClient` instances still live in each client (their
20-
* `connectTimeout` differs across providers because users can tune
21-
* it per `model { }` block); future passes can lift more shape here
22-
* if more providers join.
15+
* **Transient retry (#4560).** [sendBounded] retries the *non-streaming* request on:
16+
* - **connection-level exceptions** — an [IOException] from `http.send` (connection reset, refused,
17+
* no-route, unexpected EOF), and
18+
* - **transient HTTP statuses** — 408 / 429 / 500 / 502 / 503 / 504.
19+
*
20+
* Up to [MAX_ATTEMPTS] tries with exponential backoff (`INITIAL_BACKOFF_MS * 2^attempt`). This matches the
21+
* behavior official SDKs (e.g. OpenAI) apply by default — a dropped connection or a 503 is the textbook
22+
* retryable case, so callers get it without opting in.
23+
*
24+
* Two deliberate exclusions:
25+
* - **[HttpTimeoutException] is NOT retried** — the per-request `timeout` is the caller's *total* budget;
26+
* retrying would silently multiply it. It propagates immediately, fast and raw.
27+
* - **the original exception type is preserved** on exhaustion (rethrown as-is, not wrapped), so the
28+
* agent-level `onLLMError`/[LlmErrorDecision] handler can still pattern-match `e is ConnectException` etc.
29+
*
30+
* It sits **below** `onLLMError`: the transport rides out blips first; a persistent failure surfaces to the
31+
* handler with its identity intact. On the final attempt a transient *status* is NOT converted to an error
32+
* here — the body is returned unchanged so the per-provider parser still surfaces the provider's own message.
33+
*
34+
* Streaming (`sendChatStream`) is deliberately not retried here — re-issuing mid-stream would duplicate
35+
* already-delivered tokens; connect-phase streaming retry is a separate follow-up.
2336
*/
2437
internal object HttpModelClientSupport {
2538

39+
const val MAX_ATTEMPTS: Int = 3
40+
const val INITIAL_BACKOFF_MS: Long = 250L
41+
// 408 Request Timeout, 429 Too Many Requests, 500/502/503/504 server/gateway errors.
42+
@Suppress("MagicNumber")
43+
private val TRANSIENT_STATUSES: Set<Int> = setOf(408, 429, 500, 502, 503, 504)
44+
2645
/**
27-
* Sends [request] using [http], reads at most [maxResponseBytes]
28-
* (+1 sentinel) of the response body, and throws
29-
* [LlmProviderException] tagged with [providerLabel] when the
30-
* response would exceed the cap. Returns the UTF-8 decoded body.
31-
*
32-
* The +1 sentinel + post-read compare is intentional — `readNBytes(N)`
33-
* may legitimately return exactly N bytes for an N-byte response, so
34-
* we read one more than the cap to disambiguate "exactly at cap" from
35-
* "would have exceeded".
46+
* Sends [request] using [http] (with transient retry, see class doc), reads at most [maxResponseBytes]
47+
* (+1 sentinel) of the response body, and throws [LlmProviderException] tagged with [providerLabel] when
48+
* the response would exceed the cap. Returns the UTF-8 decoded body.
3649
*/
3750
fun sendBounded(
3851
http: HttpClient,
3952
request: HttpRequest,
4053
providerLabel: String,
4154
maxResponseBytes: Long,
4255
): String {
43-
val response = http.send(request, HttpResponse.BodyHandlers.ofInputStream())
56+
repeat(MAX_ATTEMPTS) { attempt ->
57+
val lastAttempt = attempt == MAX_ATTEMPTS - 1
58+
val response: HttpResponse<java.io.InputStream> = try {
59+
http.send(request, HttpResponse.BodyHandlers.ofInputStream())
60+
} catch (e: HttpTimeoutException) {
61+
throw e // the per-request timeout is the caller's total budget — never multiply it by retrying
62+
} catch (e: IOException) {
63+
// Connection-level failure (reset, refused, no route, unexpected EOF). Rethrow the ORIGINAL
64+
// on exhaustion so onLLMError can still match `e is ConnectException`, etc.
65+
if (lastAttempt) throw e
66+
backoff(attempt)
67+
return@repeat
68+
}
69+
// Transient server/throttling status: retry unless we're out of attempts. On the final attempt
70+
// fall through and return the body so the per-provider parser surfaces the real error message.
71+
if (!lastAttempt && response.statusCode() in TRANSIENT_STATUSES) {
72+
response.body().close()
73+
backoff(attempt)
74+
return@repeat
75+
}
76+
return readBounded(response, providerLabel, maxResponseBytes)
77+
}
78+
error("sendBounded retry loop exited without a result") // unreachable: last attempt returns or throws
79+
}
80+
81+
private fun readBounded(
82+
response: HttpResponse<java.io.InputStream>,
83+
providerLabel: String,
84+
maxResponseBytes: Long,
85+
): String {
4486
val cap = maxResponseBytes.coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
87+
// +1 sentinel: readNBytes(N) may return exactly N for an N-byte body, so read one more to
88+
// disambiguate "exactly at cap" from "would have exceeded".
4589
val bytes = response.body().use { it.readNBytes(cap + 1) }
4690
if (bytes.size > cap) {
4791
throw LlmProviderException(
@@ -50,4 +94,8 @@ internal object HttpModelClientSupport {
5094
}
5195
return String(bytes, Charsets.UTF_8)
5296
}
97+
98+
private fun backoff(attempt: Int) {
99+
Thread.sleep(INITIAL_BACKOFF_MS shl attempt)
100+
}
53101
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package agents_engine.model
2+
3+
import java.io.ByteArrayInputStream
4+
import java.io.IOException
5+
import java.io.InputStream
6+
import java.net.URI
7+
import java.net.http.HttpClient
8+
import java.net.http.HttpHeaders
9+
import java.net.http.HttpRequest
10+
import java.net.http.HttpResponse
11+
import java.net.http.HttpTimeoutException
12+
import java.util.Optional
13+
import javax.net.ssl.SSLContext
14+
import javax.net.ssl.SSLParameters
15+
import kotlin.test.Test
16+
import kotlin.test.assertEquals
17+
import kotlin.test.assertFailsWith
18+
import kotlin.test.assertTrue
19+
20+
// #4560 — default transient-network retry in the shared HTTP transport. Hermetic: a scripted fake
21+
// HttpClient drives sendBounded through retry on IOException / transient status, no-retry on 4xx, and
22+
// exhaustion. All provider adapters inherit this via HttpModelClientSupport.sendBounded.
23+
class HttpModelClientSupportTest {
24+
25+
private fun ok(body: String) = FakeResponse(200, body)
26+
private fun status(code: Int) = FakeResponse(code, """{"error":"$code"}""")
27+
private val request = HttpRequest.newBuilder().uri(URI.create("http://test/")).GET().build()
28+
29+
private fun send(vararg steps: () -> HttpResponse<InputStream>): Pair<String, Int> {
30+
val client = ScriptedHttpClient(steps.toMutableList())
31+
val body = HttpModelClientSupport.sendBounded(client, request, "Test", 4096)
32+
return body to client.calls
33+
}
34+
35+
@Test
36+
fun `retries network exceptions then succeeds`() {
37+
val (body, calls) = send(
38+
{ throw IOException("connection reset") },
39+
{ throw IOException("timeout") },
40+
{ ok("done") },
41+
)
42+
assertEquals("done", body)
43+
assertEquals(3, calls)
44+
}
45+
46+
@Test
47+
fun `retries a transient status then succeeds`() {
48+
val (body, calls) = send({ status(503) }, { ok("recovered") })
49+
assertEquals("recovered", body)
50+
assertEquals(2, calls)
51+
}
52+
53+
@Test
54+
fun `rethrows the original exception (type preserved) after exhausting attempts`() {
55+
val reset = { throw IOException("reset") }
56+
val e = assertFailsWith<IOException> { send(reset, reset, reset) }
57+
assertEquals("reset", e.message) // original preserved so onLLMError can match e is ConnectException, etc.
58+
}
59+
60+
@Test
61+
fun `does not retry a timeout (respects the configured per-request budget)`() {
62+
val client = ScriptedHttpClient(mutableListOf({ throw HttpTimeoutException("timed out") }, { ok("nope") }))
63+
assertFailsWith<HttpTimeoutException> { HttpModelClientSupport.sendBounded(client, request, "Test", 4096) }
64+
assertEquals(1, client.calls, "a timeout must surface immediately, not be retried")
65+
}
66+
67+
@Test
68+
fun `does not retry a non-transient 4xx and returns its body for downstream parsing`() {
69+
val (body, calls) = send({ status(400) }, { ok("should-not-reach") })
70+
assertTrue("400" in body, body)
71+
assertEquals(1, calls, "4xx must not be retried")
72+
}
73+
74+
@Test
75+
fun `returns the final transient-status body instead of masking the provider error`() {
76+
// 503 every time → after MAX_ATTEMPTS, the last body is returned so the per-provider parser can
77+
// surface the real error message rather than a generic transport error.
78+
val (body, calls) = send({ status(503) }, { status(503) }, { status(503) })
79+
assertTrue("503" in body, body)
80+
assertEquals(3, calls)
81+
}
82+
83+
// --- test doubles ---
84+
85+
private class FakeResponse(private val status: Int, body: String) : HttpResponse<InputStream> {
86+
private val bytes = body.toByteArray()
87+
override fun statusCode() = status
88+
override fun body(): InputStream = ByteArrayInputStream(bytes)
89+
override fun request(): HttpRequest = HttpRequest.newBuilder().uri(URI.create("http://test/")).build()
90+
override fun previousResponse(): Optional<HttpResponse<InputStream>> = Optional.empty()
91+
override fun headers(): HttpHeaders = HttpHeaders.of(emptyMap()) { _, _ -> true }
92+
override fun sslSession() = Optional.empty<javax.net.ssl.SSLSession>()
93+
override fun uri(): URI = URI.create("http://test/")
94+
override fun version(): HttpClient.Version = HttpClient.Version.HTTP_1_1
95+
}
96+
97+
private class ScriptedHttpClient(private val steps: MutableList<() -> HttpResponse<InputStream>>) : HttpClient() {
98+
var calls = 0
99+
100+
@Suppress("UNCHECKED_CAST")
101+
override fun <T> send(req: HttpRequest, handler: HttpResponse.BodyHandler<T>): HttpResponse<T> {
102+
calls++
103+
return steps.removeAt(0).invoke() as HttpResponse<T>
104+
}
105+
106+
override fun cookieHandler() = Optional.empty<java.net.CookieHandler>()
107+
override fun connectTimeout() = Optional.empty<java.time.Duration>()
108+
override fun followRedirects(): Redirect = Redirect.NEVER
109+
override fun proxy() = Optional.empty<java.net.ProxySelector>()
110+
override fun sslContext(): SSLContext = SSLContext.getDefault()
111+
override fun sslParameters(): SSLParameters = SSLParameters()
112+
override fun authenticator() = Optional.empty<java.net.Authenticator>()
113+
override fun version(): Version = Version.HTTP_1_1
114+
override fun executor() = Optional.empty<java.util.concurrent.Executor>()
115+
override fun <T> sendAsync(req: HttpRequest, handler: HttpResponse.BodyHandler<T>) = error("unused")
116+
override fun <T> sendAsync(
117+
req: HttpRequest,
118+
handler: HttpResponse.BodyHandler<T>,
119+
push: HttpResponse.PushPromiseHandler<T>?,
120+
) = error("unused")
121+
override fun newWebSocketBuilder() = error("unused")
122+
}
123+
}

0 commit comments

Comments
 (0)