luggage baggage

Machine learning, data analysis, web technologies and things around me.

multiprocessing で tfrecord 作成を高速化する話

お久しぶりです。

TensorFlow でモデル学習をする際、便利に使われるのがtf.dataAPI です。以前はtf.contribで提供されていましたが、非常に使い勝手の良い機能だったこともあり、現在は独立したモジュールとして存在しています。様々な形式のデータ入力に対応している中でも、公式に推奨されているのは
.tfrecordというバイナリ形式のデータを読み取るtf.data.TFRecordDatasetだと思います(むかしの Caffe でいう LMDB みたいなやつ)。

私はいま画像認識系の仕事をしているのですが、先日扱った画像は百数十万枚ほどあり、一枚一枚も10数MBと小さくはなかったため、単純に tfrecord を作成すると30時間以上もかかる状態でした。この記事では、その時に使った非常に雑な高速化手法をご紹介していきます。CPU 数が稼げる環境であれば、10倍速とかは十分に達成できる感じでした。

動作確認環境

  • Ubuntu 16.04 on EC2 (t2.2xlarge)
  • TensorFlow 1.10

tfrecord の作成

この記事では高速化の手順を主に書きたいので、tfrecord の作成の詳細については割愛したいと思います。大雑把な内容だけ確認すると、画像を扱う場合、

  1. 画像データを読み込み、適宜変換した後、numpy 配列にする。
  2. ラベルデータとともにtf.train.Exampleにデータを保管する。
  3. Exampleを tfrecord ファイルに書き込む。

の3ステップが必要となります。このうち、1. と 2. は画像データの読み出しおよびバイナリ化、3. はファイルへの書き込みとなるので、大きくは read/write のステップに分けられているということです。画像のサイズが大きくなると読み出しに時間がかかったりするようになります。なお、1. についてはskimagecv2等のライブラリを使ってやる方法もあるのですが、私が試した範囲では、結局 TensorFlow 付属の関数(tf.read_fileとかtf.image.decode_jpegとか)を使ってテンソルを流した後にsess.runして numpy 配列にする方針が一番速かったです。

multiprocessingを使った高速化

私が今回高速化のために実装したのは、こんな方針でした:

  1. 画像の読み出しとExample作成のために、複数のワーカープロセスを立ち上げる
  2. Exampleの書き込みは、親プロセスで実行する
  3. Exampleのやり取りをするためにmultiprocessing.Queueを使う

私の作業環境では、jpeg データの読み出しにとにかく時間がかかっており、まずはこの箇所を高速化したい状況だったのです。

親プロセスでの処理はこんな感じで書いています。

def write_to_tfrecord(img_paths, labels, out_path, img_size, n_prod_process):
    q = Queue()

    # プロセス数分だけデータを分割して、立ち上げたワーカーに食べさせる
    data_len = len(img_paths)
    img_batches = make_chunks(img_paths, n_prod_process, data_len)
    label_batches = make_chunks(labels, n_prod_process, data_len)
    prod_ps = [Process(target=image_reader,
                       args=(img_batches[i], label_batches[i], img_size, q))
               for i in range(n_prod_process)]
    for p in prod_ps:
        p.daemon = True   # ワーカーは後ろで動かしておく
        p.start()

    # 親プロセスでキュー内の Example を取り出し tfrecord に書き込む
    get_count = 0
    span = 100
    writer = tf.python_io.TFRecordWriter(out_path)
    s = time.time()
    s_total = time.time()
    while True:
        if get_count == data_len:
            break
        if q.empty():
            time.sleep(0.1)
            continue
        example = q.get()
        writer.write(example.SerializeToString())
        get_count += 1
        if get_count % span == 0:
            e = time.time()
            print("Done {:,} images. Mean writing time: {:.04f} s."
                  .format(get_count, (e - s) / span))
            s = e

    print("Done writing all ({:,} of {:,}) images in {:.02f} seconds."
          .format(get_count, data_len, time.time() - s_total))

やり方は簡単で、単にmultiprocessing.Processを必要な分立ち上げて、読むべきデータをチャンクに分けて各々のプロセスで読み、キューに詰めていくだけですね。

ワーカーの実装としては、各プロセスに対してtf.Sessionを貼り付け、読み込んだ画像 Tensor をsess.runして Python の世界に持ち込んでいます。

def image_reader(img_paths, labels, img_size, q):
    ipath = tf.placeholder(tf.string)
    size = tf.placeholder(tf.int32)
    img = tf.read_file(ipath)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize_images(img, [size, size])

    # GPU は使用しない
    with tf.Session(config=tf.ConfigProto(device_count={"GPU": 0})) as sess:
        for img_path, label in zip(img_paths, labels):
            resized_img = sess.run(img, feed_dict={ipath: img_path, size: img_size})
            example = make_example(resized_img, label)
            q.put(example)
    print("Done producing image protos @ {}".format(os.getpid()))

私が制御できていない箇所があるらしく GPU メモリが若干消費されてしまってはいたのですが、何にせよこれで画像読み出しを並列化して単純に高速化することができるようになりました。

実験

よくあるパンダ画像を例に、上記の高速化手法がどの程度ワークするか見てみました。この画像の形は 696x613 で、サイズは 113KB。1,000 枚を tfrecord 化するのにかかった秒数を、使ったプロセス数に対してプロットしたのが下のグラフです。
f:id:yoshidabenjiro:20180814002116p:plain
これ全然印象的でなくて申し訳ないのですが、プロセス数 2 でほぼ処理時間が半減して以降、ほぼ横ばいが続いていますね。

その理由としては、この事例では画像の読み込みではなく書き込みに時間がかかる傾向にあることが挙げられそうです。実際、プロセス数を増やした際のスクリプトの出力は、このようになっていました。

プロセス数=2 の時:

Start writing data with 2 producer processes
2018-08-13 14:25:16.834744: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-08-13 14:25:16.834743: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
Done 100 images. Mean writing time: 0.0022 s.
Done 200 images. Mean writing time: 0.0033 s.
Done 300 images. Mean writing time: 0.0032 s.
Done 400 images. Mean writing time: 0.0022 s.
Done 500 images. Mean writing time: 0.0022 s.
Done 600 images. Mean writing time: 0.0032 s.
Done 700 images. Mean writing time: 0.0032 s.
Done 800 images. Mean writing time: 0.0022 s.
Done 900 images. Mean writing time: 0.0032 s.
Done producing image protos @ 9374
Done producing image protos @ 9375
Done 1,000 images. Mean writing time: 0.0022 s.
Done writing all (1,000 of 1,000) images in 2.72 seconds.

プロセス数=6の時:

Start writing data with 6 producer processes
2018-08-13 14:26:36.983996: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-08-13 14:26:36.991585: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-08-13 14:26:36.992146: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-08-13 14:26:36.997134: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-08-13 14:26:36.997243: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-08-13 14:26:37.005560: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
Done 100 images. Mean writing time: 0.0030 s.
Done 200 images. Mean writing time: 0.0053 s.
Done producing image protos @ 9608
Done producing image protos @ 9605
Done producing image protos @ 9609
Done producing image protos @ 9604
Done producing image protos @ 9606
Done producing image protos @ 9607
Done 300 images. Mean writing time: 0.0024 s.
Done 400 images. Mean writing time: 0.0022 s.
Done 500 images. Mean writing time: 0.0022 s.
Done 600 images. Mean writing time: 0.0022 s.
Done 700 images. Mean writing time: 0.0022 s.
Done 800 images. Mean writing time: 0.0022 s.
Done 900 images. Mean writing time: 0.0022 s.
Done 1,000 images. Mean writing time: 0.0032 s.
Done writing all (1,000 of 1,000) images in 2.69 seconds.

注目すべきはDone producing image protos @ XXXの部分で、プロセス数を増やすとメッセージの出現タイミングが早まっていますね。これは、比較的軽めの画像しかないにもかかわらず、ワーカーを多く立ち上げすぎているせいで画像の読み出し作業が早く終わっているということです。逆に tfrecord への書き込みがボトルネックになり、全体としての完了時間には差が出なくなっているのだと思われます。

ここでは示していないのですが、私が作業した環境では、上述の通りサイズの大きな画像が大量にある状況だったため、少なくともプロセス数=30 程度まではほぼリニアに処理時間が短縮されていました。

まとめ

multiprocessingを使うことで、tfrecord 作成を簡単に高速化する方法をご紹介しました。ただ実装としてはすごくナイーブで、本当は tf.train.Server を使って分散計算の仕組みに則るやり方もあるのかもしれません。また、threadingを使ったほうが速いと思うので、そちらは別途記事化します。まずは手が早く動く方針で書いてしまった感じです。

コードは GitHub - dlnp2/tensorflow_misc にありますので、ご自由にどうぞ。パンダ画像をダウンロードして "画像へのパス - ラベル" をセットにしたサンプル csv を作成するコードも一緒にあるので、上記コードの入力として使っていただけると動作確認できると思います。