Barrier implementation

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Barrier implementation

Peter Eriksen
Greeting,

Something is not working for me, and I could use some more eyes on this.
What I'm trying to accomplish is to implement a simpel barrier for ten
worker threads (id = 0..9) using STM.  With or without the barrier, the
program produces an unordered interleaving of the output from the
workers.  Here's what I get with the program below:

$ ghc --make Main.lhs
$ a.out
0134568027913457896012579026813423904671238455702468159367839684012570279134685049137825901642375689134057892610462578903156012389473268457910267801345923924567801304689235714013679458256702465913878...

And here's what I get without the line "atomically $ barrier tv id":

$ a.out
1249056782934567210845619720538461975203698469175203469850123485076912348579406123894625738942106381592740631859274163841092315768491302578416930728254169302785693024917853029640217390856490...

The first run should've been something like:
012345678901234567890123456789012345...
since each worker thread 0..9 should write its id out once per
iteration,
and the workers should iterate in sync.

Here's the code:

> module Main where
>
> import Control.Concurrent
> import Control.Concurrent.STM
> import System.Random
>
> worker :: Int -> TVar Int -> IO ()
> worker id tv = do
> sleepingTime <- randomRIO (0, 50000)
> threadDelay sleepingTime
> putStr $ show id
> atomically $ barrier tv id
> worker id tv

Each worker sleeps for some time, then outputs its id and waits at
the barrier for all the other workers to finish their sleep+output.
 
> barrier :: TVar Int -> Int -> STM ()
> barrier tv id = do
> passed <- readTVar tv
> if (passed `mod` 10 == id)
> then writeTVar tv (passed+1)
> else retry

The barrier is simply a global variable, tv, which holds the number of
times any worker passed the barrier.  Now, a worker may only pass the
barrier iff the worker with an id one less just passed, or else it
should block.

> main :: IO ()
> main = do
> tv <- atomically $ newTVar 0
> for [0..9] $ \i -> forkIO $ worker i tv
> threadDelay (10*10^6)
>
> for = flip mapM_

The main thread just initializes the pass counter, starts 10 worker
threads, and waits for ten seconds.

I'd like to hear some comments on the approach, and perhaps even some
insight into why it doesn't work.

Regards,

Peter Eriksen
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

David Roundy
On Fri, Dec 16, 2005 at 05:46:33PM +0100, Peter Eriksen wrote:

> Here's the code:
>
> > module Main where
> >
> > import Control.Concurrent
> > import Control.Concurrent.STM
> > import System.Random
> >
> > worker :: Int -> TVar Int -> IO ()
> > worker id tv = do
> > sleepingTime <- randomRIO (0, 50000)
> > threadDelay sleepingTime
> > putStr $ show id
> > atomically $ barrier tv id
> > worker id tv

You've got the barrier after the putStr, so there's nothing to make the
first ten putStrs be in order.  I think you need a non-updating barrier
before the putStr and then an updating function after the putStr (to tell
the next worker that it is free to print).

> barrier :: TVar Int -> Int -> STM ()
> barrier tv id = do
> passed <- readTVar tv
> if (passed `mod` 10 == id)
> then writeTVar tv (passed+1)
> else retry

> move_along :: TVar Int -> Int -> STM ()
> barrier tv id = do passed <- readTVar tv
>                    writeTVar tv (passed+1)

> worker :: Int -> TVar Int -> IO ()
> worker id tv = do
> sleepingTime <- randomRIO (0, 50000)
> threadDelay sleepingTime
> atomically $ barrier tv id
> putStr $ show id
> atomically $ move_along tv id
> worker id tv
--
David Roundy
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

Lemmih
In reply to this post by Peter Eriksen
On 12/16/05, Peter Eriksen <[hidden email]> wrote:

> Greeting,
>
> Something is not working for me, and I could use some more eyes on this.
> What I'm trying to accomplish is to implement a simpel barrier for ten
> worker threads (id = 0..9) using STM.  With or without the barrier, the
> program produces an unordered interleaving of the output from the
> workers.  Here's what I get with the program below:
>
> $ ghc --make Main.lhs
> $ a.out
> 0134568027913457896012579026813423904671238455702468159367839684012570279134685049137825901642375689134057892610462578903156012389473268457910267801345923924567801304689235714013679458256702465913878...
>
> And here's what I get without the line "atomically $ barrier tv id":
>
> $ a.out
> 1249056782934567210845619720538461975203698469175203469850123485076912348579406123894625738942106381592740631859274163841092315768491302578416930728254169302785693024917853029640217390856490...
>
> The first run should've been something like:
> 012345678901234567890123456789012345...
> since each worker thread 0..9 should write its id out once per
> iteration,
> and the workers should iterate in sync.
>
> Here's the code:
>
> > module Main where
> >
> > import Control.Concurrent
> > import Control.Concurrent.STM
> > import System.Random
> >
> > worker :: Int -> TVar Int -> IO ()
> > worker id tv = do
> >       sleepingTime <- randomRIO (0, 50000)
> >       threadDelay sleepingTime
> >       putStr $ show id

You're printing the ID after a random sleep. Shouldn't be a big
surprise that the output will be shuffled.

> >       atomically $ barrier tv id

If you move 'putStr $ show id' down below the barrier then it'll
behave like you want it to.

> >       worker id tv
>
> Each worker sleeps for some time, then outputs its id and waits at
> the barrier for all the other workers to finish their sleep+output.
>
> > barrier :: TVar Int -> Int -> STM ()
> > barrier tv id = do
> >       passed <- readTVar tv
> >       if (passed `mod` 10 == id)
> >               then writeTVar tv (passed+1)
> >               else retry
>
> The barrier is simply a global variable, tv, which holds the number of
> times any worker passed the barrier.  Now, a worker may only pass the
> barrier iff the worker with an id one less just passed, or else it
> should block.
>
> > main :: IO ()
> > main = do
> >       tv <- atomically $ newTVar 0
> >       for [0..9] $ \i -> forkIO $ worker i tv
> >       threadDelay (10*10^6)
> >
> > for = flip mapM_


--
Friendly,
  Lemmih
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

Tomasz Zielonka
On Fri, Dec 16, 2005 at 06:25:00PM +0100, Lemmih wrote:
> If you move 'putStr $ show id' down below the barrier then it'll
> behave like you want it to.

However, the printed sequence may sometimes differ from expected
because of races.

Best regards
Tomasz

--
I am searching for a programmer who is good at least in some of
[Haskell, ML, C++, Linux, FreeBSD, math] for work in Warsaw, Poland
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

Lemmih
In reply to this post by Peter Eriksen
On 12/16/05, Peter Eriksen <[hidden email]> wrote:
> >       threadDelay (10*10^6)

10*10^6 == 10e6, btw.

--
Friendly,
  Lemmih
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

Marcin Tustin
On Fri, Dec 16, 2005 at 06:51:12PM +0100, Lemmih wrote:
> On 12/16/05, Peter Eriksen <[hidden email]> wrote:
> > >       threadDelay (10*10^6)
>
> 10*10^6 == 10e6, btw.

    10e7.
 
> --
> Friendly,
>   Lemmih
> _______________________________________________
> Haskell-Cafe mailing list
> [hidden email]
> http://www.haskell.org/mailman/listinfo/haskell-cafe
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

Lemmih
On 12/16/05, Marcin Tustin <[hidden email]> wrote:
> On Fri, Dec 16, 2005 at 06:51:12PM +0100, Lemmih wrote:
> > On 12/16/05, Peter Eriksen <[hidden email]> wrote:
> > > >       threadDelay (10*10^6)
> >
> > 10*10^6 == 10e6, btw.
>
>     10e7.

Prelude> 10*10^6 == 10e6
True
Prelude> 10*10^6 == 10e7
False

10*10^6 == 1.0e7.

--
Friendly,
  Lemmih
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

Marcin Tustin
On Fri, Dec 16, 2005 at 07:02:03PM +0100, Lemmih wrote:

> On 12/16/05, Marcin Tustin <[hidden email]> wrote:
> > On Fri, Dec 16, 2005 at 06:51:12PM +0100, Lemmih wrote:
> > > On 12/16/05, Peter Eriksen <[hidden email]> wrote:
> > > > >       threadDelay (10*10^6)
> > >
> > > 10*10^6 == 10e6, btw.
> >
> >     10e7.
>
> Prelude> 10*10^6 == 10e6
> True
> Prelude> 10*10^6 == 10e7
> False
>
> 10*10^6 == 1.0e7.

    Err yes, that's obviously correct. I think I must have misread "10e6" as "1e6", and then added to the superscript. D'oh.
 
> --
> Friendly,
>   Lemmih
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

Bertram Felgenhauer-2
In reply to this post by Lemmih
Lemmih wrote:
> On 12/16/05, Peter Eriksen <[hidden email]> wrote:
> > >       threadDelay (10*10^6)
>
> 10*10^6 == 10e6, btw.

But the types are different. For sake of completeness:

  (10*10^6, 10*10^^6, 10*10**6, 10e6) ::
  (Num a, Fractional b, Floating c, Fractional d) => (a, b, c, d)

threadDelay wants an Int, so 10e6 won't work.

Bertram
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Barrier implementation

Peter Eriksen
In reply to this post by Peter Eriksen
Hi again,

Now I've actually tested the barrier implementation by counting the
number of times each worker thread reaches the barrier.  It's not a
proof, but I take it as strong indication, that it's not as bad, as I
first thought.  If all workers have run the same number of times
(that is a maximum of one apart), then at least that's one good
feature of the barrier.  I think it works though and also keeps that
invariant (max one iteration apart) all the time.

Thank you for your kind help.

Regards,

Peter

****************
Here are the counts for runs with the barrier in different places:

=============
No barrier:

> worker :: Int -> TVar Int -> TVar Int -> IO ()
> worker id tv ic = do
>       sleepingTime <- randomRIO (0, 50000)
>       threadDelay sleepingTime
>       putStr $ show id
> atomically $ (inc ic)
>       worker id tv ic

(0,274)
(1,272)
(2,274)
(3,278)
(4,269)
(5,287)
(6,287)
(7,275)
(8,281)
(9,274)

================
The barrier after putStr:

> worker :: Int -> TVar Int -> TVar Int -> IO ()
> worker id tv ic = do
>       sleepingTime <- randomRIO (0, 50000)
>       threadDelay sleepingTime
>       putStr $ show id
> atomically $ barrier tv id
> atomically $ (inc ic)
>       worker id tv ic

(0,199)
(1,199)
(2,199)
(3,199)
(4,198)
(5,198)
(6,198)
(7,198)
(8,198)
(9,198)

===============
The thread between threadDelay and putStr:

> worker :: Int -> TVar Int -> TVar Int -> IO ()
> worker id tv ic = do
>       sleepingTime <- randomRIO (0, 50000)
>       threadDelay sleepingTime
> atomically $ barrier tv id
>       putStr $ show id
> atomically $ (inc ic)
>       worker id tv ic

(0,202)
(1,201)
(2,201)
(3,201)
(4,201)
(5,201)
(6,201)
(7,201)
(8,201)
(9,201)

Note: This is the one looking most like 0123456789012345... as I
initially wanted, but of course there is a chance of a race where
all worker threads wait before putStr after they are in sequence
from the barrier.  Then it would be random which one executed putStr
first.

===============
The barrier is placed in the beginning before threadDelay:

> worker :: Int -> TVar Int -> TVar Int -> IO ()
> worker id tv ic = do
>       sleepingTime <- randomRIO (0, 50000)
> atomically $ barrier tv id
>       threadDelay sleepingTime
>       putStr $ show id
> atomically $ (inc ic)
>       worker id tv ic

(0,200)
(1,200)
(2,200)
(3,200)
(4,200)
(5,199)
(6,200)
(7,200)
(8,199)
(9,199)

=======================
=======================
Here's the full program:

> module Main where
>
> import Control.Concurrent
> import Control.Concurrent.STM
> import System.Random
>
> worker :: Int -> TVar Int -> TVar Int -> IO ()
> worker id tv ic = do
>       sleepingTime <- randomRIO (0, 50000)
>       threadDelay sleepingTime
>       putStr $ show id
> atomically $ barrier tv id
> atomically $ (inc ic)
>       worker id tv ic

Each worker sleeps for some time, then outputs its id and waits at
the barrier for all the other workers to finish their sleep+output.

> barrier :: TVar Int -> Int -> STM ()
> barrier tv id = do
>       passed <- readTVar tv
>       if (passed `mod` 10 == id)
>               then writeTVar tv (passed+1)
>               else retry

The barrier is simply a global variable, tv, which holds the number of
times any worker passed the barrier.  Now, a worker may only pass the
barrier iff the worker with an id one less just passed, or else it
should block.

> main :: IO ()
> main = do
>       tv <- atomically $ newTVar 0
> idCounts <- mapM (atomically . newTVar) [0,0,0,0,0,0,0,0,0,0]
>       for [0..9] $ \i -> forkIO $ worker i tv (idCounts!!i)
>       threadDelay (10*10^6)
> mapM_ (\(i,ic) -> (atomically $ readTVar ic) >>= \n -> print
> (i,n)) (zip [0..9]  idCounts)
>
> for = flip mapM_
>
> inc tvar = readTVar tvar >>= \n -> writeTVar tvar (n+1)
_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe