Blocking IO & FIFOs

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Blocking IO & FIFOs

Jason Dusek
Hi all,

I am developing a coroutine-like interface to Bash.

  http://hpaste.org/76523

The idea is that one can send multiple "queries" to an
interpreter and then collect the results of each query. How do
we know when Bash is done with each query? Waiting for "no more
output" seems ambiguous; so the way CoBash works is:

  * Each query gets a tmp dir with two named pipes in it.
  * The query is wrapped in redirections to the pipes.
  * The pipes are removed when the query completes.

This does work, sort of:

  +Prelude> :load CoBash.hs
  [1 of 1] Compiling CoBash           ( CoBash.hs, interpreted )
  Ok, modules loaded: CoBash.
  *CoBash> tuple@(i,o,e,p) <- start
  e :: Handle
  i :: Handle
  o :: Handle
  p :: ProcessHandle
  tuple :: (Handle, Handle, Handle, ProcessHandle)
  *CoBash> query tuple "for n in {1..4}; do sleep 1; echo $n; done"
  ("1\n2\n3\n4\n","")
  it :: (ByteString, ByteString)

I say sort of because it is quite brittle. Many commands do not
return at all, for example:

  *CoBash> query tuple "uname -a"

The way I retrieve the output from the FIFOs seems dangerous:

  (,) <$> Bytes.hGetContents oh <*> Bytes.hGetContents eh

Surely, the FIFO for STDERR can not be read from until the FIFO
for STDOUT is finished; but if there is a great deal of error
output then the process will fill the FIFO's buffer and get
stuck. If we switch the order of the reads, the "for n in ..."
example above blocks:

  (,) <$> Bytes.hGetContents eh <*> Bytes.hGetContents oh

I have tried a few different ways to read from the two handles
concurrently; for example, by giving each thread an MVar
to put the contents in, or by using hGetNonBlocking on a list of
handles in a loop. Using the latter method, I never get EOF; it
just collects empty strings forever.

For comparison's sake, the expect behaviour with FIFOs is:

  In the first terminal:

   :; mkfifo fifo
   :; cat > fifo
    a
    b
    c
    d
    ^D
   :;

  In the second terminal:

   :; cat < fifo
    a
    b
    c
    d
   :;

Here I "open" the FIFO for reading with < while opening it for
writing with >. As long as the writer writes, the reader reads;
when the writer closes the write end of the pipe, the reader
receives EOF. Trying to duplicate the read behaviour in Haskell,
using hGetContents from GHCi while using cat to write to the
FIFO, doesn't work; which seems a little bogus.

There have a been a few past threads about FIFOs and their
troublesome interaction with Haskell's async-by-default IO
style. To switch to System.Posix for IO -- and deal with Ptr
Word8, in order to handle binary data -- seems like an awful
step down.

--
Jason Dusek
pgp // solidsnack // C1EBC57DC55144F35460C8DF1FD4C6C1FED18A2B




{-# LANGUAGE OverloadedStrings
           , ScopedTypeVariables
           , ParallelListComp
           , TupleSections #-}

module CoBash where

import           Control.Applicative
import           Control.Concurrent
import           Control.Concurrent.MVar
import           Control.Exception
import           Control.Monad
import           Data.Bits
import           Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as Bytes
import           Data.Maybe
import           Data.Monoid
import qualified GHC.IO.Handle.FD
import           System.IO
import           System.IO.Error
import           System.Process
import           System.Posix.ByteString

import           System.IO.Temp

import qualified Text.ShellEscape as Esc


start :: IO (Handle, Handle, Handle, ProcessHandle)
start = runInteractiveProcess "bash" [] Nothing (Just [])

query :: (Handle, Handle, Handle, ProcessHandle) -> ByteString
      -> IO (ByteString, ByteString)
query (i, _, _, _) query = withFIFOs query'
 where query' ofo efo = do
         Bytes.hPut i cmd
         hFlush i
         [oh, eh] <- mapM openFIFO [ofo, efo]
         (,) <$> Bytes.hGetContents oh <*> Bytes.hGetContents eh -- Works.
--       (,) <$> Bytes.hGetContents eh <*> Bytes.hGetContents oh -- Blocks.
        where cmd = Bytes.unlines ["{", query, "} 1>" <> ofo <> " 2>" <> efo]

shutdown :: (Handle, Handle, Handle, ProcessHandle) -> IO ()
shutdown (i, _, _, p) = () <$ hClose i <* waitForProcess p


openFIFO path = GHC.IO.Handle.FD.openFileBlocking (Bytes.unpack path) ReadMode

-- | Run an IO action with two FIFOs in scope, which will removed after it
--   completes.
withFIFOs :: (RawFilePath -> RawFilePath -> IO a) -> IO a
withFIFOs m = withSystemTempDirectory "cobash." m'
 where m'   = (uncurry m =<<) . mk . Bytes.pack
       mk d = (o, e) <$ (createNamedPipe o mode >> createNamedPipe e mode)
        where (o, e) = (d <> "/o", d <> "/e")
              mode   = ownerReadMode .|. ownerWriteMode .|. namedPipeMode

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Jason Dusek
2012/10/20 Asten, W.G.G. van (Wilfried, Student M-CSC)
<[hidden email]>:

> Would you be happy with a solution like this:
>
>  - First create two handles to two files in the tmp directory
>  - Then use StdStream's UseHandle to redirect std_err and std_out
> (using CreatePipe for std_in) to these files
>  - Then write your query to the Handle for std_in
>  - waitForProcess
>  - Collect std_out and std_err from the temporary files.
>
> If that is not satisfactory you may want to check out conduit-process
> (http://hackage.haskell.org/packages/archive/process-conduit/0.5.0.2/doc/html/src/Data-Conduit-Process.html#conduitProcess)
> that also does some interaction with a process and interleaves std_in
> and std_out.  It should not be to hard to combine std_err into that
> concept. I also faced this same problem and in one case solved it by
> using a temporary file to hold my content which was also deleted
> afterwards (This immediately prevented the content building up in
> memory).

For my application, it's important to be able to run multiple
queries against the same Bash session. Waiting for Bash to shut
down is thus not a viable way to finalize the response.

Perhaps I can spawn two cats with their outputs connected to the
FIFOs and wait for them to terminate.

--
Jason Dusek
pgp // solidsnack // C1EBC57DC55144F35460C8DF1FD4C6C1FED18A2B

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Wilfried van Asten
Perhaps an interleaving solution as in process-conduit is still viable:

 - Check if one or both of the fifo's are still ready (Based on your
statement about the reading end receiving EOF hIsEOF should work
here). If both fifos are done the query is finished so break the loop.
 - Check if some output is available on oh. If so read some of it. Repeat.
 - Otherwise check if some output is available on eh. If so read some
of it. Repeat
loop

I also see you don't do anything with the std_out and std_err pipes of
bash as given by runInteractiveProcess. These could also cause a
problem even when the FIFO's are working correctly. Replace these by
handles to the null file or let the output be dumped on the parent's
std_in and std_out (StdStream Inherit).

On Sat, Oct 20, 2012 at 4:00 PM, Jason Dusek <[hidden email]> wrote:
> For my application, it's important to be able to run multiple
> queries against the same Bash session. Waiting for Bash to shut
> down is thus not a viable way to finalize the response.
>
> Perhaps I can spawn two cats with their outputs connected to the
> FIFOs and wait for them to terminate.

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Jason Dusek
2012/10/20 Wilfried van Asten <[hidden email]>:
> Perhaps an interleaving solution as in process-conduit is
> still viable:
>
>  - Check if one or both of the fifo's are still ready (Based
>    on your statement about the reading end receiving EOF
>    hIsEOF should work here). If both fifos are done the query
>    is finished so break the loop.

Alas, checking for EOF does not work. I mentioned this in
passing in my prior email; the code was somewhat involved and I
have deleted it. Here is a simple example of something that does
not work as expected:

  In the first terminal:

   :; mkfifo fifo
   :; ghci
    --  :m + GHC.IO.Handle.FD System.IO
    --  do { h <- openFileBlocking "fifo" ReadMode ; hGetContents h }

  In the second terminal, *after* doing everything in the first
  terminal:

   :; cat > fifo
    < type some characters here >
    ^D

Notice that the characters appear in the first terminal, as the
output of hGetContents. Sending ^D to end cat does not register
any effect in GHCi; hGetContents dutifully waits and you can in
fact run cat on the FIFO again to send more characters to the
same instances of hGetContents. This would seem to be due to
non-blocking IO, deep in the IO manager.

>  - Check if some output is available on oh. If so read some of
>    it. Repeat.
>
>  - Otherwise check if some output is available on eh. If so
>    read some of it. Repeat loop
>
> I also see you don't do anything with the std_out and std_err
> pipes of bash as given by runInteractiveProcess. These could
> also cause a problem even when the FIFO's are working
> correctly. Replace these by handles to the null file or let
> the output be dumped on the parent's std_in and std_out
> (StdStream Inherit).

I would prefer to leave them be, since they're passed in from
the caller, who nominally owns them. If you mean that I should
close them in `start', well, that would make it hard to debug
this stuff; and if I simply tie them to the parent's file
descriptors, it will make it hard to deal with more than a few
CoBashes at one time while testing.

Using cat to read the FIFOs and allowing Haskell to read from
cat does work, actually.

  https://gist.github.com/3923673

Shell really is such a nice language for tying together
processes.

--
Jason Dusek
pgp // solidsnack // C1EBC57DC55144F35460C8DF1FD4C6C1FED18A2B




{-# LANGUAGE OverloadedStrings
           , ScopedTypeVariables
           , ParallelListComp
           , TupleSections #-}

module CoBash where

import           Control.Applicative
import           Control.Concurrent
import           Control.Concurrent.MVar
import           Control.Exception
import           Control.Monad
import           Data.Bits
import           Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as Bytes
import           Data.Maybe
import           Data.Monoid
import qualified GHC.IO.Handle.FD
import           System.IO
import           System.IO.Error
import           System.Process
import           System.Posix.ByteString

import           System.IO.Temp

import qualified Text.ShellEscape as Esc


start :: IO (Handle, Handle, Handle, ProcessHandle)
start = runInteractiveProcess "bash" [] Nothing (Just [])

query :: (Handle, Handle, Handle, ProcessHandle) -> ByteString
      -> IO (ByteString, ByteString)
query (i, _, _, _) query = withFIFOs query'
 where query' ofo efo = do
         Bytes.hPut i cmd
         hFlush i
         [ob, eb] <- backgroundReadFIFOs [ofo, efo]
         return (ob, eb)
        where cmd = Bytes.unlines ["{", query, "} 1>" <> ofo <> " 2>" <> efo]

shutdown :: (Handle, Handle, Handle, ProcessHandle) -> IO ()
shutdown (i, _, _, p) = () <$ hClose i <* waitForProcess p


openFIFO path = GHC.IO.Handle.FD.openFileBlocking (Bytes.unpack path) ReadMode

-- | Run an IO action with two FIFOs in scope, which will removed after it
--   completes.
withFIFOs :: (RawFilePath -> RawFilePath -> IO a) -> IO a
withFIFOs m = withSystemTempDirectory "cobash." m'
 where m'   = (uncurry m =<<) . mk . Bytes.pack
       mk d = (o, e) <$ (createNamedPipe o mode >> createNamedPipe e mode)
        where (o, e) = (d <> "/o", d <> "/e")
              mode   = ownerReadMode .|. ownerWriteMode .|. namedPipeMode

drainFIFO :: ByteString -> IO ByteString
drainFIFO path = do
  (i, o, e, p) <- bash ["-c", "exec cat <"<>(Bytes.unpack path)]
  hClose i
  hClose e
  Bytes.hGetContents o <* waitForProcess p

backgroundReadFIFOs theFIFOs = do
  cells <- sequence (newEmptyMVar <$ theFIFOs)
  sequence_ [ forkIO (drainFIFO p >>= putMVar c) | p <- theFIFOs | c <- cells ]
  sequence (takeMVar <$> cells)

bash args = runInteractiveProcess "bash" args Nothing (Just [])

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Donn Cave-4
In reply to this post by Jason Dusek
Quoth Jason Dusek <[hidden email]>,
...
> For my application, it's important to be able to run multiple
> queries against the same Bash session. Waiting for Bash to shut
> down is thus not a viable way to finalize the response.

You could redirect to disk files and also use a pipe to wait for exit.

I suppose you redirect I/O for individual queries by applying shell
redirections to the command?  So like this,

   uname -a > /tmp/cmd1 2> /tmp/cmd2 7> /tmp/exitpipe

... then read from /tmp/exitpipe, ignore empty result and read command
outputs from the disk files.

        Donn

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Jason Dusek
2012/10/20 Donn Cave <[hidden email]>:

> Quoth Jason Dusek <[hidden email]>,
> ...
>> For my application, it's important to be able to run multiple
>> queries against the same Bash session. Waiting for Bash to shut
>> down is thus not a viable way to finalize the response.
>
> You could redirect to disk files and also use a pipe to wait for exit.
>
> I suppose you redirect I/O for individual queries by applying shell
> redirections to the command?  So like this,
>
>    uname -a > /tmp/cmd1 2> /tmp/cmd2 7> /tmp/exitpipe
>
> ... then read from /tmp/exitpipe, ignore empty result and read command
> outputs from the disk files.

If I could somehow arrange to detect EOF when /tmp/exitpipe is
closed, then I might as well redirect 1 and 2 to FIFOs and wait
for them to EOF, collecting the output.

However, all of my experiments suggest that there is simply no
way in Haskell to detect the closing of the write end of a FIFO.
With `openFileBlocking', one can detect when a FIFO is *opened*;
but not when it is closed.

--
Jason Dusek
pgp // solidsnack // C1EBC57DC55144F35460C8DF1FD4C6C1FED18A2B

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Donn Cave-4
From Jason Dusek <[hidden email]>,
...
> If I could somehow arrange to detect EOF when /tmp/exitpipe is
> closed, then I might as well redirect 1 and 2 to FIFOs and wait
> for them to EOF, collecting the output.
>
> However, all of my experiments suggest that there is simply no
> way in Haskell to detect the closing of the write end of a FIFO.
> With `openFileBlocking', one can detect when a FIFO is *opened*;
> but not when it is closed.

It looks to me like our colleague in another followup may have it
working in an example.  I have run into some trouble myself, with
an example program demonstrating the approach I proposed.  With a
pre-existing named pipe, that I would just keep using, for whatever
reason it worked the first time, failed the second, and so forth,
working every other time.  If the test program created the named
pipe, it failed every time.  There are probably reasons for all this,
but I haven't looked very hard.

That was using "withFile".  If I use POSIX I/O, it works fine.
So it looks to me like there is indeed a way in Haskell to detect
a closed FIFO, it just may not be Haskell I/O without a lot more
work ironing out the possible causes of failure.

I believe that doesn't need to be a problem for you, though, because
1) your application is by nature exclusive to POSIX platforms, and
2) you need the named pipe only to detect command process exit, and
you can still apply Haskell I/O to the more interesting data that
accumulates in the command output disk file.

And there may be an answer for my problems with Haskell I/O.  Could
be as simple as using openFileBlocking, which apparently isn't supported
in the ghc I'm using.  Could have something to do with the fine points
of named pipes - for example, I believe you're supposed to open them
O_RDWR in situations you'd think would call for O_READONLY.  (Though
the latter worked for me with POSIX I/O.)

While I'm here ... I share the concern expressed in an earlier followup
about the outputs from bash in runInteractiveProcess.  This looks like
a feature of runInteractiveProcess that makes it intrinsically something
like a "code smell".  input-only and output-only processes are commonly
used and fairly tractable, where input-output processes are unusual and
and fragile, so it's an unfortunate convenience.  I think the idea is
that you'd use createProcess specifying only the input redirection.

        Donn

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Jason Dusek
2012/10/21 Donn Cave <[hidden email]>:

> From Jason Dusek <[hidden email]>:
>> If I could somehow arrange to detect EOF when /tmp/exitpipe is
>> closed, then I might as well redirect 1 and 2 to FIFOs and wait
>> for them to EOF, collecting the output.
>>
>> However, all of my experiments suggest that there is simply no
>> way in Haskell to detect the closing of the write end of a FIFO.
>> With `openFileBlocking', one can detect when a FIFO is *opened*;
>> but not when it is closed.
>
> [...] If I use POSIX I/O, it works fine.  So it looks to me
> like there is indeed a way in Haskell to detect a closed FIFO,
> it just may not be Haskell I/O without a lot more work ironing
> out the possible causes of failure.

Sadly, I can not do Posix IO on handles or read a ByteString
from a Posix FD.

> 2) you need the named pipe only to detect command process
>    exit, and you can still apply Haskell I/O to the more
>    interesting data that accumulates in the command output
>    disk file.

Writing data to disk for communicating with other processes
is not a good pattern, I think.

> And there may be an answer for my problems with Haskell I/O.
> Could be as simple as using openFileBlocking, which apparently
> isn't supported in the ghc I'm using.  Could have something to
> do with the fine points of named pipes - for example, I
> believe you're supposed to open them O_RDWR in situations
> you'd think would call for O_READONLY.  (Though the latter
> worked for me with POSIX I/O.)

It is okay to open it O_READONLY if blocking when there is no
writer is acceptable. For this application, it is.

> While I'm here ... I share the concern expressed in an earlier
> followup about the outputs from bash in runInteractiveProcess.
> This looks like a feature of runInteractiveProcess that makes
> it intrinsically something like a "code smell".  input-only
> and output-only processes are commonly used and fairly
> tractable, where input-output processes are unusual and and
> fragile, so it's an unfortunate convenience.  I think the idea
> is that you'd use createProcess specifying only the input
> redirection.

I am averse to adding "just in case" code that may not do
anything; too much system level code attains an air of mystery
this way.

--
Jason Dusek
pgp // solidsnack // C1EBC57DC55144F35460C8DF1FD4C6C1FED18A2B

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Gregory Collins-3
On Sun, Oct 21, 2012 at 1:25 PM, Jason Dusek <[hidden email]> wrote:
Sadly, I can not do Posix IO on handles or read a ByteString
from a Posix FD.

Try the "unix-bytestring" package.

G
--
Gregory Collins <[hidden email]>

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe
Reply | Threaded
Open this post in threaded view
|

Re: Blocking IO & FIFOs

Jason Dusek
Hi Everyone,

Thanks for all your help. I've put the first working version of
this on GitHub:

  https://github.com/solidsnack/coproc

Many improvements suggested in thread have not been implemented
as yet but I hope to integrate them as part of expanding the
tool to cover other interpreters, like Python or PSQL.

--
Jason Dusek
pgp // solidsnack // C1EBC57DC55144F35460C8DF1FD4C6C1FED18A2B

_______________________________________________
Haskell-Cafe mailing list
[hidden email]
http://www.haskell.org/mailman/listinfo/haskell-cafe