1 {-# LANGUAGE RankNTypes, TypeFamilies #-}
3 {-| This module provides @pipes@ utilities for \"byte streams\", which are
4 streams of strict 'Text's chunks. Use byte streams to interact
5 with both 'IO.Handle's and lazy 'Text's.
7 To stream to or from 'IO.Handle's, use 'fromHandle' or 'toHandle'. For
8 example, the following program copies data from one file to another:
11 > import qualified Pipes.Text as P
15 > withFile "inFile.txt" ReadMode $ \hIn ->
16 > withFile "outFile.txt" WriteMode $ \hOut ->
17 > runEffect $ P.fromHandle hIn >-> P.toHandle hOut
19 You can stream to and from 'stdin' and 'stdout' using the predefined 'stdin'
20 and 'stdout' proxies, like in the following \"echo\" program:
22 > main = runEffect $ P.stdin >-> P.stdout
24 You can also translate pure lazy 'TL.Text's to and from proxies:
26 > import qualified Data.Text.Lazy as TL
28 > main = runEffect $ P.fromLazy (TL.pack "Hello, world!\n") >-> P.stdout
30 In addition, this module provides many functions equivalent to lazy
31 'Text' functions so that you can transform or fold byte streams. For
32 example, to stream only the first three lines of 'stdin' to 'stdout' you
36 > import qualified Pipes.Text as PT
37 > import qualified Pipes.Parse as PP
39 > main = runEffect $ takeLines 3 PB.stdin >-> PT.stdout
41 > takeLines n = PB.unlines . PP.takeFree n . PT.lines
43 The above program will never bring more than one chunk (~ 32 KB) into
44 memory, no matter how long the lines are.
46 Note that functions in this library are designed to operate on streams that
47 are insensitive to chunk boundaries. This means that they may freely split
48 chunks into smaller chunks and /discard empty chunks/. However, they will
49 /never concatenate chunks/ in order to provide strict upper bounds on memory
53 module Data.Text.Pipes (
125 -- * Low-level Parsers
141 import Control.Exception (throwIO, try)
142 import Control.Monad (liftM, unless)
143 import Control.Monad.Trans.State.Strict (StateT)
144 import qualified Data.Text as T
145 import qualified Data.Text.IO as T
146 import Data.Text (Text)
147 import qualified Data.Text.Lazy as TL
148 import qualified Data.Text.Lazy.IO as TL
149 import Data.Text.Lazy.Internal (foldrChunks, defaultChunkSize)
150 import Data.ByteString.Unsafe (unsafeTake, unsafeDrop)
151 import Data.Char (ord)
152 import Data.Functor.Identity (Identity)
153 import qualified Data.List as List
154 import Foreign.C.Error (Errno(Errno), ePIPE)
155 import qualified GHC.IO.Exception as G
157 import qualified Pipes.ByteString.Parse as PBP
158 import Pipes.ByteString.Parse (
159 nextByte, drawByte, unDrawByte, peekByte, isEndOfBytes )
160 import Pipes.Core (respond, Server')
161 import qualified Pipes.Parse as PP
162 import Pipes.Parse (input, concat, FreeT)
163 import qualified Pipes.Safe.Prelude as Safe
164 import qualified Pipes.Safe as Safe
165 import Pipes.Safe (MonadSafe(..), Base(..))
166 import qualified Pipes.Prelude as P
167 import qualified System.IO as IO
168 import Data.Char (isSpace)
169 import Prelude hiding (
198 -- | Convert a lazy 'TL.Text' into a 'Producer' of strict 'Text's
199 fromLazy :: (Monad m) => TL.Text -> Producer' Text m ()
200 fromLazy = foldrChunks (\e a -> yield e >> a) (return ())
201 {-# INLINABLE fromLazy #-}
203 -- | Stream bytes from 'stdin'
204 stdin :: MonadIO m => Producer' Text m ()
205 stdin = fromHandle IO.stdin
206 {-# INLINABLE stdin #-}
208 -- | Convert a 'IO.Handle' into a byte stream using a default chunk size
209 fromHandle :: MonadIO m => IO.Handle -> Producer' Text m ()
210 fromHandle h = go where
211 go = do txt <- liftIO (T.hGetChunk h)
212 unless (T.null txt) $ do yield txt
214 {-# INLINABLE fromHandle#-}
216 readFile :: (MonadSafe m, Base m ~ IO) => FilePath -> Producer' Text m ()
217 readFile file = Safe.withFile file IO.ReadMode fromHandle
218 {-# INLINABLE readFile #-}
220 stdinLn :: MonadIO m => Producer' Text m ()
224 eof <- liftIO (IO.hIsEOF IO.stdin)
226 txt <- liftIO (T.hGetLine IO.stdin)
230 {-| Convert a handle into a byte stream using a fixed chunk size
232 'hGet' waits until exactly the requested number of bytes are available for
235 -- hGet :: MonadIO m => Int -> IO.Handle -> Producer' Text m ()
236 -- hGet size h = go where
238 -- eof <- liftIO (IO.hIsEOF h)
242 -- bs <- liftIO (T.hGet h size)
245 -- {-# INLINABLE hGet #-}
247 {-| Like 'hGetSome', except you can vary the maximum chunk size for each request
249 -- hGetSomeN :: MonadIO m => IO.Handle -> Int -> Server' Int Text m ()
250 -- hGetSomeN h = go where
252 -- eof <- liftIO (IO.hIsEOF h)
256 -- bs <- liftIO (T.hGetSome h size)
257 -- size2 <- respond bs
259 -- {-# INLINABLE hGetSomeN #-}
261 -- -- | Like 'hGet', except you can vary the chunk size for each request
262 -- hGetN :: MonadIO m => IO.Handle -> Int -> Server' Int Text m ()
263 -- hGetN h = go where
265 -- eof <- liftIO (IO.hIsEOF h)
269 -- bs <- liftIO (T.hGet h size)
270 -- size2 <- respond bs
272 -- {-# INLINABLE hGetN #-}
274 {-| Stream bytes to 'stdout'
276 Unlike 'toHandle', 'stdout' gracefully terminates on a broken output pipe.
278 Note: For best performance, use @(for source (liftIO . putStr))@ instead of
279 @(source >-> stdout)@.
281 stdout :: MonadIO m => Consumer' Text m ()
286 x <- liftIO $ try (T.putStr txt)
288 Left (G.IOError { G.ioe_type = G.ResourceVanished
289 , G.ioe_errno = Just ioe })
292 Left e -> liftIO (throwIO e)
294 {-# INLINABLE stdout #-}
296 stdoutLn :: (MonadIO m) => Consumer' Text m ()
301 x <- liftIO $ try (T.putStrLn str)
303 Left (G.IOError { G.ioe_type = G.ResourceVanished
304 , G.ioe_errno = Just ioe })
307 Left e -> liftIO (throwIO e)
309 {-# INLINABLE stdoutLn #-}
311 {-| Convert a byte stream into a 'Handle'
313 Note: For best performance, use @(for source (liftIO . hPutStr handle))@
314 instead of @(source >-> toHandle handle)@.
316 toHandle :: MonadIO m => IO.Handle -> Consumer' Text m r
317 toHandle h = for cat (liftIO . T.hPutStr h)
318 {-# INLINABLE toHandle #-}
320 writeFile :: (MonadSafe m, Base m ~ IO) => FilePath -> Consumer' Text m ()
321 writeFile file = Safe.withFile file IO.WriteMode toHandle
323 -- | Apply a transformation to each 'Char' in the stream
324 map :: (Monad m) => (Char -> Char) -> Pipe Text Text m r
325 map f = P.map (T.map f)
326 {-# INLINABLE map #-}
328 -- | Map a function over the byte stream and concatenate the results
330 :: (Monad m) => (Char -> Text) -> Pipe Text Text m r
331 concatMap f = P.map (T.concatMap f)
332 {-# INLINABLE concatMap #-}
334 -- | @(take n)@ only allows @n@ bytes to pass
335 take :: (Monad m, Integral a) => a -> Pipe Text Text m ()
336 take n0 = go n0 where
341 let len = fromIntegral (T.length bs)
343 then yield (T.take (fromIntegral n) bs)
347 {-# INLINABLE take #-}
349 -- | @(dropD n)@ drops the first @n@ bytes
350 drop :: (Monad m, Integral a) => a -> Pipe Text Text m r
351 drop n0 = go n0 where
356 let len = fromIntegral (T.length bs)
359 yield (T.drop (fromIntegral n) bs)
362 {-# INLINABLE drop #-}
364 -- | Take bytes until they fail the predicate
365 takeWhile :: (Monad m) => (Char -> Bool) -> Pipe Text Text m ()
366 takeWhile predicate = go
370 let (prefix, suffix) = T.span predicate bs
376 {-# INLINABLE takeWhile #-}
378 -- | Drop bytes until they fail the predicate
379 dropWhile :: (Monad m) => (Char -> Bool) -> Pipe Text Text m r
380 dropWhile predicate = go where
383 case T.findIndex (not . predicate) bs of
388 {-# INLINABLE dropWhile #-}
390 -- | Only allows 'Char's to pass if they satisfy the predicate
391 filter :: (Monad m) => (Char -> Bool) -> Pipe Text Text m r
392 filter predicate = P.map (T.filter predicate)
393 {-# INLINABLE filter #-}
395 -- | Stream all indices whose elements match the given 'Char'
396 -- elemIndices :: (Monad m, Num n) => Char -> Pipe Text n m r
397 -- elemIndices w8 = findIndices (w8 ==)
398 -- {-# INLINABLE elemIndices #-}
400 -- | Stream all indices whose elements satisfy the given predicate
401 -- findIndices :: (Monad m, Num n) => (Char -> Bool) -> Pipe Text n m r
402 -- findIndices predicate = go 0
406 -- each $ List.map (\i -> n + fromIntegral i) (T.findIndices predicate bs)
407 -- go $! n + fromIntegral (T.length bs)
408 -- {-# INLINABLE findIndices #-}
410 -- | Strict left scan over the bytes
413 => (Char -> Char -> Char) -> Char -> Pipe Text Text m r
414 scan step begin = go begin
418 let bs' = T.scanl step w8 bs
422 {-# INLINABLE scan #-}
424 {-| Fold a pure 'Producer' of strict 'Text's into a lazy
427 toLazy :: Producer Text Identity () -> TL.Text
428 toLazy = TL.fromChunks . P.toList
429 {-# INLINABLE toLazy #-}
431 {-| Fold an effectful 'Producer' of strict 'Text's into a lazy
434 Note: 'toLazyM' is not an idiomatic use of @pipes@, but I provide it for
435 simple testing purposes. Idiomatic @pipes@ style consumes the chunks
436 immediately as they are generated instead of loading them all into memory.
438 toLazyM :: (Monad m) => Producer Text m () -> m TL.Text
439 toLazyM = liftM TL.fromChunks . P.toListM
440 {-# INLINABLE toLazyM #-}
442 -- | Reduce the stream of bytes using a strict left fold
445 => (x -> Char -> x) -> x -> (x -> r) -> Producer Text m () -> m r
446 fold step begin done = P.fold (\x bs -> T.foldl' step x bs) begin done
447 {-# INLINABLE fold #-}
449 -- | Retrieve the first 'Char'
450 head :: (Monad m) => Producer Text m () -> m (Maybe Char)
456 Left _ -> return Nothing
457 Right (w8, _) -> return (Just w8)
458 {-# INLINABLE head #-}
460 -- | Retrieve the last 'Char'
461 last :: (Monad m) => Producer Text m () -> m (Maybe Char)
471 else go (Just $ T.last bs) p'
472 -- TODO: Change this to 'unsafeLast' when bytestring-0.10.2.0
473 -- becomes more widespread
474 {-# INLINABLE last #-}
476 -- | Determine if the stream is empty
477 null :: (Monad m) => Producer Text m () -> m Bool
479 {-# INLINABLE null #-}
481 -- | Count the number of bytes
482 length :: (Monad m, Num n) => Producer Text m () -> m n
483 length = P.fold (\n bs -> n + fromIntegral (T.length bs)) 0 id
484 {-# INLINABLE length #-}
486 -- | Fold that returns whether 'M.Any' received 'Char's satisfy the predicate
487 any :: (Monad m) => (Char -> Bool) -> Producer Text m () -> m Bool
488 any predicate = P.any (T.any predicate)
489 {-# INLINABLE any #-}
491 -- | Fold that returns whether 'M.All' received 'Char's satisfy the predicate
492 all :: (Monad m) => (Char -> Bool) -> Producer Text m () -> m Bool
493 all predicate = P.all (T.all predicate)
494 {-# INLINABLE all #-}
496 -- | Return the maximum 'Char' within a byte stream
497 maximum :: (Monad m) => Producer Text m () -> m (Maybe Char)
498 maximum = P.fold step Nothing id
503 else Just $ case mw8 of
504 Nothing -> T.maximum bs
505 Just w8 -> max w8 (T.maximum bs)
506 {-# INLINABLE maximum #-}
508 -- | Return the minimum 'Char' within a byte stream
509 minimum :: (Monad m) => Producer Text m () -> m (Maybe Char)
510 minimum = P.fold step Nothing id
516 Nothing -> Just (T.minimum bs)
517 Just w8 -> Just (min w8 (T.minimum bs))
518 {-# INLINABLE minimum #-}
520 -- | Determine whether any element in the byte stream matches the given 'Char'
521 -- elem :: (Monad m) => Char -> Producer Text m () -> m Bool
522 -- elem w8 = P.any (T.elem w8)
523 -- {-# INLINABLE elem #-}
525 -- {-| Determine whether all elements in the byte stream do not match the given
528 -- notElem :: (Monad m) => Char -> Producer Text m () -> m Bool
529 -- notElem w8 = P.all (T.notElem w8)
530 -- {-# INLINABLE notElem #-}
532 -- | Find the first element in the stream that matches the predicate
535 => (Char -> Bool) -> Producer Text m () -> m (Maybe Char)
536 find predicate p = head (p >-> filter predicate)
537 {-# INLINABLE find #-}
539 -- | Index into a byte stream
541 :: (Monad m, Integral a)
542 => a-> Producer Text m () -> m (Maybe Char)
543 index n p = head (p >-> drop n)
544 {-# INLINABLE index #-}
546 -- | Find the index of an element that matches the given 'Char'
548 -- :: (Monad m, Num n) => Char -> Producer Text m () -> m (Maybe n)
549 -- elemIndex w8 = findIndex (w8 ==)
550 -- {-# INLINABLE elemIndex #-}
552 -- | Store the first index of an element that satisfies the predicate
554 -- :: (Monad m, Num n)
555 -- => (Char -> Bool) -> Producer Text m () -> m (Maybe n)
556 -- findIndex predicate p = P.head (p >-> findIndices predicate)
557 -- {-# INLINABLE findIndex #-}
559 -- -- | Store a tally of how many elements match the given 'Char'
560 -- count :: (Monad m, Num n) => Char -> Producer Text m () -> m n
561 -- count w8 p = P.fold (+) 0 id (p >-> P.map (fromIntegral . T.count w8))
562 -- {-# INLINABLE count #-}
564 -- | Splits a 'Producer' after the given number of bytes
566 :: (Monad m, Integral n)
569 -> Producer' Text m (Producer Text m r)
576 Left r -> return (return r)
578 let len = fromIntegral (T.length bs)
584 let (prefix, suffix) = T.splitAt (fromIntegral n) bs
586 return (yield suffix >> p')
587 {-# INLINABLE splitAt #-}
589 -- | Split a byte stream into 'FreeT'-delimited byte streams of fixed size
591 :: (Monad m, Integral n)
592 => n -> Producer Text m r -> FreeT (Producer Text m) m r
593 chunksOf n p0 = PP.FreeT (go p0)
599 Right (bs, p') -> PP.Free $ do
600 p'' <- splitAt n (yield bs >> p')
601 return $ PP.FreeT (go p'')
602 {-# INLINABLE chunksOf #-}
604 {-| Split a byte stream in two, where the first byte stream is the longest
605 consecutive group of bytes that satisfy the predicate
611 -> Producer' Text m (Producer Text m r)
617 Left r -> return (return r)
619 let (prefix, suffix) = T.span predicate bs
626 return (yield suffix >> p')
627 {-# INLINABLE span #-}
629 {-| Split a byte stream in two, where the first byte stream is the longest
630 consecutive group of bytes that don't satisfy the predicate
636 -> Producer Text m (Producer Text m r)
637 break predicate = span (not . predicate)
638 {-# INLINABLE break #-}
640 {-| Split a byte stream into sub-streams delimited by bytes that satisfy the
647 -> PP.FreeT (Producer Text m) m r
648 splitWith predicate p0 = PP.FreeT (go0 p0)
653 Left r -> return (PP.Pure r)
657 else return $ PP.Free $ do
658 p'' <- span (not . predicate) (yield bs >> p')
659 return $ PP.FreeT (go1 p'')
664 Right (_, p') -> PP.Free $ do
665 p'' <- span (not . predicate) p'
666 return $ PP.FreeT (go1 p'')
667 {-# INLINABLE splitWith #-}
669 -- | Split a byte stream using the given 'Char' as the delimiter
673 -> FreeT (Producer Text m) m r
674 split w8 = splitWith (w8 ==)
675 {-# INLINABLE split #-}
677 {-| Group a byte stream into 'FreeT'-delimited byte streams using the supplied
682 => (Char -> Char -> Bool)
684 -> FreeT (Producer Text m) m r
685 groupBy equal p0 = PP.FreeT (go p0)
690 Left r -> return (PP.Pure r)
691 Right (bs, p') -> case (T.uncons bs) of
694 return $ PP.Free $ do
695 p'' <- span (equal w8) (yield bs >> p')
696 return $ PP.FreeT (go p'')
697 {-# INLINABLE groupBy #-}
699 -- | Group a byte stream into 'FreeT'-delimited byte streams of identical bytes
701 :: (Monad m) => Producer Text m r -> FreeT (Producer Text m) m r
703 {-# INLINABLE group #-}
705 {-| Split a byte stream into 'FreeT'-delimited lines
707 Note: This function is purely for demonstration purposes since it assumes a
708 particular encoding. You should prefer the 'Data.Text.Text' equivalent of
709 this function from the upcoming @pipes-text@ library.
712 :: (Monad m) => Producer Text m r -> FreeT (Producer Text m) m r
713 lines p0 = PP.FreeT (go0 p0)
718 Left r -> return (PP.Pure r)
722 else return $ PP.Free $ go1 (yield bs >> p')
724 p' <- break ('\n' ==) p
725 return $ PP.FreeT (go2 p')
730 Right (_, p') -> PP.Free (go1 p')
731 {-# INLINABLE lines #-}
733 {-| Split a byte stream into 'FreeT'-delimited words
735 Note: This function is purely for demonstration purposes since it assumes a
736 particular encoding. You should prefer the 'Data.Text.Text' equivalent of
737 this function from the upcoming @pipes-text@ library.
740 :: (Monad m) => Producer Text m r -> FreeT (Producer Text m) m r
741 words p0 = removeEmpty (splitWith isSpace p0)
743 removeEmpty f = PP.FreeT $ do
746 PP.Pure r -> return (PP.Pure r)
750 Left f' -> PP.runFreeT (removeEmpty f')
751 Right (bs, p') -> return $ PP.Free $ do
754 return (removeEmpty f')
755 {-# INLINABLE words #-}
757 -- | Intersperse a 'Char' in between the bytes of the byte stream
759 :: (Monad m) => Char -> Producer Text m r -> Producer Text m r
767 yield (T.intersperse w8 bs)
774 yield (T.singleton w8)
775 yield (T.intersperse w8 bs)
777 {-# INLINABLE intersperse #-}
779 {-| 'intercalate' concatenates the 'FreeT'-delimited byte streams after
780 interspersing a byte stream in between them
784 => Producer Text m ()
785 -> FreeT (Producer Text m) m r
790 x <- lift (PP.runFreeT f)
792 PP.Pure r -> return r
797 x <- lift (PP.runFreeT f)
799 PP.Pure r -> return r
804 {-# INLINABLE intercalate #-}
806 {-| Join 'FreeT'-delimited lines into a byte stream
808 Note: This function is purely for demonstration purposes since it assumes a
809 particular encoding. You should prefer the 'Data.Text.Text' equivalent of
810 this function from the upcoming @pipes-text@ library.
813 :: (Monad m) => FreeT (Producer Text m) m r -> Producer Text m r
817 x <- lift (PP.runFreeT f)
819 PP.Pure r -> return r
822 yield $ T.singleton '\n'
824 {-# INLINABLE unlines #-}
826 {-| Join 'FreeT'-delimited words into a byte stream
828 Note: This function is purely for demonstration purposes since it assumes a
829 particular encoding. You should prefer the 'Data.Text.Text' equivalent of
830 this function from the upcoming @pipes-text@ library.
833 :: (Monad m) => FreeT (Producer Text m) m r -> Producer Text m r
834 unwords = intercalate (yield $ T.pack " ")
835 {-# INLINABLE unwords #-}
838 The following parsing utilities are single-byte analogs of the ones found
842 {-| Take bytes until they fail the predicate
844 Unlike 'takeWhile', this 'PP.unDraw's unused bytes
849 -- -> Pipe Text Text (StateT (Producer Text m r) m) ()
850 -- takeWhile' = PBP.takeWhile
851 -- {-# INLINABLE takeWhile' #-}
852 -- {-# DEPRECATED takeWhile' "Use Pipes.Text.Parse.takeWhile instead" #-}
855 "Pipes.Text.Parse" re-exports 'nextByte', 'drawByte', 'unDrawByte',
856 'peekByte', and 'isEndOfBytes'.
858 @Data.Text@ re-exports the 'Text' type.
860 @Data.Word@ re-exports the 'Char' type.
862 @Pipes.Parse@ re-exports 'input', 'concat', and 'FreeT' (the type).