Perlでマルチスレッド

Ads

新しいスレッドモデル

Perl5.8 では ithread というスレッドモデルが採用されました。

  • ithread はインタープリタベースのスレッドであり、明示的に指定されたデータだけが共有されます。
  • 従来(Perl5.005時代から)のスレッドモデルでは暗黙的に全てのデータが共有されていました。
  • 今後は ithread 推奨だそうです。

でも、そもそも古いスレッドモデルの使い方なんて知らねぇからどうでもいいや(ぉ

マニュアル

ithread モデルでのマルチスレッドプログラミングの仕方については以下で理解。

実際のプログラムで使えそうなサンプル

目的

元々はメール配送システムを作ろうとしていた。

そのシステムでは複数の宛先へのメール配送を1通毎に並列にやりたかったし、 スレッドが無限に増えてしまってもリソースを消費しすぎて困るのでそれを制限したかった。 また、相手サーバの事情によっては纏めて数万通とか出すと怒られるときがあるのでメールの配送ペースも調整したかった。

以上がマルチスレッドプログラミングを勉強する目的だった。

要件

以下の要求を満たすプログラムを作る。

  • 大量のタスクを処理する。
  • 複数のタスクを同時並行処理したい。
  • タスクの同時並行処理の並列度を制限したい。
  • 単位時間に処理出するタスクの数を制限したい。

ソース

上記条件を満たすサンプルが出来たのでメモ。 ダウンロードも出来るようにしておく。

このソースには無駄なコードや、こうした方が良かったとか思うところがいっぱいあるんだが修正するのが面倒い…。 参考にする方は、とりあえず参考にするだけして、自分で色々試して良いと思った方法をとったほうが良いです XD

fileithread_test.pl

#!/usr/bin/perl
use strict;
use threads;
use threads::shared;
use Thread::Semaphore;
use Thread::Queue;

package threadTest;

my $TASKNUMBER  = shift || 10;  #タスク実行回数
my $CONCURRENCY = shift || 3;   #タスク実行の並列度
my $PERSEC      = shift || 0.5; #タスク実行数/sec
my $TASKLIFE    = shift || 5;   #タスクの寿命(秒)
my $alerm :shared = 0;          #終了の合図

&main;
sub main {
    print "タスク実行回数     : $TASKNUMBER\n";
    print "タスク実行の並列度 : $CONCURRENCY\n";
    print "タスク実行数/sec   : $PERSEC\n";
    print "タスクの寿命(秒)   : $TASKLIFE\n";
    my $queue = Thread::Queue->new;
    threads->new(\&speedControl, $queue, $PERSEC);
    consumeQueue($queue, $CONCURRENCY, $TASKNUMBER, $TASKLIFE);
    &quit;
}

# 指定速度でキューを埋める
sub speedControl {
    my $queue = shift;  #投入先キュー
    my $persec = shift; #投入数/秒
    my $processed = 0;  #投入済数
    my $fraction = 0;   #投入端数
    while(!$alerm) {
        $fraction += $persec;
        if(1 <= $fraction) {
            my $charge = int($fraction);
            $queue->enqueue((1..$charge));
            $processed += $charge;
            $fraction -= $charge;
        }
        sleep 1;
    }
}

# キューが空にならない間、タスクを並列実行させる。
sub consumeQueue {
    my $queue = shift;
    my $concurrency = shift;
    my $number = shift;
    my $tasklife = shift;
    my $semaphore = Thread::Semaphore->new($concurrency);
    my $taskid = 1;
    foreach(1..$number) {
        $queue->dequeue;
        $semaphore->down;
        threads->new(\&doTask, $semaphore, $taskid++, $tasklife);
    }
}

# タスクを実行する
sub doTask {
    my $semaphore = shift;
    my $id = shift;
    my $life = shift;
    while(0 < $life) {
        print " " x $id . "$id($life).\n";
        sleep 1;
        $life--;
    }
    $semaphore->up;
}

# 終了処理
sub quit {
    #永続スレッド(speedControl)に終了の合図を送る
    $alerm = 1;
    #全てのスレッドが終了するのを待機する
    foreach my $thr (threads->list) {
        if ($thr->tid && !threads::equal($thr, threads->self)) {
            $thr->join;
        }
    }
    exit 0;
}

実行結果

以下が実行結果です。 結果だけ見ても意味分かりません(w

サンプルをダウンロードしてパラメータを変えて実行してみれば要件が満たされてることが分かるんじゃないかと思います(^^;

タスク実行回数     : 10
タスク実行の並列度 : 3
タスク実行数/sec   : 0.5
タスクの寿命(秒)   : 5
 1(5).
 1(4).
 1(3).
  2(5).
 1(2).
  2(4).
 1(1).
  2(3).
   3(5).
  2(2).
   3(4).
  2(1).
   3(3).
    4(5).
   3(2).
    4(4).
   3(1).
    4(3).
     5(5).
    4(2).
     5(4).
    4(1).
      6(5).
     5(3).
     5(2).
      6(4).
     5(1).
      6(3).
       7(5).
       7(4).
      6(2).
      6(1).
       7(3).
        8(5).
        8(4).
       7(2).
       7(1).
        8(3).
         9(5).
         9(4).
        8(2).
        8(1).
         9(3).
          10(5).
         9(2).
          10(4).
         9(1).
          10(3).
          10(2).
          10(1).

コメント

  • さっぱり分からない。 -- 2006-12-01 (金) 13:22:47
  • スレッド wiki でググッたらここにきたw -- 4さま? 2007-04-21 (土) 23:33:41

添付ファイル: fileithread_test.pl 103件 [詳細]

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2009-01-21 (水) 13:23:17 (2683d)