Quantcast

How to make asynchronous I/O composable and safe?

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

How to make asynchronous I/O composable and safe?

Joey Adams
I'm not happy with asynchronous I/O in Haskell.  It's hard to reason
about, and doesn't compose well.  At least in my code.

I'm currently trying to build a networking layer for my application
using Network.TLS.  Here is a rather minimalist API:

   newtype Connection = Connection (TLSCtx Handle)

   connectClient :: Handle         -- ^ Connection handle, as returned
by 'connectTo'
                 -> X509           -- ^ TLS certificate (i.e. public key)
                 -> IO Connection

   connectServer :: Handle         -- ^ Connection handle, as returned
by 'accept'
                 -> X509           -- ^ TLS certificate (i.e. public key)
                 -> TLS.PrivateKey -- ^ TLS private key
                 -> IO Connection

   close :: Connection -> IO ()

   sendMessage :: Connection -> Message -> IO ()

   recvMessage :: Connection -> ByteString -> IO (Message, ByteString)

The module provides little more than connection initialization and
message serialization.  I don't try to use locks or STM to multiplex
the connection or, in the case of recvMessage, hide connection state.
I just be sure to only use sendMessage in one thread at a time, only
use recvMessage in one thread at a time, and marshal the "extra bytes"
parameter of recvMessage from call to call (with the help of StateT).

I wrote a simple "chat server" to test it.  The client turned out okay:

   main :: IO ()
   main = do
       cert <- getCertificate
       handle <- connectTo "localhost" (PortNumber 1337)
       conn <- connectClient handle cert
       _ <- forkIO $ forever $ do
           s <- getLine
           sendMessage conn $ TestMessage s
       forever $ flip runStateT B.empty $ do
           msg <- StateT $ recvMessage conn
           case msg of
               TestMessage s ->
                   liftIO $ putStrLn s
               _ ->
                   liftIO $ hPrintf stderr
                       "Warning: unrecognized message from server: %s\n"
                       (messageTypeName msg)

The only glaring problem is that, if the user presses Ctrl+D, the
forked (sending) thread dies, but the main (receiving) thread lingers.
 I'd have to add exception handlers to ensure that when one thread
dies, the other thread dies too.

However, the server is an abomination (see attachment).

Unfortunately, it's not as simple as "spawn one thread per client".
We need at least two threads, one to listen for messages from the
client, and another to send messages to the client.  GHC won't let us
simultaneously, in the same thread, wait for input from a connection
and wait for an STM transaction to succeed.

Another source of complexity is: what if we throw an exception at a
thread while it is in the middle of sending a packet?  Then we can't
shut down the connection properly (i.e. Network.TLS.bye), because the
receiver might think the close_notify packet is part of the
interrupted packet.

Having a thread for each client is good, as it:

 * Lets us think about each client separately.  No need to turn our
code inside out or write one big loop that juggles all the clients.

 * Isolates exceptions.  If sendMessage or recvMessage throws an
exception, it doesn't bring the whole server down.

On the other hand, having multiple threads interact with a single
client is hard to think about:

 * We have to synchronize the threads (e.g. when one dies, kill the other one)

 * Multiple places where an exception can arise

 * Can multiple threads interact with the connection handle simultaneously?

So why don't I make my connection API handle some of this?  Well, I
tried.  There are so many ways to do it, and I couldn't find a way
that simplified usage much.  The approach used by Handle and by
Network.TLS is to use MVars and IORefs to ensure that, if two threads
access the same connection, the connection doesn't become totally
corrupt.  If I do the same, then I'll have *three* layers of locking
under the hood.

Worse, the locking done by Handle and Network.TLS doesn't guarantee
much.  I don't know if it's safe to have one thread sending and
another thread receiving.  Especially in the case of Network.TLS,
where 'recvData' automatically handshakes in some cases, which sends
packets.  Since I don't know how much thread safety to expect, I can't
write networking code and know for sure that it is safe.

I'm certainly not protected from interleaved data if multiple threads
send on the same handle.  For example:

    import Control.Concurrent
    import System.IO

    main :: IO ()
    main = do
        hSetBuffering stdout NoBuffering
        _ <- forkIO $ putStrLn "One sentence."
        putStrLn "Another sentence."

produces:

    AnOonteh esre nsteenntceen.c
    e.

That is, I can't rely on putStrLn being "atomic".  To produce
intelligible output (without changing the buffering mode), I have to
"lock" the output each time I write something.  putStrLn doesn't do it
for me.

=== Summary ===

In Haskell, sound logic and a great type system lead to elegant,
composable code in a variety of domains, such as:

 * Expression evaluation
 * Parsing
 * Concurrent programming (thanks to STM)

Asynchronous I/O is tricky.  However, Haskell currently does little to
alleviate the complexity (at least for me).

How can we structure network protocol APIs so that they stack well
(e.g. only lock once, rather than locking each layer's connection
state)?  How can we deal with I/O errors without having to think about
them at every turn?

For now, how can I structure my application's communication API so
it's less messy to use?

Thanks,
- Joey

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe

chat-server.hs (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Bardur Arantsson-2
On 01/14/2012 06:24 AM, Joey Adams wrote:
> I'm not happy with asynchronous I/O in Haskell.  It's hard to reason
> about, and doesn't compose well.  At least in my code.
>
[--snip--]

Async I/O *is* tricky if you're expecting threads to do their own
writes/reads directly to/from sockets. I find that using a
message-passing approach for communication makes this much easier.

If you need multiple server threads to respond to the same client
(socket) then the easiest approach might be to simply use a (Chan a) for
output. Since you always put full messages to the Chan, exceptions cause
no problems with respect to partial messages, etc.

You can also use a Chan for forwarding messages from the client socket
to the appropriate server threads -- if you need several (or even all)
threads to receive messages from the client you can use "dupChan" on the
"input-from-client" channel you pass to the server threads.

So, the API becomes something like:

    runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO ()

where the first parameter contains the "client logic" and "A" is the
type of the messages from the client and "B" is the type of the messages
which are sent back to the client.

Hope this helps,



_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Joey Adams
On Sat, Jan 14, 2012 at 1:29 AM, Bardur Arantsson <[hidden email]> wrote:
> So, the API becomes something like:
>
>   runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO ()
>
> where the first parameter contains the "client logic" and "A" is the type of
> the messages from the client and "B" is the type of the messages which are
> sent back to the client.

Thanks, that's a good idea.  Even if I only plan to receive in one
thread, placing the messages in a Chan or TChan helps separate my
application thread from the complexities of connection management.

Is there something on Hackage that will do this for me?  Or will I
need to roll my own?  Namely, convert a network connection to a pair
of channels, and close the connection automatically.  Something like
this:

    -- | Spawn two threads, one which populates the first channel with messages
    -- from the other host, and another which reads the second channel and sends
    -- its messages to the other host.
    --
    -- Run the given computation, passing it these channels.  When the
computation
    -- completes (or throws an exception), sending and receiving will
stop, and the
    -- connection will be closed.
    --
    -- If either the receiving thread or sending thread encounter an exception,
    -- sending and receiving will stop, and an asynchronous exception will be
    -- thrown to your thread.
    channelize :: IO msg_in             -- ^ Receive callback
                   -> (msg_out -> IO ()     -- ^ Send callback
                   -> IO ()                 -- ^ Close callback
                   -> (TChan msg_in -> TChan msg_out -> IO a)
                                            -- ^ Inner computation
                   -> IO a

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Daniel Waterworth
I've been trying to write networking code in Haskell too. I've also
come to the conclusion that channels are the way to go. However,
what's missing in the standard `Chan` type, which is essential for my
use-case, is the ability to do the equivalent of the unix select call.
My other slight qualm is that the type doesn't express the direction
of data (though this is easy to add afterwards).

I know of the chp package, but in order to learn how it worked, I
spent a day writing my own version. I've kept the API similar to that
of the standard Chan's. If it would be useful to you as well, I'll
happily open source it sooner rather than later,

Daniel

p.s I'd avoid the TChan for networking code as reading from a TChan is
a busy operation. [1]

[1] http://hackage.haskell.org/packages/archive/stm/2.2.0.1/doc/html/src/Control-Concurrent-STM-TChan.html#readTChan

On 14 January 2012 10:42, Joey Adams <[hidden email]> wrote:

> On Sat, Jan 14, 2012 at 1:29 AM, Bardur Arantsson <[hidden email]> wrote:
>> So, the API becomes something like:
>>
>>   runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO ()
>>
>> where the first parameter contains the "client logic" and "A" is the type of
>> the messages from the client and "B" is the type of the messages which are
>> sent back to the client.
>
> Thanks, that's a good idea.  Even if I only plan to receive in one
> thread, placing the messages in a Chan or TChan helps separate my
> application thread from the complexities of connection management.
>
> Is there something on Hackage that will do this for me?  Or will I
> need to roll my own?  Namely, convert a network connection to a pair
> of channels, and close the connection automatically.  Something like
> this:
>
>    -- | Spawn two threads, one which populates the first channel with messages
>    -- from the other host, and another which reads the second channel and sends
>    -- its messages to the other host.
>    --
>    -- Run the given computation, passing it these channels.  When the
> computation
>    -- completes (or throws an exception), sending and receiving will
> stop, and the
>    -- connection will be closed.
>    --
>    -- If either the receiving thread or sending thread encounter an exception,
>    -- sending and receiving will stop, and an asynchronous exception will be
>    -- thrown to your thread.
>    channelize :: IO msg_in             -- ^ Receive callback
>                   -> (msg_out -> IO ()     -- ^ Send callback
>                   -> IO ()                 -- ^ Close callback
>                   -> (TChan msg_in -> TChan msg_out -> IO a)
>                                            -- ^ Inner computation
>                   -> IO a
>
> _______________________________________________
> Haskell-Cafe mailing list
> [hidden email]
> http://www.haskell.org/mailman/listinfo/haskell-cafe

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Daniel Waterworth
Disregard that last comment on `TChan`s; retry blocks. you learn a new
thing every day [=

Daniel

On 14 January 2012 11:27, Daniel Waterworth <[hidden email]> wrote:

> I've been trying to write networking code in Haskell too. I've also
> come to the conclusion that channels are the way to go. However,
> what's missing in the standard `Chan` type, which is essential for my
> use-case, is the ability to do the equivalent of the unix select call.
> My other slight qualm is that the type doesn't express the direction
> of data (though this is easy to add afterwards).
>
> I know of the chp package, but in order to learn how it worked, I
> spent a day writing my own version. I've kept the API similar to that
> of the standard Chan's. If it would be useful to you as well, I'll
> happily open source it sooner rather than later,
>
> Daniel
>
> p.s I'd avoid the TChan for networking code as reading from a TChan is
> a busy operation. [1]
>
> [1] http://hackage.haskell.org/packages/archive/stm/2.2.0.1/doc/html/src/Control-Concurrent-STM-TChan.html#readTChan
>
> On 14 January 2012 10:42, Joey Adams <[hidden email]> wrote:
>> On Sat, Jan 14, 2012 at 1:29 AM, Bardur Arantsson <[hidden email]> wrote:
>>> So, the API becomes something like:
>>>
>>>   runSocketServer :: ((Chan a, Chan b) -> IO ()) -> ... -> IO ()
>>>
>>> where the first parameter contains the "client logic" and "A" is the type of
>>> the messages from the client and "B" is the type of the messages which are
>>> sent back to the client.
>>
>> Thanks, that's a good idea.  Even if I only plan to receive in one
>> thread, placing the messages in a Chan or TChan helps separate my
>> application thread from the complexities of connection management.
>>
>> Is there something on Hackage that will do this for me?  Or will I
>> need to roll my own?  Namely, convert a network connection to a pair
>> of channels, and close the connection automatically.  Something like
>> this:
>>
>>    -- | Spawn two threads, one which populates the first channel with messages
>>    -- from the other host, and another which reads the second channel and sends
>>    -- its messages to the other host.
>>    --
>>    -- Run the given computation, passing it these channels.  When the
>> computation
>>    -- completes (or throws an exception), sending and receiving will
>> stop, and the
>>    -- connection will be closed.
>>    --
>>    -- If either the receiving thread or sending thread encounter an exception,
>>    -- sending and receiving will stop, and an asynchronous exception will be
>>    -- thrown to your thread.
>>    channelize :: IO msg_in             -- ^ Receive callback
>>                   -> (msg_out -> IO ()     -- ^ Send callback
>>                   -> IO ()                 -- ^ Close callback
>>                   -> (TChan msg_in -> TChan msg_out -> IO a)
>>                                            -- ^ Inner computation
>>                   -> IO a
>>
>> _______________________________________________
>> Haskell-Cafe mailing list
>> [hidden email]
>> http://www.haskell.org/mailman/listinfo/haskell-cafe

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Peter Simons
In reply to this post by Bardur Arantsson-2
Hi guys,

 >> I'm not happy with asynchronous I/O in Haskell.  It's hard to reason
 >> about, and doesn't compose well.
 >
 > Async I/O *is* tricky if you're expecting threads to do their own
 > writes/reads directly to/from sockets. I find that using a
 > message-passing approach for communication makes this much easier.

yes, that is true. I've always felt that spreading IO code all over the
software is a choice that makes the programmers live unnecessarily hard.
The (IMHO superior) alternative is to have one central IO loop that
generates buffers of input, passes them to callback a function, and
receives buffers of output in response.

I have attached a short module that implements the following function:

  type ByteCount        = Word16
  type Capacity         = Word16
  data Buffer           = Buf !Capacity !(Ptr Word8) !ByteCount
  type BlockHandler st  = Buffer -> st -> IO (Buffer, st)

  runLoop :: ReadHandle -> Capacity -> BlockHandler st -> st -> IO st

That setup is ideal for implementing streaming services, where there is
only one connection on which some kind of dialog between client/server
takes place, i.e. an HTTP server.

Programs like Bittorrent, on the other hand, are much harder to design,
because there's a great number of seemingly individual I/O contexts
(i.e. the machine is talking to hundreds, or even thousands of other
machines), but all those communications need to be coordinated in one
way or another.

A solution for that problem invariably ends up looking like a massive
finite state machine, which is somewhat unpleasant.

Take care,
Peter




{-# LANGUAGE DeriveDataTypeable #-}
{- |
   Module      :  BlockIO
   License     :  BSD3

   Maintainer  :  [hidden email]
   Stability   :  provisional
   Portability :  DeriveDataTypeable

   'runLoop' drives a 'BlockHandler' with data read from the
   input stream until 'hIsEOF' ensues. Everything else has
   to be done by the callback; runLoop just does the I\/O.
   But it does it /fast/.
-}

module BlockIO where

import Prelude hiding ( catch, rem )
import Control.Exception
import Control.Monad.State
import Data.List
import Data.Typeable
import System.IO
import System.IO.Error hiding ( catch )
import Foreign  hiding ( new )
import System.Timeout

-- * Static Buffer I\/O

type ReadHandle  = Handle
type WriteHandle = Handle

type ByteCount = Word16
type Capacity  = Word16
data Buffer    = Buf !Capacity !(Ptr Word8) !ByteCount
                 deriving (Eq, Show, Typeable)

-- |Run the given computation with an initialized, empty
-- 'Buffer'. The buffer is gone when the computation
-- returns.

withBuffer :: Capacity -> (Buffer -> IO a) -> IO a
withBuffer 0 = fail "BlockIO.withBuffer with size 0 doesn't make sense"
withBuffer n = bracket cons dest
  where
  cons = mallocArray (fromIntegral n) >>= \p -> return (Buf n p 0)
  dest (Buf _ p _) = free p

-- |Drop the first @n <= size@ octets from the buffer.

flush :: ByteCount -> Buffer -> IO Buffer
flush 0 buf               = return buf
flush n (Buf cap ptr len) = assert (n <= len) $ do
  let ptr' = ptr `plusPtr` fromIntegral n
      len' = fromIntegral len - fromIntegral n
  when (len' > 0) (copyArray ptr ptr' len')
  return (Buf cap ptr (fromIntegral len'))

type Timeout = Int

-- |If there is space, read and append more octets; then
-- return the modified buffer. In case of 'hIsEOF',
-- 'Nothing' is returned. If the buffer is full already,
-- 'throwDyn' a 'BufferOverflow' exception. When the timeout
-- exceeds, 'ReadTimeout' is thrown.

slurp :: Timeout -> ReadHandle -> Buffer -> IO (Maybe Buffer)
slurp to h b@(Buf cap ptr len) = do
  when (cap <= len) (throw (BufferOverflow h b))
  timeout to (handleEOF wrap) >>=
    maybe (throw (ReadTimeout to h b)) return
  where
  wrap = do let ptr' = ptr `plusPtr` fromIntegral len
                n    = cap - len
            rc <- hGetBufNonBlocking h ptr' (fromIntegral n)
            if rc > 0
               then return (Buf cap ptr (len + fromIntegral rc))
               else hWaitForInput h (-1) >> wrap

-- * BlockHandler and I\/O Driver

-- |A callback function suitable for use with 'runLoop'
-- takes a buffer and a state, then returns a modified
-- buffer and a modified state. Usually the callback will
-- use 'slurp' to remove data it has processed already.

type BlockHandler st = Buffer -> st -> IO (Buffer, st)

type ExceptionHandler st e = e -> st -> IO st

-- |Our main I\/O driver.

runLoopNB
  :: (st -> Timeout)            -- ^ user state provides timeout
  -> (SomeException -> st -> IO st)   -- ^ user provides I\/O error handler
  -> ReadHandle                 -- ^ the input source
  -> Capacity                   -- ^ I\/O buffer size
  -> BlockHandler st            -- ^ callback
  -> st                         -- ^ initial callback state
  -> IO st                      -- ^ return final callback state
runLoopNB mkTO errH hIn cap f initST = withBuffer cap (`ioloop` initST)
  where
  ioloop buf st = buf `seq` st `seq`
    handle (`errH` st) $ do
      rc <- slurp (mkTO st) hIn buf
      case rc of
        Nothing   -> return st
        Just buf' -> f buf' st >>= uncurry ioloop

-- |A variant which won't time out and will just 'throw' all
-- exceptions.

runLoop :: ReadHandle -> Capacity -> BlockHandler st -> st -> IO st
runLoop = runLoopNB (const (-1)) (\e _ -> throw e)

-- * Handler Combinators

-- |Signal how many bytes have been consumed from the
-- /front/ of the list; these octets will be dropped.

type StreamHandler st = [Word8] -> st -> IO (ByteCount, st)

handleStream :: StreamHandler st -> BlockHandler st
handleStream f buf@(Buf _ ptr len) st = do
  (i, st') <- peekArray (fromIntegral len) ptr >>= flip f st
  buf' <- flush i buf
  return (buf', st')

-- * I\/O Exceptions

-- |Thrown by 'slurp'.

data BufferOverflow = BufferOverflow ReadHandle Buffer
                    deriving (Show, Typeable)

instance Exception BufferOverflow where

-- |Thrown by 'slurp'.

data ReadTimeout    = ReadTimeout Timeout ReadHandle Buffer
                    deriving (Show, Typeable)

instance Exception ReadTimeout where

-- * Internal Helper Functions

-- |Return 'Nothing' if the given computation throws an
-- 'isEOFError' exception. Used by 'slurp'.

handleEOF :: IO a -> IO (Maybe a)
handleEOF f =
  catchJust fromException
    (fmap Just f)
    (\e -> if isEOFError e then return Nothing else ioError e)

-- |Our version of C's @strstr(3)@.

strstr :: [Word8] -> [Word8] -> Maybe Int
strstr tok = strstr' 0
  where
  strstr'  _     []       = Nothing
  strstr' pos ls@(_:xs)
    | tok `isPrefixOf` ls = Just (pos + length tok)
    | otherwise           = strstr' (pos + 1) xs

-- |Split a list by some delimiter. Will soon be provided by
-- "Data.List".

splitList :: Eq a => [a] -> [a] -> [[a]]
splitList d' l' =
  unfoldr (\x -> if null x then Nothing else Just $ nextToken d' [] (snd $ splitAt (length d') x)) (d'++l')
  where nextToken _ r [] = (r, [])
        nextToken d r l@(h:t) | d `isPrefixOf` l = (r, l)
                              | otherwise = nextToken d (r++[h]) t

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Bardur Arantsson-2
In reply to this post by Joey Adams
On 01/14/2012 11:42 AM, Joey Adams wrote:

> On Sat, Jan 14, 2012 at 1:29 AM, Bardur Arantsson<[hidden email]>  wrote:
>> So, the API becomes something like:
>>
>>    runSocketServer :: ((Chan a, Chan b) ->  IO ()) ->  ... ->  IO ()
>>
>> where the first parameter contains the "client logic" and "A" is the type of
>> the messages from the client and "B" is the type of the messages which are
>> sent back to the client.
>
> Thanks, that's a good idea.  Even if I only plan to receive in one
> thread, placing the messages in a Chan or TChan helps separate my
> application thread from the complexities of connection management.
>

Unless TCP is an absolute requirement, something like 0MQ[1,2] may be
worth investigating.

It handles all the nasty details and you get a simple message-based
interface with lots of nice things like pub-sub, request-reply, etc. etc.

[1] http://hackage.haskell.org/package/zeromq-haskell-0.8.2
[2] http://www.zeromq.org/



_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Peter Simons
In reply to this post by Daniel Waterworth
Hi Daniel,

 > I've been trying to write networking code in Haskell too. I've also
 > come to the conclusion that channels are the way to go.

isn't a tuple of input/output channels essentially the same as a stream
processor arrow? I found the example discussed in the "arrow paper" [1]
very enlightening in that regard. There also is a Haskell module that
extends the SP type to support monadic IO at [2].

Take care,
Peter


[1] http://www.ittc.ku.edu/Projects/SLDG/filing_cabinet/Hughes_Generalizing_Monads_to_Arrows.pdf
[2] http://hackage.haskell.org/package/streamproc


_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Daniel Waterworth
Hi Peter,

streamproc is a very interesting package, I'll surely use it somewhere
in the future. However, I'm not convinced that this solves my
immediate problem, but perhaps this is due to my inexperience with
arrows. My problem is:

I have a number of network connections and I have a system that does
things. I want the network connections to interact with the system. I
also want the system to be able to interact with the network
connections by way of a pub/sub style message bus.

The only way I can see stream processors working in this scenario is
if all of the events of the system are handled in a single thread. The
events are then pushed into the stream processor and actions are
pulled out. This isn't acceptable because the amount of logic in the
stream processor will be fairly small for my problem in comparison
with the logic that is required to mux/demux events/actions onto
sockets. It's also a problem that there's a single threaded
bottleneck.

Daniel

On 14 January 2012 11:58, Peter Simons <[hidden email]> wrote:

> Hi Daniel,
>
>  > I've been trying to write networking code in Haskell too. I've also
>  > come to the conclusion that channels are the way to go.
>
> isn't a tuple of input/output channels essentially the same as a stream
> processor arrow? I found the example discussed in the "arrow paper" [1]
> very enlightening in that regard. There also is a Haskell module that
> extends the SP type to support monadic IO at [2].
>
> Take care,
> Peter
>
>
> [1] http://www.ittc.ku.edu/Projects/SLDG/filing_cabinet/Hughes_Generalizing_Monads_to_Arrows.pdf
> [2] http://hackage.haskell.org/package/streamproc
>
>
> _______________________________________________
> Haskell-Cafe mailing list
> [hidden email]
> http://www.haskell.org/mailman/listinfo/haskell-cafe

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

wren ng thornton
In reply to this post by Daniel Waterworth
On 1/14/12 6:27 AM, Daniel Waterworth wrote:
> p.s I'd avoid the TChan for networking code as reading from a TChan is
> a busy operation. [1]
>
> [1] http://hackage.haskell.org/packages/archive/stm/2.2.0.1/doc/html/src/Control-Concurrent-STM-TChan.html#readTChan


The `retry`-ness will be rectified whenever the new version of stm is
pushed out[1], which includes tryReadTChan for one-shot use. Until then,
you can use the version of tryReadTChan in stm-chans[2] which provides
the same operation, though less optimized since it's not behind the API
wall. Once I learn the version number of when the optimized variants
will be released, the stm-chans version will use CPP to properly select
between the new version vs the backport, so you can rely on stm-chans to
provide a compatibility layer for those operations.


[1] http://www.haskell.org/pipermail/cvs-libraries/2011-April/012914.html

[2]
http://hackage.haskell.org/packages/archive/stm-chans/1.1.0/doc/html/src/Control-Concurrent-STM-TChan-Compat.html

--
Live well,
~wren

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

David Barbour
In reply to this post by Joey Adams
I favor a wait-free concurrency model based on the `vat` from E language. Vats can be modeled very easily in Haskell, and in many other languages. I currently use such a vat model for my Haskell projects. I describe aspects of it at a few places:


Unfortunately, my code isn't generic. The vats I've implemented are specialized (i.e. via extra stages and queues) primarily for efficient processing of concurrent reactive dataflows.

Advantages of Vats:
* wait-free asynchronous IO, hence deadlock and starvation free
* first-class methods that help in many ways:
** distribute the dispatch burden (no need for a `main` switch statement)
** extensible (easy to add new methods without adjusting central code)
** transparent parallelization (calls may transparently invoke local or remote methods)
** securable (control distribution and parameter-types of methods)
** easily model code distribution (e.g. create a method with monad action parameters)
* simple state - variables local to each vat may be shared between methods
* coarse-grained `islands of consistency` are easy to reason about
* implicit batching between vats improves consistency AND efficiency
* easy to express incremental computation (as sequence of method calls)
* clean coarse-grained interaction with data-parallelism (e.g. spark parameters)

Technically, the vat consistency model is not `composable`. Instead, it works well based on coarse granularity and the natural limits of local reasoning - i.e. actual use-cases only interact with one or two other vats before they extend far enough that developers simply design assuming arbitrary ordering and potential interference. Developers can build ad-hoc consistency models atop vats easily enough. I use a temporal consistency model atop vats, which is composable, but is feasible for my reactive dataflow model primarily due to its updates being commutative. 

That said, one should be careful about asserting transactions are composable. Transactions don't scale well as they compose, having greater opportunity for conflict, rework, starvation, priority inversion. Transactions also don't interact well with the real-world IO, such as continuous sensor streams or actuators. Despite the non-composability of vat semantics, they at least scale better than transactions. cf. http://awelonblue.wordpress.com/2011/07/05/transaction-tribulation/ 

The recent work on Cloud Haskell takes a similar inspiration from E's vats. However, the focus of cloud haskell is different and consequently there are a lot of subtle but important differences between cloud haskell processes and my use of vats, e.g. regarding process identifiers, serializability requirements, etc.

Regards,

Dave


On Fri, Jan 13, 2012 at 9:24 PM, Joey Adams <[hidden email]> wrote:
In Haskell, sound logic and a great type system lead to elegant,
composable code in a variety of domains, such as:

 * Expression evaluation
 * Parsing
 * Concurrent programming (thanks to STM)

Asynchronous I/O is tricky.  However, Haskell currently does little to
alleviate the complexity (at least for me).

How can we structure network protocol APIs so that they stack well
(e.g. only lock once, rather than locking each layer's connection
state)?  How can we deal with I/O errors without having to think about
them at every turn?

For now, how can I structure my application's communication API so
it's less messy to use?

Thanks,
- Joey

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe



_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Simon Marlow-7
In reply to this post by Joey Adams
This is an interesting problem, I think I might incorporate parts of it
into the next revision of my Concurrent Haskell tutorial.

It sounds like you're getting overwhelmed by several different problems,
and dealing with them separately would probably help.  e.g. you want
some infrastructure to run two threads and send an exception to one
whenever the other one dies.  You also want to be able to avoid a thread
being interrupted while performing an operation that should be atomic,
like sending a message - this is slightly tricky, because there's a
tradeoff between keeping the thread responsive and not interrupting an
operation.  The biggest hammer is maskUninterruptible, which can be used
if all else fails.

Whether Network.TLS supports simultaneous read and write I don't know,
but you can examine the code or talk to the maintainer.  If it doesn't,
adding a layer of locking is straightforward, and doesn't increase
overall complexity (it's localised).

Cheers,
        Simon

On 14/01/2012 05:24, Joey Adams wrote:

> I'm not happy with asynchronous I/O in Haskell.  It's hard to reason
> about, and doesn't compose well.  At least in my code.
>
> I'm currently trying to build a networking layer for my application
> using Network.TLS.  Here is a rather minimalist API:
>
>     newtype Connection = Connection (TLSCtx Handle)
>
>     connectClient :: Handle         -- ^ Connection handle, as returned
> by 'connectTo'
>                   ->  X509           -- ^ TLS certificate (i.e. public key)
>                   ->  IO Connection
>
>     connectServer :: Handle         -- ^ Connection handle, as returned
> by 'accept'
>                   ->  X509           -- ^ TLS certificate (i.e. public key)
>                   ->  TLS.PrivateKey -- ^ TLS private key
>                   ->  IO Connection
>
>     close :: Connection ->  IO ()
>
>     sendMessage :: Connection ->  Message ->  IO ()
>
>     recvMessage :: Connection ->  ByteString ->  IO (Message, ByteString)
>
> The module provides little more than connection initialization and
> message serialization.  I don't try to use locks or STM to multiplex
> the connection or, in the case of recvMessage, hide connection state.
> I just be sure to only use sendMessage in one thread at a time, only
> use recvMessage in one thread at a time, and marshal the "extra bytes"
> parameter of recvMessage from call to call (with the help of StateT).
>
> I wrote a simple "chat server" to test it.  The client turned out okay:
>
>     main :: IO ()
>     main = do
>         cert<- getCertificate
>         handle<- connectTo "localhost" (PortNumber 1337)
>         conn<- connectClient handle cert
>         _<- forkIO $ forever $ do
>             s<- getLine
>             sendMessage conn $ TestMessage s
>         forever $ flip runStateT B.empty $ do
>             msg<- StateT $ recvMessage conn
>             case msg of
>                 TestMessage s ->
>                     liftIO $ putStrLn s
>                 _ ->
>                     liftIO $ hPrintf stderr
>                         "Warning: unrecognized message from server: %s\n"
>                         (messageTypeName msg)
>
> The only glaring problem is that, if the user presses Ctrl+D, the
> forked (sending) thread dies, but the main (receiving) thread lingers.
>   I'd have to add exception handlers to ensure that when one thread
> dies, the other thread dies too.
>
> However, the server is an abomination (see attachment).
>
> Unfortunately, it's not as simple as "spawn one thread per client".
> We need at least two threads, one to listen for messages from the
> client, and another to send messages to the client.  GHC won't let us
> simultaneously, in the same thread, wait for input from a connection
> and wait for an STM transaction to succeed.
>
> Another source of complexity is: what if we throw an exception at a
> thread while it is in the middle of sending a packet?  Then we can't
> shut down the connection properly (i.e. Network.TLS.bye), because the
> receiver might think the close_notify packet is part of the
> interrupted packet.
>
> Having a thread for each client is good, as it:
>
>   * Lets us think about each client separately.  No need to turn our
> code inside out or write one big loop that juggles all the clients.
>
>   * Isolates exceptions.  If sendMessage or recvMessage throws an
> exception, it doesn't bring the whole server down.
>
> On the other hand, having multiple threads interact with a single
> client is hard to think about:
>
>   * We have to synchronize the threads (e.g. when one dies, kill the other one)
>
>   * Multiple places where an exception can arise
>
>   * Can multiple threads interact with the connection handle simultaneously?
>
> So why don't I make my connection API handle some of this?  Well, I
> tried.  There are so many ways to do it, and I couldn't find a way
> that simplified usage much.  The approach used by Handle and by
> Network.TLS is to use MVars and IORefs to ensure that, if two threads
> access the same connection, the connection doesn't become totally
> corrupt.  If I do the same, then I'll have *three* layers of locking
> under the hood.
>
> Worse, the locking done by Handle and Network.TLS doesn't guarantee
> much.  I don't know if it's safe to have one thread sending and
> another thread receiving.  Especially in the case of Network.TLS,
> where 'recvData' automatically handshakes in some cases, which sends
> packets.  Since I don't know how much thread safety to expect, I can't
> write networking code and know for sure that it is safe.
>
> I'm certainly not protected from interleaved data if multiple threads
> send on the same handle.  For example:
>
>      import Control.Concurrent
>      import System.IO
>
>      main :: IO ()
>      main = do
>          hSetBuffering stdout NoBuffering
>          _<- forkIO $ putStrLn "One sentence."
>          putStrLn "Another sentence."
>
> produces:
>
>      AnOonteh esre nsteenntceen.c
>      e.
>
> That is, I can't rely on putStrLn being "atomic".  To produce
> intelligible output (without changing the buffering mode), I have to
> "lock" the output each time I write something.  putStrLn doesn't do it
> for me.
>
> === Summary ===
>
> In Haskell, sound logic and a great type system lead to elegant,
> composable code in a variety of domains, such as:
>
>   * Expression evaluation
>   * Parsing
>   * Concurrent programming (thanks to STM)
>
> Asynchronous I/O is tricky.  However, Haskell currently does little to
> alleviate the complexity (at least for me).
>
> How can we structure network protocol APIs so that they stack well
> (e.g. only lock once, rather than locking each layer's connection
> state)?  How can we deal with I/O errors without having to think about
> them at every turn?
>
> For now, how can I structure my application's communication API so
> it's less messy to use?
>
> Thanks,
> - Joey
>
>
>
> _______________________________________________
> Haskell-Cafe mailing list
> [hidden email]
> http://www.haskell.org/mailman/listinfo/haskell-cafe


_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

David Barbour
I'd say use of asynchronous exceptions should be a last resort. Developers should be encouraged to explicitly model any event notification system they use.

Regards,

Dave

On Tue, Jan 17, 2012 at 1:42 AM, Simon Marlow <[hidden email]> wrote:
This is an interesting problem, I think I might incorporate parts of it into the next revision of my Concurrent Haskell tutorial.

It sounds like you're getting overwhelmed by several different problems, and dealing with them separately would probably help.  e.g. you want some infrastructure to run two threads and send an exception to one whenever the other one dies.  You also want to be able to avoid a thread being interrupted while performing an operation that should be atomic, like sending a message - this is slightly tricky, because there's a tradeoff between keeping the thread responsive and not interrupting an operation.  The biggest hammer is maskUninterruptible, which can be used if all else fails.

Whether Network.TLS supports simultaneous read and write I don't know, but you can examine the code or talk to the maintainer.  If it doesn't, adding a layer of locking is straightforward, and doesn't increase overall complexity (it's localised).

Cheers,
       Simon


On 14/01/2012 05:24, Joey Adams wrote:
I'm not happy with asynchronous I/O in Haskell.  It's hard to reason
about, and doesn't compose well.  At least in my code.

I'm currently trying to build a networking layer for my application
using Network.TLS.  Here is a rather minimalist API:

   newtype Connection = Connection (TLSCtx Handle)

   connectClient :: Handle         -- ^ Connection handle, as returned
by 'connectTo'
                 ->  X509           -- ^ TLS certificate (i.e. public key)
                 ->  IO Connection

   connectServer :: Handle         -- ^ Connection handle, as returned
by 'accept'
                 ->  X509           -- ^ TLS certificate (i.e. public key)
                 ->  TLS.PrivateKey -- ^ TLS private key
                 ->  IO Connection

   close :: Connection ->  IO ()

   sendMessage :: Connection ->  Message ->  IO ()

   recvMessage :: Connection ->  ByteString ->  IO (Message, ByteString)

The module provides little more than connection initialization and
message serialization.  I don't try to use locks or STM to multiplex
the connection or, in the case of recvMessage, hide connection state.
I just be sure to only use sendMessage in one thread at a time, only
use recvMessage in one thread at a time, and marshal the "extra bytes"
parameter of recvMessage from call to call (with the help of StateT).

I wrote a simple "chat server" to test it.  The client turned out okay:

   main :: IO ()
   main = do
       cert<- getCertificate
       handle<- connectTo "localhost" (PortNumber 1337)
       conn<- connectClient handle cert
       _<- forkIO $ forever $ do
           s<- getLine
           sendMessage conn $ TestMessage s
       forever $ flip runStateT B.empty $ do
           msg<- StateT $ recvMessage conn
           case msg of
               TestMessage s ->
                   liftIO $ putStrLn s
               _ ->
                   liftIO $ hPrintf stderr
                       "Warning: unrecognized message from server: %s\n"
                       (messageTypeName msg)

The only glaring problem is that, if the user presses Ctrl+D, the
forked (sending) thread dies, but the main (receiving) thread lingers.
 I'd have to add exception handlers to ensure that when one thread
dies, the other thread dies too.

However, the server is an abomination (see attachment).

Unfortunately, it's not as simple as "spawn one thread per client".
We need at least two threads, one to listen for messages from the
client, and another to send messages to the client.  GHC won't let us
simultaneously, in the same thread, wait for input from a connection
and wait for an STM transaction to succeed.

Another source of complexity is: what if we throw an exception at a
thread while it is in the middle of sending a packet?  Then we can't
shut down the connection properly (i.e. Network.TLS.bye), because the
receiver might think the close_notify packet is part of the
interrupted packet.

Having a thread for each client is good, as it:

 * Lets us think about each client separately.  No need to turn our
code inside out or write one big loop that juggles all the clients.

 * Isolates exceptions.  If sendMessage or recvMessage throws an
exception, it doesn't bring the whole server down.

On the other hand, having multiple threads interact with a single
client is hard to think about:

 * We have to synchronize the threads (e.g. when one dies, kill the other one)

 * Multiple places where an exception can arise

 * Can multiple threads interact with the connection handle simultaneously?

So why don't I make my connection API handle some of this?  Well, I
tried.  There are so many ways to do it, and I couldn't find a way
that simplified usage much.  The approach used by Handle and by
Network.TLS is to use MVars and IORefs to ensure that, if two threads
access the same connection, the connection doesn't become totally
corrupt.  If I do the same, then I'll have *three* layers of locking
under the hood.

Worse, the locking done by Handle and Network.TLS doesn't guarantee
much.  I don't know if it's safe to have one thread sending and
another thread receiving.  Especially in the case of Network.TLS,
where 'recvData' automatically handshakes in some cases, which sends
packets.  Since I don't know how much thread safety to expect, I can't
write networking code and know for sure that it is safe.

I'm certainly not protected from interleaved data if multiple threads
send on the same handle.  For example:

    import Control.Concurrent
    import System.IO

    main :: IO ()
    main = do
        hSetBuffering stdout NoBuffering
        _<- forkIO $ putStrLn "One sentence."
        putStrLn "Another sentence."

produces:

    AnOonteh esre nsteenntceen.c
    e.

That is, I can't rely on putStrLn being "atomic".  To produce
intelligible output (without changing the buffering mode), I have to
"lock" the output each time I write something.  putStrLn doesn't do it
for me.

=== Summary ===

In Haskell, sound logic and a great type system lead to elegant,
composable code in a variety of domains, such as:

 * Expression evaluation
 * Parsing
 * Concurrent programming (thanks to STM)

Asynchronous I/O is tricky.  However, Haskell currently does little to
alleviate the complexity (at least for me).

How can we structure network protocol APIs so that they stack well
(e.g. only lock once, rather than locking each layer's connection
state)?  How can we deal with I/O errors without having to think about
them at every turn?

For now, how can I structure my application's communication API so
it's less messy to use?

Thanks,
- Joey



_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe


_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe


_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Joey Adams
On Tue, Jan 17, 2012 at 3:20 PM, David Barbour <[hidden email]> wrote:
> I'd say use of asynchronous exceptions should be a last resort. ...

I agree.  However, network libraries in Haskell (e.g. Handle,
Network.TLS) generally don't provide the primitives needed to do that
on the receiving end.  For example, if a thread is blocked on hGetBuf,
it cannot also wait on a signal telling it to stop.  Since hClose on
the same handle will block until the hGetBuf is done, the only way to
stop reading from the handle is to throw an asynchronous exception at
the hGetBuf thread.

Worse, since there is no threadWaitReadHandle :: Handle -> IO (),
there's no way to guarantee that hGetBuf will not be interrupted in
the middle of receiving a packet.  From an application perspective,
this invalidates subsequent retrievals unless the protocol is
self-synchronizing.

-Joey

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: How to make asynchronous I/O composable and safe?

Joey Adams
In reply to this post by Joey Adams
I uploaded a package that creates an STM layer over a network connection:

    http://hackage.haskell.org/package/stm-channelize

I haven't used it in anger yet, but I hope it's a step in the right
direction.  I included a sample chat client and server.  The client is
pretty cute:

    main =
        let connect = connectTo "localhost" (PortNumber 1234) >>= connectHandle
         in channelize connect      $ \conn ->
            channelize connectStdio $ \stdio ->
                forever $ atomically $
                    (recv conn >>= send stdio) `orElse`
                    (recv stdio >>= send conn)

I use channelize on both the network connection, and on stdin/stdout.

The server is much longer, but shouldn't be terribly confusing.  It
demonstrates kicking out a client without a dangerous asynchronous
exception, something we can't do easily without waiting on
alternatives (i.e. orElse).

-Joey

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Loading...