Rubyで並列処理をやっていく #AdventCalendar

mixiグループアドベントカレンダー2016 1日目です。

今回は、自分が今まで利用したRubyでの並列処理を書くためのgemとか知見を紹介します。

機運

先日のRubyKaigi 2016で、Ruby3ではGuildという新しい並列処理のモデル*1が、導入されるというセッションがあったり、concurrent-rubyというgemの開発が流行り初めて居たりと、Ruby界隈でも何となく並列処理がブームきているように感じます。

マルチプロセス/スレッド

しかしRubyで並列処理するのは言語の仕様としてそれなりに制限があり、他の言語のようにThreadをバンバン立ててマルチコアで計算!爆速化!!みたいなのは難しいです。 というのも、Ruby1.9からネイティブスレッドは導入されたものの多くのC拡張を使ったgemのスレッドセーフ性が問題となるため、GIL(Global interpreter lock)と呼ばれる仕組みが存在しており、RubyやC拡張の処理自体が1つのプロセス上で同時に実行されることがありません。(逆にいうとGILのおかげでスレッドセーフ性について何も考えずにRubyが書けている!)

それでも、並列に処理を実行したいときは普通にRubyでアプリケーションを開発していると訪れて、何とかやっていく必要があります。 その場合、以下のような実現方法が考えられます。

  • forkしてマルチプロセス化(PerlとかPHPではこっちが多い)
    • メモリ消費大 (Copy on Writeという仕組みがあるので一定は共有される)
    • スレッドセーフ性考えなくて良くてシンプル
    • Rubyでの計算処理でもCPUのコア数使いきれる
  • マルチスレッド化 + IO多重化
    • メモリ消費小
    • バグなく動かすためスレッドセーフ性が必要
    • Rubyでの計算処理ではCPU使いきれないのでIO処理と組み合わせる

並列処理を書くのは難しい

上述のようなプリミティブな実現方法があったとしても、自分で一から全て正しく書くのはとてもハードルが高いことです。 他の言語でもThreadを直接使うよりを抽象化されたモデル(Future, Actor, async/awaitなど)を利用することが多いです。 Rubyの場合は用途に特化した便利な実装がすでにあるのでこの辺を使う所から始めると良いと個人的に思うので紹介と、これまで得た知見を共有します。

  • Parallel
  • Sidekiq

最初に触れたconcurrent-rubyを使うと他の言語で利用で利用されているような非同期処理の抽象化モデルが利用できますが今回は省略します。

Parallel

ループ処理をめちゃくちゃ簡単に並列化できるライブラリ。 プロセスモデル・スレッドモデル両方採用できます。

サンプルコード。デフォで、マシンのCPU数を調べてその数だけマルチプロセスを起動して処理してくれます。 記法も簡単で、Parallel.mapの引数に配列を渡すだけ。各do〜end内の処理が複数のプロセスやスレッドで処理が行われるようになります。 mapを利用すればRubyArray#mapのように処理した結果を配列で受け取ることも可能です(もちろん引数で渡した配列の順番も保持してくれます)。

# 2 CPUs -> work in 2 processes (a,b + c)
results = Parallel.map(['a','b','c']) do |one_letter|
  expensive_calculation(one_letter)
end

README.md#usage

こういうとき使ってる

  • サーバー1台で実行する程度のスクリプト(集計とか)の処理を早くしたい時
    • 仕様クラスみたいなものを作ってテーブル全件調べて対象データのみ抽出とか
    • 1台で実行すれば処理のアウトプット先を1箇所にまとめるのが簡単
  • 並列に画像などのデータをダウンロード・アップロードする
class FooTargetUserSpecification
  def satisfied_by?(user)
    # ...
    # 重めの判定ロジック
  end

  def satisfied_users(&block)
    User.includes(:some_associations).find_in_batches do |gruop|
      Parallel.each(group, in_processes: 4) do |user|
        @reconnected ||= User.connection.reconnect! || true
        block(user) if satisfied_by?(user)
      end
      User.connection.reconnect!
    end
  end
end

file = File.open('target_users.csv', 'rw+')
spec = FooTargetUserSpecification.new
spec.satisfied_users do |user|
  file.puts(user.id)
end
file.close

知見

  • マルチプロセスで実行するとメモリ食うのでこういうバッチスクリプトをCronで回す場合はちゃんと安定して実行できるかどうか本番データと同じ規模で検証必要
  • 実行内容によってスレッドベースでやると良いのかプロセスベースでやると良いのかは考える(メモリ効率・スレッドセーフ性など)
  • ActiveRecordと組み合わせて使う場合には、connection_pool周りで問題が起きるのでgemのREADME.mdに書いてある再接続処理使うと良い

Sidekiq

いわゆる非同期Jobキューの実装として有名。Sidekiqのプロセスを起動しておいて、RedisにJobを積むと積んだそばから Sidekiqのプロセスが随時ワーカーを起動してJobを消化していってくれます。

スレッドモデルで並列化をしているので、IO処理などでブロッキングされる場合に効果が出るので、 アプリのPush通知を送ったり、メールを送ったり、DBのレコードを更新したりに向いてます。 同様のgemにResqueというものがあるが、あちらはプロセスモデルで実現しているのでメモリを余分に食ったりする。 Sidekiqでもプロセスモデルで実行した方が良い重い計算処理を行いたい場合は並列数1で複数のプロセスを立てれば良いです。

作者の努力によってバージョンが上がるごとにスループットがめちゃ上がったり、gem自身の依存関係が減ったり、エンタープライズ向けの機能も用意されていて徳が高いです。

こういうとき使ってる

  • HTTPリクエスト内では処理しりきれない遅い処理を非同期に実行するJobキュー
    • Push/メール通知
    • 遅延させてUpdateクエリを発行させたい時
    • 動画のエンコード処理
  • 細かい大量の処理を一気にスケールアウトさせて処理したい時
    • 新たなサムネイルを事前作成し、キャッシュを温めたい時
  • S3からDBに登録された画像をダウンロード -> 解析処理 -> DBに永続化
class UserFooWorker
  include Sidekiq::Worker

  sidekiq_options(queue: :default, retry: 3)

  def perform(user_id)
    user = User.find_by(id: user_id)
    return unless user # userが取得できなかったら終了しとく

    user.do_something
  end
end

# 呼び出し側
# perform_asyncを呼ぶとRedisにJobが積まれる
UserFooWorker.perform_async(user.id)

知見

  • Workerのコードを書くときはとにかく冪等制!冪等制!冪等制!と3回ぐらい唱えてからコードを書いてそして読み直す
    • 2回以上同じ処理を実行しても良い処理を書く
    • 1度実行した処理かどうかをチェックできるようにする
    • 仕様として2回実行されてしまうのを一部許容する
  • Workerの要件によって適切にretry回数・条件・間隔を設定する
    • 1度失敗して再度実行したら成功するのかどうか?
      • 通信系はエラーになりやすいので何回かリトライさせる
      • 存在しないデータへの処理は2度と成功しない場合が多い
    • exponential back-offによって1週間後とかに再実行されて嬉しいの???
    • Worker内のコードはできるだけシンプルにしてどこでエラーが発生していつリトライされるのかが分かりやすくなるよう心がける
  • リリースが失敗した場合のリカバリー手段を考えておく
    • 作りが甘くてぬるぽバグとかで大量にリトライJobを出してしまってretryもすぐ消化してしまったとき
      • バグを修正したのちに影響範囲分を再度積み直すとか
  • 1 Workerの粒度を小さくする
  • Queueを意識する
    • リクエスト時に非同期で行うJobを積むQueueと、バッチ処理的に一気にJobを積む際のQueueは分ける(非同期側が詰まる)
    • CloudWatchなどにQueueのサイズをメトリクスとしてPutしておく(AutoScalingに利用できる)
  • Redisを意識する
    • backtraceオプションは便利だが、有効にしたJobを大量に積んで全部こけるとほとんど同様のbacktraceがストレージに書き込まれてものすごい容量を食うので注意
    • XXX.perform_asyncを1万回ループするより、Sidekiq::Client.bulk_pushのAPIを利用して一括で詰むことで負荷もかけずに速く詰める↓のように
# Jobを積む時はRedisに優しくするためにbluk_push使うと、一気に大量のJobを積めて良い
User.select(:id).find_in_batches do |group|
  args = group.map {|user| [user.id] }
  Sidekiq::Client.bluk_push('class' => UserFooWorker, 'args' => args, 'backtrace' => false, 'queue' => 'user_foo_worker_queue')
end

まとめ

Rubyでも簡単に取り入れられる並列処理の書き方について紹介しました。 既存処理を並列化して高速化出来ると気持ち良いので、ぜひ試して見てもらいたいです。 来年はconcurrent-rubyやRxRubyを試してみたい。