Skip to content

Commit 84d7e83

Browse files
committed
Add slidingWindowSum
`slidingWindow` typically helps write only functions with complexity around `O(n*k)`, where `n` is the number of elements in the stream and `k` is the size of the window. In many cases, this can be reduced to `O(n)` by looking not at the window itself but instead the sum of that window in some `Semigroup`. This can be used, for example, to implement moving averages such as arithmetic, geometric, or harmonic means.
1 parent d2685a5 commit 84d7e83

4 files changed

Lines changed: 194 additions & 5 deletions

File tree

src/Data/AnnotatedQueue.hs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
{-# language FunctionalDependencies, ScopedTypeVariables, FlexibleInstances,
2+
BangPatterns, UndecidableInstances #-}
3+
4+
-- | An implementation of Okasaki's implicit queues holding elements of some
5+
-- semigroup. We track the sum of them all.
6+
module Data.AnnotatedQueue
7+
( Queue
8+
, ViewL (..)
9+
, empty
10+
, viewl
11+
, drop1
12+
, singleton
13+
, snoc
14+
, measureQueue
15+
) where
16+
17+
import Data.Semigroup (Semigroup (..))
18+
19+
data FDigit a = FOne !a | FTwo !a !a
20+
data RDigit a = RZero | ROne !a
21+
data Node s a = Node !s !a !a
22+
23+
newtype Queue s = Queue (Tree s (Elem s))
24+
instance Semigroup s => Semigroup (Queue s) where
25+
(!t) <> u = case viewl u of
26+
EmptyL -> t
27+
ViewL x xs -> (t `snoc` x) <> xs
28+
instance Semigroup s => Monoid (Queue s) where
29+
mempty = empty
30+
mappend = (<>)
31+
32+
newtype Elem a = Elem a
33+
34+
data Tree s a
35+
= Zero
36+
| One !a
37+
| Two !a !a
38+
| Deep !s !(FDigit a) (Tree s (Node s a)) !(RDigit a)
39+
40+
empty :: Queue s
41+
empty = Queue Zero
42+
43+
singleton :: s -> Queue s
44+
singleton = Queue . One . Elem
45+
46+
snoc :: Semigroup s => Queue s -> s -> Queue s
47+
snoc (Queue t) s = Queue (snocTree t (Elem s))
48+
{-# INLINABLE snoc #-}
49+
50+
measureQueue :: Semigroup s => Queue s -> Maybe s
51+
measureQueue (Queue q) = case q of
52+
Zero -> Nothing
53+
One a -> Just (measure a)
54+
Two a b -> Just (measure a <> measure b)
55+
Deep s _ _ _ -> Just s
56+
{-# INLINABLE measureQueue #-}
57+
58+
class Measurable s a | a -> s where
59+
measure :: a -> s
60+
instance Measurable s (Elem s) where
61+
measure (Elem x) = x
62+
instance Measurable s (Node s a) where
63+
measure (Node s _ _) = s
64+
instance (Semigroup s, Measurable s a) => Measurable s (FDigit a) where
65+
measure (FOne a) = measure a
66+
measure (FTwo a b) = measure a <> measure b
67+
class SemiMeasurable s a | a -> s where
68+
semimeasure :: s -> a -> s
69+
instance (Semigroup s, Measurable s a) => SemiMeasurable s (RDigit a) where
70+
semimeasure s RZero = s
71+
semimeasure s (ROne a) = s <> measure a
72+
instance (Semigroup s, Measurable s a)
73+
=> SemiMeasurable s (Tree s a) where
74+
semimeasure s Zero = s
75+
semimeasure s (One a) = s <> measure a
76+
semimeasure s (Two a b) = s <> measure a <> measure b
77+
semimeasure s (Deep t _ _ _) = s <> t
78+
79+
node
80+
:: (Semigroup s, Measurable s a)
81+
=> a -> a -> Node s a
82+
node a b = Node (measure a <> measure b) a b
83+
{-# INLINABLE node #-}
84+
85+
deep :: (Semigroup s, Measurable s a) => FDigit a -> Tree s (Node s a) -> RDigit a -> Tree s a
86+
deep pr m sf = Deep (measure pr `semimeasure` m `semimeasure` sf) pr m sf
87+
{-# INLINABLE deep #-}
88+
89+
snocTree :: (Measurable s a, Semigroup s) => Tree s a -> a -> Tree s a
90+
-- Note: in the last case we depart slightly from Okasaki. Following Hinze
91+
-- and Paterson, we force the *old* middle immediately to prevent a chain of
92+
-- thunks from accumulating in case of multiple sequential snocs.
93+
snocTree Zero a = One a
94+
snocTree (One a) b = Two a b
95+
snocTree (Two a b) c = Deep (measure a <> measure b <> measure c) (FTwo a b) Zero (ROne c)
96+
snocTree (Deep s pr m RZero) q = Deep (s <> measure q) pr m (ROne q)
97+
snocTree (Deep s pr !m (ROne p)) !q
98+
= Deep (s <> measure q) pr (snocTree m (node p q)) RZero
99+
{-# INLINABLE snocTree #-}
100+
101+
data ViewL s = EmptyL | ViewL !s !(Queue s)
102+
data ViewLTree s a = EmptyLTree | ViewLTree !a !(Tree s a)
103+
104+
viewl :: Semigroup s => Queue s -> ViewL s
105+
viewl (Queue q) = case viewlTree q of
106+
EmptyLTree -> EmptyL
107+
ViewLTree (Elem s) q' -> ViewL s (Queue q')
108+
{-# INLINABLE viewl #-}
109+
110+
viewlTree :: (Semigroup s, Measurable s a) => Tree s a -> ViewLTree s a
111+
viewlTree Zero = EmptyLTree
112+
viewlTree (One a) = ViewLTree a Zero
113+
viewlTree (Two a b) = ViewLTree a (One b)
114+
viewlTree (Deep _ (FTwo a b) m sf) = ViewLTree a (deep (FOne b) m sf)
115+
viewlTree (Deep _ (FOne a) m sf) = case viewlTree m of
116+
EmptyLTree -> case sf of
117+
RZero -> ViewLTree a Zero
118+
ROne b -> ViewLTree a (One b)
119+
ViewLTree (Node p b c) m' -> ViewLTree a (Deep (p `semimeasure` m' `semimeasure` sf) (FTwo b c) m' sf)
120+
{-# INLINABLE viewlTree #-}
121+
122+
drop1 :: Semigroup s => Queue s -> Queue s
123+
drop1 q = case viewl q of
124+
EmptyL -> empty
125+
ViewL _ q' -> q'
126+
{-# INLINABLE drop1 #-}

src/Streaming/Prelude.hs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ module Streaming.Prelude (
134134
, show
135135
, cons
136136
, slidingWindow
137+
, slidingWindowSum
137138
, slidingWindowMin
138139
, slidingWindowMinBy
139140
, slidingWindowMinOn
@@ -272,8 +273,10 @@ import Data.Functor.Of
272273
import Data.Functor.Sum
273274
import Data.Monoid (Monoid (mappend, mempty))
274275
import Data.Ord (Ordering (..), comparing)
276+
import Data.Semigroup (Semigroup (..))
275277
import Foreign.C.Error (Errno(Errno), ePIPE)
276278
import Text.Read (readMaybe)
279+
import qualified Data.AnnotatedQueue as AQ
277280
import qualified Data.Foldable as Foldable
278281
import qualified Data.IntSet as IntSet
279282
import qualified Data.Sequence as Seq
@@ -2846,7 +2849,7 @@ mapMaybe phi = loop where
28462849
{-# INLINABLE mapMaybe #-}
28472850

28482851
{-| 'slidingWindow' accumulates the first @n@ elements of a stream,
2849-
update thereafter to form a sliding window of length @n@.
2852+
updating thereafter to form a sliding window of length @n@.
28502853
It follows the behavior of the slidingWindow function in
28512854
<https://hackage.haskell.org/package/conduit-combinators-1.0.4/docs/Data-Conduit-Combinators.html#v:slidingWindow conduit-combinators>.
28522855
@@ -2880,6 +2883,33 @@ slidingWindow n = setup (max 1 n :: Int) mempty
28802883
Right (x,rest) -> setup (m-1) (sequ Seq.|> x) rest
28812884
{-# INLINABLE slidingWindow #-}
28822885

2886+
{-| 'slidingWindowSum' accumulates the first @n@ elements of a stream
2887+
with elements in some 'Semigroup',
2888+
updating thereafter to form a sliding window of length @n@.
2889+
-}
2890+
slidingWindowSum :: (Monad m, Semigroup a)
2891+
=> Int
2892+
-> Stream (Of a) m b
2893+
-> Stream (Of a) m b
2894+
slidingWindowSum n = setup (max 1 n) AQ.empty
2895+
where
2896+
window !qu str = do
2897+
case AQ.measureQueue qu of
2898+
Just s -> yield s
2899+
Nothing -> pure ()
2900+
e <- lift (next str)
2901+
case e of
2902+
Left r -> return r
2903+
Right (a,rest) ->
2904+
window (AQ.drop1 $ qu `AQ.snoc` a) rest
2905+
setup 0 !qu str = window qu str
2906+
setup m qu str = do
2907+
e <- lift $ next str
2908+
case e of
2909+
Left r -> window qu (return r)
2910+
Right (x,rest) -> setup (m-1) (qu `AQ.snoc` x) rest
2911+
{-# INLINABLE slidingWindowSum #-}
2912+
28832913
-- | 'slidingWindowMin' finds the minimum in every sliding window of @n@
28842914
-- elements of a stream. If within a window there are multiple elements that are
28852915
-- the least, it prefers the first occurrence (if you prefer to have the last

streaming.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ library
204204
, Streaming.Prelude
205205
, Streaming.Internal
206206
, Data.Functor.Of
207+
other-modules:
208+
Data.AnnotatedQueue
207209
build-depends:
208210
base >=4.8 && <5
209211
, mtl >=2.1 && <2.3

test/test.hs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,43 @@
1+
{-# LANGUAGE ScopedTypeVariables #-}
12
module Main where
23

34
import qualified Data.Foldable as Foldable
45
import Data.Functor.Identity
56
import Data.Ord
7+
import qualified Data.Semigroup as DS
68
import qualified Streaming.Prelude as S
79
import Test.Hspec
810
import Test.QuickCheck
911

1012
toL :: S.Stream (S.Of a) Identity b -> [a]
1113
toL = runIdentity . S.toList_
1214

13-
main :: IO ()
14-
main =
15-
hspec $ do
15+
slidingWindowMin_spec :: SpecWith ()
16+
slidingWindowMin_spec =
1617
describe "slidingWindowMin" $ do
1718
it "works with a few simple cases" $ do
1819
toL (S.slidingWindowMin 2 (S.each [1, 3, 9, 4, 6, 4])) `shouldBe` [1, 3, 4, 4, 4]
1920
toL (S.slidingWindowMin 3 (S.each [1, 3, 2, 6, 3, 7, 8, 9])) `shouldBe` [1, 2, 2, 3, 3, 7]
2021
it "produces no results with empty streams" $
2122
property $ \k -> toL (S.slidingWindowMin k (mempty :: S.Stream (S.Of Int) Identity ())) `shouldBe` []
22-
it "behaves like a (S.map Foldable.minimum) (slidingWindow) for non-empty streams" $
23+
it "behaves like (S.map Foldable.minimum . slidingWindow k) for non-empty streams" $
2324
property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.minimum crashes on empty lists
2425
->
2526
toL (S.slidingWindowMin k (S.each xs)) ===
2627
toL (S.map Foldable.minimum (S.slidingWindow k (S.each (xs :: [Int]))))
28+
it "behaves like (S.map getMin . slidingWindowSum . S.map Min)" $
29+
property $ \(xs :: [Int]) k
30+
->
31+
toL (S.slidingWindowMin k (S.each xs)) ===
32+
toL (S.map DS.getMin $ S.slidingWindowSum k $ S.map DS.Min $ S.each xs)
2733
it "behaves like identity when window size is 1" $
2834
property $ \xs -> toL (S.slidingWindowMin 1 (S.each (xs :: [Int]))) === xs
2935
it "produces a prefix when the stream elements are sorted" $
3036
property $ \(Sorted xs) k ->
3137
(length xs >= k) ==> (toL (S.slidingWindowMin k (S.each (xs :: [Int]))) === take (length xs - (k - 1)) xs)
38+
39+
slidingWindowMinBy_spec :: SpecWith ()
40+
slidingWindowMinBy_spec =
3241
describe "slidingWindowMinBy" $ do
3342
it "prefers earlier elements when several elements compare equal" $ do
3443
toL (S.slidingWindowMinBy (comparing fst) 2 (S.each [(1, 1), (2, 2), (2, 3), (2, 4)])) `shouldBe`
@@ -38,6 +47,9 @@ main =
3847
->
3948
toL (S.slidingWindowMinBy (comparing fst) k (S.each xs)) ===
4049
toL (S.map (Foldable.minimumBy (comparing fst)) (S.slidingWindow k (S.each (xs :: [(Int, Int)]))))
50+
51+
slidingWindowMinOn_spec :: SpecWith ()
52+
slidingWindowMinOn_spec =
4153
describe "slidingWindowMinOn" $ do
4254
it "behaves like a (S.map (Foldable.minimumBy (comparing p))) (slidingWindow) for non-empty streams" $ do
4355
property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.minimumBy crashes on empty lists
@@ -49,6 +61,9 @@ main =
4961
(length xs >= k) ==>
5062
(toL (S.slidingWindowMinOn (const (undefined :: UnitWithLazyEq)) k (S.each (xs :: [Int]))) ===
5163
take (length xs - (k - 1)) xs)
64+
65+
slidingWindowMax_spec :: SpecWith ()
66+
slidingWindowMax_spec =
5267
describe "slidingWindowMax" $ do
5368
it "produces no results with empty streams" $
5469
property $ \k -> toL (S.slidingWindowMax k (mempty :: S.Stream (S.Of Int) Identity ())) `shouldBe` []
@@ -62,6 +77,9 @@ main =
6277
it "produces a suffix when the stream elements are sorted" $
6378
property $ \(Sorted xs) k ->
6479
(length xs >= k) ==> (toL (S.slidingWindowMax k (S.each (xs :: [Int]))) === drop (k - 1) xs)
80+
81+
slidingWindowMaxBy_spec :: SpecWith ()
82+
slidingWindowMaxBy_spec =
6583
describe "slidingWindowMaxBy" $ do
6684
it "prefers later elements when several elements compare equal" $ do
6785
toL (S.slidingWindowMaxBy (comparing fst) 2 (S.each [(1, 1), (2, 2), (2, 3), (2, -900)])) `shouldBe`
@@ -71,6 +89,9 @@ main =
7189
->
7290
toL (S.slidingWindowMaxBy (comparing fst) k (S.each xs)) ===
7391
toL (S.map (Foldable.maximumBy (comparing fst)) (S.slidingWindow k (S.each (xs :: [(Int, Int)]))))
92+
93+
slidingWindowMaxOn_spec :: SpecWith ()
94+
slidingWindowMaxOn_spec =
7495
describe "slidingWindowMaxOn" $ do
7596
it "behaves like a (S.map (Foldable.maximumBy (comparing p))) (slidingWindow) for non-empty streams" $ do
7697
property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.maximumBy crashes on empty lists
@@ -82,6 +103,16 @@ main =
82103
(length xs >= k) ==>
83104
(toL (S.slidingWindowMaxOn (const (undefined :: UnitWithLazyEq)) k (S.each (xs :: [Int]))) === drop (k - 1) xs)
84105

106+
main :: IO ()
107+
main =
108+
hspec $ do
109+
slidingWindowMin_spec
110+
slidingWindowMinBy_spec
111+
slidingWindowMinOn_spec
112+
slidingWindowMax_spec
113+
slidingWindowMaxBy_spec
114+
slidingWindowMaxOn_spec
115+
85116
data UnitWithLazyEq = UnitWithLazyEq
86117

87118
instance Eq UnitWithLazyEq where

0 commit comments

Comments
 (0)