Skip to content

Commit 161397a

Browse files
AlexCherrypiclaude
andcommitted
test(discovery): cover the event-stream leak invariant directly
The 47b445d fix landed with three tests against consumeEventStream, but the actual leak lived one layer up — eventLoop re-calling cli.Events per message. Those tests would not have caught a regression at that exact layer. Refactor eventLoop into a thin glue + runEventLoop(ctx, source, consume). source and consume are injectable, so the new test TestRunEventLoop_OnlyReopensAfterStreamEnds can count source calls directly: 5 messages must produce exactly 1 source call, then a stream close must produce a 2nd. The pre-fix shape would have requested a fresh stream after every message and failed this loudly. Also: - consumeEventStream now takes errBackoff as a parameter (production passes a new streamErrBackoff = 2*time.Second constant; tests pass 1ms). TestConsumeEventStream_ErrSignalRequestsRetry no longer relies on a racy ctx-timeout — it asserts retry=true && err==nil cleanly. - Doc comment dropped the misleading "F-46 regression" framing. The bug predates F-46 (lived in eventLoop since the initial skeleton); the doc now points at the fixing commit 47b445d instead. Verified: go test ./internal/discovery/... -race -count=3 clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 47b445d commit 161397a

2 files changed

Lines changed: 140 additions & 29 deletions

File tree

internal/discovery/discovery.go

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -166,22 +166,37 @@ func (d *Discoverer) pollLoop(ctx context.Context) {
166166
}
167167
}
168168

169+
// streamErrBackoff is the pause after the docker event stream emits
170+
// an error before we reopen. Production value; tests pass shorter
171+
// durations directly into consumeEventStream.
172+
const streamErrBackoff = 2 * time.Second
173+
169174
func (d *Discoverer) eventLoop(ctx context.Context) error {
170175
f := buildEventFilter(d.discriminator)
176+
source := func(ctx context.Context) (<-chan events.Message, <-chan error) {
177+
return d.cli.Events(ctx, events.ListOptions{Filters: f})
178+
}
179+
return runEventLoop(ctx, source, d.consumeEvents)
180+
}
171181

182+
// runEventLoop is the reconnect-loop: open a stream via source, drain
183+
// it via consume until ctx cancels or the stream ends, reopen only
184+
// then. Extracted from eventLoop so tests can verify the central
185+
// invariant — source is called once per real disconnect, NOT once per
186+
// event. The pre-fix shape placed source() inside the inner select
187+
// and re-called it on every message, opening a fresh long-poll HTTP
188+
// request to docker(-proxy) per event while the prior request's
189+
// goroutine stayed parked on the old (still-valid-ctx) connection.
190+
// On busy event sources (Frigate watchdog cycling ffmpeg subprocesses)
191+
// the leaked sockets exhaust the kernel's tcp_mem.
192+
func runEventLoop(
193+
ctx context.Context,
194+
source func(context.Context) (<-chan events.Message, <-chan error),
195+
consume func(context.Context, <-chan events.Message, <-chan error) (bool, error),
196+
) error {
172197
for {
173-
msgs, errs := d.cli.Events(ctx, events.ListOptions{Filters: f})
174-
// consumeEvents stays on the SAME (msgs, errs) pair until the
175-
// stream ends (errs/closed) or ctx is cancelled. Earlier
176-
// versions exited the inner select after every single message
177-
// and re-called cli.Events, which opens a fresh long-poll HTTP
178-
// request to the docker daemon each time but never closes the
179-
// previous one — the prior goroutine stays parked on the old
180-
// connection. On busy event sources (e.g. a Frigate watchdog
181-
// restarting ffmpeg subprocesses) anchord then accumulates
182-
// hundreds of ESTAB sockets to docker(-proxy) until the kernel
183-
// hits its tcp_mem ceiling.
184-
retry, err := d.consumeEvents(ctx, msgs, errs)
198+
msgs, errs := source(ctx)
199+
retry, err := consume(ctx, msgs, errs)
185200
if err != nil {
186201
return err
187202
}
@@ -202,20 +217,30 @@ func (d *Discoverer) consumeEvents(ctx context.Context, msgs <-chan events.Messa
202217
if err := d.snapshot(ctx); err != nil {
203218
slog.Warn("event-driven snapshot failed", "err", err)
204219
}
205-
})
220+
}, streamErrBackoff)
206221
}
207222

208223
// consumeEventStream is the pure consume-loop. Behaviour contract:
209224
// - ctx cancelled → returns (false, ctx.Err())
210-
// - errs delivers → returns (true, nil) after a 2s backoff
225+
// - errs delivers → returns (true, nil) after errBackoff
211226
// - msgs is closed → returns (true, nil) immediately
212227
// - msgs message → onMessage() invoked, loop continues on SAME stream
213228
//
214229
// The invariant: while messages keep flowing, the function does NOT
215230
// return — callers must not re-enter to reopen a fresh stream per
216231
// message. That pattern leaked the long-poll HTTP request to
217-
// docker(-proxy) on every event (F-46 regression).
218-
func consumeEventStream(ctx context.Context, msgs <-chan events.Message, errs <-chan error, onMessage func()) (retry bool, err error) {
232+
// docker(-proxy) on every event (see commit 47b445d).
233+
//
234+
// errBackoff is a parameter, not a constant, so tests can pass a tiny
235+
// duration and assert retry semantics deterministically without
236+
// waiting two real seconds.
237+
func consumeEventStream(
238+
ctx context.Context,
239+
msgs <-chan events.Message,
240+
errs <-chan error,
241+
onMessage func(),
242+
errBackoff time.Duration,
243+
) (retry bool, err error) {
219244
for {
220245
select {
221246
case <-ctx.Done():
@@ -228,7 +253,7 @@ func consumeEventStream(ctx context.Context, msgs <-chan events.Message, errs <-
228253
select {
229254
case <-ctx.Done():
230255
return false, ctx.Err()
231-
case <-time.After(2 * time.Second):
256+
case <-time.After(errBackoff):
232257
}
233258
return true, nil
234259
case msg, ok := <-msgs:

internal/discovery/discovery_test.go

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,10 @@ func TestParseIP(t *testing.T) {
407407
}
408408
}
409409

410+
// testBackoff is short enough that ErrSignalRequestsRetry can wait the
411+
// full backoff and assert retry=true deterministically.
412+
const testBackoff = 1 * time.Millisecond
413+
410414
// Regression for the docker-socket-proxy connection leak: a single
411415
// open stream must serve many messages — consumeEventStream must not
412416
// return between messages, only on stream-end or ctx cancellation.
@@ -431,7 +435,7 @@ func TestConsumeEventStream_StaysOnSameStreamAcrossMessages(t *testing.T) {
431435

432436
retry, err := consumeEventStream(ctx, msgs, errs, func() {
433437
calls.Add(1)
434-
})
438+
}, testBackoff)
435439
if err != nil {
436440
t.Fatalf("unexpected error: %v", err)
437441
}
@@ -450,7 +454,7 @@ func TestConsumeEventStream_CtxCancelStopsLoop(t *testing.T) {
450454
ctx, cancel := context.WithCancel(context.Background())
451455
cancel() // pre-cancelled
452456

453-
retry, err := consumeEventStream(ctx, msgs, errs, func() {})
457+
retry, err := consumeEventStream(ctx, msgs, errs, func() {}, testBackoff)
454458
if retry {
455459
t.Error("retry should be false when ctx cancelled — caller should not reopen")
456460
}
@@ -462,21 +466,103 @@ func TestConsumeEventStream_CtxCancelStopsLoop(t *testing.T) {
462466
func TestConsumeEventStream_ErrSignalRequestsRetry(t *testing.T) {
463467
msgs := make(chan events.Message)
464468
errs := make(chan error, 1)
465-
466-
// Use a context with a small timeout so the post-error backoff
467-
// returns quickly without holding the test up for 2 s.
468-
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
469+
ctx, cancel := context.WithCancel(context.Background())
469470
defer cancel()
470471

471472
errs <- errors.New("stream interrupted")
472473

473474
retry, err := consumeEventStream(ctx, msgs, errs, func() {
474475
t.Error("onMessage must not be called on errs path")
475-
})
476-
// Backoff hit ctx-cancel rather than completing — that's fine, the
477-
// real-world caller already received the "retry after backoff"
478-
// instruction and will simply observe ctx.Err() next.
479-
if err == nil && !retry {
480-
t.Error("on err with no ctx-cancel mid-backoff we expect retry=true")
476+
}, testBackoff)
477+
if err != nil {
478+
t.Fatalf("unexpected error: %v", err)
479+
}
480+
if !retry {
481+
t.Error("retry should be true after errs signal so caller reopens")
482+
}
483+
}
484+
485+
// Regression for the docker-socket-proxy connection leak at the
486+
// reconnect-loop layer: source must be called once per *real*
487+
// disconnect, NOT once per event. The pre-fix shape opened a fresh
488+
// long-poll HTTP request on every message and leaked the previous
489+
// one's goroutine. This test would have failed loudly before commit
490+
// 47b445d (a 2nd stream would have been requested after the first
491+
// message).
492+
func TestRunEventLoop_OnlyReopensAfterStreamEnds(t *testing.T) {
493+
type stream struct {
494+
msgs chan events.Message
495+
errs chan error
496+
}
497+
streams := make(chan *stream, 4)
498+
499+
source := func(ctx context.Context) (<-chan events.Message, <-chan error) {
500+
s := &stream{
501+
msgs: make(chan events.Message, 16),
502+
errs: make(chan error, 1),
503+
}
504+
streams <- s
505+
return s.msgs, s.errs
506+
}
507+
508+
var eventCalls atomic.Int32
509+
consume := func(ctx context.Context, msgs <-chan events.Message, errs <-chan error) (bool, error) {
510+
for {
511+
select {
512+
case <-ctx.Done():
513+
return false, ctx.Err()
514+
case _, ok := <-msgs:
515+
if !ok {
516+
return true, nil // signal reopen on stream end
517+
}
518+
eventCalls.Add(1)
519+
}
520+
}
521+
}
522+
523+
ctx, cancel := context.WithCancel(context.Background())
524+
defer cancel()
525+
526+
done := make(chan error, 1)
527+
go func() { done <- runEventLoop(ctx, source, consume) }()
528+
529+
// First stream is opened eagerly.
530+
s1 := <-streams
531+
532+
// Pump 5 events through the SAME stream.
533+
for i := 0; i < 5; i++ {
534+
s1.msgs <- events.Message{Action: "start", Actor: events.Actor{ID: "abcdef0123456789"}}
535+
}
536+
waitFor(t, 2*time.Second, func() bool { return eventCalls.Load() == 5 })
537+
538+
// Central assertion: no new stream was requested while events were flowing.
539+
select {
540+
case <-streams:
541+
t.Fatal("source was called a second time mid-stream — leak fix regressed")
542+
case <-time.After(50 * time.Millisecond):
543+
}
544+
545+
// Close the stream → source MUST be called a second time.
546+
close(s1.msgs)
547+
select {
548+
case <-streams:
549+
// expected
550+
case <-time.After(2 * time.Second):
551+
t.Fatal("source was not called again after stream close — caller failed to reopen")
552+
}
553+
554+
cancel()
555+
<-done
556+
}
557+
558+
func waitFor(t *testing.T, timeout time.Duration, cond func() bool) {
559+
t.Helper()
560+
deadline := time.Now().Add(timeout)
561+
for time.Now().Before(deadline) {
562+
if cond() {
563+
return
564+
}
565+
time.Sleep(1 * time.Millisecond)
481566
}
567+
t.Fatalf("timed out waiting for condition after %s", timeout)
482568
}

0 commit comments

Comments
 (0)