@@ -134,6 +134,12 @@ module Streaming.Prelude (
134134 , show
135135 , cons
136136 , slidingWindow
137+ , slidingWindowMin
138+ , slidingWindowMinBy
139+ , slidingWindowMinOn
140+ , slidingWindowMax
141+ , slidingWindowMaxBy
142+ , slidingWindowMaxOn
137143 , wrapEffect
138144
139145 -- * Splitting and inspecting streams of elements
@@ -272,6 +278,7 @@ import qualified Data.Foldable as Foldable
272278import qualified Data.IntSet as IntSet
273279import qualified Data.Sequence as Seq
274280import qualified Data.Set as Set
281+ import Data.Word (Word64 )
275282import qualified GHC.IO.Exception as G
276283import qualified Prelude
277284import qualified System.IO as IO
@@ -2871,6 +2878,216 @@ slidingWindow n = setup (max 1 n :: Int) mempty
28712878 Right (x,rest) -> setup (m- 1 ) (sequ Seq. |> x) rest
28722879{-# INLINABLE slidingWindow #-}
28732880
2881+ -- | 'slidingWindowMin' finds the minimum in every sliding window of @n@
2882+ -- elements of a stream. If within a window there are multiple elements that are
2883+ -- the least, it prefers the first occurrence (if you prefer to have the last
2884+ -- occurrence, use the max version and flip your comparator). It satisfies:
2885+ --
2886+ -- @
2887+ -- 'slidingWindowMin' n s = 'map' 'Foldable.minimum' ('slidingWindow' n s)
2888+ -- @
2889+ --
2890+ -- Except that it is far more efficient, especially when the window size is
2891+ -- large: it calls 'compare' /O(m)/ times overall where /m/ is the total number
2892+ -- of elements in the stream.
2893+ slidingWindowMin :: (Monad m , Ord a ) => Int -> Stream (Of a ) m b -> Stream (Of a ) m b
2894+ slidingWindowMin = slidingWindowMinBy compare
2895+ {-# INLINE slidingWindowMin #-}
2896+
2897+ -- | 'slidingWindowMax' finds the maximum in every sliding window of @n@
2898+ -- elements of a stream. If within a window there are multiple elements that are
2899+ -- the largest, it prefers the last occurrence (if you prefer to have the first
2900+ -- occurrence, use the min version and flip your comparator). It satisfies:
2901+ --
2902+ -- @
2903+ -- 'slidingWindowMax' n s = 'map' 'Foldable.maximum' ('slidingWindow' n s)
2904+ -- @
2905+ --
2906+ -- Except that it is far more efficient, especially when the window size is
2907+ -- large: it calls 'compare' /O(m)/ times overall where /m/ is the total number
2908+ -- of elements in the stream.
2909+ slidingWindowMax :: (Monad m , Ord a ) => Int -> Stream (Of a ) m b -> Stream (Of a ) m b
2910+ slidingWindowMax = slidingWindowMaxBy compare
2911+ {-# INLINE slidingWindowMax #-}
2912+
2913+ -- | 'slidingWindowMinBy' finds the minimum in every sliding window of @n@
2914+ -- elements of a stream according to the given comparison function (which should
2915+ -- define a total ordering). See notes above about elements that are equal. It
2916+ -- satisfies:
2917+ --
2918+ -- @
2919+ -- 'slidingWindowMinBy' f n s = 'map' ('Foldable.minimumBy' f) ('slidingWindow' n s)
2920+ -- @
2921+ --
2922+ -- Except that it is far more efficient, especially when the window size is
2923+ -- large: it calls the comparison function /O(m)/ times overall where /m/ is the
2924+ -- total number of elements in the stream.
2925+ slidingWindowMinBy :: Monad m => (a -> a -> Ordering ) -> Int -> Stream (Of a ) m b -> Stream (Of a ) m b
2926+ slidingWindowMinBy cmp = slidingWindowOrd id (\ a b -> cmp a b == GT )
2927+ {-# INLINE slidingWindowMinBy #-}
2928+
2929+ -- | 'slidingWindowMaxBy' finds the maximum in every sliding window of @n@
2930+ -- elements of a stream according to the given comparison function (which should
2931+ -- define a total ordering). See notes above about elements that are equal. It
2932+ -- satisfies:
2933+ --
2934+ -- @
2935+ -- 'slidingWindowMaxBy' f n s = 'map' ('Foldable.maximumBy' f) ('slidingWindow' n s)
2936+ -- @
2937+ --
2938+ -- Except that it is far more efficient, especially when the window size is
2939+ -- large: it calls the comparison function /O(m)/ times overall where /m/ is the
2940+ -- total number of elements in the stream.
2941+ slidingWindowMaxBy :: Monad m => (a -> a -> Ordering ) -> Int -> Stream (Of a ) m b -> Stream (Of a ) m b
2942+ slidingWindowMaxBy cmp = slidingWindowOrd id (\ a b -> cmp a b /= GT )
2943+ {-# INLINE slidingWindowMaxBy #-}
2944+
2945+ -- | 'slidingWindowMinOn' finds the minimum in every sliding window of @n@
2946+ -- elements of a stream according to the given projection function. See notes
2947+ -- above about elements that are equal. It satisfies:
2948+ --
2949+ -- @
2950+ -- 'slidingWindowMinOn' f n s = 'map' ('Foldable.minimumOn' ('comparing' f)) ('slidingWindow' n s)
2951+ -- @
2952+ --
2953+ -- Except that it is far more efficient, especially when the window size is
2954+ -- large: it calls 'compare' on the projected value /O(m)/ times overall where
2955+ -- /m/ is the total number of elements in the stream, and it calls the
2956+ -- projection function exactly /m/ times.
2957+ slidingWindowMinOn :: (Monad m , Ord p ) => (a -> p ) -> Int -> Stream (Of a ) m b -> Stream (Of a ) m b
2958+ slidingWindowMinOn proj = slidingWindowOrd proj (\ a b -> compare a b == GT )
2959+ {-# INLINE slidingWindowMinOn #-}
2960+
2961+ -- | 'slidingWindowMaxOn' finds the maximum in every sliding window of @n@
2962+ -- elements of a stream according to the given projection function. See notes
2963+ -- above about elements that are equal. It satisfies:
2964+ --
2965+ -- @
2966+ -- 'slidingWindowMaxOn' f n s = 'map' ('Foldable.maximumOn' ('comparing' f)) ('slidingWindow' n s)
2967+ -- @
2968+ --
2969+ -- Except that it is far more efficient, especially when the window size is
2970+ -- large: it calls 'compare' on the projected value /O(m)/ times overall where
2971+ -- /m/ is the total number of elements in the stream, and it calls the
2972+ -- projection function exactly /m/ times.
2973+ slidingWindowMaxOn :: (Monad m , Ord p ) => (a -> p ) -> Int -> Stream (Of a ) m b -> Stream (Of a ) m b
2974+ slidingWindowMaxOn proj = slidingWindowOrd proj (\ a b -> compare a b /= GT )
2975+ {-# INLINE slidingWindowMaxOn #-}
2976+
2977+ -- IMPLEMENTATION NOTE [the slidingWindow{Min,Max} functions]
2978+ --
2979+ -- When one wishes to compute the minimum (or maximum; without loss of
2980+ -- generality we will only discuss the minimum case) of a sliding window of a
2981+ -- stream, the naive method is to collect all such sliding windows, and then run
2982+ -- 'Foldable.minimum' over each window. The issue is that this algorithm is very
2983+ -- inefficient: if the stream has $n$ elements, and the sliding window has $k$
2984+ -- elements, then there are $n-k+1$ windows, and computing the minimum in each
2985+ -- window requires $k-1$ comparisons. So a total of $(k-1)*(n-k+1)$ comparisons
2986+ -- are needed, or simply $O(nk)$ when $k$ is much smaller than $n$.
2987+ --
2988+ -- We can motivate an improvement as follows. Suppose the window size is 3 and
2989+ -- the current sliding window has numbers 4, 6, 8; if the next element happens
2990+ -- to be 5, there would then be no need to keep the numbers 6 and 8 in the
2991+ -- window. Because in the next window we have 6, 8, 5 so 5 will be yielded. In
2992+ -- the window after the next we have 8, 5, x so either 5 or x will be yielded.
2993+ -- So 6 and 8 will never be yielded.
2994+ --
2995+ -- This leads to the idea that we can keep a window that is a subsequence of the
2996+ -- actual window. But immediately the next problem is, if we don't keep a window
2997+ -- of the original window size, there would be no way for us to tell which
2998+ -- elements are out of the window. So the idea is to also keep an index of the
2999+ -- item along with the item itself. We then have several important invariants:
3000+ --
3001+ -- (a) The window is sorted according to the index.
3002+ -- (b) The window is sorted according to the item itself.
3003+ -- (c) The size of the window never has more elements than $k$.
3004+ --
3005+ -- The window is initially empty. The three-step algorithm to update the window
3006+ -- maintains these invariants.
3007+ --
3008+ -- The overall asymptotic complexity is great. Comparisons only happen in the
3009+ -- first part of the update. Each comparison either results in an element being
3010+ -- removed from the window (so there can be at most $O(n)$ such comparisons); or
3011+ -- that comparison does not result in an element being removed, but such
3012+ -- comparisons happen at most once for each element being inserted, which is
3013+ -- also $O(n)$. This means an overall $O(n)$ number of comparisons needed.
3014+ --
3015+ -- I did not invent this algorithm; it's pretty well-known. I'm not sure the
3016+ -- algorithm has a name.
3017+ slidingWindowOrd :: Monad m => (a -> p ) -> (p -> p -> Bool ) -> Int -> Stream (Of a ) m b -> Stream (Of a ) m b
3018+ slidingWindowOrd proj f n =
3019+ dropButRetainAtLeastOne (k- 1 ) . catMaybes . scan update initial extract
3020+ -- The use of dropButRetainAtLeastOne is to gracefully handle edge cases where
3021+ -- the window size is bigger than the length of the entire sequence.
3022+ where
3023+ k = max 1 n -- window size
3024+ initial = SlidingWindowOrdState 0 mempty
3025+ -- All three invariants are satisfied initially. The window is trivially
3026+ -- sorted because it is empty. Its size, zero, is also less than the window
3027+ -- size.
3028+ update (SlidingWindowOrdState i w0) a =
3029+ let projected = proj a
3030+ w1 = Seq. dropWhileR (\ (SlidingWindowOrdElement _ _ p) -> f p projected) w0
3031+ -- Step 1: pop all elements at the back greater than the current one,
3032+ -- because they will never be yielded: the current element will always be
3033+ -- yielded until those popped elements go out of the window. This is
3034+ -- extracting a subsequence of the window, so invariants (a) and (b)
3035+ -- remain satisfied. Since this operation deletes elements, invariant (c)
3036+ -- is maintained.
3037+ w2 = w1 Seq. |> SlidingWindowOrdElement i a projected
3038+ -- Step 2: add the current element to the back. Since the current index is
3039+ -- greater than all previous indices, invariant (a) is satisfied.
3040+ -- Invariant (b) is also satisfied because in step 1 we popped elements
3041+ -- greater than the current, so either the window is empty or the back of
3042+ -- the window is smaller than the current one. Invariant (c) may be
3043+ -- violated, but this is fixed below.
3044+ w3 = Seq. dropWhileL (\ (SlidingWindowOrdElement j _ _) -> j + fromIntegral k <= i) w2
3045+ -- Step 3: remove elements that are out of the window. Again this is
3046+ -- extracting a subsequence so invariants (a) and (b) are maintained.
3047+ -- Invariant (c) is maintained because the least index still possibly in
3048+ -- the window is i+1-k, in which case we have k elements.
3049+ in SlidingWindowOrdState (i + 1 ) w3
3050+ -- Extract the front.
3051+ extract (SlidingWindowOrdState _ w) =
3052+ case Seq. viewl w of
3053+ SlidingWindowOrdElement _ m _ Seq. :< _ -> Just m
3054+ _ -> Nothing
3055+ {-# INLINABLE slidingWindowOrd #-}
3056+
3057+ -- | A 'SlidingWindowOrdState' keeps track of the current sliding window state
3058+ -- in 'slidingWindowOrd'. It keeps track of the current index of the item from
3059+ -- the stream as well as a 'Seq.Seq' of the current window. See comments above
3060+ -- for properties satisfied by the window.
3061+ data SlidingWindowOrdState a p =
3062+ SlidingWindowOrdState ! Word64
3063+ ! (Seq. Seq (SlidingWindowOrdElement a p ))
3064+
3065+ -- | A 'SlidingWindowOrdElement' is an element with a 'Word64'-based index as
3066+ -- well as the projection used for comparison. It is used in the sliding window
3067+ -- functions to associate an item with their index and the projected element in
3068+ -- the stream.
3069+ data SlidingWindowOrdElement a p = SlidingWindowOrdElement ! Word64 a p
3070+
3071+ -- | Similar to 'drop', except that if the input stream doesn't have enough
3072+ -- elements, the last one will be yielded. However, if there's none to begin
3073+ -- with, this function will also produce none.
3074+ dropButRetainAtLeastOne :: Monad m => Int -> Stream (Of a ) m r -> Stream (Of a ) m r
3075+ dropButRetainAtLeastOne 0 = id
3076+ dropButRetainAtLeastOne n = loop Nothing n
3077+ where
3078+ loop (Just final) (- 1 ) str = yield final >> str
3079+ loop final m str = do
3080+ e <- lift (next str)
3081+ case e of
3082+ Left r -> do
3083+ case final of
3084+ Nothing -> pure ()
3085+ Just l -> yield l
3086+ return r
3087+ Right (x, rest) -> loop (Just x) (m - 1 ) rest
3088+ {-# INLINABLE dropButRetainAtLeastOne #-}
3089+
3090+
28743091-- | Map monadically over a stream, producing a new stream
28753092-- only containing the 'Just' values.
28763093mapMaybeM :: Monad m => (a -> m (Maybe b )) -> Stream (Of a ) m r -> Stream (Of b ) m r
0 commit comments