Skip to content

Commit 670399b

Browse files
johnhamptonharendra-kumar
authored andcommitted
fix: resolve parDemuxScan deadlock when worker throws with full buffer
Signal the driver thread when a worker exception occurs so it can wake up from blocking on a full input buffer. Previously sendExceptionToDriver only wrote to the output queue, leaving the driver stuck on takeMVar. - Set closedForInput and signal inputSpaceDoorBell in sendExceptionToDriver - Add regression test with slow single-key worker that throws mid-stream - Test confirms original exception propagates instead of BlockedIndefinitelyOnMVar
1 parent d1b75d4 commit 670399b

3 files changed

Lines changed: 43 additions & 0 deletions

File tree

docs/User/Project/Changelog.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44

55
## Unreleased
66

7+
### Bug Fixes
8+
9+
* Fix `parDemuxScan` deadlock when a worker throws an exception while the
10+
driver is blocked on a full input buffer.
11+
12+
### Breaking Changes
13+
714
* The following deprecated modules have been removed:
815
* Streamly.Data.Array.Foreign
916
* Streamly.Data.Fold.Tee

src/Streamly/Internal/Data/Fold/Channel/Type.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,8 @@ sendEOFToDriver sv = liftIO $ do
313313
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
314314
sendExceptionToDriver sv e = do
315315
tid <- myThreadId
316+
writeIORef (closedForInput sv) True
317+
void $ tryPutMVar (inputSpaceDoorBell sv) ()
316318
void $ sendToDriver sv (FoldException tid e)
317319

318320
data FromSVarState m a b =

test/Streamly/Test/Data/Scanl/Concurrent.hs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
module Streamly.Test.Data.Scanl.Concurrent (main) where
1111

1212
import Control.Concurrent (threadDelay)
13+
import Control.Exception (ErrorCall(..), try)
1314
import Data.Function ( (&) )
1415
import Data.IORef (newIORef, atomicModifyIORef')
1516
import Data.List (sort)
@@ -103,6 +104,37 @@ parDemuxScan_StreamEnd concOpts = do
103104
map snd (filter fst res) `shouldBe` filter even [1..streamLen]
104105
map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen]
105106

107+
parDemuxScan_WorkerException :: (Scanl.Config -> Scanl.Config) -> IO ()
108+
parDemuxScan_WorkerException concOpts = do
109+
let throwAfter = 3
110+
-- All items go to the same key so the driver stays in sendToWorker_
111+
-- for a single worker channel, maximizing the chance of blocking on a
112+
-- full buffer.
113+
demuxer _ = (0 :: Int)
114+
gen _ = pure
115+
$ Scanl.lmapM
116+
(\x -> do
117+
-- Slow the worker down so the buffer fills up and the
118+
-- driver blocks in sendToWorker_ waiting for space.
119+
threadDelay 50000
120+
if (x :: Int) > throwAfter
121+
then error "worker exception"
122+
else pure x)
123+
$ Scanl.mkScanl1 (\_ x -> x)
124+
-- Send enough items to fill the buffer (maxBuffer 1) and block
125+
inpList = [1..100]
126+
inpStream = Stream.fromList inpList
127+
res <- try
128+
$ Scanl.parDemuxScanM concOpts demuxer gen inpStream
129+
& Stream.concatMap Stream.fromList
130+
& fmap (\x -> (fst x,) <$> snd x)
131+
& Stream.catMaybes
132+
& Stream.fold Fold.toList
133+
case res of
134+
Left (ErrorCall msg) -> msg `shouldBe` "worker exception"
135+
Right _ -> expectationFailure
136+
"Expected ErrorCall exception but stream completed successfully"
137+
106138
main :: IO ()
107139
main = hspec
108140
$ H.parallel
@@ -118,3 +150,5 @@ main = hspec
118150
$ parDemuxScan_StreamEnd (Scanl.maxBuffer 1)
119151
it "parDemuxScanM (scan end) (maxBuffer 1)"
120152
$ parDemuxScan_ScanEnd (Scanl.maxBuffer 1)
153+
it "parDemuxScanM (worker exception) (maxBuffer 1)"
154+
$ parDemuxScan_WorkerException (Scanl.maxBuffer 1)

0 commit comments

Comments
 (0)