IO::Pollでcometなチャットサーバを書いてみた
perl側
#!/usr/bin/perl use strict; use warnings; use IO::Socket; use IO::Poll qw( POLLIN POLLOUT POLLHUP POLLERR ); use Data::Dumper; use HTTP::Request; use CGI; my $_timeout = 15; $SIG{ALRM} = \&timeout; alarm($_timeout); my $port = shift || 3000; my $server = IO::Socket::INET->new( LocalPort => $port, Listen => 10, Reuse => 1, ) or die $@; my $poll = IO::Poll->new(); $poll->mask( $server => POLLIN ); my $clients = {}; my @messages = (); while (1) { my $cnt = $poll->poll(); my @readers = $poll->handles( POLLIN | POLLHUP | POLLERR ); for (@readers) { if ( $server eq $_ ) { my $socket = $_->accept; $socket->blocking(0); # add client socket $poll->mask( $socket => POLLIN ); my $fileno = fileno($socket); $clients->{$fileno} = $socket; } else { my $buf; while ( defined $_->sysread( $buf, 8192 ) ) { my $req = HTTP::Request->parse($buf); unless ($buf) { remove( $poll, $_, $clients ); last; } my $params = $req->uri->as_string; if ( $params eq '/' ) { open( F, './index.html' ); my $root_content = do { local $/; <F> }; close F; $_->syswrite($root_content); remove( $poll, $_, $clients ); #clear all for my $_client ( values %{$clients} ) { remove( $poll, $_client, $clients ); } $clients = {}; } else { $params =~ s/\/\?//g; my $q = new CGI($params); my $action = $q->param('a'); if ( $action eq 'write' ) { my $message = $q->param('message'); my $nickname = $q->param('nickname'); my $content = "{ \"message\": \"$message\", \"nickname\":\"$nickname\" }"; push @messages , $content; for my $client ( values %{$clients} ) { $client->syswrite( "HTTP/1.1 200 OK\r\n". "Content-Type:text/html; charset=utf-8\r\n". "Content-Length:".(length $content) ."\r\n". "Expires:-1\r\n". "Pragma:no-cache\r\n". "Cache-Control:must-revalidate, no-cache, no-store\r\n". "\r\n". $content ); #clear all remove( $poll, $client, $clients ); } $clients = {}; } elsif ( $action eq 'poll' ) { #for debug print "now polling ..." . fileno($_); } elsif ( $action eq 'init' ) { #disconnect my $message; $message = join ',' , @messages; $_->syswrite("{ \"init\": [" . $message . "] }"); remove( $poll, $_, $clients ); } else { #disconnect $_->syswrite("oops bad request !!"); remove( $poll, $_, $clients ); } } } } } } sub remove { my ( $poll, $client, $clients ) = @_; delete $clients->{ fileno($client) } if ($clients); $poll->remove($client); $client->close(); } sub timeout { print "called timeout"; for my $client ( values %{$clients} ) { $poll->remove($client); $client->close(); #$client->shutdown(2); } $clients = {}; alarm($_timeout); }
サーバ側の解説 * 15秒毎にタイムアウトさせています。 pollingしているクライアントの接続も切っています。 my $_timeout = 15; $SIG{ALRM} = \&timeout; alarm($_timeout); * IO::Pollオブジェクトにサーバの待機ソケットを監視対象に加えています。 POLLINは監視の条件 my $poll = IO::Poll->new(); $poll->mask( $server => POLLIN ); * イベント待機 $poll->poll(); * イベントが発生したファイルディスクリプタのリスト取得 my @readers = $poll->handles( POLLIN | POLLHUP | POLLERR ); * 待機ソケットの接続要求を受けてクライアントソケットを監視対象に追加 読み取りのみ監視、でノンブロッキングモード my $socket = $_->accept; $socket->blocking(0); $poll->mask( $socket => POLLIN ); * 保持しているソケットに一斉書き込み ヘッダーにcacheを無効にするように設定 for my $client ( values %{$clients} ) { $client->syswrite( "HTTP/1.1 200 OK\r\n". "Content-Type:text/html; charset=utf-8\r\n". "Content-Length:".(length $content) ."\r\n". "Expires:-1\r\n". "Pragma:no-cache\r\n". "Cache-Control:must-revalidate, no-cache, no-store\r\n". "\r\n". $content ); * pollingの時はなにもしない。これでつなぎっぱなし状態 elsif ( $action eq 'poll' ) {
index.html
<html> <head> <meta HTTP-EQUIV="Pragma" CONTENT="no-cache"> <meta HTTP-EQUIV="Expires" CONTENT="-1"> <title>IO::Poll</title> <script src="/path/to/prototype.js"></script> <script> Event.observe(window, 'load', function (){ function poll(){ new Ajax.Request( '/?a=poll', { method: 'GET', parameters: {}, onComplete: writeMessage } ); }; function writeMessage(res){ try{ var m = $('messages'); var resObj = eval('('+res.responseText+')'); m.innerHTML = ('<span>' + resObj.nickname + ' >> ' + resObj.message + '</span><br>' + m.innerHTML); }catch(e){} poll(); } Event.observe($('input_form'), 'submit', function(event){ event = event || window.event; Event.stop(event); var message = $('message').value; var nickname = $('nickname').value; new Ajax.Request( '/?a=write&message='+encodeURIComponent(message) +'&nickname='+encodeURIComponent(nickname), { method: 'GET', parameters: {}, onComplete: function(res){ $('message').value = ""; } } ); } ); //init new Ajax.Request( '/?a=init', { method: 'GET', parameters: {}, onComplete: function(res){ var m = $('messages'); var resObj = eval('('+res.responseText+')'); var data = resObj.init; for(var i=0; data.length > i; i++){ m.innerHTML = ('<span>' + data[i].nickname + ' >> ' + data[i].message + '</span><br>' + m.innerHTML); } poll(); } } ); } ); </script> </head> <body> <form action="#" method="POST" id="input_form" > <input type="text" id="nickname" value="guest" size="8"> <input type="text" id="message"><span style="" ><input type="submit" ></span> </form> <div id="messages"> <div> </body> </html>
クライアント側解説 * サーバからのタイムアウト、または書き込みによる切断の後 すぐにpollingしています function writeMessage(res){ . . . poll(); }