TheSchwartzもはじめました
GearmanとTheSchwartz
Gearmanに投げた仕事は無事完了したかどうかはよくわかりません。
途中でサーバーダウンしていた場合にどの処理が完了していて、どの処理が未完了なのか、わかりかねます。
TheSchwartzは引き受けた仕事を逐一DBに記録します。なので完了した処理と未完了の処理を区別する事ができます。
DBセットアップ
vi db/schwartz.sql
DROP TABLE IF EXISTS funcmap; CREATE TABLE funcmap ( funcid INTEGER PRIMARY KEY AUTOINCREMENT, funcname VARCHAR(255) NOT NULL, UNIQUE(funcname) ); DROP TABLE IF EXISTS job; CREATE TABLE job ( jobid INTEGER PRIMARY KEY AUTOINCREMENT, funcid INTEGER UNSIGNED NOT NULL, arg MEDIUMBLOB, uniqkey VARCHAR(255) NULL, insert_time INTEGER UNSIGNED, run_after INTEGER UNSIGNED NOT NULL, grabbed_until INTEGER UNSIGNED NOT NULL, priority SMALLINT UNSIGNED, coalesce VARCHAR(255), UNIQUE(funcid,uniqkey) ); DROP TABLE IF EXISTS error; CREATE TABLE error ( error_time INTEGER UNSIGNED NOT NULL, jobid INTEGER NOT NULL, message VARCHAR(255) NOT NULL, funcid INT UNSIGNED NOT NULL DEFAULT 0 ); DROP TABLE IF EXISTS exitstatus; CREATE TABLE exitstatus ( jobid INTEGER PRIMARY KEY NOT NULL, funcid INT UNSIGNED NOT NULL DEFAULT 0, status SMALLINT UNSIGNED, completion_time INTEGER UNSIGNED, delete_after INTEGER UNSIGNED );
sqlite db/schwartz.db < db/schwartz.sql
Jobを投げる
vi eg/client.pl
#!/usr/bin/perl package main; use strict; use warnings; use utf8; use DBI; use TheSchwartz::Simple; my $dbh = DBI->connect("dbi:SQLite:dbname=db/schwartz.db", '', '', {RaiseError => 1}) or die $DBI::err; ### clientを準備 my $client = TheSchwartz::Simple->new([$dbh]); my $job_id = $client->insert('MyWorker', {hoge=>'fuga'}); my @jobs = $client->list_jobs({funcname => 'MyWorker'}); for my $job (@jobs) { print $job->jobid, "\n"; } $dbh->disconnect;
実行
:!perl eg/schwartz.pl 1
DB確認。jobが入っています。
sqlite3 db/schwartz.db SQLite version 3.4.2 Enter ".help" for instructions sqlite> select * from job; 1|1|||1276694254|1276694254|0|| sqlite>
Wokerを用意
vi eg/worker.pl
#!/usr/bin/perl package MyWorker; use base qw( TheSchwartz::Worker ); sub work { my $class = shift; my $job = shift; my $arg = $job->arg; warn $arg->{hoge}; $job->completed; } 1; package main; use TheSchwartz; my $client = TheSchwartz->new(databases => [ { dsn => "dbi:SQLite:dbname=db/schwartz.db", user=> '', pass=>'' } ]); $client->set_prioritize(1); $client->can_do('MyWorker'); $client->work_once;
実行
:!perl eg/schwartz_worker.pl
fuga at eg/schwartz_worker.pl line 10.
DB確認。jobテーブルのレコードが減り、funcmapにレコードが増えている。
sqlite> select * from job; sqlite> sqlite> select * from funcmap; 1|MyWorker