読者です 読者をやめる 読者になる 読者になる

ffmpegがハングしたことを検知したらkillするための実行環境

先日から作り始めているmediakitにread timeout的な機能を追加した。

read timeout

read timeoutというのは入力待ち状態に制限を設けて、制限いっぱいまで待っても入力が得られない場合はタイムアウトとして終了するような機能で、ネットワーク周りのライブラリにはよく実装されている気がする。

ffmpegのハングアップ検知

日々Sidekiqなどの非同期ジョブサーバー上でffmpegを用いてエンコードを実行していると、たまになかなか終了しなくて長時間生き残ってしまうプロセスがいるので、そういう状態を検知して殺していかなければ、キューが詰まってしまうことになる。

read timeoutと似たような枠組みで、ffmpegにもハングアップしたことを検知してkillする処理を実装できる。ffmpegはコマンドを普通に実行すると標準エラーにエンコードの進行状況を書き込んでくれる。そのIOを監視していれば、"もし一定時間標準エラーに書き込みがなくエンコードが進んでいなければTimeout::Errrorの例外を投げる"というロジックを実装できる。

既存のstreamio-ffmpegでも実装されている機能であるので、自分の作っているmediakitでも同じ機能は欲しい。ただ、streamio-ffmpegの実装見るとIOクラスにモンキーパッチしてたり、して実装が雑なので、一から自分で実装してみることにした。

https://github.com/streamio/streamio-ffmpeg/blob/master/lib/ffmpeg/transcoder.rb#L61-L88

Added timeout option to detect hung ffmpeg process by stakach · Pull Request #24 · streamio/streamio-ffmpeg · GitHub

Rubyのマルチスレッド・プロセスプログラミング

Rubyで外部コマンドを実行する手段はいろいろな手段があって、一番簡単なのはPerl由来っぽいKernel.systemか、backtic2つ書くやつ。これだと標準入力・出力周りが、コマンド自体の処理が完了するまでうまく扱えないので複雑な処理を実装するには使えない。

Kernel.spawnを使うとプロセスを起動した瞬間にpidが返り値で返ってきて、さらに引数オプションでIOのリダイレクトを設定できる。これを使えばプロセスのIOをプログラムから扱うことが出来る。

しかしspawnをそのまま使うと必要な処理を自分で書くのは結構面倒なので、通常はOpen3.popen3を使う。

# 返り値4つは、それぞれIOオブジェクト3つとプロセスの終了を監視するためのスレッド
stdin, stdout, stderr, wait_thread = Open3.popen3('/foo_command')

# stdinに対して書き込むとプロセスに入力できる
stdin.write("yes\n")

# IO#getsで少しづつ読み取れる
while chunk = stdout.gets
  puts chunk
end

# ↓のブロックのI/Fを使わない時はThread#joinでプロセスの終了を待つことが出来る
wait_thread.join 

# ブロックでもいける
Open3.popen3('/foo_command') do |stdin, stdout, stderr, wait_thread|
  # do_something
end

このIOオブジェクトを別スレッドを起動して監視するような実装をすれば良さそうだ。 IOの監視用スレッドと、Read Timeout時の例外を投げるためのスレッドを作る。

stdin, stdout, stderr, wait_thread = Open3.popen3('ffmpeg -i ...')
begin
  # この変数を基準に
  updated_at = Time.now

  # IO#getsで中身が取得できればupdated_atを更新する
  io_thread = Thread.new do 
    while chunk = stderr.gets
      updated_at = Time.now
    end
  end

  # 0.1秒ごとにタイムアウトしたかどうかをチェックするループを作る
  main_thread = Thread.current
  timer_thread = Thread.new do
    loop do 
      # updated_atから現在時刻までに10秒超えていたらメインスレッドで例外を投げる
      main_thread.raise(Timeout::Error) if (Time.now - updated_at) > 10
      sleep(0.1)
    end
  end

  wait_thread.join
rescue Timeout::Error => error
  # エラーを検知したらプロセスを殺して呼び出し元にTimeout::Errorを伝搬させる
  Processl.kill('SIGIKILL', wait_thread.pid)
  raise(error)
end

実際にはちゃんとそれぞれのスレッドをクラス化して管理できるように書いたり、 begin〜rescue〜ensureで必ずThreadをkillするようにしたりする。

read timeout自体のテスト

ffmpegを使えばテスト出来なくはないが、実際に動画をエンコードするのも大変なので、標準エラー出力を吐き出すダミーのコマンドを作ってテストを書けるようにした。

mediakit/ffmpeg at master · ainame/mediakit · GitHub

こんな感じで使える

# 単に3秒間sleepして正常終了する
$ test/supports/ffmpeg --sleep=3
$ echo $? #=> 0

# 単に3秒間sleepして異常終了する
$ test/supports/ffmpeg --sleep=3 --fail
$ echo $? #=> 1

# 3秒間、stdout,stderr両方に適当なメッセージを出力しつづける
$ test/supports/ffmpeg --sleep=3 --progress

これをテストでは利用する。

Travis-CI / 1CPU / Vagrant

マルチスレッド方式でそれっぽい実装したし、テストも書けるようになって通ったし、さぁ完成だ!と思ってgithubにpushしたところ、Travi-CIだとテストが通らない・・・。

どうやら手元のMacだと2CPUあるので、メインのスレッドと外部コマンドのプロセスと監視スレッドがいいかんじに並列に動作するっぽいのだが、1CPUだとスレッドの実行スケジュールのタイミングがうまく管理できず、IOの監視スレッドが起動してすぐsleepしてしまっているようで、Travis-CI上では、どんだけ標準出力・標準エラー出力が存在しててもそのままタイムアウトしてしまってテストが落ちた。

1CPUの環境でデバッグするためにVagrantRuby環境を整えて試行錯誤したのだけど、普通にRubyのThreadで実装するだけだとどうしてもうまく行かず暫くの間詰まってしまった。(そもそもvagrant sshがうまくいかなくて環境にアクセスするまでに結構時間使ってしまった・・・。)

Cool.io

↑のような状態で詰まっている間、仕事でSidekiqやfluentdを使った開発をしていたらSidekiqではcelluloid、fluentdではcool.ioというイベント駆動プログラミング用のフレームワークを使っているというのを何となく思い出して使ってみようと思った。

イベント駆動フレームワークに乗っかればそのスレッド周りスケジュールの問題も、既存のソリューションに乗っかればきっとうまくいくと思って、cool.ioで実装しなおした。

Coolio::IOというクラスがまさに自分の欲しい実装でこういうのの簡易版を自前で実装していた。 newにIOオブジェクトを与えると、read出来るタイミングでon_readメソッドが呼び出せるので、そのタイミングで最終更新時間を更新すれば良い。

cool.io/io.rb at master · tarcieri/cool.io · GitHub

実装したのがこのクラス。IOWatcherが各IOを監視、TimeoutTimerが0.1秒間隔で処理を起動してタイムアウトのチェックをする。

class IOWatcher < Coolio::IO
  attr_reader(:data)

  def initialize(io, &block)
    @block = block
    @data = ''
    super
  end

  def on_read(data)
    @data << data
    @block.call(self)
  end
end

class TimeoutTimer < Coolio::TimerWatcher
  DEFAULT_CHECK_INTERVAL = 0.1

  def initialize(duration, current_thread)
    @mutex = Mutex.new
    @duration = duration
    @watched_at = Time.now
    @current_thread = current_thread
    super(DEFAULT_CHECK_INTERVAL, true)
  end

  def on_timer
    if timeout?
      @current_thread.raise(Timeout::Error, "wait timeout error with #{@duration} sec.")
    end
  end

  def update
    @mutex.synchronize do
      @watched_at = Time.now
    end
  end

  private

  def timeout?
    # compare duration into first decimal place by integer
    ((Time.now - @watched_at) * 10).floor >= (@duration * 10).floor
  end
end

こんなかんじでpopen3の返り値を監視するwatcherを作って、全てイベントループの中に突っ込んで実行する。

stdin, stdout, stderr, wait_thread = Open3.popen3(command)
begin
  loop = Coolio::Loop.new

  timer = TimeoutTimer.new(timeout, Thread.current)
  timer.attach(loop)

  out_watcher = IOWatcher.new(stdout) { timer.update }
  out_watcher.attach(loop)

  err_watcher = IOWatcher.new(stderr) { timer.update }
  err_watcher.attach(loop)

  # イベントループは別スレッドで起動
  loop_thread = Thread.new { loop.run }

  # コマンドの終了を待つ間、裏ではイベントループが回っている
  wait_thread.join
rescue Timeout::Error => error
  # エラーを検知したらプロセスを殺して呼び出し元にTimeout::Errorを伝搬させる
  Processl.kill('SIGIKILL', wait_thread.pid)
  raise(error)
ensure
  # 各種終了処理を実行する
  out_watcher.close
  err_watcher.close
  timer.detach
  loop_thread.join
end

この実装までやったら1CPU環境でも動くようになった。

まとめ

最終的にcool.ioに依存することでffmpegがハングしたことを検知しつつ実行するための実装できた。 Rubyでは純粋なマルチスレッド処理による計算はGILのせいで出来ないけど、 IO周りに関してはちゃんとやろうとすれば、実装できることがわかった。 結構面倒くさいけど良い知見。

ffmpeg以外でもコマンド実行中に進捗状況を標準出力・エラー出力に履いてくれるようなコマンドには利用できる。他にもnice値も設定できたりコマンドのエスケープもしてくれたりするので割りと安心して外部コマンドを発行できる下地が出来上がった!!

mediakit/runner.rb at master · ainame/mediakit · GitHub

参考

qiita.com