diff --git a/benchmark/streamly-benchmarks.cabal b/benchmark/streamly-benchmarks.cabal index c56e7f9424..34815a1629 100644 --- a/benchmark/streamly-benchmarks.cabal +++ b/benchmark/streamly-benchmarks.cabal @@ -17,7 +17,7 @@ description: Benchmarks are separated from the main package because we flag fusion-plugin description: Use fusion plugin for benchmarks and executables manual: True - default: False + default: True flag limit-build-mem description: Limits memory when building the executables diff --git a/cabal.project b/cabal.project index c702bc9041..385bc915db 100644 --- a/cabal.project +++ b/cabal.project @@ -4,6 +4,9 @@ packages: streamly.cabal , benchmark/streamly-benchmarks.cabal , bench-test-lib/bench-test-lib.cabal +-- For compiling standalone programs +-- write-ghc-environment-files: always + -- For debugging heap overflow -- jobs: 1 diff --git a/core/src/Streamly/Internal/Data/Stream/Eliminate.hs b/core/src/Streamly/Internal/Data/Stream/Eliminate.hs index edfce3b297..e63ac1dfea 100644 --- a/core/src/Streamly/Internal/Data/Stream/Eliminate.hs +++ b/core/src/Streamly/Internal/Data/Stream/Eliminate.hs @@ -580,17 +580,25 @@ isSubsequenceOf (Stream stepa ta) (Stream stepb tb) = go SPEC Nothing' ta tb Skip sb' -> go SPEC (Just' x) sa sb' Stop -> return False --- | @stripPrefix prefix input@ strips the @prefix@ stream from the @input@ --- stream if it is a prefix of input. Returns 'Nothing' if the input does not --- start with the given prefix, stripped input otherwise. Returns @Just nil@ --- when the prefix is the same as the input stream. +-- NOTE: Unlike 'dropPrefix', which always returns a transformed stream, +-- this function returns @Maybe@ to indicate whether the prefix matched. + +-- | @stripPrefix prefix input@ strips the @prefix@ stream from the @input@ if +-- present. +-- +-- If the input begins with the given prefix, returns @Just@ the remaining +-- stream. If the input does not start with the prefix, returns 'Nothing'. +-- +-- It may consume both the streams partially up to the point of failure. -- --- Space: @O(1)@ +-- /Space:/ @O(1)@ -- {-# INLINE_NORMAL stripPrefix #-} stripPrefix :: (Monad m, Eq a) - => Stream m a -> Stream m a -> m (Maybe (Stream m a)) + => Stream m a -- ^ Prefix to remove + -> Stream m a -- ^ Input stream + -> m (Maybe (Stream m a)) -- ^ Remaining stream if prefix matches stripPrefix (Stream stepa ta) (Stream stepb tb) = go SPEC Nothing' ta tb where @@ -674,6 +682,9 @@ isSuffixOfUnbox :: (MonadIO m, Eq a, Unbox a) => isSuffixOfUnbox suffix stream = StreamD.reverseUnbox suffix `isPrefixOf` StreamD.reverseUnbox stream +-- XXX this buffers both streams. Buffering should be equal to the size of the +-- suffix. + -- | Drops the given suffix from a stream. Returns 'Nothing' if the stream does -- not end with the given suffix. Returns @Just nil@ when the suffix is the -- same as the stream. diff --git a/core/src/Streamly/Internal/Data/Stream/Parse.hs b/core/src/Streamly/Internal/Data/Stream/Parse.hs index 3cbfc7384e..aff5a3d2ae 100644 --- a/core/src/Streamly/Internal/Data/Stream/Parse.hs +++ b/core/src/Streamly/Internal/Data/Stream/Parse.hs @@ -71,8 +71,8 @@ module Streamly.Internal.Data.Stream.Parse , splitOnSuffixSeq -- internal , splitBeginBy_ - , splitEndBySeqOneOf - , splitSepBySeqOneOf + , splitEndByOneOf + , splitSepByOneOf -- * Transform (Nested Containers) -- | Opposite to compact in ArrayStream @@ -80,9 +80,11 @@ module Streamly.Internal.Data.Stream.Parse , splitInnerBySuffix -- XXX innerSplitOnSuffix -- * Reduce By Streams + , dropCommonPrefixBy , dropPrefix - , dropInfix + , dropMatches , dropSuffix + , replaceMatches -- * Deprecated , parseManyD @@ -97,8 +99,12 @@ where #include "ArrayMacros.h" import Control.Exception (assert) +import Control.Monad (zipWithM) import Control.Monad.IO.Class (MonadIO(..)) import Data.Bits (shiftR, shiftL, (.|.), (.&.)) +import Data.Functor.Identity (Identity(..), runIdentity) +import Data.List (groupBy, sortBy) +import Data.Ord (comparing, Down(..)) import Data.Proxy (Proxy(..)) import Data.Word (Word32) import Fusion.Plugin.Types (Fuse(..)) @@ -106,14 +112,17 @@ import GHC.Types (SPEC(..)) import Streamly.Internal.Data.Array.Type (Array(..)) import Streamly.Internal.Data.Fold.Type (Fold(..)) +import Streamly.Internal.Data.Maybe.Strict (Maybe'(..)) import Streamly.Internal.Data.MutArray.Type (MutArray(..)) +import Streamly.Internal.Data.MutByteArray.Type (MutByteArray) import Streamly.Internal.Data.Parser (ParseError(..), ParseErrorPos) import Streamly.Internal.Data.RingArray (RingArray(..)) -import Streamly.Internal.Data.SVar.Type (adaptState) +import Streamly.Internal.Data.SVar.Type (adaptState, defState) import Streamly.Internal.Data.Unbox (Unbox(..)) import qualified Streamly.Internal.Data.Array.Type as A import qualified Streamly.Internal.Data.MutArray.Type as MutArray +import qualified Streamly.Internal.Data.MutByteArray.Type as MutByteArray import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Parser as PR import qualified Streamly.Internal.Data.Parser as PRD @@ -1921,10 +1930,10 @@ splitEndBySeq_ = splitOnSuffixSeq False -- | Split post any one of the given patterns. -- -- /Unimplemented/ -{-# INLINE splitEndBySeqOneOf #-} -splitEndBySeqOneOf :: -- (Monad m, Unboxed a, Integral a) => +{-# INLINE splitEndByOneOf #-} +splitEndByOneOf :: -- (Monad m, Unboxed a, Integral a) => [Array a] -> Fold m a b -> Stream m a -> Stream m b -splitEndBySeqOneOf _subseq _f _m = undefined +splitEndByOneOf _subseq _f _m = undefined -- | Split on a prefixed separator element, dropping the separator. The -- supplied 'Fold' is applied on the split segments. @@ -2007,16 +2016,351 @@ splitBeginBy_ _predicate _f = undefined -- >>> splitList [1,2,3,3,4] [1,2,3,3,4] -- > [[],[]] --- This can be implemented easily using Rabin Karp --- | Split on any one of the given patterns. +{-# ANN type SplitOnSeqOneOfState Fuse #-} +data SplitOnSeqOneOfState fs s b = + SOOInit + | SOOYield b (SplitOnSeqOneOfState fs s b) + | SOODone + | SOOReinit (fs -> SplitOnSeqOneOfState fs s b) + + -- No matchable patterns: stream folded as a single segment + | SOOWholeInit !fs s + | SOOWhole !fs s + + -- Buffer-filling phase. The Bool is True iff this state was entered + -- after a match (post-match restart). On Stop with count=0: + -- primed=True → yield one empty fold result (matches splitSepBySeq_) + -- primed=False → yield nothing (pristine initial state) + | SOOFill !Bool !Int !fs s !MutByteArray ![Word32] + + -- Rolling phase, ring head at given byte offset + | SOOLoop !fs s !MutByteArray !Int ![Word32] + + -- After a match: fold n elements (mid-stream) starting at byte offset, + -- then finalize and re-init at SOOFill 0 + | SOOPrefold !Int !Int !fs !MutByteArray s + + -- At end of stream: fold n elements starting at byte offset, then + -- finalize and stop + | SOODrain !Int !Int !fs !MutByteArray + +-- | Split a stream on any one of the given infix separator patterns. Behaves +-- like 'splitSepBySeq_' generalized to multiple patterns. -- --- /Unimplemented/ +-- The supplied fold is applied on the inter-pattern segments. Matched +-- separators are dropped. Matching is left-to-right and non-overlapping; when +-- two patterns match ending at the same position, the longer (leftmost-start) +-- match wins. Empty patterns in the list are ignored. An empty pattern list +-- behaves as if no pattern can ever match — the entire stream is folded as a +-- single segment. -- -{-# INLINE splitSepBySeqOneOf #-} -splitSepBySeqOneOf :: -- (Monad m, Unboxed a, Integral a) => - [Array a] -> Fold m a b -> Stream m a -> Stream m b -splitSepBySeqOneOf _subseq _f _m = - undefined -- D.fromStreamD $ D.splitOnAny f subseq (D.toStreamD m) +-- >>> splitOneOf ps xs = Stream.fold Fold.toList $ Stream.splitSepByOneOf (Prelude.map Array.fromList ps) Fold.toList (Stream.fromList xs) +-- +-- >>> splitOneOf ["::", "->"] "a::b->c::d" +-- ["a","b","c","d"] +-- +-- >>> splitOneOf ["::"] "a::b::c" +-- ["a","b","c"] +-- +-- >>> splitOneOf ["x"] "abc" +-- ["abc"] +-- +-- >>> splitOneOf [] "abc" +-- ["abc"] +-- +-- -- this one currently fails +-- >> splitOneOf ["a", "ab"] "xaby" +-- ["x","y"] +-- +-- Space: @O(n)@ where n is the longest pattern length. +-- +{-# INLINE_NORMAL splitSepByOneOf #-} +splitSepByOneOf + :: forall m a b. (MonadIO m, Unbox a, Enum a) + => [Array a] + -> Fold m a b + -> Stream m a + -> Stream m b +splitSepByOneOf patArrs0 (Fold fstep initial _ final) (Stream step state) = + Stream stepOuter SOOInit + + where + + elemSize = SIZE_OF(a) + patArrs = filter (\arr -> A.length arr > 0) patArrs0 + + k :: Word32 + k = 2891336453 + + addCksum :: Word32 -> a -> Word32 + addCksum cksum a = cksum * k + fromIntegral (fromEnum a) + + -- (patLen elements, patBytes, k^patLen, [(hash, pattern)]) + -- Sorted by patLen descending for longest-match priority. + groups :: [(Int, Int, Word32, [(Word32, Array a)])] + groups = + let byLen = sortBy (comparing (Down . A.length)) patArrs + grouped = groupBy (\x y -> A.length x == A.length y) byLen + in Prelude.map mkGroup grouped + + where + + mkGroup [] = error "splitSepByOneOf: impossible empty group" + mkGroup ps@(p:_) = + let pLen = A.length p + pBytes = A.byteLength p + cf = k ^ pLen + hps = Prelude.map (\arr -> (A.foldl' addCksum 0 arr, arr)) ps + in (pLen, pBytes, cf, hps) + + initialHashes :: [Word32] + initialHashes = Prelude.map (const 0) groups + + maxLen = case groups of + [] -> 0 + ((l, _, _, _) : _) -> l + + maxBytes = maxLen * SIZE_OF(a) + + skip = return . Skip + + nextAfterInit nextGen stepRes = + case stepRes of + FL.Partial fs -> nextGen fs + FL.Done b -> SOOYield b (SOOReinit nextGen) + + yieldReinit nextGen b = + initial >>= skip . SOOYield b . nextAfterInit nextGen + + -- Phase 1 (Fill) hash update. count is the new count after inserting x. + -- Buffer is direct-indexed: position p at byte offset p * SIZE_OF(a). + updateHashesFill :: Int -> MutByteArray -> a -> [Word32] -> IO [Word32] + updateHashesFill count mba x = Control.Monad.zipWithM upd groups + where + upd (pLen, _, coeff, _) hash + | count <= pLen = pure (addCksum hash x) + | otherwise = do + let oldOffset = (count - pLen - 1) * SIZE_OF(a) + old :: a <- peekAt oldOffset mba + pure (addCksum hash x + - coeff * fromIntegral (fromEnum old)) + + -- Phase 2 (Loop) hash update. ringHead is the byte offset of the oldest + -- element BEFORE insertion. Reads the falling-out element for each L + -- (which for L=maxLen is the element about to be evicted at ringHead). + updateHashesLoop :: Int -> MutByteArray -> a -> [Word32] -> IO [Word32] + updateHashesLoop ringHead mba x = Control.Monad.zipWithM upd groups + where + upd (pLen, _, coeff, _) hash = do + let oldOffset = + (ringHead + (maxLen - pLen) * SIZE_OF(a)) + `mod` maxBytes + old :: a <- peekAt oldOffset mba + pure (addCksum hash x + - coeff * fromIntegral (fromEnum old)) + + -- Phase 1 match check. count is the new count after insertion. + -- Returns the matched pattern length (the longest one), or Nothing. + checkMatchesFill :: Int -> MutByteArray -> [Word32] -> IO (Maybe Int) + checkMatchesFill count mba = go groups + where + go [] _ = pure Nothing + go _ [] = pure Nothing + go ((pLen, pBytes, _, hps):gs) (hash:hs) + | count < pLen = go gs hs + | otherwise = do + m <- anyMatch hash pBytes hps + if m then pure (Just pLen) else go gs hs + + anyMatch _ _ [] = pure False + anyMatch h pb ((ph, pat):rest) + | h /= ph = anyMatch h pb rest + | otherwise = do + let pLenElems = pb `div` SIZE_OF(a) + startOffset = (countAtCheck - pLenElems) * SIZE_OF(a) + cmp <- MutByteArray.unsafeByteCmp + mba startOffset + (A.arrContents pat) (A.arrStart pat) + pb + if cmp == 0 + then pure True + else anyMatch h pb rest + where + countAtCheck = count + + -- Phase 2 match check. ringHead is the NEW ring head (after insertion). + -- Last L elements span the ring view starting at + -- (ringHead + (maxLen - L) * SIZE_OF(a)) mod maxBytes + -- with length pBytes. + checkMatchesLoop :: Int -> MutByteArray -> [Word32] -> IO (Maybe Int) + checkMatchesLoop ringHead mba = go groups + where + go [] _ = pure Nothing + go _ [] = pure Nothing + go ((pLen, pBytes, _, hps):gs) (hash:hs) = do + m <- anyMatch hash pLen pBytes hps + if m then pure (Just pLen) else go gs hs + + anyMatch _ _ _ [] = pure False + anyMatch h pLen pb ((ph, pat):rest) + | h /= ph = anyMatch h pLen pb rest + | otherwise = do + let viewHead = + (ringHead + (maxLen - pLen) * SIZE_OF(a)) + `mod` maxBytes + viewRing = RingArray + { ringContents = mba + , ringSize = maxBytes + , ringHead = viewHead + } + eq <- RB.eqArrayN viewRing pat pb + if eq + then pure True + else anyMatch h pLen pb rest + + {-# INLINE_LATE stepOuter #-} + stepOuter _ SOOInit = do + res <- initial + case res of + FL.Partial fs -> + if null groups + then skip $ SOOWholeInit fs state + else do + (MutArray mba _ _ _) :: MutArray a <- + liftIO $ MutArray.emptyOf maxLen + skip $ SOOFill False 0 fs state mba initialHashes + FL.Done b -> skip $ SOOYield b SOOInit + + stepOuter _ (SOOYield b next) = pure $ Yield b next + + stepOuter _ SOODone = pure Stop + + stepOuter _ (SOOReinit nextGen) = + initial >>= skip . nextAfterInit nextGen + + --------------------------- + -- No-pattern (Whole stream) path + --------------------------- + + stepOuter gst (SOOWholeInit fs st) = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + r <- fstep fs x + case r of + FL.Partial fs1 -> skip $ SOOWhole fs1 s + FL.Done b -> skip $ SOOYield b SOODone + Skip s -> skip $ SOOWholeInit fs s + Stop -> final fs >> pure Stop + + stepOuter gst (SOOWhole fs st) = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + r <- fstep fs x + case r of + FL.Partial fs1 -> skip $ SOOWhole fs1 s + FL.Done b -> skip $ SOOYield b SOODone + Skip s -> skip $ SOOWhole fs s + Stop -> do + b <- final fs + skip $ SOOYield b SOODone + + --------------------------- + -- Phase 1 — Fill + --------------------------- + + stepOuter gst (SOOFill primed count fs st mba hashes) = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + liftIO $ pokeAt (count * SIZE_OF(a)) mba x + let count' = count + 1 + hashes' <- liftIO $ updateHashesFill count' mba x hashes + matchRes <- liftIO $ checkMatchesFill count' mba hashes' + case matchRes of + Just pLen -> + skip $ SOOPrefold (count' - pLen) 0 fs mba s + Nothing -> + if count' == maxLen + then skip $ SOOLoop fs s mba 0 hashes' + else skip $ SOOFill primed count' fs s mba hashes' + Skip s -> skip $ SOOFill primed count fs s mba hashes + Stop + | count == 0 && not primed -> final fs >> pure Stop + | count == 0 -> do + b <- final fs + skip $ SOOYield b SOODone + | otherwise -> skip $ SOODrain count 0 fs mba + + --------------------------- + -- Phase 2 — Loop + --------------------------- + + stepOuter gst (SOOLoop fs st mba ringHead hashes) = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + evicted :: a <- liftIO $ peekAt ringHead mba + hashes' <- liftIO $ updateHashesLoop ringHead mba x hashes + liftIO $ pokeAt ringHead mba x + let ringHead' = (ringHead + elemSize) `mod` maxBytes + r <- fstep fs evicted + case r of + FL.Partial fs1 -> do + matchRes <- liftIO $ checkMatchesLoop ringHead' mba hashes' + case matchRes of + Just pLen -> + skip $ SOOPrefold + (maxLen - pLen) + ringHead' + fs1 + mba + s + Nothing -> + skip $ SOOLoop fs1 s mba ringHead' hashes' + FL.Done b -> do + let jump c = SOOFill True 0 c s mba initialHashes + yieldReinit jump b + Skip s -> skip $ SOOLoop fs s mba ringHead hashes + Stop -> skip $ SOODrain maxLen ringHead fs mba + + --------------------------- + -- Pre-fold (after a mid-stream match) + --------------------------- + + stepOuter _ (SOOPrefold 0 _ fs mba s) = do + b <- final fs + let jump c = SOOFill True 0 c s mba initialHashes + yieldReinit jump b + stepOuter _ (SOOPrefold remaining offset fs mba s) = do + old :: a <- liftIO $ peekAt offset mba + let nextOffset = (offset + SIZE_OF(a)) `mod` maxBytes + r <- fstep fs old + case r of + FL.Partial fs1 -> + skip $ SOOPrefold (remaining - 1) nextOffset fs1 mba s + FL.Done b -> do + let jump c = SOOPrefold (remaining - 1) nextOffset c mba s + yieldReinit jump b + + --------------------------- + -- Drain (end-of-stream) + --------------------------- + + stepOuter _ (SOODrain 0 _ fs _) = do + b <- final fs + skip $ SOOYield b SOODone + stepOuter _ (SOODrain remaining offset fs mba) = do + old :: a <- liftIO $ peekAt offset mba + let nextOffset = (offset + SIZE_OF(a)) `mod` maxBytes + r <- fstep fs old + case r of + FL.Partial fs1 -> + skip $ SOODrain (remaining - 1) nextOffset fs1 mba + FL.Done b -> do + let jump c = SOODrain (remaining - 1) nextOffset c mba + yieldReinit jump b ------------------------------------------------------------------------------ -- Nested Container Transformation @@ -2137,43 +2481,856 @@ splitInnerBySuffix isEmpty splitter joiner (Stream step1 state1) = -- Trimming ------------------------------------------------------------------------------ --- | Drop prefix from the input stream if present. +data DropCommonPrefixState sa sb b + = DCPMatching sa sb + | DCPMatchWith !b sa sb + | DCPPassThrough sb + +-- | Drop the longest common prefix of the prefix stream and the input stream, +-- returning the remaining input. +-- +-- Stops consuming the prefix as soon as either stream ends or a mismatch is +-- found. -- -- Space: @O(1)@ +{-# INLINE dropCommonPrefixBy #-} +dropCommonPrefixBy :: Monad m => + (b -> a -> Bool) -> Stream m a -> Stream m b -> Stream m b +dropCommonPrefixBy eq (Stream stepa ta) (Stream stepb tb) = + Stream step (DCPMatching ta tb) + + where + + {-# INLINE_LATE step #-} + step _ (DCPMatching sa sb) = do + r <- stepa defState sa + return $ case r of + Yield x sa' -> Skip (DCPMatchWith x sa' sb) + Skip sa' -> Skip (DCPMatching sa' sb) + Stop -> Skip (DCPPassThrough sb) + + step gst (DCPMatchWith x sa sb) = do + r <- stepb gst sb + return $ case r of + Yield y sb' -> + if y `eq` x + then Skip (DCPMatching sa sb') + else Yield y (DCPPassThrough sb') + Skip sb' -> Skip (DCPMatchWith x sa sb') + Stop -> Stop + + step gst (DCPPassThrough sb) = do + r <- stepb gst sb + return $ case r of + Yield x sb' -> Yield x (DCPPassThrough sb') + Skip sb' -> Skip (DCPPassThrough sb') + Stop -> Stop + +{-# ANN type DropPrefixState Fuse #-} +data DropPrefixState sa sb a + = DPInit sa sb !Int + | DPMatchWith !a sa sb !Int + | DPReplay sa !Int (Maybe' a) sb + | DPPassThrough sb + +-- | Drop a prefix stream from the input stream if present, otherwise return +-- the input stream unchanged. +-- +-- Unlike 'stripPrefix', which returns @Maybe@ to indicate whether the prefix +-- matched, this always returns a stream. If the prefix matches it is dropped, +-- otherwise the input is returned as-is. -- --- See also stripPrefix. +-- Space: @O(1)@ -- --- /Unimplemented/ +-- See also 'Streamly.Internal.Data.Stream.Eliminate.stripPrefix'. {-# INLINE dropPrefix #-} -dropPrefix :: - -- (Monad m, Eq a) => - Stream m a -> Stream m a -> Stream m a -dropPrefix = error "Not implemented yet!" +dropPrefix :: (Monad m, Eq a) => Stream Identity a -> Stream m a -> Stream m a +dropPrefix (Stream stepa ta) (Stream stepb tb) = + Stream step (DPInit ta tb 0) + + where --- | Drop all matching infix from the input stream if present. Infix stream --- may be consumed multiple times. + {-# INLINE_LATE step #-} + step _ (DPInit sa sb count) = + let r = runIdentity (stepa defState sa) + in return $ case r of + Yield x sa' -> Skip (DPMatchWith x sa' sb count) + Skip sa' -> Skip (DPInit sa' sb count) + Stop -> Skip (DPPassThrough sb) + + step gst (DPMatchWith x sa sb count) = do + r <- stepb gst sb + return $ case r of + Yield y sb' -> + if x == y + then Skip (DPInit sa sb' (count + 1)) + else Skip (DPReplay ta count (Just' y) sb') + Skip sb' -> Skip (DPMatchWith x sa sb' count) + Stop -> Skip (DPReplay ta count Nothing' sb) + + step _ (DPReplay _ 0 Nothing' _) = return Stop + step _ (DPReplay _ 0 (Just' y) sb) = return $ Yield y (DPPassThrough sb) + step _ (DPReplay sa n mismatch sb) = + let r = runIdentity (stepa defState sa) + in return $ case r of + Yield x sa' -> Yield x (DPReplay sa' (n - 1) mismatch sb) + Skip sa' -> Skip (DPReplay sa' n mismatch sb) + Stop -> Stop -- unreachable: n <= matched count <= prefix length + + step gst (DPPassThrough sb) = do + r <- stepb gst sb + return $ case r of + Yield x sb' -> Yield x (DPPassThrough sb') + Skip sb' -> Skip (DPPassThrough sb') + Stop -> Stop + +{-# ANN type DropInfixState Fuse #-} +data DropInfixState mba rb rh ck w s a = + DropInfixInit s + + | DropInfixEmpty s + + | DropInfixSingle s a + + | DropInfixWordInit Int !w s + | DropInfixWordLoop !w s + | DropInfixWordDone Int !w + + | DropInfixKRInit Int s mba + | DropInfixKRLoop s mba !rh !ck + | DropInfixKRCheck s mba !rh + | DropInfixKRDone Int rb + +-- | Drop all non-overlapping occurrences of the given sequence from the input +-- stream. -- --- Space: @O(n)@ where n is the length of the infix. +-- >>> dropMatches p xs = Stream.fold Fold.toList $ Stream.dropMatches (Array.fromList p) (Stream.fromList xs) -- --- See also stripInfix. +-- >>> dropMatches ".." "a..b..c" +-- "abc" -- --- /Unimplemented/ -{-# INLINE dropInfix #-} -dropInfix :: - -- (Monad m, Eq a) => - Stream m a -> Stream m a -> Stream m a -dropInfix = error "Not implemented yet!" - --- | Drop suffix from the input stream if present. Suffix stream may be --- consumed multiple times. +-- >>> dropMatches ".." "" +-- "" -- --- Space: @O(n)@ where n is the length of the suffix. +-- >>> dropMatches "" "abc" +-- "abc" -- --- See also stripSuffix. +-- >>> dropMatches ".." "abc" +-- "abc" -- --- /Unimplemented/ -{-# INLINE dropSuffix #-} -dropSuffix :: - -- (Monad m, Eq a) => - Stream m a -> Stream m a -> Stream m a -dropSuffix = error "Not implemented yet!" +-- >>> dropMatches "aa" "aaaa" +-- "" +-- +-- Space: @O(n)@ where n is the length of the infix. +-- +-- See also 'Streamly.Internal.Data.Stream.Eliminate.stripInfix'. +{-# INLINE_NORMAL dropMatches #-} +dropMatches + :: forall m a. (MonadIO m, Unbox a, Enum a, Eq a) + => Array a + -> Stream m a + -> Stream m a +dropMatches patArr (Stream step state) = + Stream stepOuter (DropInfixInit state) + + where + + patLen = A.length patArr + patBytes = A.byteLength patArr + maxIndex = patLen - 1 + maxOffset = patBytes - SIZE_OF(a) + elemBits = SIZE_OF(a) * 8 + + wordMask :: Word + wordMask = (1 `shiftL` (elemBits * patLen)) - 1 + + elemMask :: Word + elemMask = (1 `shiftL` elemBits) - 1 + + wordPat :: Word + wordPat = wordMask .&. A.foldl' addToWord 0 patArr + + addToWord wd a = (wd `shiftL` elemBits) .|. fromIntegral (fromEnum a) + + k = 2891336453 :: Word32 + coeff = k ^ patLen + + addCksum cksum a = cksum * k + fromIntegral (fromEnum a) + + deltaCksum cksum old new = + addCksum cksum new - coeff * fromIntegral (fromEnum old) + + patHash = A.foldl' addCksum 0 patArr + + skip = return . Skip + + {-# INLINE_LATE stepOuter #-} + stepOuter _ (DropInfixInit st) + | patLen == 0 = skip $ DropInfixEmpty st + | patLen == 1 = do + pat <- liftIO $ A.unsafeGetIndexIO 0 patArr + skip $ DropInfixSingle st pat + | SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) = + skip $ DropInfixWordInit 0 0 st + | otherwise = do + (MutArray mba _ _ _) :: MutArray a <- + liftIO $ MutArray.emptyOf patLen + skip $ DropInfixKRInit 0 st mba + + --------------------------- + -- Empty pattern + --------------------------- + + stepOuter gst (DropInfixEmpty st) = do + res <- step (adaptState gst) st + case res of + Yield x s -> return $ Yield x (DropInfixEmpty s) + Skip s -> skip (DropInfixEmpty s) + Stop -> return Stop + + ----------------- + -- Single Pattern + ----------------- + + stepOuter gst (DropInfixSingle st0 pat) = go SPEC st0 + + where + + go !_ !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> + if pat == x + then go SPEC s + else return $ Yield x (DropInfixSingle s pat) + Skip s -> go SPEC s + Stop -> return Stop + + --------------------------- + -- Short Pattern - Shift Or + --------------------------- + + stepOuter _ (DropInfixWordDone 0 _) = return Stop + stepOuter _ (DropInfixWordDone n wrd) = do + let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1))) + return + $ Yield + (toEnum $ fromIntegral old) + (DropInfixWordDone (n - 1) wrd) + + stepOuter gst (DropInfixWordInit idx0 wrd0 st0) = + go SPEC idx0 wrd0 st0 + + where + + {-# INLINE go #-} + go !_ !idx !wrd !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + let wrd1 = addToWord wrd x + if idx == maxIndex + then + if wrd1 .&. wordMask == wordPat + then skip $ DropInfixWordInit 0 0 s + else skip $ DropInfixWordLoop wrd1 s + else go SPEC (idx + 1) wrd1 s + Skip s -> go SPEC idx wrd s + Stop -> + if idx /= 0 + then skip $ DropInfixWordDone idx wrd + else return Stop + + stepOuter gst (DropInfixWordLoop wrd0 st0) = + go SPEC wrd0 st0 + + where + + {-# INLINE go #-} + go !_ !wrd !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + let wrd1 = addToWord wrd x + old = (wordMask .&. wrd) + `shiftR` (elemBits * (patLen - 1)) + oldA = toEnum (fromIntegral old) + if wrd1 .&. wordMask == wordPat + then return $ Yield oldA (DropInfixWordInit 0 0 s) + else return $ Yield oldA (DropInfixWordLoop wrd1 s) + Skip s -> go SPEC wrd s + Stop -> skip $ DropInfixWordDone patLen wrd + + ------------------------------- + -- General Pattern - Karp Rabin + ------------------------------- + + stepOuter gst (DropInfixKRInit offset st mba) = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + liftIO $ pokeAt offset mba x + if offset == maxOffset + then do + let arr :: Array a = Array + { arrContents = mba + , arrStart = 0 + , arrEnd = patBytes + } + let ringHash = A.foldl' addCksum 0 arr + if ringHash == patHash && A.byteEq arr patArr + then skip $ DropInfixKRCheck s mba 0 + else skip $ DropInfixKRLoop s mba 0 ringHash + else skip $ DropInfixKRInit (offset + SIZE_OF(a)) s mba + Skip s -> skip $ DropInfixKRInit offset s mba + Stop -> do + let rb = RingArray + { ringContents = mba + , ringSize = offset + , ringHead = 0 + } + skip $ DropInfixKRDone offset rb + + stepOuter gst (DropInfixKRLoop st0 mba rh0 cksum0) = + go SPEC st0 rh0 cksum0 + + where + + go !_ !st !rh !cksum = do + res <- step (adaptState gst) st + let rb = RingArray + { ringContents = mba + , ringSize = patBytes + , ringHead = rh + } + case res of + Yield x s -> do + (rb1, old) <- liftIO (RB.replace rb x) + let cksum1 = deltaCksum cksum old x + rh1 = ringHead rb1 + if cksum1 == patHash + then return $ Yield old (DropInfixKRCheck s mba rh1) + else return $ Yield old (DropInfixKRLoop s mba rh1 cksum1) + Skip s -> go SPEC s rh cksum + Stop -> skip $ DropInfixKRDone patBytes rb + + stepOuter _ (DropInfixKRCheck st mba rh) = do + let rb = RingArray + { ringContents = mba + , ringSize = patBytes + , ringHead = rh + } + res <- liftIO $ RB.eqArray rb patArr + if res + then skip $ DropInfixKRInit 0 st mba + else skip $ DropInfixKRLoop st mba rh patHash + + stepOuter _ (DropInfixKRDone 0 _) = return Stop + stepOuter _ (DropInfixKRDone len rb) = do + assert (len >= 0) (return ()) + old <- RB.unsafeGetHead rb + let rb1 = RB.moveForward rb + return $ Yield old (DropInfixKRDone (len - SIZE_OF(a)) rb1) + +{-# ANN type ReplaceInfixState Fuse #-} +data ReplaceInfixState mba rb rh ck w s a = + ReplaceInfixInit s + + | ReplaceInfixEmpty s + + | ReplaceInfixSingle s a + | ReplaceInfixEmitSingle Int s a + + | ReplaceInfixWordInit Int !w s + | ReplaceInfixWordLoop !w s + | ReplaceInfixWordDone Int !w + | ReplaceInfixEmitWord Int s + + | ReplaceInfixKRInit Int s mba + | ReplaceInfixKRLoop s mba !rh !ck + | ReplaceInfixKRCheck s mba !rh + | ReplaceInfixKRDone Int rb + | ReplaceInfixEmitKR Int s mba + +-- | Replace all non-overlapping occurrences of the given sequence in the input +-- stream with a replacement sequence. +-- +-- This is like 'dropMatches' except the matched pattern is replaced by the +-- supplied replacement array instead of being dropped. +-- +-- >>> replaceMatches p r xs = Stream.fold Fold.toList $ Stream.replaceMatches (Array.fromList p) (Array.fromList r) (Stream.fromList xs) +-- +-- >>> replaceMatches ".." "!" "a..b..c" +-- "a!b!c" +-- +-- >>> replaceMatches ".." "---" "a..b..c" +-- "a---b---c" +-- +-- >>> replaceMatches ".." "" "a..b..c" +-- "abc" +-- +-- >>> replaceMatches "" "X" "abc" +-- "abc" +-- +-- >>> replaceMatches "aa" "b" "aaaa" +-- "bb" +-- +-- >>> replaceMatches "abc" "xyz" "abc" +-- "xyz" +-- +-- Space: @O(n)@ where n is the length of the infix pattern. +-- +{-# INLINE_NORMAL replaceMatches #-} +replaceMatches + :: forall m a. (MonadIO m, Unbox a, Enum a, Eq a) + => Array a + -> Array a + -> Stream m a + -> Stream m a +replaceMatches patArr replArr (Stream step state) = + Stream stepOuter (ReplaceInfixInit state) + + where + + patLen = A.length patArr + patBytes = A.byteLength patArr + replLen = A.length replArr + maxIndex = patLen - 1 + maxOffset = patBytes - SIZE_OF(a) + elemBits = SIZE_OF(a) * 8 + + wordMask :: Word + wordMask = (1 `shiftL` (elemBits * patLen)) - 1 + + elemMask :: Word + elemMask = (1 `shiftL` elemBits) - 1 + + wordPat :: Word + wordPat = wordMask .&. A.foldl' addToWord 0 patArr + + addToWord wd a = (wd `shiftL` elemBits) .|. fromIntegral (fromEnum a) + + k = 2891336453 :: Word32 + coeff = k ^ patLen + + addCksum cksum a = cksum * k + fromIntegral (fromEnum a) + + deltaCksum cksum old new = + addCksum cksum new - coeff * fromIntegral (fromEnum old) + + patHash = A.foldl' addCksum 0 patArr + + skip = return . Skip + + {-# INLINE_LATE stepOuter #-} + stepOuter _ (ReplaceInfixInit st) + | patLen == 0 = skip $ ReplaceInfixEmpty st + | patLen == 1 = do + pat <- liftIO $ A.unsafeGetIndexIO 0 patArr + skip $ ReplaceInfixSingle st pat + | SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) = + skip $ ReplaceInfixWordInit 0 0 st + | otherwise = do + (MutArray mba _ _ _) :: MutArray a <- + liftIO $ MutArray.emptyOf patLen + skip $ ReplaceInfixKRInit 0 st mba + + --------------------------- + -- Empty pattern + --------------------------- + + stepOuter gst (ReplaceInfixEmpty st) = do + res <- step (adaptState gst) st + case res of + Yield x s -> return $ Yield x (ReplaceInfixEmpty s) + Skip s -> skip (ReplaceInfixEmpty s) + Stop -> return Stop + + ----------------- + -- Single Pattern + ----------------- + + stepOuter gst (ReplaceInfixSingle st0 pat) = go SPEC st0 + + where + + go !_ !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> + if pat == x + then skip $ ReplaceInfixEmitSingle 0 s pat + else return $ Yield x (ReplaceInfixSingle s pat) + Skip s -> go SPEC s + Stop -> return Stop + + stepOuter _ (ReplaceInfixEmitSingle i s pat) + | i >= replLen = skip $ ReplaceInfixSingle s pat + | otherwise = do + r <- liftIO $ A.unsafeGetIndexIO i replArr + return $ Yield r (ReplaceInfixEmitSingle (i + 1) s pat) + + --------------------------- + -- Short Pattern - Shift Or + --------------------------- + + stepOuter _ (ReplaceInfixWordDone 0 _) = return Stop + stepOuter _ (ReplaceInfixWordDone n wrd) = do + let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1))) + return + $ Yield + (toEnum $ fromIntegral old) + (ReplaceInfixWordDone (n - 1) wrd) + + stepOuter gst (ReplaceInfixWordInit idx0 wrd0 st0) = + go SPEC idx0 wrd0 st0 + + where + + {-# INLINE go #-} + go !_ !idx !wrd !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + let wrd1 = addToWord wrd x + if idx == maxIndex + then + if wrd1 .&. wordMask == wordPat + then skip $ ReplaceInfixEmitWord 0 s + else skip $ ReplaceInfixWordLoop wrd1 s + else go SPEC (idx + 1) wrd1 s + Skip s -> go SPEC idx wrd s + Stop -> + if idx /= 0 + then skip $ ReplaceInfixWordDone idx wrd + else return Stop + + stepOuter gst (ReplaceInfixWordLoop wrd0 st0) = + go SPEC wrd0 st0 + + where + + {-# INLINE go #-} + go !_ !wrd !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + let wrd1 = addToWord wrd x + old = (wordMask .&. wrd) + `shiftR` (elemBits * (patLen - 1)) + oldA = toEnum (fromIntegral old) + if wrd1 .&. wordMask == wordPat + then return $ Yield oldA (ReplaceInfixEmitWord 0 s) + else return $ Yield oldA (ReplaceInfixWordLoop wrd1 s) + Skip s -> go SPEC wrd s + Stop -> skip $ ReplaceInfixWordDone patLen wrd + + stepOuter _ (ReplaceInfixEmitWord i s) + | i >= replLen = skip $ ReplaceInfixWordInit 0 0 s + | otherwise = do + r <- liftIO $ A.unsafeGetIndexIO i replArr + return $ Yield r (ReplaceInfixEmitWord (i + 1) s) + + ------------------------------- + -- General Pattern - Karp Rabin + ------------------------------- + + stepOuter gst (ReplaceInfixKRInit offset st mba) = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + liftIO $ pokeAt offset mba x + if offset == maxOffset + then do + let arr :: Array a = Array + { arrContents = mba + , arrStart = 0 + , arrEnd = patBytes + } + let ringHash = A.foldl' addCksum 0 arr + if ringHash == patHash && A.byteEq arr patArr + then skip $ ReplaceInfixKRCheck s mba 0 + else skip $ ReplaceInfixKRLoop s mba 0 ringHash + else skip $ ReplaceInfixKRInit (offset + SIZE_OF(a)) s mba + Skip s -> skip $ ReplaceInfixKRInit offset s mba + Stop -> do + let rb = RingArray + { ringContents = mba + , ringSize = offset + , ringHead = 0 + } + skip $ ReplaceInfixKRDone offset rb + + stepOuter gst (ReplaceInfixKRLoop st0 mba rh0 cksum0) = + go SPEC st0 rh0 cksum0 + + where + + go !_ !st !rh !cksum = do + res <- step (adaptState gst) st + let rb = RingArray + { ringContents = mba + , ringSize = patBytes + , ringHead = rh + } + case res of + Yield x s -> do + (rb1, old) <- liftIO (RB.replace rb x) + let cksum1 = deltaCksum cksum old x + rh1 = ringHead rb1 + if cksum1 == patHash + then return $ Yield old (ReplaceInfixKRCheck s mba rh1) + else return $ Yield old (ReplaceInfixKRLoop s mba rh1 cksum1) + Skip s -> go SPEC s rh cksum + Stop -> skip $ ReplaceInfixKRDone patBytes rb + + stepOuter _ (ReplaceInfixKRCheck st mba rh) = do + let rb = RingArray + { ringContents = mba + , ringSize = patBytes + , ringHead = rh + } + res <- liftIO $ RB.eqArray rb patArr + if res + then skip $ ReplaceInfixEmitKR 0 st mba + else skip $ ReplaceInfixKRLoop st mba rh patHash + + stepOuter _ (ReplaceInfixKRDone 0 _) = return Stop + stepOuter _ (ReplaceInfixKRDone len rb) = do + assert (len >= 0) (return ()) + old <- RB.unsafeGetHead rb + let rb1 = RB.moveForward rb + return $ Yield old (ReplaceInfixKRDone (len - SIZE_OF(a)) rb1) + + stepOuter _ (ReplaceInfixEmitKR i s mba) + | i >= replLen = skip $ ReplaceInfixKRInit 0 s mba + | otherwise = do + r <- liftIO $ A.unsafeGetIndexIO i replArr + return $ Yield r (ReplaceInfixEmitKR (i + 1) s mba) + +{-# ANN type DropSuffixState Fuse #-} +data DropSuffixState mba rb rh w s a = + DropSuffixInit s + | DropSuffixDone + + | DropSuffixEmpty s + + | DropSuffixSingle s a + | DropSuffixSingleHold s a a + + | DropSuffixWordInit Int !w s + | DropSuffixWordLoop !w s + | DropSuffixWordDone Int !w + + | DropSuffixKRInit Int s mba + | DropSuffixKRLoop s mba !rh + | DropSuffixKRDone Int rb + +-- | Drop the suffix pattern from the input stream if it matches the tail of +-- the stream. Otherwise, the stream is passed through unchanged. +-- +-- >>> dropSuffix p xs = Stream.fold Fold.toList $ Stream.dropSuffix (Array.fromList p) (Stream.fromList xs) +-- +-- >>> dropSuffix ".." "ab.." +-- "ab" +-- +-- >>> dropSuffix ".." "abc" +-- "abc" +-- +-- >>> dropSuffix "" "abc" +-- "abc" +-- +-- >>> dropSuffix ".." "" +-- "" +-- +-- >>> dropSuffix "abc" "ab" +-- "ab" +-- +-- Space: @O(n)@ where n is the length of the suffix. +-- +-- See also 'Streamly.Internal.Data.Stream.Eliminate.stripSuffix'. +{-# INLINE_NORMAL dropSuffix #-} +dropSuffix + :: forall m a. (MonadIO m, Unbox a, Enum a, Eq a) + => Array a + -> Stream m a + -> Stream m a +dropSuffix patArr (Stream step state) = + Stream stepOuter (DropSuffixInit state) + + where + + patLen = A.length patArr + patBytes = A.byteLength patArr + maxIndex = patLen - 1 + maxOffset = patBytes - SIZE_OF(a) + elemBits = SIZE_OF(a) * 8 + + wordMask :: Word + wordMask = (1 `shiftL` (elemBits * patLen)) - 1 + + elemMask :: Word + elemMask = (1 `shiftL` elemBits) - 1 + + wordPat :: Word + wordPat = wordMask .&. A.foldl' addToWord 0 patArr + + addToWord wd a = (wd `shiftL` elemBits) .|. fromIntegral (fromEnum a) + + skip = return . Skip + + {-# INLINE_LATE stepOuter #-} + stepOuter _ (DropSuffixInit st) + | patLen == 0 = skip $ DropSuffixEmpty st + | patLen == 1 = do + pat <- liftIO $ A.unsafeGetIndexIO 0 patArr + skip $ DropSuffixSingle st pat + | SIZE_OF(a) * patLen <= sizeOf (Proxy :: Proxy Word) = + skip $ DropSuffixWordInit 0 0 st + | otherwise = do + (MutArray mba _ _ _) :: MutArray a <- + liftIO $ MutArray.emptyOf patLen + skip $ DropSuffixKRInit 0 st mba + + stepOuter _ DropSuffixDone = return Stop + + --------------------------- + -- Empty pattern + --------------------------- + + stepOuter gst (DropSuffixEmpty st) = do + res <- step (adaptState gst) st + case res of + Yield x s -> return $ Yield x (DropSuffixEmpty s) + Skip s -> skip (DropSuffixEmpty s) + Stop -> return Stop + + ----------------- + -- Single Pattern + ----------------- + + stepOuter gst (DropSuffixSingle st pat) = do + res <- step (adaptState gst) st + case res of + Yield x s -> skip $ DropSuffixSingleHold s x pat + Skip s -> skip $ DropSuffixSingle s pat + Stop -> return Stop + + stepOuter gst (DropSuffixSingleHold st0 held0 pat) = go SPEC st0 held0 + + where + + go !_ !st !held = do + res <- step (adaptState gst) st + case res of + Yield x s -> return $ Yield held (DropSuffixSingleHold s x pat) + Skip s -> go SPEC s held + Stop -> + if held == pat + then return Stop + else return $ Yield held DropSuffixDone + + --------------------------- + -- Short Pattern - Shift Or + --------------------------- + + stepOuter _ (DropSuffixWordDone 0 _) = return Stop + stepOuter _ (DropSuffixWordDone n wrd) = do + let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1))) + return + $ Yield + (toEnum $ fromIntegral old) + (DropSuffixWordDone (n - 1) wrd) + + stepOuter gst (DropSuffixWordInit idx0 wrd0 st0) = + go SPEC idx0 wrd0 st0 + + where + + {-# INLINE go #-} + go !_ !idx !wrd !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + let wrd1 = addToWord wrd x + if idx == maxIndex + then skip $ DropSuffixWordLoop wrd1 s + else go SPEC (idx + 1) wrd1 s + Skip s -> go SPEC idx wrd s + Stop -> + if idx /= 0 + then skip $ DropSuffixWordDone idx wrd + else return Stop + + stepOuter gst (DropSuffixWordLoop wrd0 st0) = + go SPEC wrd0 st0 + + where + + {-# INLINE go #-} + go !_ !wrd !st = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + let wrd1 = addToWord wrd x + old = (wordMask .&. wrd) + `shiftR` (elemBits * (patLen - 1)) + oldA = toEnum (fromIntegral old) + return $ Yield oldA (DropSuffixWordLoop wrd1 s) + Skip s -> go SPEC wrd s + Stop -> + if wrd .&. wordMask == wordPat + then return Stop + else skip $ DropSuffixWordDone patLen wrd + + ------------------------------- + -- General Pattern - Ring Buffer + ------------------------------- + + stepOuter gst (DropSuffixKRInit offset st mba) = do + res <- step (adaptState gst) st + case res of + Yield x s -> do + liftIO $ pokeAt offset mba x + if offset == maxOffset + then skip $ DropSuffixKRLoop s mba 0 + else skip $ DropSuffixKRInit (offset + SIZE_OF(a)) s mba + Skip s -> skip $ DropSuffixKRInit offset s mba + Stop -> do + let rb = RingArray + { ringContents = mba + , ringSize = offset + , ringHead = 0 + } + skip $ DropSuffixKRDone offset rb + + stepOuter gst (DropSuffixKRLoop st0 mba rh0) = + go SPEC st0 rh0 + + where + + go !_ !st !rh = do + res <- step (adaptState gst) st + let rb = RingArray + { ringContents = mba + , ringSize = patBytes + , ringHead = rh + } + case res of + Yield x s -> do + (rb1, old) <- liftIO (RB.replace rb x) + let rh1 = ringHead rb1 + return $ Yield old (DropSuffixKRLoop s mba rh1) + Skip s -> go SPEC s rh + Stop -> do + matches <- liftIO $ RB.eqArray rb patArr + if matches + then return Stop + else skip $ DropSuffixKRDone patBytes rb + + stepOuter _ (DropSuffixKRDone 0 _) = return Stop + stepOuter _ (DropSuffixKRDone len rb) = do + assert (len >= 0) (return ()) + old <- RB.unsafeGetHead rb + let rb1 = RB.moveForward rb + return $ Yield old (DropSuffixKRDone (len - SIZE_OF(a)) rb1) diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index d075a1094b..3dde91b070 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -169,6 +169,7 @@ module Streamly.Internal.Data.Stream.Type -- | These should probably be expressed using zipping operations. , eqBy , cmpBy + , takeCommonPrefixBy -- * Utilities , splitAt @@ -883,6 +884,33 @@ eqBy eq (Stream step1 t1) (Stream step2 t2) = eq_loop0 SPEC t1 t2 Skip s2' -> eq_null s2' Stop -> return True +-- | See also the complementary function of this which is dropCommonPrefixBy. +-- +{-# INLINE_NORMAL takeCommonPrefixBy #-} +takeCommonPrefixBy :: Monad m => + (a -> b -> Bool) -> Stream m b -> Stream m a -> Stream m a +takeCommonPrefixBy eq (Stream step2 t2) (Stream step1 t1) = + Stream step (t1, t2, Nothing') + + where + + step _ (s1, s2, Nothing') = do + r <- step1 defState s1 + case r of + Yield x s1' -> return $ Skip (s1', s2, Just' x) + Skip s1' -> return $ Skip (s1', s2, Nothing') + Stop -> return Stop + + step _ (s1, s2, Just' x) = do + r <- step2 defState s2 + case r of + Yield y s2' -> + if eq x y + then return $ Yield x (s1, s2', Nothing') + else return Stop + Skip s2' -> return $ Skip (s1, s2', Just' x) + Stop -> return Stop + -- Adapted from the vector package. -- | Compare two streams lexicographically. diff --git a/core/src/Streamly/Internal/FileSystem/DirIO.hs b/core/src/Streamly/Internal/FileSystem/DirIO.hs index 2d159e436e..04dd1a5b7b 100644 --- a/core/src/Streamly/Internal/FileSystem/DirIO.hs +++ b/core/src/Streamly/Internal/FileSystem/DirIO.hs @@ -8,20 +8,29 @@ -- Maintainer : streamly@composewell.com -- Portability : GHC -- --- API Design notes: --- --- The paths returned by "read" can be absolute (/usr/bin/ls), relative to --- current directory (./bin/ls) or path segments relative to current dir --- (bin/ls). To accomodate all the cases we can provide a prefix to attach --- to the paths being generated. Alternatively, we could take the approach --- of the higher layer doing that, but it is more efficient to allocate the --- path buffer once rather than modifying it later. We can do this by --- passing a fold to transform the output. --- --- Also it may be more efficient to apply a filter to the paths right here --- instead of applying it in a layer above. Cut the output at the source --- rather than generate and then discard it later. We can do this by --- passing a fold to filter the input. +-- Directory traversal API design notes: +-- +-- Paths returned by readdir: +-- -------------------------- +-- The paths returned by "read" can be absolute (/x/y/z), relative to current +-- directory e.g. if current dir is /x then path is (./y/z), or path segments +-- relative to current dir e.g. (y/z). To accomodate all the cases we can +-- provide a prefix to readdir to attach to the paths being generated e.g. the +-- prefix would be "/x" in the previous example. Alternatively, we could take +-- the approach of the higher layer doing that, but it is more efficient to +-- allocate the path buffer once rather than modifying it later. We can do this +-- by passing a fold to transform the output. +-- +-- Filtering of paths: +-- ------------------- +-- +-- It may be more efficient to apply a filter to the paths during readdir API +-- itself instead of applying it in a layer above. That way we cut the output +-- at the source rather than generate and then discard it later. We can do this +-- by passing a fold to filter the input. +-- +-- Symlink Resolution: +-- ------------------- -- -- When reading a symlink directory we can resolve the symlink and read the -- destination directory or we can just emit the file it is pointing to and @@ -32,6 +41,9 @@ -- counted as depth level increment whereas if we resolve that at lower level -- then it won't. We can do this by passing an option to modify the behavior. -- +-- Cyclic paths: +-- ------------- +-- -- When resolving cyclic directory symlinks one way to curtail it is ELOOP -- which gives up if it encounters too many level. Another way is to use -- the inode information to check if we are traversing an already traversed @@ -39,6 +51,9 @@ -- ELOOP by passing an option but it may be inefficient because we may -- encounter the loop from any node in the cycle. -- +-- Broken links and permission issues: +-- ----------------------------------- +-- -- If we encounter an error reading a directory because of permission -- issues should we ignore it in this low level API or catch it in the -- higher level traversal functionality? Similarly, if there are broken @@ -48,12 +63,12 @@ -- do this by passing an option. -- -- Returning the metadata: +-- ----------------------- -- -- Specific scans can be used to return the metadata in the output stream if --- needed. However, we may need three different APIs: --- one with fast metadata, and --- another with full metadata. In the two cases the fold input would be --- different. +-- needed. However, we may need three different APIs: without metadata, with +-- fast metadata, and with full metadata. In each case the fold input would +-- be different. -- -- * readMinimal: read only the path names, no metadata -- * readStandard: read the path and minimal metadata @@ -62,8 +77,13 @@ -- NOTE: Full metadata can be read by mapping a stat call to a stream of paths -- rather than via readdir API. Does it help the performance to do it in the -- readdir API? - --- Design pattern: +-- +-- Passing a scan to the readdir operation: +-- ---------------------------------------- +-- +-- There are two approaches to traversal and filtering. (1) Read the attributes +-- as data and provide it to a high level traversal handler to do filtering etc. +-- (2) pass a fold or scan to the reader itself which does the filtering. -- -- By passing a scan we can process the output right at the source and produce -- a cooked output. Otherwise we may have to produce a stream of intermediate @@ -71,12 +91,23 @@ -- get eliminated by fusion. For example, a fold can directly write the CString -- from readdir to the output buffer whereas if we output the Path then we will -- incur an overhead of intermediate structure. +-- +-- Filesystem functionality modules: +-- --------------------------------- +-- +-- This DirIO module mainly provides "readdir" functionality. File stat +-- functionality is coupled to readdir because we may return file stats along +-- with filepaths, or may provide functionality to filter based on stats. +-- +-- The FileTest module in streamly-coreutils provides the file stat read +-- operations, we may have to bring that here if some coupling with readdir is +-- needed. +-- +-- FileIO module provides regular file create operation. +-- module Streamly.Internal.FileSystem.DirIO ( - -- XXX Create a Metadata or Meta module for stat, access, getxattr, chmod, - -- chown, utime, rename operations. - -- -- * Metadata -- getMetadata GetMetadata (followSymlinks, noAutoMount - see fstatat) diff --git a/core/src/Streamly/Internal/FileSystem/Path.hs b/core/src/Streamly/Internal/FileSystem/Path.hs index bf07bfef4d..b73d8c5ee0 100644 --- a/core/src/Streamly/Internal/FileSystem/Path.hs +++ b/core/src/Streamly/Internal/FileSystem/Path.hs @@ -6,6 +6,24 @@ -- Maintainer : streamly@composewell.com -- Portability : GHC -- +-- The API in this module is equivalent to or can emulate all or most of +-- the filepath package API. It has some differences from the filepath +-- package: +-- +-- 1. Empty paths are not allowed. Paths are validated before construction. +-- 2. The default Path type itself affords considerable safety regarding the +-- distinction of rooted or non-rooted paths, it also allows distinguishing +-- directory and file paths. +-- 3. It is designed to provide flexible typing to provide compile time safety +-- for rooted/non-rooted paths and file/dir paths. The Path type is just part +-- of that typed path ecosystem. Though the default Path type itself should be +-- enough for most cases. +-- 4. It leverages the streamly array module for most of the heavy lifting, +-- it is a thin wrapper on top of that, improving maintainability as well as +-- providing better performance. We can have pinned and unpinned paths, also +-- provide lower level operations for certain cases to interact more +-- efficiently with low level code. +-- -- == References -- -- * https://en.wikipedia.org/wiki/Path_(computing) diff --git a/core/src/Streamly/Internal/FileSystem/PosixPath.hs b/core/src/Streamly/Internal/FileSystem/PosixPath.hs index 424f373b81..2c186b692e 100644 --- a/core/src/Streamly/Internal/FileSystem/PosixPath.hs +++ b/core/src/Streamly/Internal/FileSystem/PosixPath.hs @@ -1,6 +1,20 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE TemplateHaskell #-} +-- CAUTION! Do not start a module haddock comment here as this file gets +-- included in Path.hs and then we have duplicate module level comment in that +-- file, generating a haddock warning. +-- +-- See Internal.FileSystem.Path for module level docs. +-- +-- This file is preprocessed and included in Internal.FileSystem.Path module. +-- The preprocessor replaces the macros by OS specific values. OS_PATH_TYPE +-- macro represents the file system path type and OS_NAME the operating system. +-- The only assumption about the encoding of the path is that it maps the +-- characters SEPARATORS and @.@ to OS_WORD_TYPE representing their ASCII +-- values. Operations are provided to encode and decode using CODEC_NAME +-- encoding. + #if defined(IS_PORTABLE) #define OS_PATH_TYPE Path #define OS_WORD_TYPE OsWord @@ -41,41 +55,6 @@ #define SEPARATORS @/@ #endif - --- XXX Do not start a module haddock comment here as this file gets included in --- Path.hs and then we have duplicate module level comment in that file, --- generating a haddock warning. - --- Module : Streamly.Internal.FileSystem.OS_PATH_TYPE --- Copyright : (c) 2023 Composewell Technologies --- License : BSD3 --- Maintainer : streamly@composewell.com --- Portability : GHC --- --- This module implements a OS_PATH_TYPE type representing a file system path for --- OS_NAME operating systems. The only assumption about the encoding of the --- path is that it maps the characters SEPARATORS and @.@ to OS_WORD_TYPE --- representing their ASCII values. Operations are provided to encode and --- decode using CODEC_NAME encoding. --- --- This module has APIs that are equivalent to or can emulate all or most of --- the filepath package APIs. It has some differences from the filepath --- package: --- --- 1. Empty paths are not allowed. Paths are validated before construction. --- 2. The default Path type itself affords considerable safety regarding the --- distinction of rooted or non-rooted paths, it also allows distinguishing --- directory and file paths. --- 3. It is designed to provide flexible typing to provide compile time safety --- for rooted/non-rooted paths and file/dir paths. The Path type is just part --- of that typed path ecosystem. Though the default Path type itself should be --- enough for most cases. --- 4. It leverages the streamly array module for most of the heavy lifting, --- it is a thin wrapper on top of that, improving maintainability as well as --- providing better performance. We can have pinned and unpinned paths, also --- provide lower level operations for certain cases to interact more --- efficinetly with low level code. - module Streamly.Internal.FileSystem.OS_PATH_TYPE ( -- * Setup @@ -1471,6 +1450,15 @@ ignoreCase val conf = conf { _ignoreCase = val } -- convey that when it is False "./x" and "./x" are not strictly equal. -- Similarly, "treatDotRootsEqual" has a problem with the "./x" and "x" -- comparison, there is not dor root in the second path. +-- +-- We can think of paths having scopes: +-- * Scope currentDirectory: "." is fixed so ./x has an unambiguous meaning +-- * Scope currentSystem: "/" is fixed so /x has an unambiguous meaning, but +-- "./x" may be ambiguous. +-- * Scope universal: "server://" is fixed but "/" is ambiguous +-- +-- So we can possibly specify a scope (Directory/System/Universe) for +-- comparison rather than using allowRelativeEquality. -- | Allow relative paths to be treated as equal. When this is 'False' relative -- paths will never match even if they are literally equal e.g. "./x" will not @@ -1580,8 +1568,8 @@ eqPath cfg (OS_PATH a) (OS_PATH b) = Common.OS_NAME (cfg eqCfg) a b #endif --- | Check two paths for byte level equality. This is the most strict path --- equality check. +-- | Check two paths for byte level equality. This is the most strict and +-- fastest path equality check. -- -- >>> eqPath a b = Path.eqPathBytes (Path.fromString_ a) (Path.fromString_ b) --