StateT and pipes-concurrency

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

StateT and pipes-concurrency

Alexey Raga
Hi,

I am using "pipes-concurrency" trying to model the following scenario: my "source" pipe has type of 

    stream :: Producer InputData (StateT References IO) ()

 where "References" is just a map that I accumulate while streaming the source. Then I am following the "broadcast" example from the Tutorial attempting to "fork" my flow into two branches:

main = do
  (output1, input1) <- spawn unbounded
  (output2, input2) <- spawn unbounded
  a1 <- async $ do
    execStateT (runEffect $ stream >-> toOutput (output1 <> output2)) emptyTables
    performGC
  a2 <- async $ do
    withFile "/tmp/geo/smart-grouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupSmart (fromInput input1) >-> toHandle h) ??? -- what to put here?
      hFlush h
    performGC
  a3 <- async $ do
    withFile "/tmp/geo/damnGrouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupDamn (fromInput input2) >-> toHandle h) ???
      hFlush h
    performGC
  mapM_ wait [a1,a2,a3]

I would like my branches to use the information from the source's StateT, however following the types, it looks like I have to provide each branch with the initial (empty?) state.

How do I share the state between the source and the forked branches? Or how do I model this situation correctly?

Cheers,
Alexey.

--
Reply | Threaded
Open this post in threaded view
|

Re: StateT and pipes-concurrency

Gabriel Gonzalez
The general rule of thumb is that you should only use concurrency for two reasons:

* Increased performance (and only if the increase offsets the context switching)
* Waiting on multiple concurrent input streams

Generally you should try to avoid using concurrency just as a control flow mechanism and instead you should try to use pure, single-threaded ways of forking input streams.  The main reason you want to avoid concurrency unless absolutely necessary is that it's very difficult to test and reason about concurrent code.

The answer to this question depends on how `groupSmart` and `groupDamn` are implemented.  However, I can give one example if I can make certain assumptions.

Let's assume for simplicity that `groupSmart`/`groupDamn` emit one output element for every input element.  In that case you can actually encode them as `Fold`s from my `foldl` library:

    groupSmart :: Fold I O1
    groupDamn  :: Fold I O2

Then if you wanted to run both grouping mechanisms in parallel over the same stream, you would just combine them using `Applicative` syntax:

    groupBoth :: Fold I (O1, O2)
    groupBoth = (,) <$> groupSmart <*> groupDamn

Then you would transform that into a `Pipe` by using:

    import Control.Foldl (purely)
    import qualified Pipes.Prelude as Pipes

    pipeBoth :: Monad m => Pipe I (O1, O2) m r
    pipeBoth = purely Pipes.scan groupBoth

Then you would just write something like:

    example :: MonadIO io => Consumer I io r
    example = for (purely Pipes.scan groupBoth) (\(o1, o2) -> do
        liftIO (writeToHandle handle1 o1)
        liftIO (writeToHandle handle2 o2) )

... and now you can do everything within a single pipeline.

You can also give the `groupSmart`/`groupDamn` folds access to the `StateT` layer by generalizing them to `FoldM`s instead:

    groupSmart :: FoldM (StateT References IO) I O1
    groupDamn  :: FoldM (StateT References IO) I O1

... and the only change you would make is to use `impurely Pipes.scanM` instead of `purely Pipes.scan`.

On 8/23/2015 5:31 AM, Alexey Raga wrote:
Hi,

I am using "pipes-concurrency" trying to model the following scenario: my "source" pipe has type of 

    stream :: Producer InputData (StateT References IO) ()

 where "References" is just a map that I accumulate while streaming the source. Then I am following the "broadcast" example from the Tutorial attempting to "fork" my flow into two branches:

main = do
  (output1, input1) <- spawn unbounded
  (output2, input2) <- spawn unbounded
  a1 <- async $ do
    execStateT (runEffect $ stream >-> toOutput (output1 <> output2)) emptyTables
    performGC
  a2 <- async $ do
    withFile "/tmp/geo/smart-grouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupSmart (fromInput input1) >-> toHandle h) ??? -- what to put here?
      hFlush h
    performGC
  a3 <- async $ do
    withFile "/tmp/geo/damnGrouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupDamn (fromInput input2) >-> toHandle h) ???
      hFlush h
    performGC
  mapM_ wait [a1,a2,a3]

I would like my branches to use the information from the source's StateT, however following the types, it looks like I have to provide each branch with the initial (empty?) state.

How do I share the state between the source and the forked branches? Or how do I model this situation correctly?

Cheers,
Alexey.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
send an email to [hidden email].

--
Reply | Threaded
Open this post in threaded view
|

Re: StateT and pipes-concurrency

Alexey Raga

Thanks, Gabriel!

You are right, I don't need any concurrency, I just wanted to use it for the ability to broadcast producers.

I actually was looking at Fold, but I couldn't figure out how it could work for the following requirements :

- I need to group elements differently in each branch. I wouldn't be able to use pipes-groups here because it gives me functions of Producers, not Pipes.
- due to grouping my branches would emit at different rates and not for every input element. I could always emit Maybe, but it doesn't feel like a right way, does it?

In fact what I wanted is to have a stream, gather some stats on it (here is my StateT), group it in two different ways and output both results into different files.

I will have a look at Fold again, would it still be right tool for the job with the requirements I mentioned?

Cheers,
Alexey.


On Mon, 24 Aug 2015 01:07 Gabriel Gonzalez <[hidden email]> wrote:
The general rule of thumb is that you should only use concurrency for two reasons:

* Increased performance (and only if the increase offsets the context switching)
* Waiting on multiple concurrent input streams

Generally you should try to avoid using concurrency just as a control flow mechanism and instead you should try to use pure, single-threaded ways of forking input streams.  The main reason you want to avoid concurrency unless absolutely necessary is that it's very difficult to test and reason about concurrent code.

The answer to this question depends on how `groupSmart` and `groupDamn` are implemented.  However, I can give one example if I can make certain assumptions.

Let's assume for simplicity that `groupSmart`/`groupDamn` emit one output element for every input element.  In that case you can actually encode them as `Fold`s from my `foldl` library:

    groupSmart :: Fold I O1
    groupDamn  :: Fold I O2

Then if you wanted to run both grouping mechanisms in parallel over the same stream, you would just combine them using `Applicative` syntax:

    groupBoth :: Fold I (O1, O2)
    groupBoth = (,) <$> groupSmart <*> groupDamn

Then you would transform that into a `Pipe` by using:

    import Control.Foldl (purely)
    import qualified Pipes.Prelude as Pipes

    pipeBoth :: Monad m => Pipe I (O1, O2) m r
    pipeBoth = purely Pipes.scan groupBoth

Then you would just write something like:

    example :: MonadIO io => Consumer I io r
    example = for (purely Pipes.scan groupBoth) (\(o1, o2) -> do
        liftIO (writeToHandle handle1 o1)
        liftIO (writeToHandle handle2 o2) )

... and now you can do everything within a single pipeline.

You can also give the `groupSmart`/`groupDamn` folds access to the `StateT` layer by generalizing them to `FoldM`s instead:

    groupSmart :: FoldM (StateT References IO) I O1
    groupDamn  :: FoldM (StateT References IO) I O1

... and the only change you would make is to use `impurely Pipes.scanM` instead of `purely Pipes.scan`.

On 8/23/2015 5:31 AM, Alexey Raga wrote:
Hi,

I am using "pipes-concurrency" trying to model the following scenario: my "source" pipe has type of 

    stream :: Producer InputData (StateT References IO) ()

 where "References" is just a map that I accumulate while streaming the source. Then I am following the "broadcast" example from the Tutorial attempting to "fork" my flow into two branches:

main = do
  (output1, input1) <- spawn unbounded
  (output2, input2) <- spawn unbounded
  a1 <- async $ do
    execStateT (runEffect $ stream >-> toOutput (output1 <> output2)) emptyTables
    performGC
  a2 <- async $ do
    withFile "/tmp/geo/smart-grouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupSmart (fromInput input1) >-> toHandle h) ??? -- what to put here?
      hFlush h
    performGC
  a3 <- async $ do
    withFile "/tmp/geo/damnGrouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupDamn (fromInput input2) >-> toHandle h) ???
      hFlush h
    performGC
  mapM_ wait [a1,a2,a3]

I would like my branches to use the information from the source's StateT, however following the types, it looks like I have to provide each branch with the initial (empty?) state.

How do I share the state between the source and the forked branches? Or how do I model this situation correctly?

Cheers,
Alexey.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
send an email to [hidden email].

--
Reply | Threaded
Open this post in threaded view
|

Re: StateT and pipes-concurrency

Gabriel Gonzalez
I think you might end up having to do the two `Fold`s which each return a `Maybe`.  I don't know a more elegant way of decomposing the problem.

On 8/23/2015 3:51 PM, Alexey Raga wrote:

Thanks, Gabriel!

You are right, I don't need any concurrency, I just wanted to use it for the ability to broadcast producers.

I actually was looking at Fold, but I couldn't figure out how it could work for the following requirements :

- I need to group elements differently in each branch. I wouldn't be able to use pipes-groups here because it gives me functions of Producers, not Pipes.
- due to grouping my branches would emit at different rates and not for every input element. I could always emit Maybe, but it doesn't feel like a right way, does it?

In fact what I wanted is to have a stream, gather some stats on it (here is my StateT), group it in two different ways and output both results into different files.

I will have a look at Fold again, would it still be right tool for the job with the requirements I mentioned?

Cheers,
Alexey.


On Mon, 24 Aug 2015 01:07 Gabriel Gonzalez <[hidden email]> wrote:
The general rule of thumb is that you should only use concurrency for two reasons:

* Increased performance (and only if the increase offsets the context switching)
* Waiting on multiple concurrent input streams

Generally you should try to avoid using concurrency just as a control flow mechanism and instead you should try to use pure, single-threaded ways of forking input streams.  The main reason you want to avoid concurrency unless absolutely necessary is that it's very difficult to test and reason about concurrent code.

The answer to this question depends on how `groupSmart` and `groupDamn` are implemented.  However, I can give one example if I can make certain assumptions.

Let's assume for simplicity that `groupSmart`/`groupDamn` emit one output element for every input element.  In that case you can actually encode them as `Fold`s from my `foldl` library:

    groupSmart :: Fold I O1
    groupDamn  :: Fold I O2

Then if you wanted to run both grouping mechanisms in parallel over the same stream, you would just combine them using `Applicative` syntax:

    groupBoth :: Fold I (O1, O2)
    groupBoth = (,) <$> groupSmart <*> groupDamn

Then you would transform that into a `Pipe` by using:

    import Control.Foldl (purely)
    import qualified Pipes.Prelude as Pipes

    pipeBoth :: Monad m => Pipe I (O1, O2) m r
    pipeBoth = purely Pipes.scan groupBoth

Then you would just write something like:

    example :: MonadIO io => Consumer I io r
    example = for (purely Pipes.scan groupBoth) (\(o1, o2) -> do
        liftIO (writeToHandle handle1 o1)
        liftIO (writeToHandle handle2 o2) )

... and now you can do everything within a single pipeline.

You can also give the `groupSmart`/`groupDamn` folds access to the `StateT` layer by generalizing them to `FoldM`s instead:

    groupSmart :: FoldM (StateT References IO) I O1
    groupDamn  :: FoldM (StateT References IO) I O1

... and the only change you would make is to use `impurely Pipes.scanM` instead of `purely Pipes.scan`.

On 8/23/2015 5:31 AM, Alexey Raga wrote:
Hi,

I am using "pipes-concurrency" trying to model the following scenario: my "source" pipe has type of 

    stream :: Producer InputData (StateT References IO) ()

 where "References" is just a map that I accumulate while streaming the source. Then I am following the "broadcast" example from the Tutorial attempting to "fork" my flow into two branches:

main = do
  (output1, input1) <- spawn unbounded
  (output2, input2) <- spawn unbounded
  a1 <- async $ do
    execStateT (runEffect $ stream >-> toOutput (output1 <> output2)) emptyTables
    performGC
  a2 <- async $ do
    withFile "/tmp/geo/smart-grouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupSmart (fromInput input1) >-> toHandle h) ??? -- what to put here?
      hFlush h
    performGC
  a3 <- async $ do
    withFile "/tmp/geo/damnGrouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupDamn (fromInput input2) >-> toHandle h) ???
      hFlush h
    performGC
  mapM_ wait [a1,a2,a3]

I would like my branches to use the information from the source's StateT, however following the types, it looks like I have to provide each branch with the initial (empty?) state.

How do I share the state between the source and the forked branches? Or how do I model this situation correctly?

Cheers,
Alexey.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
from it, send an email to [hidden email].


--
Reply | Threaded
Open this post in threaded view
|

Re: StateT and pipes-concurrency

Daniel Díaz Carrete
In reply to this post by Alexey Raga
Hi Alexey,

> I need to group elements differently in each branch. I wouldn't be able to use pipes-groups here because it gives me functions of Producers, not Pipes.

> - due to grouping my branches would emit at different rates and not for every input element.

I have uploaded foldl-transduce to Hackage. It lets you perform grouping operations fold-side instead of producer-side, which means you can still combine the resulting folds with <*> : 

import qualified Control.Foldl as L
import Control.Foldl.Transduce

    L.fold ((,) <$> (folds (chunksOf 2) L.list L.list) <*> (folds (chunksOf 3) L.list L.list)) [1..7]
    ([[1,2],[3,4],[5,6],[7]], [[1,2,3],[4,5,6],[7]])

Here's a monadic example:

    import qualified Data.ByteString as B

    toHandle :: Handle -> FoldM IO B.ByteString ()
    toHandle h = FoldM (\_ -> B.hPut h) (pure ()) (\_ -> pure ())

    testGroupIO :: IO ()
    testGroupIO = 
        withFile "file1" AppendMode $ \h1 ->
            withFile "file2" AppendMode $ \h2 ->
                flip L.foldM (replicate 10 "aabbccddeeffgghhiijj") $
                    foldsM (chunksOf 7) (L.generalize L.list) (L.handlesM traverse (toHandle h1)) *>
                    foldsM (chunksOf 3) (L.generalize L.list) (L.handlesM traverse (toHandle h2))

Could this help you with your use case?

On Monday, August 24, 2015 at 12:51:16 AM UTC+2, Alexey Raga wrote:

Thanks, Gabriel!

You are right, I don't need any concurrency, I just wanted to use it for the ability to broadcast producers.

I actually was looking at Fold, but I couldn't figure out how it could work for the following requirements :

- I need to group elements differently in each branch. I wouldn't be able to use pipes-groups here because it gives me functions of Producers, not Pipes.
- due to grouping my branches would emit at different rates and not for every input element. I could always emit Maybe, but it doesn't feel like a right way, does it?

In fact what I wanted is to have a stream, gather some stats on it (here is my StateT), group it in two different ways and output both results into different files.

I will have a look at Fold again, would it still be right tool for the job with the requirements I mentioned?

Cheers,
Alexey.


On Mon, 24 Aug 2015 01:07 Gabriel Gonzalez <<a href="javascript:" target="_blank" gdf-obfuscated-mailto="EZnhpoS6BAAJ" rel="nofollow" onmousedown="this.href=&#39;javascript:&#39;;return true;" onclick="this.href=&#39;javascript:&#39;;return true;">gabri...@...> wrote:
The general rule of thumb is that you should only use concurrency for two reasons:

* Increased performance (and only if the increase offsets the context switching)
* Waiting on multiple concurrent input streams

Generally you should try to avoid using concurrency just as a control flow mechanism and instead you should try to use pure, single-threaded ways of forking input streams.  The main reason you want to avoid concurrency unless absolutely necessary is that it's very difficult to test and reason about concurrent code.

The answer to this question depends on how `groupSmart` and `groupDamn` are implemented.  However, I can give one example if I can make certain assumptions.

Let's assume for simplicity that `groupSmart`/`groupDamn` emit one output element for every input element.  In that case you can actually encode them as `Fold`s from my `foldl` library:

    groupSmart :: Fold I O1
    groupDamn  :: Fold I O2

Then if you wanted to run both grouping mechanisms in parallel over the same stream, you would just combine them using `Applicative` syntax:

    groupBoth :: Fold I (O1, O2)
    groupBoth = (,) <$> groupSmart <*> groupDamn

Then you would transform that into a `Pipe` by using:

    import Control.Foldl (purely)
    import qualified Pipes.Prelude as Pipes

    pipeBoth :: Monad m => Pipe I (O1, O2) m r
    pipeBoth = purely Pipes.scan groupBoth

Then you would just write something like:

    example :: MonadIO io => Consumer I io r
    example = for (purely Pipes.scan groupBoth) (\(o1, o2) -> do
        liftIO (writeToHandle handle1 o1)
        liftIO (writeToHandle handle2 o2) )

... and now you can do everything within a single pipeline.

You can also give the `groupSmart`/`groupDamn` folds access to the `StateT` layer by generalizing them to `FoldM`s instead:

    groupSmart :: FoldM (StateT References IO) I O1
    groupDamn  :: FoldM (StateT References IO) I O1

... and the only change you would make is to use `impurely Pipes.scanM` instead of `purely Pipes.scan`.

On 8/23/2015 5:31 AM, Alexey Raga wrote:
Hi,

I am using "pipes-concurrency" trying to model the following scenario: my "source" pipe has type of 

    stream :: Producer InputData (StateT References IO) ()

 where "References" is just a map that I accumulate while streaming the source. Then I am following the "broadcast" example from the Tutorial attempting to "fork" my flow into two branches:

main = do
  (output1, input1) <- spawn unbounded
  (output2, input2) <- spawn unbounded
  a1 <- async $ do
    execStateT (runEffect $ stream >-> toOutput (output1 <> output2)) emptyTables
    performGC
  a2 <- async $ do
    withFile "/tmp/geo/smart-grouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupSmart (fromInput input1) >-> toHandle h) ??? -- what to put here?
      hFlush h
    performGC
  a3 <- async $ do
    withFile "/tmp/geo/damnGrouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupDamn (fromInput input2) >-> toHandle h) ???
      hFlush h
    performGC
  mapM_ wait [a1,a2,a3]

I would like my branches to use the information from the source's StateT, however following the types, it looks like I have to provide each branch with the initial (empty?) state.

How do I share the state between the source and the forked branches? Or how do I model this situation correctly?

Cheers,
Alexey.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
send an email to <a href="javascript:" target="_blank" gdf-obfuscated-mailto="EZnhpoS6BAAJ" rel="nofollow" onmousedown="this.href=&#39;javascript:&#39;;return true;" onclick="this.href=&#39;javascript:&#39;;return true;">haskell-pipe...@googlegroups.com.

--
Reply | Threaded
Open this post in threaded view
|

Re: StateT and pipes-concurrency

Alexey Raga
Daniel,

Thank you very much, I will definitely give it a try!

Cheers,
Alexey.

On Tuesday, August 25, 2015 at 4:29:41 PM UTC+10, Daniel Díaz wrote:
Hi Alexey,

> I need to group elements differently in each branch. I wouldn't be able to use pipes-groups here because it gives me functions of Producers, not Pipes.

> - due to grouping my branches would emit at different rates and not for every input element.

I have uploaded <a href="http://hackage.haskell.org/package/foldl-transduce" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\75http%3A%2F%2Fhackage.haskell.org%2Fpackage%2Ffoldl-transduce\46sa\75D\46sntz\0751\46usg\75AFQjCNGAckcOhSYxgPJr4KP6y9gwUC7T6w&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\75http%3A%2F%2Fhackage.haskell.org%2Fpackage%2Ffoldl-transduce\46sa\75D\46sntz\0751\46usg\75AFQjCNGAckcOhSYxgPJr4KP6y9gwUC7T6w&#39;;return true;">foldl-transduce to Hackage. It lets you perform grouping operations fold-side instead of producer-side, which means you can still combine the resulting folds with <*> : 

import qualified Control.Foldl as L
import Control.Foldl.Transduce

    L.fold ((,) <$> (folds (chunksOf 2) L.list L.list) <*> (folds (chunksOf 3) L.list L.list)) [1..7]
    ([[1,2],[3,4],[5,6],[7]], [[1,2,3],[4,5,6],[7]])

Here's a monadic example:

    import qualified Data.ByteString as B

    toHandle :: Handle -> FoldM IO B.ByteString ()
    toHandle h = FoldM (\_ -> B.hPut h) (pure ()) (\_ -> pure ())

    testGroupIO :: IO ()
    testGroupIO = 
        withFile "file1" AppendMode $ \h1 ->
            withFile "file2" AppendMode $ \h2 ->
                flip L.foldM (replicate 10 "aabbccddeeffgghhiijj") $
                    foldsM (chunksOf 7) (L.generalize L.list) (L.handlesM traverse (toHandle h1)) *>
                    foldsM (chunksOf 3) (L.generalize L.list) (L.handlesM traverse (toHandle h2))

Could this help you with your use case?

On Monday, August 24, 2015 at 12:51:16 AM UTC+2, Alexey Raga wrote:

Thanks, Gabriel!

You are right, I don't need any concurrency, I just wanted to use it for the ability to broadcast producers.

I actually was looking at Fold, but I couldn't figure out how it could work for the following requirements :

- I need to group elements differently in each branch. I wouldn't be able to use pipes-groups here because it gives me functions of Producers, not Pipes.
- due to grouping my branches would emit at different rates and not for every input element. I could always emit Maybe, but it doesn't feel like a right way, does it?

In fact what I wanted is to have a stream, gather some stats on it (here is my StateT), group it in two different ways and output both results into different files.

I will have a look at Fold again, would it still be right tool for the job with the requirements I mentioned?

Cheers,
Alexey.


On Mon, 24 Aug 2015 01:07 Gabriel Gonzalez <[hidden email]> wrote:
The general rule of thumb is that you should only use concurrency for two reasons:

* Increased performance (and only if the increase offsets the context switching)
* Waiting on multiple concurrent input streams

Generally you should try to avoid using concurrency just as a control flow mechanism and instead you should try to use pure, single-threaded ways of forking input streams.  The main reason you want to avoid concurrency unless absolutely necessary is that it's very difficult to test and reason about concurrent code.

The answer to this question depends on how `groupSmart` and `groupDamn` are implemented.  However, I can give one example if I can make certain assumptions.

Let's assume for simplicity that `groupSmart`/`groupDamn` emit one output element for every input element.  In that case you can actually encode them as `Fold`s from my `foldl` library:

    groupSmart :: Fold I O1
    groupDamn  :: Fold I O2

Then if you wanted to run both grouping mechanisms in parallel over the same stream, you would just combine them using `Applicative` syntax:

    groupBoth :: Fold I (O1, O2)
    groupBoth = (,) <$> groupSmart <*> groupDamn

Then you would transform that into a `Pipe` by using:

    import Control.Foldl (purely)
    import qualified Pipes.Prelude as Pipes

    pipeBoth :: Monad m => Pipe I (O1, O2) m r
    pipeBoth = purely Pipes.scan groupBoth

Then you would just write something like:

    example :: MonadIO io => Consumer I io r
    example = for (purely Pipes.scan groupBoth) (\(o1, o2) -> do
        liftIO (writeToHandle handle1 o1)
        liftIO (writeToHandle handle2 o2) )

... and now you can do everything within a single pipeline.

You can also give the `groupSmart`/`groupDamn` folds access to the `StateT` layer by generalizing them to `FoldM`s instead:

    groupSmart :: FoldM (StateT References IO) I O1
    groupDamn  :: FoldM (StateT References IO) I O1

... and the only change you would make is to use `impurely Pipes.scanM` instead of `purely Pipes.scan`.

On 8/23/2015 5:31 AM, Alexey Raga wrote:
Hi,

I am using "pipes-concurrency" trying to model the following scenario: my "source" pipe has type of 

    stream :: Producer InputData (StateT References IO) ()

 where "References" is just a map that I accumulate while streaming the source. Then I am following the "broadcast" example from the Tutorial attempting to "fork" my flow into two branches:

main = do
  (output1, input1) <- spawn unbounded
  (output2, input2) <- spawn unbounded
  a1 <- async $ do
    execStateT (runEffect $ stream >-> toOutput (output1 <> output2)) emptyTables
    performGC
  a2 <- async $ do
    withFile "/tmp/geo/smart-grouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupSmart (fromInput input1) >-> toHandle h) ??? -- what to put here?
      hFlush h
    performGC
  a3 <- async $ do
    withFile "/tmp/geo/damnGrouping.xml" WriteMode $ \h -> do
      evalStateT (runEffect $ groupDamn (fromInput input2) >-> toHandle h) ???
      hFlush h
    performGC
  mapM_ wait [a1,a2,a3]

I would like my branches to use the information from the source's StateT, however following the types, it looks like I have to provide each branch with the initial (empty?) state.

How do I share the state between the source and the forked branches? Or how do I model this situation correctly?

Cheers,
Alexey.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
send an email to haskell-pipe...@googlegroups.com.

--