Parent Directory | Revision Log | Patch
--- spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm 2005/06/17 00:53:05 191041 +++ spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm 2005/06/17 01:06:03 191042 @@ -34,7 +34,7 @@ use base qw( Exporter ); @PFSTATE_VARS = qw( PFSTATE_ERROR PFSTATE_STARTING PFSTATE_IDLE PFSTATE_BUSY PFSTATE_KILLED - PFORDER_ACCEPT + PFORDER_ACCEPT ); %EXPORT_TAGS = ( @@ -50,11 +50,29 @@ use constant PFSTATE_KILLED => 3; use constant PFORDER_ACCEPT => 10; +########################################################################### + +# we use the following protocol between the master and child processes to +# control when they accept/who accepts: server tells a child to accept with a +# PF_ACCEPT_ORDER, child responds with "B$pid\n" when it's busy, and "I$pid\n" +# once it's idle again. In addition, the parent sends PF_PING_ORDER +# periodically to ping the child processes. Very simple protocol. Note that +# the $pid values are packed into 4 bytes so that the buffers are always of a +# known length; if you need to transfer longer data, assign a new protocol verb +# (the first char) and use the length of the following data buffer as the +# packed value. +use constant PF_ACCEPT_ORDER => "A....\n"; +use constant PF_PING_ORDER => "P....\n"; # timeout for a sysread() on the command channel. if we go this long # without a message from the spamd parent or child, it's an error. use constant TOUT_READ_MAX => 300; +# interval between "ping" messages from the spamd parent to all children, +# used as a sanity check to ensure TOUT_READ_MAX isn't hit when things +# are functional. +use constant TOUT_PING_INTERVAL => 150; + ########################################################################### sub new { @@ -68,6 +86,7 @@ sub new { $self->{kids} = { }; $self->{overloaded} = 0; $self->{min_children} ||= 1; + $self->{server_last_ping} = time; $self; } @@ -191,7 +210,16 @@ sub main_server_poll { } # any action? - return unless ($nfound); + if (!$nfound) { + # none. periodically ping the children though just to ensure + # they're still alive and can hear us + + my $now = time; + if ($now - $self->{server_last_ping} > TOUT_PING_INTERVAL) { + $self->main_ping_kids($now); + } + return; + } # were the kids ready, or did we get signal? if (vec ($rout, $self->{server_fileno}, 1)) { @@ -239,6 +267,23 @@ sub main_server_poll { $self->adapt_num_children(); } +sub main_ping_kids { + my ($self, $now) = @_; + + $self->{server_last_ping} = $now; + + my ($sock, $kid); + while (($kid, $sock) = each %{$self->{backchannel}->{kids}}) { + $self->syswrite_with_retry($sock, PF_PING_ORDER) and next; + + warn "prefork: write of ping failed to $kid fd=".$sock->fileno.": ".$!; + + # note: this is safe according to the note in perldoc -f each; 'it is + # always safe to delete the item most recently returned by each()' + $self->child_error_kill($kid, $sock); + } +} + sub read_one_message_from_child_socket { my ($self, $sock) = @_; @@ -282,14 +327,6 @@ sub read_one_message_from_child_socket { ########################################################################### -# we use the following protocol between the master and child processes to -# control when they accept/who accepts: server tells a child to accept with a -# "A....\n", child responds with "B$pid\n" when it's busy, and "I$pid\n" once -# it's idle again. Very simple protocol. Note that the $pid values are packed -# into 4 bytes so that the buffers are always of a known length; if you need to -# transfer longer data, assign a new protocol verb (the first char) and use the -# length of the following data buffer as the packed value. - sub order_idle_child_to_accept { my ($self) = @_; @@ -308,7 +345,7 @@ sub order_idle_child_to_accept { return $self->order_idle_child_to_accept(); } - if (!$self->syswrite_with_retry($sock, "A....\n")) + if (!$self->syswrite_with_retry($sock, PF_ACCEPT_ORDER)) { # failure to write to the child; bad news. call it dead warn "prefork: killing rogue child $kid, failed to write on fd ".$sock->fileno.": $!\n"; @@ -351,7 +388,7 @@ sub child_now_ready_to_accept { my ($self, $kid) = @_; if ($self->{waiting_for_idle_child}) { my $sock = $self->{backchannel}->get_socket_for_child($kid); - $self->syswrite_with_retry($sock, "A....\n") + $self->syswrite_with_retry($sock, PF_ACCEPT_ORDER) or die "prefork: $kid claimed it was ready, but write failed on fd ". $sock->fileno.": ".$!; $self->{waiting_for_idle_child} = 0; @@ -406,6 +443,10 @@ sub wait_for_orders { } chomp $line; + if (index ($line, "P") == 0) { # string starts with "P" = ping + dbg("prefork: periodic ping from spamd parent"); + next; + } if (index ($line, "A") == 0) { # string starts with "A" = accept return PFORDER_ACCEPT; } @@ -454,6 +495,7 @@ retry_read: } else { $tout = $deadline - $now; # the remaining timeout + $tout = 1 if ($tout <= 0); # ensure it's > 0 } dbg("prefork: sysread(".$sock->fileno.") not ready, wait max $tout secs");
infrastructure at apache.org | ViewVC Help |
Powered by ViewVC 1.1.26 |