Timeouts inside a ConduitParser

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Timeouts inside a ConduitParser

Luke Lau
I have a ConduitParser (a Sink with some parsing state) with a version of satisfy that can time out:

satisfy :: MonadIO m => (a -> Bool) -> ConduitParser a m a
satisfy pred = do
tId <- liftIO myThreadId
timeoutThread <- liftIO $ forkIO $ do
threadDelay 1000000
throwTo tId TimeoutException
x <- await
liftIO $ killThread timeoutThread
if pred x
then return x
else empty

However I would rather not deal with the risks involved with handling concurrency myself and use a system library like System.Timeout:

satisfy :: MonadIO m => (a -> Bool) -> ConduitParser a m a
satisfy pred = do
x <- timeout 1000000 await
if pred x
then return x
else empty

This doesn’t work though since I need to be able to both lift and unlift await from IO, and ConduitParser lies on top of ConduitT, which is one of the types of monads that UnliftIO cannot be an instance of. Are there any better approaches to this?

_______________________________________________
Haskell-Cafe mailing list
To (un)subscribe, modify options or view archives go to:
http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
Only members subscribed via the mailman list are allowed to post.
Reply | Threaded
Open this post in threaded view
|

Re: Timeouts inside a ConduitParser

Michael Snoyman
Due to the coroutine-based nature of conduit, this kind of approach isn't possible. In short, when you're inside the `ConduitT` transformer, you cannot use `timeout` on a `ConduitT` action because you need to yield control of execution to a different coroutine. Instead, you should move the `timeout` call to _outside_ the `ConduitT` parts, e.g.:

    timeout foo $ runConduit $ src .| satisfy .| sink

It seems like in this case, that kind of timeout usage is going to be too coarse-grained. I haven't used it personally, but the `stm-conduit` package probably has something in the direction you're looking for. Alternatively, I put together an example of how this might be done using some standard Haskell libraries like stm and async here:


The basic idea is to have two sibling threads: one running the original source and writing its values to a queue, and another running the full conduit pipeline with a modified source that will time out on reads from that queue.

On Thu, Jun 21, 2018 at 4:08 PM Luke Lau <[hidden email]> wrote:
I have a ConduitParser (a Sink with some parsing state) with a version of satisfy that can time out:

satisfy :: MonadIO m => (a -> Bool) -> ConduitParser a m a
satisfy pred = do
tId <- liftIO myThreadId
timeoutThread <- liftIO $ forkIO $ do
threadDelay 1000000
throwTo tId TimeoutException
x <- await
liftIO $ killThread timeoutThread
if pred x
then return x
else empty

However I would rather not deal with the risks involved with handling concurrency myself and use a system library like System.Timeout:

satisfy :: MonadIO m => (a -> Bool) -> ConduitParser a m a
satisfy pred = do
x <- timeout 1000000 await
if pred x
then return x
else empty

This doesn’t work though since I need to be able to both lift and unlift await from IO, and ConduitParser lies on top of ConduitT, which is one of the types of monads that UnliftIO cannot be an instance of. Are there any better approaches to this?
_______________________________________________
Haskell-Cafe mailing list
To (un)subscribe, modify options or view archives go to:
http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
Only members subscribed via the mailman list are allowed to post.

_______________________________________________
Haskell-Cafe mailing list
To (un)subscribe, modify options or view archives go to:
http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
Only members subscribed via the mailman list are allowed to post.