IO::Pollでcometなチャットサーバを書いてみた

perl

#!/usr/bin/perl

use strict;
use warnings;
use IO::Socket;
use IO::Poll qw( POLLIN POLLOUT POLLHUP POLLERR );
use Data::Dumper;
use HTTP::Request;
use CGI;

my $_timeout = 15;
$SIG{ALRM} = \&timeout;
alarm($_timeout);

my $port = shift || 3000;

my $server = IO::Socket::INET->new(
    LocalPort => $port,
    Listen    => 10,
    Reuse     => 1,
) or die $@;

my $poll = IO::Poll->new();
$poll->mask( $server => POLLIN );

my $clients = {};
my @messages = ();

while (1) {
    my $cnt = $poll->poll();

    my @readers = $poll->handles( POLLIN | POLLHUP | POLLERR );

    for (@readers) {
        if ( $server eq $_ ) {
            my $socket = $_->accept;
            $socket->blocking(0);

            # add client socket
            $poll->mask( $socket => POLLIN );

            my $fileno = fileno($socket);
            $clients->{$fileno} = $socket;
        }
        else {
            my $buf;

            while ( defined $_->sysread( $buf, 8192 ) ) {
                my $req = HTTP::Request->parse($buf);

                unless ($buf) {
                    remove( $poll, $_, $clients );
                    last;
                }

                my $params = $req->uri->as_string;
                if ( $params eq '/' ) {

                    open( F, './index.html' );
                    my $root_content = do { local $/; <F> };
                    close F;
                    $_->syswrite($root_content);
                    remove( $poll, $_, $clients );

                    #clear all
                    for my $_client ( values %{$clients} ) {
                        remove( $poll, $_client, $clients );
                    }
                    $clients = {};

                }
                else {
                    $params =~ s/\/\?//g;
                    my $q      = new CGI($params);
                    my $action = $q->param('a');

                    if ( $action eq 'write' ) {
                        my $message  = $q->param('message');
                        my $nickname = $q->param('nickname');

                        my $content = "{ \"message\": \"$message\", \"nickname\":\"$nickname\" }";
                        push @messages , $content;
                        for my $client ( values %{$clients} ) {

                            $client->syswrite(
                                              "HTTP/1.1 200 OK\r\n".
                                              "Content-Type:text/html; charset=utf-8\r\n".
                                              "Content-Length:".(length $content) ."\r\n".
                                              "Expires:-1\r\n".
                                              "Pragma:no-cache\r\n".
                                              "Cache-Control:must-revalidate, no-cache, no-store\r\n".
                                              "\r\n".
                                              $content
                                              );
                            #clear all
                            remove( $poll, $client, $clients );
                        }
                        $clients = {};
                    }
                    elsif ( $action eq 'poll' ) {
                        #for debug
                        print "now polling ..." . fileno($_);
                    }
                    elsif ( $action eq 'init' ) {
                        #disconnect
                        my $message;
                        $message = join ',' , @messages;
                        $_->syswrite("{ \"init\": [" . $message . "] }");
                        remove( $poll, $_, $clients );
                    }
                    else {
                        #disconnect
                        $_->syswrite("oops bad request !!");
                        remove( $poll, $_, $clients );
                    }
                }
            }
        }
    }
}

sub remove {
    my ( $poll, $client, $clients ) = @_;
    delete $clients->{ fileno($client) } if ($clients);
    $poll->remove($client);
    $client->close();
}

sub timeout {
    print "called timeout";
    for my $client ( values %{$clients} ) {
        $poll->remove($client);
        $client->close();
        #$client->shutdown(2);
    }
    $clients = {};
    alarm($_timeout);
}
サーバ側の解説

* 15秒毎にタイムアウトさせています。
 pollingしているクライアントの接続も切っています。

my $_timeout = 15;
$SIG{ALRM} = \&timeout;
alarm($_timeout);

* IO::Pollオブジェクトにサーバの待機ソケットを監視対象に加えています。
 POLLINは監視の条件

my $poll = IO::Poll->new();
$poll->mask( $server => POLLIN );

* イベント待機

$poll->poll();

* イベントが発生したファイルディスクリプタのリスト取得
my @readers = $poll->handles( POLLIN | POLLHUP | POLLERR );

* 待機ソケットの接続要求を受けてクライアントソケットを監視対象に追加
  読み取りのみ監視、でノンブロッキングモード

my $socket = $_->accept;
$socket->blocking(0);
$poll->mask( $socket => POLLIN );

* 保持しているソケットに一斉書き込み
ヘッダーにcacheを無効にするように設定

for my $client ( values %{$clients} ) {
  $client->syswrite(
   "HTTP/1.1 200 OK\r\n".
   "Content-Type:text/html; charset=utf-8\r\n".
   "Content-Length:".(length $content) ."\r\n".
   "Expires:-1\r\n".
   "Pragma:no-cache\r\n".
   "Cache-Control:must-revalidate, no-cache, no-store\r\n".
   "\r\n".
   $content
);


* pollingの時はなにもしない。これでつなぎっぱなし状態
elsif ( $action eq 'poll' ) {

index.html

<html>
<head>
<meta HTTP-EQUIV="Pragma" CONTENT="no-cache">
<meta HTTP-EQUIV="Expires" CONTENT="-1">
<title>IO::Poll</title>
<script src="/path/to/prototype.js"></script>
<script>
Event.observe(window, 'load', function (){
    function poll(){
        new Ajax.Request(
                         '/?a=poll',
                         {
                             method: 'GET',
                             parameters: {},
                             onComplete: writeMessage
                         } );
    };
    
    function writeMessage(res){
        try{
            var m = $('messages');
            var resObj = eval('('+res.responseText+')');
            m.innerHTML = ('<span>' + resObj.nickname + ' >> ' + resObj.message + '</span><br>' + m.innerHTML);
        }catch(e){}
        poll();
    }
    
    Event.observe($('input_form'), 'submit',
                  function(event){
                      event = event || window.event;
                      Event.stop(event);
                      var message = $('message').value;
                      var nickname = $('nickname').value;
                      new Ajax.Request(
                                       '/?a=write&message='+encodeURIComponent(message)
                                       +'&nickname='+encodeURIComponent(nickname), 
                                       {
                                           method: 'GET',
                                           parameters: {},
                                           onComplete: function(res){ $('message').value = ""; }
                                       } );
                  } );

    //init
    new Ajax.Request(
                     '/?a=init',
                     {
                         method: 'GET',
                         parameters: {},
                         onComplete: function(res){
                             var m = $('messages');
                             var resObj = eval('('+res.responseText+')');
                             var data = resObj.init; 
                             for(var i=0; data.length > i; i++){
                             m.innerHTML = ('<span>' + data[i].nickname + ' >> ' + data[i].message + '</span><br>' + m.innerHTML);
                             }
                             poll();
                         }
                     } );
    
} );
</script>
</head>
<body>
<form action="#" method="POST" id="input_form" >
  <input type="text" id="nickname" value="guest" size="8"> <input type="text" id="message"><span style="" ><input type="submit" ></span>
</form>
<div id="messages">
<div>
</body>
</html>
クライアント側解説

* サーバからのタイムアウト、または書き込みによる切断の後
  すぐにpollingしています
function writeMessage(res){
       .
       .
       .

      poll();
}