Rubyで並列処理をやっていく #AdventCalendar
mixiグループアドベントカレンダー2016 1日目です。
今回は、自分が今まで利用したRubyでの並列処理を書くためのgemとか知見を紹介します。
機運
先日のRubyKaigi 2016で、Ruby3ではGuildという新しい並列処理のモデル*1が、導入されるというセッションがあったり、concurrent-rubyというgemの開発が流行り初めて居たりと、Ruby界隈でも何となく並列処理がブームきているように感じます。
マルチプロセス/スレッド
しかしRubyで並列処理するのは言語の仕様としてそれなりに制限があり、他の言語のようにThreadをバンバン立ててマルチコアで計算!爆速化!!みたいなのは難しいです。 というのも、Ruby1.9からネイティブスレッドは導入されたものの多くのC拡張を使ったgemのスレッドセーフ性が問題となるため、GIL(Global interpreter lock)と呼ばれる仕組みが存在しており、RubyやC拡張の処理自体が1つのプロセス上で同時に実行されることがありません。(逆にいうとGILのおかげでスレッドセーフ性について何も考えずにRubyが書けている!)
それでも、並列に処理を実行したいときは普通にRubyでアプリケーションを開発していると訪れて、何とかやっていく必要があります。 その場合、以下のような実現方法が考えられます。
- forkしてマルチプロセス化(PerlとかPHPではこっちが多い)
- メモリ消費大 (Copy on Writeという仕組みがあるので一定は共有される)
- スレッドセーフ性考えなくて良くてシンプル
- Rubyでの計算処理でもCPUのコア数使いきれる
- マルチスレッド化 + IO多重化
- メモリ消費小
- バグなく動かすためスレッドセーフ性が必要
- Rubyでの計算処理ではCPU使いきれないのでIO処理と組み合わせる
並列処理を書くのは難しい
上述のようなプリミティブな実現方法があったとしても、自分で一から全て正しく書くのはとてもハードルが高いことです。 他の言語でもThreadを直接使うよりを抽象化されたモデル(Future, Actor, async/awaitなど)を利用することが多いです。 Rubyの場合は用途に特化した便利な実装がすでにあるのでこの辺を使う所から始めると良いと個人的に思うので紹介と、これまで得た知見を共有します。
- Parallel
- Sidekiq
最初に触れたconcurrent-rubyを使うと他の言語で利用で利用されているような非同期処理の抽象化モデルが利用できますが今回は省略します。
Parallel
ループ処理をめちゃくちゃ簡単に並列化できるライブラリ。 プロセスモデル・スレッドモデル両方採用できます。
サンプルコード。デフォで、マシンのCPU数を調べてその数だけマルチプロセスを起動して処理してくれます。
記法も簡単で、Parallel.map
の引数に配列を渡すだけ。各do〜end
内の処理が複数のプロセスやスレッドで処理が行われるようになります。
map
を利用すればRubyのArray#map
のように処理した結果を配列で受け取ることも可能です(もちろん引数で渡した配列の順番も保持してくれます)。
# 2 CPUs -> work in 2 processes (a,b + c) results = Parallel.map(['a','b','c']) do |one_letter| expensive_calculation(one_letter) end
こういうとき使ってる
- サーバー1台で実行する程度のスクリプト(集計とか)の処理を早くしたい時
- 仕様クラスみたいなものを作ってテーブル全件調べて対象データのみ抽出とか
- 1台で実行すれば処理のアウトプット先を1箇所にまとめるのが簡単
- 並列に画像などのデータをダウンロード・アップロードする
class FooTargetUserSpecification def satisfied_by?(user) # ... # 重めの判定ロジック end def satisfied_users(&block) User.includes(:some_associations).find_in_batches do |gruop| Parallel.each(group, in_processes: 4) do |user| @reconnected ||= User.connection.reconnect! || true block(user) if satisfied_by?(user) end User.connection.reconnect! end end end file = File.open('target_users.csv', 'rw+') spec = FooTargetUserSpecification.new spec.satisfied_users do |user| file.puts(user.id) end file.close
知見
- マルチプロセスで実行するとメモリ食うのでこういうバッチスクリプトをCronで回す場合はちゃんと安定して実行できるかどうか本番データと同じ規模で検証必要
- 実行内容によってスレッドベースでやると良いのかプロセスベースでやると良いのかは考える(メモリ効率・スレッドセーフ性など)
- ActiveRecordと組み合わせて使う場合には、connection_pool周りで問題が起きるのでgemのREADME.mdに書いてある再接続処理使うと良い
Sidekiq
いわゆる非同期Jobキューの実装として有名。Sidekiqのプロセスを起動しておいて、RedisにJobを積むと積んだそばから Sidekiqのプロセスが随時ワーカーを起動してJobを消化していってくれます。
スレッドモデルで並列化をしているので、IO処理などでブロッキングされる場合に効果が出るので、 アプリのPush通知を送ったり、メールを送ったり、DBのレコードを更新したりに向いてます。 同様のgemにResqueというものがあるが、あちらはプロセスモデルで実現しているのでメモリを余分に食ったりする。 Sidekiqでもプロセスモデルで実行した方が良い重い計算処理を行いたい場合は並列数1で複数のプロセスを立てれば良いです。
作者の努力によってバージョンが上がるごとにスループットがめちゃ上がったり、gem自身の依存関係が減ったり、エンタープライズ向けの機能も用意されていて徳が高いです。
こういうとき使ってる
- HTTPリクエスト内では処理しりきれない遅い処理を非同期に実行するJobキュー
- Push/メール通知
- 遅延させてUpdateクエリを発行させたい時
- 動画のエンコード処理
- 細かい大量の処理を一気にスケールアウトさせて処理したい時
- 新たなサムネイルを事前作成し、キャッシュを温めたい時
- S3からDBに登録された画像をダウンロード -> 解析処理 -> DBに永続化
class UserFooWorker include Sidekiq::Worker sidekiq_options(queue: :default, retry: 3) def perform(user_id) user = User.find_by(id: user_id) return unless user # userが取得できなかったら終了しとく user.do_something end end # 呼び出し側 # perform_asyncを呼ぶとRedisにJobが積まれる UserFooWorker.perform_async(user.id)
知見
- Workerのコードを書くときはとにかく冪等制!冪等制!冪等制!と3回ぐらい唱えてからコードを書いてそして読み直す
- 2回以上同じ処理を実行しても良い処理を書く
- 1度実行した処理かどうかをチェックできるようにする
- 仕様として2回実行されてしまうのを一部許容する
- Workerの要件によって適切にretry回数・条件・間隔を設定する
- 1度失敗して再度実行したら成功するのかどうか?
- 通信系はエラーになりやすいので何回かリトライさせる
- 存在しないデータへの処理は2度と成功しない場合が多い
- exponential back-offによって1週間後とかに再実行されて嬉しいの???
- Worker内のコードはできるだけシンプルにしてどこでエラーが発生していつリトライされるのかが分かりやすくなるよう心がける
- 1度失敗して再度実行したら成功するのかどうか?
- リリースが失敗した場合のリカバリー手段を考えておく
- 作りが甘くてぬるぽバグとかで大量にリトライJobを出してしまってretryもすぐ消化してしまったとき
- バグを修正したのちに影響範囲分を再度積み直すとか
- 作りが甘くてぬるぽバグとかで大量にリトライJobを出してしまってretryもすぐ消化してしまったとき
- 1 Workerの粒度を小さくする
- 1個の親のリソースに複数個の子のリソースが結びつく場合に、子のリソースの数だけJobを積むとSidekiqの並列数をあげればその分完了まで速くなる
- Complex Job Workflows with Batches · mperham/sidekiq Wiki
- Queueを意識する
- リクエスト時に非同期で行うJobを積むQueueと、バッチ処理的に一気にJobを積む際のQueueは分ける(非同期側が詰まる)
- CloudWatchなどにQueueのサイズをメトリクスとしてPutしておく(AutoScalingに利用できる)
- Redisを意識する
- backtraceオプションは便利だが、有効にしたJobを大量に積んで全部こけるとほとんど同様のbacktraceがストレージに書き込まれてものすごい容量を食うので注意
- XXX.perform_asyncを1万回ループするより、Sidekiq::Client.bulk_pushのAPIを利用して一括で詰むことで負荷もかけずに速く詰める↓のように
# Jobを積む時はRedisに優しくするためにbluk_push使うと、一気に大量のJobを積めて良い User.select(:id).find_in_batches do |group| args = group.map {|user| [user.id] } Sidekiq::Client.bluk_push('class' => UserFooWorker, 'args' => args, 'backtrace' => false, 'queue' => 'user_foo_worker_queue') end
まとめ
Rubyでも簡単に取り入れられる並列処理の書き方について紹介しました。 既存処理を並列化して高速化出来ると気持ち良いので、ぜひ試して見てもらいたいです。 来年はconcurrent-rubyやRxRubyを試してみたい。