ffmpegがハングしたことを検知したらkillするための実行環境
先日から作り始めているmediakitにread timeout的な機能を追加した。
read timeout
read timeoutというのは入力待ち状態に制限を設けて、制限いっぱいまで待っても入力が得られない場合はタイムアウトとして終了するような機能で、ネットワーク周りのライブラリにはよく実装されている気がする。
ffmpegのハングアップ検知
日々Sidekiqなどの非同期ジョブサーバー上でffmpegを用いてエンコードを実行していると、たまになかなか終了しなくて長時間生き残ってしまうプロセスがいるので、そういう状態を検知して殺していかなければ、キューが詰まってしまうことになる。
read timeoutと似たような枠組みで、ffmpegにもハングアップしたことを検知してkillする処理を実装できる。ffmpegはコマンドを普通に実行すると標準エラーにエンコードの進行状況を書き込んでくれる。そのIOを監視していれば、"もし一定時間標準エラーに書き込みがなくエンコードが進んでいなければTimeout::Errrorの例外を投げる"というロジックを実装できる。
既存のstreamio-ffmpegでも実装されている機能であるので、自分の作っているmediakitでも同じ機能は欲しい。ただ、streamio-ffmpegの実装見るとIOクラスにモンキーパッチしてたり、して実装が雑なので、一から自分で実装してみることにした。
https://github.com/streamio/streamio-ffmpeg/blob/master/lib/ffmpeg/transcoder.rb#L61-L88
Rubyのマルチスレッド・プロセスプログラミング
Rubyで外部コマンドを実行する手段はいろいろな手段があって、一番簡単なのはPerl由来っぽいKernel.system
か、backtic2つ書くやつ。これだと標準入力・出力周りが、コマンド自体の処理が完了するまでうまく扱えないので複雑な処理を実装するには使えない。
Kernel.spawn
を使うとプロセスを起動した瞬間にpidが返り値で返ってきて、さらに引数オプションでIOのリダイレクトを設定できる。これを使えばプロセスのIOをプログラムから扱うことが出来る。
しかしspawnをそのまま使うと必要な処理を自分で書くのは結構面倒なので、通常はOpen3.popen3
を使う。
# 返り値4つは、それぞれIOオブジェクト3つとプロセスの終了を監視するためのスレッド stdin, stdout, stderr, wait_thread = Open3.popen3('/foo_command') # stdinに対して書き込むとプロセスに入力できる stdin.write("yes\n") # IO#getsで少しづつ読み取れる while chunk = stdout.gets puts chunk end # ↓のブロックのI/Fを使わない時はThread#joinでプロセスの終了を待つことが出来る wait_thread.join # ブロックでもいける Open3.popen3('/foo_command') do |stdin, stdout, stderr, wait_thread| # do_something end
このIOオブジェクトを別スレッドを起動して監視するような実装をすれば良さそうだ。 IOの監視用スレッドと、Read Timeout時の例外を投げるためのスレッドを作る。
stdin, stdout, stderr, wait_thread = Open3.popen3('ffmpeg -i ...') begin # この変数を基準に updated_at = Time.now # IO#getsで中身が取得できればupdated_atを更新する io_thread = Thread.new do while chunk = stderr.gets updated_at = Time.now end end # 0.1秒ごとにタイムアウトしたかどうかをチェックするループを作る main_thread = Thread.current timer_thread = Thread.new do loop do # updated_atから現在時刻までに10秒超えていたらメインスレッドで例外を投げる main_thread.raise(Timeout::Error) if (Time.now - updated_at) > 10 sleep(0.1) end end wait_thread.join rescue Timeout::Error => error # エラーを検知したらプロセスを殺して呼び出し元にTimeout::Errorを伝搬させる Processl.kill('SIGIKILL', wait_thread.pid) raise(error) end
実際にはちゃんとそれぞれのスレッドをクラス化して管理できるように書いたり、 begin〜rescue〜ensureで必ずThreadをkillするようにしたりする。
read timeout自体のテスト
ffmpegを使えばテスト出来なくはないが、実際に動画をエンコードするのも大変なので、標準エラー出力を吐き出すダミーのコマンドを作ってテストを書けるようにした。
mediakit/ffmpeg at master · ainame/mediakit · GitHub
こんな感じで使える
# 単に3秒間sleepして正常終了する $ test/supports/ffmpeg --sleep=3 $ echo $? #=> 0 # 単に3秒間sleepして異常終了する $ test/supports/ffmpeg --sleep=3 --fail $ echo $? #=> 1 # 3秒間、stdout,stderr両方に適当なメッセージを出力しつづける $ test/supports/ffmpeg --sleep=3 --progress
これをテストでは利用する。
Travis-CI / 1CPU / Vagrant
マルチスレッド方式でそれっぽい実装したし、テストも書けるようになって通ったし、さぁ完成だ!と思ってgithubにpushしたところ、Travi-CIだとテストが通らない・・・。
どうやら手元のMacだと2CPUあるので、メインのスレッドと外部コマンドのプロセスと監視スレッドがいいかんじに並列に動作するっぽいのだが、1CPUだとスレッドの実行スケジュールのタイミングがうまく管理できず、IOの監視スレッドが起動してすぐsleepしてしまっているようで、Travis-CI上では、どんだけ標準出力・標準エラー出力が存在しててもそのままタイムアウトしてしまってテストが落ちた。
1CPUの環境でデバッグするためにVagrantでRuby環境を整えて試行錯誤したのだけど、普通にRubyのThreadで実装するだけだとどうしてもうまく行かず暫くの間詰まってしまった。(そもそもvagrant sshがうまくいかなくて環境にアクセスするまでに結構時間使ってしまった・・・。)
Cool.io
↑のような状態で詰まっている間、仕事でSidekiqやfluentdを使った開発をしていたらSidekiqではcelluloid、fluentdではcool.ioというイベント駆動プログラミング用のフレームワークを使っているというのを何となく思い出して使ってみようと思った。
イベント駆動フレームワークに乗っかればそのスレッド周りスケジュールの問題も、既存のソリューションに乗っかればきっとうまくいくと思って、cool.ioで実装しなおした。
Coolio::IOというクラスがまさに自分の欲しい実装でこういうのの簡易版を自前で実装していた。 newにIOオブジェクトを与えると、read出来るタイミングでon_readメソッドが呼び出せるので、そのタイミングで最終更新時間を更新すれば良い。
cool.io/io.rb at master · tarcieri/cool.io · GitHub
実装したのがこのクラス。IOWatcherが各IOを監視、TimeoutTimerが0.1秒間隔で処理を起動してタイムアウトのチェックをする。
class IOWatcher < Coolio::IO attr_reader(:data) def initialize(io, &block) @block = block @data = '' super end def on_read(data) @data << data @block.call(self) end end class TimeoutTimer < Coolio::TimerWatcher DEFAULT_CHECK_INTERVAL = 0.1 def initialize(duration, current_thread) @mutex = Mutex.new @duration = duration @watched_at = Time.now @current_thread = current_thread super(DEFAULT_CHECK_INTERVAL, true) end def on_timer if timeout? @current_thread.raise(Timeout::Error, "wait timeout error with #{@duration} sec.") end end def update @mutex.synchronize do @watched_at = Time.now end end private def timeout? # compare duration into first decimal place by integer ((Time.now - @watched_at) * 10).floor >= (@duration * 10).floor end end
こんなかんじでpopen3の返り値を監視するwatcherを作って、全てイベントループの中に突っ込んで実行する。
stdin, stdout, stderr, wait_thread = Open3.popen3(command) begin loop = Coolio::Loop.new timer = TimeoutTimer.new(timeout, Thread.current) timer.attach(loop) out_watcher = IOWatcher.new(stdout) { timer.update } out_watcher.attach(loop) err_watcher = IOWatcher.new(stderr) { timer.update } err_watcher.attach(loop) # イベントループは別スレッドで起動 loop_thread = Thread.new { loop.run } # コマンドの終了を待つ間、裏ではイベントループが回っている wait_thread.join rescue Timeout::Error => error # エラーを検知したらプロセスを殺して呼び出し元にTimeout::Errorを伝搬させる Processl.kill('SIGIKILL', wait_thread.pid) raise(error) ensure # 各種終了処理を実行する out_watcher.close err_watcher.close timer.detach loop_thread.join end
この実装までやったら1CPU環境でも動くようになった。
まとめ
最終的にcool.ioに依存することでffmpegがハングしたことを検知しつつ実行するための実装できた。 Rubyでは純粋なマルチスレッド処理による計算はGILのせいで出来ないけど、 IO周りに関してはちゃんとやろうとすれば、実装できることがわかった。 結構面倒くさいけど良い知見。
ffmpeg以外でもコマンド実行中に進捗状況を標準出力・エラー出力に履いてくれるようなコマンドには利用できる。他にもnice値も設定できたりコマンドのエスケープもしてくれたりするので割りと安心して外部コマンドを発行できる下地が出来上がった!!
mediakit/runner.rb at master · ainame/mediakit · GitHub