celeryの実行でハマったこと(serializeにpickleが使われていた)

結論

celeryではserialize/deserialize(defaultではpickleを利用)してデータの受け渡しをしているため、celeryに渡すデータは、serialize/deserialize可能なデータにしましょう。

背景

celeryに、タスクを渡した時(do.delay(something))に、decode errorになる現象が起きていました。

celeryを利用してprocessの起動までは辿りつけましたが、processに渡す引数(something)にStringO形式のものが含まれていたりするとうまく動かないことが分かりました。
例えば、ファイルなどが相当します。

この辺の情報
http://stackoverflow.com/questions/4330719/django-celery-how-to-send-request-filesphoto-to-task

http://stackoverflow.com/questions/3642107/python-pickling-error-when-using-sessions


そこで、StringOが含まれない形でceleryに渡すように改造しましたが、まだどうも期待通りに動いていない部分がありました。
例えば、Can't decode message body: RuntimeError('maximum recursion depth
exceeded')と怒られました。(データの階層が深すぎという事かもしれません)

この"decode"という言葉が気になって調べていくと、どうやら単に渡したデータをqueueに積む(message broker経由で渡す)だけで無いらしいことが分かりました。

celeryでの処理手順

まず、celeryでは、どのように処理されているか説明します。
taskをqueueに積む際に、「どのデータをどの関数で処理するか」をceleryに通知します。
例えば、通常以下のようにします。

@task
def do(something):
 # something to do
 print something

上記が定義されていたとすると、

do.delay(something)

のようにしてceleryに実行依頼します。
(詳細の使い方は、http://d.hatena.ne.jp/nihohi/20120711なども参照ください)
このとき、somethingをserializeした形でqueueに積みます。
defaultではserializeにpickleを使用します。
(serialize moduleはconfigのCELERY_TASK_SERIALIZERで変更可能)

同様に、celeryが実際にタスク実行する際には、queueからdeserializeして実行します。

従って、celeryに渡す"something"はpickleでserialize/deselialize可能なものでなければなりません。

最初に失敗していたのは、このserialize(pickle化)で失敗していました。
StringO形式のものを含む場合、このserializeで失敗するようです。

一方"背景"で説明した失敗の方は、deserialize(unpickle化)で失敗していました。
こちらの、正確な原因は分かっていませんが、「極端に再帰的なデータ構造を pickle 化しようとした場合には再帰の深さ制限を越えてしまうかもしれず、この場合には RuntimeError が送出され」るそうですのできっとこれが起きていたでしょう。
なお、「 sys.setrecursionlimit() で慎重に上げていくことは可能」だそうです。([参考]リンク参照)

また、debug/検討過程でやはりceleryへの実行依頼に失敗する(serializeに失敗)することがありました。

こちらは、pickleでは、serializeする際に、「pickle化可能な関数やクラスがモジュールのトップレベルで定義されていなければならない」そうですが、これまで、関数内で定義したsub classを利用して生成していたため、失敗していたようです。

参考

何を pickle 化したり unpickle 化できるのか?
http://www.python.jp/doc/2.6/library/pickle.html?highlight=pickle#pickle-unpickle