Lazy I/O must go! - Iteratee: 列挙ベースのI/O

最近ちょっと気になるiterateeを勉強したので、日本語の解説を書いてみます。と言いつつ、大部分が The Monad.Reader Issue 16 *1 からの引用です。

はじめに

Iterateeと呼ばれる新たなI/Oの抽象化手法が、最近にわかに広まりつつあります。既存のI/Oが抱える問題を解決するべくOleg Kiselyovによって2008年頃に提唱されたiterateeは、新しい高性能webフレームワークsnap *2 や、hyena *3 で利用されています。また、HackagDB上にて、iterateeパッケージ*4、およびiterateeを利用できる様々なパッケージ *5 *6 *7 *8 が公開されています。

しかし、ドキュメントの少なさなどからiterateeがどういうものなのかよく分からないという人も多いようです。そういうわけなので、iterateeを易しく解説してみようと思います。

Haskellにおける既存のI/Oとその問題点

さてHaskellでは標準APIとしてHandleベースのI/Oライブラリが用意されています(hGetLine, hPutStr, など)。しかし、HandleベースのI/Oはもともと手続き型言語の世界からやってきたもので、HaskellAPIとして考えたとき十分に使いやすいとは言えません。その理由としては、

  • composableでない
  • 関数型でない
  • 格好良くない!

などがあります。Haskellには、Lazy I/Oというものもあります。hGetContentsなどを用いると、なるほど確かに String -> String のような関数的でとても綺麗なインターフェースで入出力処理を行うことができます。しかし、Lazy I/Oにはとても致命的ないくつかの問題点があるのです。

  • I/Oが終わると同時にリソースを開放できない。GCを待つ必要がある。これはネットワークソケットなどの限られたリソースの場合は致命的。
  • エラーハンドリングが困難。エラーの発生も遅延されるので、pureなコード内でI/Oエラーが発生するかもしれません。

これらの理由から、Lazy I/Oは使うべきではないとする意見もあるようです*9

そこでこれらの問題を解決した、次のようなAPIが望まれます。

  • composable
  • 高レベル
  • 堅牢
  • 思い通りに動く (リソース管理など)
  • ハイパフォーマンス

iterateeでは、これを逆転の発想で実現します。Handleのインターフェースは、データが必要になる度にユーザがデータを取りに行くという、いわばpull式のインターフェースですが、iterateeでは、データを受け取って処理するpush式のインターフェースになります。

iterateeとは

iterateeはfoldlのインターフェースで説明されます。Haskellプログラマならfoldlには親しみ深いと思います。foldlは3つの引数を取ります。それぞれ、

foldl :: (a -> b -> a) -> a -> [b] -> a
  • 合成関数
  • 初期値アキュームレータ
  • データ列

です。foldlはデータ列をアキュームレータに合成関数を用いて供給することにより最終出力を得ます。アキュームレータと合成関数を合わせて、iterateeと呼ぶことにします。そうすると、foldlは"iteratee"を用いて入力データを"イテレート"する、と考えることができます。

入力例を生成し、iterateeに与えるものをenumeratorと呼ぶことにします。foldlでは単なるリストですが、我々の目的であるI/Oならば、Handleから入力例を得て、それをiterateeに渡す働きをすることになります。

どうしてこのような抽象化にするのでしょうか。これはひとえに計算の合成のためです。

iterateeの合成

2つのiterateeを合成して大きなiterateeをつくるというのを考えることができます。1つ目のiterateeを走らせて計算が完了したら、残りの入力列で2つ目のiterateeを走らせます。

enumeratorの合成

1つ目のenumeratorの要素に続けて2つ目のenumeratorの要素が続くようなenumeratorが考えられます。

実装してみる

具体的なコードを見たほうが理解も速いと思いますので、実際にiterateeを実装していくことにします。

まずはデータストリームとiterateeのデータ構造を定義します。

data StreamG el = Empty | El el | EOF

data IterV el a
  = Done a (StreamG el)
  | Cont (StreamG el -> IterV el a)

StreamGが入力ストリームを表しますが、一般的なストリームの定義とは異なります。ストリーム全体を保持するのではなく、ただ1つ(もしくは0個)の要素を保持します。ストリーム全体は[StreamG]として表されます。これはEOFの情報を持たせたいためです。

iterateeを表しているのがIterVです。IterVはfoldlにおけるiterateeに対応するものですが、foldlの概念とは少し異なります。それはストリーム全体を消費しなくても計算を終了できる所です。Doneが終了した状態を現しており、計算結果と最後の要素を消費したかを持っています。Contは計算途中であることを表し、続きの計算を持っています。

さて、これを動作させるために、IterVにデータを供給するためのenum関数(enumeratorを表す)と、最終結果を取り出すためのrun関数を作る必要があります。iterateeには次の3つのタイプのものがあります。enum関数はこれらをハンドルできなければなりません。

  • 有限個の要素を消費し、Doneになる
  • ストリーム全体を消費し、EOFが来るとDoneになる(例えばsumのようなもの)
  • 決してDoneにならない

enum関数は基本的にfoldlのような関数になりますが、Doneをハンドルしなければいけないところが異なっています。

加えて、未完了のiterateeにEOFを送って結果を取り出すrun関数を作ります。

enum :: IterV el a -> [el] -> IterV el a
enum i [] = i
enum i@(Done _ _) _ = i
enum (Cont k) (x:xs) = enum (k (El x)) xs

run :: IterV el a -> Maybe a
run (Done x _) = Just x
run (Cont k) = run' (k EOF)
 where
   run' (Done x _) = Just x
   run' _ = Nothing

iterateeの例

具体的なiterateeの例を、よくあるリストの操作を定義することで見ていきます。

  • 先頭要素を消費して、先頭要素を返す。
head :: IterV el (Maybe el)
head = Cont step
  where
    step (El el) = Done (Just el) Empty
    step Empty = Cont step
    step EOF = Done Nothing EOF
  • 先頭要素を消費せず先頭要素を返す。
peek :: IterV el (Maybe el)
peek = Cont step
  where
    step c@(El el) = Done (Just el) c
    step Empty = Cont step
    step EOF = Done Nothing EOF
  • 要素をn個消費する。
drop :: Int -> IterV el ()
drop 0 = Done () Empty
drop n = Cont step
  where
    step (El _) = drop (n-1)
    step Empty = Cont step
    step EOF = Done () EOF
  • 要素の数を数える。
length :: IterV el Int
length = Cont (step 0)
  where
    step acc (El _) = Cont (step (acc+1))
    step acc Empty = Cont (step acc)
    step acc EOF = Done acc EOF

合成

このようなiterateeの定義に対して、iterateeの合成を考えることができます。実際のところ、IterVはモナドであり、ApplicativeやFunctorにもできます。合成の意味は前の章で説明したとおりです。bindは1つめのiterateeを実行して、残りの入力列で2つめのiterateeを実行するiterateeを返します。

instance Monad (IterV el) where
  return x = Done x Empty
  m >>= f = case m of
    Done x str -> case f x of
      Done x' _ -> Done x' str
      Cont k -> k str
    Cont k -> Cont (\str -> k str >>= f)

instance Functor (IterV el) where
  fmap f (Done x str) = Done (f x) str
  fmap f (Cont k) = Cont (fmap f . k)

instance Applicative (IterV el) where
  pure x = Done x Empty
  (Done f str) <*> i2 = fmap f i2
  (Cont k) <*> i2 = Cont (\str -> k str <*> i2)

こうして我々はiterateeをHaskellの標準的な方法で合成できるようになりました。簡単な例を示します。drop1keep1は、入力を1文字捨てて、次の文字を返すiterateeです。alternatesは入力列を1つ置きに、先頭5つまで取り出すiterateeです。

drop1keep1 :: IterV el (Maybe el)
drop1keep1 = drop 1 >> head

alternates :: IterV el [el]
alternates = fmap catMaybes . sequence . replicate 5 $ drop1keep1

実行例:

*Main> let alts = enum alternates [1..10]
*Main> run alts
Just [2,4,6,8,10]
*Main> let alts2 = enum alternates [1..]
*Main> run alts2
Just [2,4,6,8,10]

モナドの導入

さて、ここまでiterateeを実装して来たわけですが、前の章の実装はpurelyな世界に閉じていて何もI/Oをしていません。I/Oをできるようにするため、enumeratorをモナドに対応させてやることにします。

type EnumeratorM el m a = IterV el a -> m (IterV el a)

enumHandle :: Handle -> EnumeratorM Char IO a
enumHandle h iter = loop iter
  where
    loop i@(Done _ _) = return i
    loop i@(Cont k) = do
      isEOF <- hIsEOF h
      if isEOF then return i else hGetChar h >>= loop . k . El

enumFile :: FilePath -> EnumeratorM Char IO a
enumFile fp i =
  bracket
    (openFile fp ReadMode)
    (hClose)
    (flip enumHandle i)

モナド対応版のenumeratorの型がEnumeratorMです。先程のenum関数と同じく、IterVを引数に取りIterVを返しますが、EnumeratorがI/Oをできるようにモナドに包まれています。

実際のenumeratorの例がenumHandleとenumFileです。それぞれハンドルとファイルの文字要素を列挙します。これらのenumeratorは次のような重要な安全性を持っています。

  • ハンドルがリークしない

Handleがiterateeに渡らないので、どこかにエスケープしてしまうことがない。bracketによって例外が起きても安心。

  • 完了後すぐにハンドルを閉じることができる

iterateeが遅延評価されないので、すぐに閉じることができます。

enumHandleがEOFをiterateeに送らないのは重要です。enumeratorがEOFを送らないことによって、enumeratorの連結が可能になります。EOFを送るのはrunの仕事になります。

2つのファイルの長さを計算する例:

lengthOfTwoFiles :: FilePath -> FilePath -> IO (Maybe Int)
lengthOfTwoFiles fp1 fp2 =
  fmap run $ ((enumFile fp1) >=> (enumFile fp2)) length

モナディックiteratee

I/Oのうち、入力はこれでできるようになりましたが、出力はどうでしょう。iterateeをモナドに対応させることによりこれは可能になります。しかしenumeratorより多少ややこしいです。

data IterVM el m a
  = DoneM a (StreamG el)
  | ContM (StreamG el -> Iteratee el m a)

newtype Iteratee el m a
  = Iteratee { runIter :: m (IterVM el m a) }

IterV が IterVMになって、Iteratee型が増えています。Iterateeが、モナド対応版iterateeで、IterVMは内部的な型です(IterVMのコンストラクタが2つあるのでIterateeの内部には書けない)。

モナド版のIterV(head, peek, drop, lengthなど)に対して、自動的にIterateeに変換する持ち上げ(lift)を定義することができます。

liftIter :: Monad m => IterV el a -> Iteratee el m a
liftIter (Done x str) = Iteratee . return $ DoneM x str
liftIter (Cont k) = Iteratee . return $ ContM (liftIter . k)

さらに、IterateeをMonadTransとMonadIOにしておくと便利です。

instance MonadTrans (Iteratee el) where
  lift m = Iteratee $ m >>= \x -> return (DoneM x Empty)

instance MonadIO m => MonadIO (Iteratee el m) where
  liftIO = lift . liftIO

モナディックiterateeの例をいくつか挙げておきます。

  • ストリームのデータをすべてファイルに書き込む
streamToFile :: FilePath -> Iteratee Char IO ()
streamToFile fp = Iteratee (openFile fp WriteMode >>= go)
  where
    go h = return $ ContM (step h)
    step h (El el) = Iteratee (hPutChar h el >> go h)
    step h Empty = Iteratee (go h)
    step h EOF = Iteratee (return $ DoneM () EOF)
  • 100文字ごとに読み込んだ文字数を表示する
throbber :: Iteratee el IO ()
throbber = Iteratee (cont $ step (100 :: Int) 99)
  where
    step acc 0 (El _) =
      Iteratee
        (printf "Read %d chars\n" acc >>
         cont (step (acc+100) 99))
    step acc cnt (El _) = Iteratee (cont $ step acc (cnt-1))
    step acc cnt Empty = Iteratee (cont $ step acc cnt)
    step _ _ EOF = Iteratee (return $ DoneM () EOF)
    cont = return . ContM

Iteratee自体もモナドですので、もちろんもっと高レベルな書き方もできます。

sequenceI_ :: [Iteratee el m a] -> Iteratee el m ()
-- 定義はエクササイズとして考えてみてね

throbber2 :: Iteratee el IO ()
throbber2 = sequenceI_ $ map i ([100,200..] :: [Int])
  where
    i n = liftIter (drop 100) >> lift (printf "Read %d elems\n" n)

複数のiterateeを1つのenumeratorで並列に評価するというコンビネータを考えます。

enumPair :: Iteratee el m a -> Iteratee el m b -> Iteratee el m (a, b)
enumPair i1 i2 = Iteratee step
 where
   step ... -- 実装は各自考えてみてね

streamToFile "hoge" `enumPair` throbber とすることにより、ファイル書き込みに簡単に進捗表示をつけることができるようになります。その他、時間のかかるような処理にプログレスバー表示を取り付けたりするのも簡単です。これは遅延I/Oでは(少なくとも低レベルの実装を隠したままでは)、できないことです。

ネストしたenumerator

head, drop, lengthといった、fold的な関数を見てきましたが、ではfilterやmapのような関数はどうでしょう?これらをうまく定義するために、"ネストしたenumerator"というものを導入します。これをenumerateeと呼びます。

enumerateeは、ストリーム変換子としての型を持ちます。

type EnumerateeM elOuter elInner m a =
  Iteratee elInner m a
  -> Iteratee elOuter m (Iteratee elInner m a)

以下では簡単のために非モナド版iterateeで考えます。

filter :: (el -> Bool) -> IterV el a -> IterV el (IterV el a)
filter pred i@(Done _ _) = return i
filter pred (Cont k) = Cont step
  where
    step e@(El el) | pred el = filter pred (k e)
    step EOF = Done (k EOF) EOF
    step _ = Cont step

mapIter :: (elOuter -> elInner)
           -> IterV elInner a
           -> IterV elOuter (IterV elInner a)
mapIter f i@(Done _ _) = return i
mapIter f (Cont k) = Cont step
  where
    step (El el) = mapIter f (k (El $ f el))
    step Empty = Cont step
    step EOF = Done (k EOF) EOF

outerストリームが元のstreamで、innerストリームがフィルタされたストリームになります。filterはまずpredicateを引数に取り、さらにもう一つ、フィルタされたinnerストリームを処理するためのiterateeを引数に取ります。mapIterも同様です。

enumerateeを簡単に作るためにヘルパ関数を定義します。

convStream :: Iteratee elOuter m [elInner]
              -> EnumerateeM elOuter elInner m a

outer要素をinner要素に変換するiterateeを引数に取り、enumerateeを構築します。これはmapIterと異なり、1対1の変換でなくても可能です。many-to-one, many-to-none, one-to-many、さらにはnone-to-manyの関係も有り得ます。これを利用すると、例えば、Word8のストリームからInt16のストリームに変換するenumerateeが定義できます。

その他の実装

ここまでiterateeの実装を一通り見てきましたが、実はiterateeの実装にはいろいろなものが提案されています。iteratee-0.2*10では上記実装とは少々異なったものになっています。

data IterVM el m a = DoneM a (StreamG el)
                   | ContM (Iteratee el m a)

newtype Iteratee el m a =
  Iteratee { runIter :: StreamG el -> m (IterVM el m a) }

この定義では、Iterateeは(入力を何も消費しない場合でも)少なくとも一つのデータを要求します。その他の点には違いはありません(これは何が嬉しいんだろう?)。

iteratee-0.4以降*11(未リリース?)ではRankNTypesを用いたCPSスタイルになっています。

newtype Iteratee el m a =
  Iteratee {
    runIter :: forall r.
      (a -> Stream el -> m r) ->
      ((Stream el -> Iteratee el m a) -> Maybe ErrMsg -> m r) ->
      m r
    }

CPSスタイルには、他の実装と異なり型が一つしかなくて関数なので、標準の合成演算子、>>=や(.)で合成できる、あるいは低レベルなiterateeの定義がシンプルになるなどのアドバンテージがあります。

パフォーマンス

iterateeのデザインゴールはLazy I/Oに匹敵するパフォーマンスが得られることでした。実際のところiterateeは速いのでしょうか?残念ながら上記の実装そのままではとても遅いです。それは主に文字の読み書きを1文字ずつやっていることが原因です。

その解決策として、StreamGを1文字保持するのではなく、データのチャンクを保持するように変更します。そうするとI/Oをより大きなバッファで行うことができるようになります。またそうすることによりEmptyコンストラクタを消すことができます。

データのチャンクを保持するバッファの選択がパフォーマンスに大きく影響を与えます。リストはHaskellにおいては一般的で自然な選択で、要素に対して多態でとても使いやすいのですが、パフォーマンスの点からはあまり良くありません。StrictなByteStringを用いると素晴らしいパフォーマンスが得られますが、一般性を損ないます。

そこでiterateeパッケージではポリモーフィックなコンテナを表現できるListLikeクラス*12を利用しています。これによって、一般性を欠くことなく適切なコンテナを選択できるようになっています。

まとめ

というわけでiterateeの概要を見てきました。この記事では基本的な部分しか説明できませんでしたが、実際のiterateeライブラリ*13をいじってみると理解が深まると思います。最近ではhackageDBに類似のライブラリ*14が登場したりしており、まだまだ発展途上のライブラリな感がありますが、将来的にはメインストリームになるんではないかなあと個人的には思っています。