1 {-# LANGUAGE RankNTypes, TypeFamilies #-}
3 {-| This module provides @pipes@ utilities for \"byte streams\", which are
4 streams of strict 'Text's chunks. Use byte streams to interact
5 with both 'IO.Handle's and lazy 'Text's.
7 To stream to or from 'IO.Handle's, use 'fromHandle' or 'toHandle'. For
8 example, the following program copies data from one file to another:
11 > import qualified Pipes.Text as P
15 > withFile "inFile.txt" ReadMode $ \hIn ->
16 > withFile "outFile.txt" WriteMode $ \hOut ->
17 > runEffect $ P.fromHandle hIn >-> P.toHandle hOut
19 You can stream to and from 'stdin' and 'stdout' using the predefined 'stdin'
20 and 'stdout' proxies, like in the following \"echo\" program:
22 > main = runEffect $ P.stdin >-> P.stdout
24 You can also translate pure lazy 'TL.Text's to and from proxies:
26 > import qualified Data.Text.Lazy as TL
28 > main = runEffect $ P.fromLazy (TL.pack "Hello, world!\n") >-> P.stdout
30 In addition, this module provides many functions equivalent to lazy
31 'Text' functions so that you can transform or fold byte streams. For
32 example, to stream only the first three lines of 'stdin' to 'stdout' you
36 > import qualified Pipes.Text as PT
37 > import qualified Pipes.Parse as PP
39 > main = runEffect $ takeLines 3 PB.stdin >-> PT.stdout
41 > takeLines n = PB.unlines . PP.takeFree n . PT.lines
43 The above program will never bring more than one chunk (~ 32 KB) into
44 memory, no matter how long the lines are.
46 Note that functions in this library are designed to operate on streams that
47 are insensitive to chunk boundaries. This means that they may freely split
48 chunks into smaller chunks and /discard empty chunks/. However, they will
49 /never concatenate chunks/ in order to provide strict upper bounds on memory
53 module Data.Text.Pipes (
125 -- * Low-level Parsers
141 import Control.Exception (throwIO, try)
142 import Control.Monad (liftM, unless)
143 import Control.Monad.Trans.State.Strict (StateT)
144 import qualified Data.Text as T
145 import qualified Data.Text.IO as T
146 import Data.Text (Text)
147 import qualified Data.Text.Lazy as TL
148 import qualified Data.Text.Lazy.IO as TL
149 import Data.Text.Lazy.Internal (foldrChunks, defaultChunkSize)
150 import Data.ByteString.Unsafe (unsafeTake, unsafeDrop)
151 import Data.Char (ord)
152 import Data.Functor.Identity (Identity)
153 import qualified Data.List as List
154 import Foreign.C.Error (Errno(Errno), ePIPE)
155 import qualified GHC.IO.Exception as G
157 import qualified Pipes.ByteString.Parse as PBP
158 import Pipes.ByteString.Parse (
159 nextByte, drawByte, unDrawByte, peekByte, isEndOfBytes )
160 import Pipes.Core (respond, Server')
161 import qualified Pipes.Parse as PP
162 import Pipes.Parse (input, concat, FreeT)
163 import qualified Pipes.Safe.Prelude as Safe
164 import qualified Pipes.Safe as Safe
165 import Pipes.Safe (MonadSafe(..), Base(..))
166 import qualified Pipes.Prelude as P
167 import qualified System.IO as IO
168 import Data.Char (isSpace)
169 import Prelude hiding (
198 -- | Convert a lazy 'TL.Text' into a 'Producer' of strict 'Text's
199 fromLazy :: (Monad m) => TL.Text -> Producer' Text m ()
200 fromLazy = foldrChunks (\e a -> yield e >> a) (return ())
201 {-# INLINABLE fromLazy #-}
203 -- | Stream bytes from 'stdin'
204 stdin :: MonadIO m => Producer' Text m ()
205 stdin = fromHandle IO.stdin
206 {-# INLINABLE stdin #-}
208 -- | Convert a 'IO.Handle' into a text stream using a chunk size
209 -- determined by the good sense of the text library.
210 fromHandle :: MonadIO m => IO.Handle -> Producer' Text m ()
211 fromHandle h = go where
212 go = do txt <- liftIO (T.hGetChunk h)
213 unless (T.null txt) $ do yield txt
215 {-# INLINABLE fromHandle#-}
217 {-| Stream text from a file using Pipes.Safe
219 >>> runSafeT $ runEffect $ readFile "README.md" >-> map toUpper >-> hoist lift stdout
222 TEXT PIPES, SOMEHOW TO BE FUSED WITH `PIPES-TEXT`.
227 readFile :: (MonadSafe m, Base m ~ IO) => FilePath -> Producer' Text m ()
228 readFile file = Safe.withFile file IO.ReadMode fromHandle
229 {-# INLINABLE readFile #-}
231 stdinLn :: MonadIO m => Producer' Text m ()
235 eof <- liftIO (IO.hIsEOF IO.stdin)
237 txt <- liftIO (T.hGetLine IO.stdin)
241 {-| Convert a handle into a byte stream using a fixed chunk size
243 'hGet' waits until exactly the requested number of bytes are available for
246 -- hGet :: MonadIO m => Int -> IO.Handle -> Producer' Text m ()
247 -- hGet size h = go where
249 -- eof <- liftIO (IO.hIsEOF h)
253 -- bs <- liftIO (T.hGet h size)
256 -- {-# INLINABLE hGet #-}
258 {-| Like 'hGetSome', except you can vary the maximum chunk size for each request
260 -- hGetSomeN :: MonadIO m => IO.Handle -> Int -> Server' Int Text m ()
261 -- hGetSomeN h = go where
263 -- eof <- liftIO (IO.hIsEOF h)
267 -- bs <- liftIO (T.hGetSome h size)
268 -- size2 <- respond bs
270 -- {-# INLINABLE hGetSomeN #-}
272 -- -- | Like 'hGet', except you can vary the chunk size for each request
273 -- hGetN :: MonadIO m => IO.Handle -> Int -> Server' Int Text m ()
274 -- hGetN h = go where
276 -- eof <- liftIO (IO.hIsEOF h)
280 -- bs <- liftIO (T.hGet h size)
281 -- size2 <- respond bs
283 -- {-# INLINABLE hGetN #-}
285 {-| Stream bytes to 'stdout'
287 Unlike 'toHandle', 'stdout' gracefully terminates on a broken output pipe.
289 Note: For best performance, use @(for source (liftIO . putStr))@ instead of
290 @(source >-> stdout)@.
292 stdout :: MonadIO m => Consumer' Text m ()
297 x <- liftIO $ try (T.putStr txt)
299 Left (G.IOError { G.ioe_type = G.ResourceVanished
300 , G.ioe_errno = Just ioe })
303 Left e -> liftIO (throwIO e)
305 {-# INLINABLE stdout #-}
307 stdoutLn :: (MonadIO m) => Consumer' Text m ()
312 x <- liftIO $ try (T.putStrLn str)
314 Left (G.IOError { G.ioe_type = G.ResourceVanished
315 , G.ioe_errno = Just ioe })
318 Left e -> liftIO (throwIO e)
320 {-# INLINABLE stdoutLn #-}
322 {-| Convert a byte stream into a 'Handle'
324 Note: For best performance, use @(for source (liftIO . hPutStr handle))@
325 instead of @(source >-> toHandle handle)@.
327 toHandle :: MonadIO m => IO.Handle -> Consumer' Text m r
328 toHandle h = for cat (liftIO . T.hPutStr h)
329 {-# INLINABLE toHandle #-}
331 writeFile :: (MonadSafe m, Base m ~ IO) => FilePath -> Consumer' Text m ()
332 writeFile file = Safe.withFile file IO.WriteMode toHandle
334 -- | Apply a transformation to each 'Char' in the stream
335 map :: (Monad m) => (Char -> Char) -> Pipe Text Text m r
336 map f = P.map (T.map f)
337 {-# INLINABLE map #-}
339 -- | Map a function over the byte stream and concatenate the results
341 :: (Monad m) => (Char -> Text) -> Pipe Text Text m r
342 concatMap f = P.map (T.concatMap f)
343 {-# INLINABLE concatMap #-}
345 -- | @(take n)@ only allows @n@ bytes to pass
346 take :: (Monad m, Integral a) => a -> Pipe Text Text m ()
347 take n0 = go n0 where
352 let len = fromIntegral (T.length bs)
354 then yield (T.take (fromIntegral n) bs)
358 {-# INLINABLE take #-}
360 -- | @(dropD n)@ drops the first @n@ bytes
361 drop :: (Monad m, Integral a) => a -> Pipe Text Text m r
362 drop n0 = go n0 where
367 let len = fromIntegral (T.length bs)
370 yield (T.drop (fromIntegral n) bs)
373 {-# INLINABLE drop #-}
375 -- | Take bytes until they fail the predicate
376 takeWhile :: (Monad m) => (Char -> Bool) -> Pipe Text Text m ()
377 takeWhile predicate = go
381 let (prefix, suffix) = T.span predicate bs
387 {-# INLINABLE takeWhile #-}
389 -- | Drop bytes until they fail the predicate
390 dropWhile :: (Monad m) => (Char -> Bool) -> Pipe Text Text m r
391 dropWhile predicate = go where
394 case T.findIndex (not . predicate) bs of
399 {-# INLINABLE dropWhile #-}
401 -- | Only allows 'Char's to pass if they satisfy the predicate
402 filter :: (Monad m) => (Char -> Bool) -> Pipe Text Text m r
403 filter predicate = P.map (T.filter predicate)
404 {-# INLINABLE filter #-}
406 -- | Stream all indices whose elements match the given 'Char'
407 -- elemIndices :: (Monad m, Num n) => Char -> Pipe Text n m r
408 -- elemIndices w8 = findIndices (w8 ==)
409 -- {-# INLINABLE elemIndices #-}
411 -- | Stream all indices whose elements satisfy the given predicate
412 -- findIndices :: (Monad m, Num n) => (Char -> Bool) -> Pipe Text n m r
413 -- findIndices predicate = go 0
417 -- each $ List.map (\i -> n + fromIntegral i) (T.findIndices predicate bs)
418 -- go $! n + fromIntegral (T.length bs)
419 -- {-# INLINABLE findIndices #-}
421 -- | Strict left scan over the bytes
424 => (Char -> Char -> Char) -> Char -> Pipe Text Text m r
425 scan step begin = go begin
429 let bs' = T.scanl step w8 bs
433 {-# INLINABLE scan #-}
435 {-| Fold a pure 'Producer' of strict 'Text's into a lazy
438 toLazy :: Producer Text Identity () -> TL.Text
439 toLazy = TL.fromChunks . P.toList
440 {-# INLINABLE toLazy #-}
442 {-| Fold an effectful 'Producer' of strict 'Text's into a lazy
445 Note: 'toLazyM' is not an idiomatic use of @pipes@, but I provide it for
446 simple testing purposes. Idiomatic @pipes@ style consumes the chunks
447 immediately as they are generated instead of loading them all into memory.
449 toLazyM :: (Monad m) => Producer Text m () -> m TL.Text
450 toLazyM = liftM TL.fromChunks . P.toListM
451 {-# INLINABLE toLazyM #-}
453 -- | Reduce the stream of bytes using a strict left fold
456 => (x -> Char -> x) -> x -> (x -> r) -> Producer Text m () -> m r
457 fold step begin done = P.fold (\x bs -> T.foldl' step x bs) begin done
458 {-# INLINABLE fold #-}
460 -- | Retrieve the first 'Char'
461 head :: (Monad m) => Producer Text m () -> m (Maybe Char)
467 Left _ -> return Nothing
468 Right (w8, _) -> return (Just w8)
469 {-# INLINABLE head #-}
471 -- | Retrieve the last 'Char'
472 last :: (Monad m) => Producer Text m () -> m (Maybe Char)
482 else go (Just $ T.last bs) p'
483 -- TODO: Change this to 'unsafeLast' when bytestring-0.10.2.0
484 -- becomes more widespread
485 {-# INLINABLE last #-}
487 -- | Determine if the stream is empty
488 null :: (Monad m) => Producer Text m () -> m Bool
490 {-# INLINABLE null #-}
492 -- | Count the number of bytes
493 length :: (Monad m, Num n) => Producer Text m () -> m n
494 length = P.fold (\n bs -> n + fromIntegral (T.length bs)) 0 id
495 {-# INLINABLE length #-}
497 -- | Fold that returns whether 'M.Any' received 'Char's satisfy the predicate
498 any :: (Monad m) => (Char -> Bool) -> Producer Text m () -> m Bool
499 any predicate = P.any (T.any predicate)
500 {-# INLINABLE any #-}
502 -- | Fold that returns whether 'M.All' received 'Char's satisfy the predicate
503 all :: (Monad m) => (Char -> Bool) -> Producer Text m () -> m Bool
504 all predicate = P.all (T.all predicate)
505 {-# INLINABLE all #-}
507 -- | Return the maximum 'Char' within a byte stream
508 maximum :: (Monad m) => Producer Text m () -> m (Maybe Char)
509 maximum = P.fold step Nothing id
514 else Just $ case mw8 of
515 Nothing -> T.maximum bs
516 Just w8 -> max w8 (T.maximum bs)
517 {-# INLINABLE maximum #-}
519 -- | Return the minimum 'Char' within a byte stream
520 minimum :: (Monad m) => Producer Text m () -> m (Maybe Char)
521 minimum = P.fold step Nothing id
527 Nothing -> Just (T.minimum bs)
528 Just w8 -> Just (min w8 (T.minimum bs))
529 {-# INLINABLE minimum #-}
531 -- | Determine whether any element in the byte stream matches the given 'Char'
532 -- elem :: (Monad m) => Char -> Producer Text m () -> m Bool
533 -- elem w8 = P.any (T.elem w8)
534 -- {-# INLINABLE elem #-}
536 -- {-| Determine whether all elements in the byte stream do not match the given
539 -- notElem :: (Monad m) => Char -> Producer Text m () -> m Bool
540 -- notElem w8 = P.all (T.notElem w8)
541 -- {-# INLINABLE notElem #-}
543 -- | Find the first element in the stream that matches the predicate
546 => (Char -> Bool) -> Producer Text m () -> m (Maybe Char)
547 find predicate p = head (p >-> filter predicate)
548 {-# INLINABLE find #-}
550 -- | Index into a byte stream
552 :: (Monad m, Integral a)
553 => a-> Producer Text m () -> m (Maybe Char)
554 index n p = head (p >-> drop n)
555 {-# INLINABLE index #-}
557 -- | Find the index of an element that matches the given 'Char'
559 -- :: (Monad m, Num n) => Char -> Producer Text m () -> m (Maybe n)
560 -- elemIndex w8 = findIndex (w8 ==)
561 -- {-# INLINABLE elemIndex #-}
563 -- | Store the first index of an element that satisfies the predicate
565 -- :: (Monad m, Num n)
566 -- => (Char -> Bool) -> Producer Text m () -> m (Maybe n)
567 -- findIndex predicate p = P.head (p >-> findIndices predicate)
568 -- {-# INLINABLE findIndex #-}
570 -- -- | Store a tally of how many elements match the given 'Char'
571 -- count :: (Monad m, Num n) => Char -> Producer Text m () -> m n
572 -- count w8 p = P.fold (+) 0 id (p >-> P.map (fromIntegral . T.count w8))
573 -- {-# INLINABLE count #-}
575 -- | Splits a 'Producer' after the given number of bytes
577 :: (Monad m, Integral n)
580 -> Producer' Text m (Producer Text m r)
587 Left r -> return (return r)
589 let len = fromIntegral (T.length bs)
595 let (prefix, suffix) = T.splitAt (fromIntegral n) bs
597 return (yield suffix >> p')
598 {-# INLINABLE splitAt #-}
600 -- | Split a byte stream into 'FreeT'-delimited byte streams of fixed size
602 :: (Monad m, Integral n)
603 => n -> Producer Text m r -> FreeT (Producer Text m) m r
604 chunksOf n p0 = PP.FreeT (go p0)
610 Right (bs, p') -> PP.Free $ do
611 p'' <- splitAt n (yield bs >> p')
612 return $ PP.FreeT (go p'')
613 {-# INLINABLE chunksOf #-}
615 {-| Split a byte stream in two, where the first byte stream is the longest
616 consecutive group of bytes that satisfy the predicate
622 -> Producer' Text m (Producer Text m r)
628 Left r -> return (return r)
630 let (prefix, suffix) = T.span predicate bs
637 return (yield suffix >> p')
638 {-# INLINABLE span #-}
640 {-| Split a byte stream in two, where the first byte stream is the longest
641 consecutive group of bytes that don't satisfy the predicate
647 -> Producer Text m (Producer Text m r)
648 break predicate = span (not . predicate)
649 {-# INLINABLE break #-}
651 {-| Split a byte stream into sub-streams delimited by bytes that satisfy the
658 -> PP.FreeT (Producer Text m) m r
659 splitWith predicate p0 = PP.FreeT (go0 p0)
664 Left r -> return (PP.Pure r)
668 else return $ PP.Free $ do
669 p'' <- span (not . predicate) (yield bs >> p')
670 return $ PP.FreeT (go1 p'')
675 Right (_, p') -> PP.Free $ do
676 p'' <- span (not . predicate) p'
677 return $ PP.FreeT (go1 p'')
678 {-# INLINABLE splitWith #-}
680 -- | Split a byte stream using the given 'Char' as the delimiter
684 -> FreeT (Producer Text m) m r
685 split w8 = splitWith (w8 ==)
686 {-# INLINABLE split #-}
688 {-| Group a byte stream into 'FreeT'-delimited byte streams using the supplied
693 => (Char -> Char -> Bool)
695 -> FreeT (Producer Text m) m r
696 groupBy equal p0 = PP.FreeT (go p0)
701 Left r -> return (PP.Pure r)
702 Right (bs, p') -> case (T.uncons bs) of
705 return $ PP.Free $ do
706 p'' <- span (equal w8) (yield bs >> p')
707 return $ PP.FreeT (go p'')
708 {-# INLINABLE groupBy #-}
710 -- | Group a byte stream into 'FreeT'-delimited byte streams of identical bytes
712 :: (Monad m) => Producer Text m r -> FreeT (Producer Text m) m r
714 {-# INLINABLE group #-}
716 {-| Split a byte stream into 'FreeT'-delimited lines
718 Note: This function is purely for demonstration purposes since it assumes a
719 particular encoding. You should prefer the 'Data.Text.Text' equivalent of
720 this function from the upcoming @pipes-text@ library.
723 :: (Monad m) => Producer Text m r -> FreeT (Producer Text m) m r
724 lines p0 = PP.FreeT (go0 p0)
729 Left r -> return (PP.Pure r)
733 else return $ PP.Free $ go1 (yield bs >> p')
735 p' <- break ('\n' ==) p
736 return $ PP.FreeT (go2 p')
741 Right (_, p') -> PP.Free (go1 p')
742 {-# INLINABLE lines #-}
744 {-| Split a byte stream into 'FreeT'-delimited words
746 Note: This function is purely for demonstration purposes since it assumes a
747 particular encoding. You should prefer the 'Data.Text.Text' equivalent of
748 this function from the upcoming @pipes-text@ library.
751 :: (Monad m) => Producer Text m r -> FreeT (Producer Text m) m r
752 words p0 = removeEmpty (splitWith isSpace p0)
754 removeEmpty f = PP.FreeT $ do
757 PP.Pure r -> return (PP.Pure r)
761 Left f' -> PP.runFreeT (removeEmpty f')
762 Right (bs, p') -> return $ PP.Free $ do
765 return (removeEmpty f')
766 {-# INLINABLE words #-}
768 -- | Intersperse a 'Char' in between the bytes of the byte stream
770 :: (Monad m) => Char -> Producer Text m r -> Producer Text m r
778 yield (T.intersperse w8 bs)
785 yield (T.singleton w8)
786 yield (T.intersperse w8 bs)
788 {-# INLINABLE intersperse #-}
790 {-| 'intercalate' concatenates the 'FreeT'-delimited byte streams after
791 interspersing a byte stream in between them
795 => Producer Text m ()
796 -> FreeT (Producer Text m) m r
801 x <- lift (PP.runFreeT f)
803 PP.Pure r -> return r
808 x <- lift (PP.runFreeT f)
810 PP.Pure r -> return r
815 {-# INLINABLE intercalate #-}
817 {-| Join 'FreeT'-delimited lines into a byte stream
819 Note: This function is purely for demonstration purposes since it assumes a
820 particular encoding. You should prefer the 'Data.Text.Text' equivalent of
821 this function from the upcoming @pipes-text@ library.
824 :: (Monad m) => FreeT (Producer Text m) m r -> Producer Text m r
828 x <- lift (PP.runFreeT f)
830 PP.Pure r -> return r
833 yield $ T.singleton '\n'
835 {-# INLINABLE unlines #-}
837 {-| Join 'FreeT'-delimited words into a byte stream
839 Note: This function is purely for demonstration purposes since it assumes a
840 particular encoding. You should prefer the 'Data.Text.Text' equivalent of
841 this function from the upcoming @pipes-text@ library.
844 :: (Monad m) => FreeT (Producer Text m) m r -> Producer Text m r
845 unwords = intercalate (yield $ T.pack " ")
846 {-# INLINABLE unwords #-}
849 The following parsing utilities are single-byte analogs of the ones found
853 {-| Take bytes until they fail the predicate
855 Unlike 'takeWhile', this 'PP.unDraw's unused bytes
860 -- -> Pipe Text Text (StateT (Producer Text m r) m) ()
861 -- takeWhile' = PBP.takeWhile
862 -- {-# INLINABLE takeWhile' #-}
863 -- {-# DEPRECATED takeWhile' "Use Pipes.Text.Parse.takeWhile instead" #-}
866 "Pipes.Text.Parse" re-exports 'nextByte', 'drawByte', 'unDrawByte',
867 'peekByte', and 'isEndOfBytes'.
869 @Data.Text@ re-exports the 'Text' type.
871 @Data.Word@ re-exports the 'Char' type.
873 @Pipes.Parse@ re-exports 'input', 'concat', and 'FreeT' (the type).