Adapting zstandard streaming compression in Streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Sal
Reply | Threaded
Open this post in threaded view
|

Adapting zstandard streaming compression in Streaming

Sal
Hello,

I am trying to adapt streaming version of `zstandard` using `Streaming` library. There is already a version that exists using `ByteString m r`. I can't figure out how to implement a function like below, and will appreciate help:

stream :: MonadIO m => S.Stream (S.Of B.ByteString) m r -> Result -> S.Stream (S.Of B.ByteString) m r

Here is the original code from `streaming-zstd`:

stream :: MonadIO m => ByteString m r -> Result -> ByteString m r
stream
(Go m) res = lift m >>= flip stream res
stream bs
(Error who what) =  error (who ++ ": " ++ what)
stream bs
(Produce bytes res') = Chunk bytes (liftIO res' >>= stream bs)
stream
(Chunk c cs) (Consume f) = liftIO (f c) >>= stream cs
stream
(Empty r) (Consume f)    = liftIO (f mempty) >>= stream (Empty r)
stream
(Empty r) (Done o) =  Chunk o (Empty r)
stream input state
= error $ "unpossible! bytes of input left in stream state "
                         
++ show state





--
Sal
Reply | Threaded
Open this post in threaded view
|

Re: Adapting zstandard streaming compression in Streaming

Sal
Figured out how to do this. Adding the code here for others who might try to solve this problem in future. Improvement suggestions are welcome.

import Streaming.Prelude as S
import Data.IORef
import Streaming as S
import qualified Codec.Compression.Zstd.Streaming as Z
import qualified Data.ByteString as BS (ByteString,empty)


-- Compression streamer - uses Zstd compression
streamZstd
:: (MonadIO m,Monad m) => IO Z.Result -> Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
streamZstd pop inp
= loop inp pop
 
where
    loop bytes res
= do
      bs
<- liftIO res
     
case bs of
        Z
.Error who what -> error (who ++ ": " ++ what)
        Z
.Done bs -> (lift . S.uncons $ bytes) >>= (maybe (S.yield bs) (\_ -> error "Compress/Decompress ended while input stream still had bytes"))
        Z
.Produce bs npop -> S.yield bs >> loop bytes npop
       
-- if we run out of input stream, call loop with empty stream, and compress function with empty ByteString
       
-- to signal end - we should then be in Done state in next call to loop
        Z
.Consume f -> (lift . S.uncons $ bytes) >>= (maybe (loop (return ()) (f BS.empty)) (\(bs,nbs) -> loop nbs (f bs)))


decompress
:: (MonadIO m,Monad m) => Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
decompress
= streamZstd Z.decompress


compress
:: (MonadIO m,Monad m) => Int -> Stream (Of BS.ByteString) m () -> Stream (Of BS.ByteString) m ()
compress level
= streamZstd (Z.compress level)



On Saturday, March 10, 2018 at 5:56:10 PM UTC-5, Sal wrote:
Hello,

I am trying to adapt streaming version of `<a href="https://hackage.haskell.org/package/zstd-0.1.0.0/docs/Codec-Compression-Zstd-Streaming.html" target="_blank" rel="nofollow" onmousedown="this.href=&#39;https://www.google.com/url?q\x3dhttps%3A%2F%2Fhackage.haskell.org%2Fpackage%2Fzstd-0.1.0.0%2Fdocs%2FCodec-Compression-Zstd-Streaming.html\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNG-kdcfdCagb83gcGTt7v1e5lgLiw&#39;;return true;" onclick="this.href=&#39;https://www.google.com/url?q\x3dhttps%3A%2F%2Fhackage.haskell.org%2Fpackage%2Fzstd-0.1.0.0%2Fdocs%2FCodec-Compression-Zstd-Streaming.html\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNG-kdcfdCagb83gcGTt7v1e5lgLiw&#39;;return true;">zstandard` using `Streaming` library. There is already a <a href="https://github.com/michaelt/streaming-zstd/blob/master/Streaming/Zstd.hs" target="_blank" rel="nofollow" onmousedown="this.href=&#39;https://www.google.com/url?q\x3dhttps%3A%2F%2Fgithub.com%2Fmichaelt%2Fstreaming-zstd%2Fblob%2Fmaster%2FStreaming%2FZstd.hs\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHG8T-tLDIRvYZLsJLppl9_XB3huQ&#39;;return true;" onclick="this.href=&#39;https://www.google.com/url?q\x3dhttps%3A%2F%2Fgithub.com%2Fmichaelt%2Fstreaming-zstd%2Fblob%2Fmaster%2FStreaming%2FZstd.hs\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHG8T-tLDIRvYZLsJLppl9_XB3huQ&#39;;return true;">version that exists using `ByteString m r`. I can't figure out how to implement a function like below, and will appreciate help:

stream :: MonadIO m => S.Stream (S.Of B.ByteString) m r -> Result -> S.Stream (S.Of B.ByteString) m r

Here is the original code from `streaming-zstd`:

stream :: MonadIO m => ByteString m r -> Result -> ByteString m r
stream
(Go m) res = lift m >>= flip stream res
stream bs
(Error who what) =  error (who ++ ": " ++ what)
stream bs
(Produce bytes res') = Chunk bytes (liftIO res' >>= stream bs)
stream
(Chunk c cs) (Consume f) = liftIO (f c) >>= stream cs
stream
(Empty r) (Consume f)    = liftIO (f mempty) >>= stream (Empty r)
stream
(Empty r) (Done o) =  Chunk o (Empty r)
stream input state
= error $ "unpossible! bytes of input left in stream state "
                         
++ show state





--