INFRA

GAE×Python - Task Queueの研究

Task Queueを研究いたします。

GAEにはcronというものがあります。これは、cron.yamlにタスクと実行頻度を登録しておくことで、バックグラウンドで自動的に登録されたタスクを登録された頻度で実行するものです。

しかしcronでは不十分な状況が想定されます。例えば、WEBサービスの利用者は随時増加していくものですが、その利用者全員分のタスクを一定頻度で実行したい場合などは、cronでは不十分になります。

例を示します。twitterのbotを作成できるwebサービスとしましょう。2分に1回全ユーザがつぶやく必要があります。ユーザが1人であれば、cronで十分対応できます。下記のようにcronに記載すればいいだけです。

cron:
- description: auto tweet
url: /autotweet
schedule: every 2 minutes

しかし、ユーザ数が分からないが100人あるいは10000人いるような場合において、全ユーザの自動つぶやきをどのように実行したらよいかを考えた場合、cronでは不十分です。cronは1回の処理が30秒という制限があります。仮にこの制限がなければ、確かにcronでも対応が可能かもしれません。例えばユーザ分つぶやき処理を繰り返せばいいわけです。しかし確実に30秒を超過する(ユーザが多い場合)わけですから、現実的ではありません。cron以外でバックグラウンド処理ができる機能はないのでしょうか?

そこで登場するが、Task Queueというものです。

これはcronとは異なり、プログラム上でタスクを追加できるものです。一度に複数のタスクを追加してよいようです。これであれば、cronで2分に1回auto tweetというページを読み込んで、実行されたauto tweetが全てのユーザ分のつぶやき処理をタスクに追加するということが可能になると思います。

Task Queueの実験として下記のプロジェクトを作成しました。

# -*- coding: utf-8 -*-
import os
import webapp2
import jinja2
from google.appengine.ext import db
import logging
from google.appengine.api.labs import taskqueue
jinja_environment = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.dirname(__file__)))
class TestData(db.Model):
name = db.StringProperty()
age = db.IntegerProperty()
count = db.IntegerProperty()
'''
100ユーザを自動でつく
key_nameは1〜100、nameは1〜100、ageは全部20、countは全部0
'''
class Init(webapp2.RequestHandler):
def get(self):
test = TestData.all().fetch(1)
if not test:
for idx in range(100):
testData = TestData(key_name = str(idx+1))
testData.name = str(idx+1)
testData.age = 20
testData.count = 0
testData.put()
'''
全てのTestDataのカウントをtask queを使ってアップす
'''
class Test(webapp2.RequestHandler):
def get(self):
tests = TestData.all()
for test in tests:
taskqueue.add(url='/countup', params={'keyName': test.key().name()})
'''
TestDataの1つを与えられたキーをもとに抽出し、カウントアップす
'''
class CountUp(webapp2.RequestHandler):
def post(self):
keyName = self.request.get('keyName')
testData = TestData.get_by_key_name(keyName)
testData.count += 1
testData.put()
'''
現状の全てのTestDataのカウントを表示す
'''
class Main(webapp2.RequestHandler):
def get(self):
tests = TestData.all()
templateValues = {'tests':tests}
template = jinja_environment.get_template('layout.html')
self.response.out.write(template.render(templateValues))
logging.getLogger().setLevel(logging.DEBUG)
app = webapp2.WSGIApplication([
('/init',Init),
('/test',Test),
('/countup',CountUp),
('/',Main),
], debug=True)

きちんと作動しました。ただ、大量のタスクをキューに一度に追加した場合、これらは並列的に処理されるもんでしょうか?全然分かりません。これが一個一個順番に処理されていくのであれば、一つの処理に2秒かかっていたとすると、10000ユーザー分の処理を実行し終わるのに、20000秒かかります。すなわち、5時間半です。。。それじゃあ、全くもって意味ないですね。使えないプログラムです。2分に1回全ユーザ分の処理を一挙にやりたい。理想的には10000ユーザ分の処理が2秒で終わる状態です。どうもテストしてみた感じだと、順番に実施してる感があるような。もうちょい調べます。

とりあえず、1 つのタスク実行の有効期間が 30 秒に制限されているようです。まあこれは問題ないですね。cronでやろうとしても1処理は30秒に制限されています。あとは、キューについて色々設定できるようです。設定はqueue.yamlというファイルを作成します。rateというので一定時間にどの程度のタスクを処理するかを設定できるようですが、1分間に1万回とか設定しても大丈夫なんでしょうかね?

まあcronよりいいのは確かだし、かなり色々柔軟に設定できるみたいだし、基本的に順番に処理されるようなものの、同時に処理する場合もある的なことも書いてあるから割と期待できそうなんだけど、結論的にはよく分からない。

分からないので、2番目の実験をしてみようと思う。
実験内容としては、上記コードのCountupクラスを修正して、Countupの処理が終わってから2秒間停止させてみようと思う。これが全部順番に実施されるのであれば、キューは3.3分のちょっとした旅を始めるだろう。10000件処理しようとしたら5.5時間の長旅になってしまう。

Countupクラスの修正コードは下記になります。

class CountUp(webapp2.RequestHandler):
def post(self):
keyName = self.request.get('keyName')
testData = TestData.get_by_key_name(keyName)
testData.count += 1
testData.put()
time.sleep(2)

さて、どうなるか?

見事に3分程かかっている。まだ終わらない。やっぱり基本的に順番に実施されるので、一処理に二秒かかるなら、10000件処理には5.5時間かかると思っておいた方がよいのだろうか?それにしてもそれではいかんのだ。もしかしたら設定ファイルにレートを設定したら早くなるかもしれん。やってみよう。

下記のようなqueue.yamを作成してみたが、やっぱり遅い。全く変わらない。

queue:
- name: default
rate: 10000/s 

ちなみに、1 つのキューについて 1 秒あたり 50 回のタスク呼び出しと、書いてあった。こういう制限があるのか。念のため、rateを50/sに変えてみたが変わらず遅い。しかしこれじゃあ使えないじゃないか。どうしたらいいのだ。

むむ!?アップしたら、まずrateは最大で1秒500回だぞというエラーが出たので500/sに変えて再度アップしたら、すげー速かった。速かったぞ。念のためもう一度試したけど速かった!Google様ありがとう。