-Fix xcatd after broken checkin

git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@2208 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd
This commit is contained in:
jbjohnso 2008-09-19 15:34:08 +00:00
parent 3d08b64cf1
commit 0ad5513cb7

View File

@ -1,4 +1,868 @@
if (defined $reqs and (scalar(@{$reqs}) == 1)) {
#!/usr/bin/env perl
# IBM(c) 2007 EPL license http://www.eclipse.org/legal/epl-v10.html
use strict;
use warnings;
use Carp qw(cluck confess);
BEGIN
{
$::XCATROOT = $ENV{'XCATROOT'} ? $ENV{'XCATROOT'} : '/opt/xcat';
}
use lib "$::XCATROOT/lib/perl";
use Storable qw(freeze thaw);
use xCAT::Utils;
use xCAT::MsgUtils;
use File::Path;
use Time::HiRes qw(sleep);
use Thread qw(yield);
use xCAT::Client qw(submit_request);
my $clientselect = new IO::Select;
my $sslclients = 0; #THROTTLE
my $maxsslclients = 16; #default
sub xexit {
while (wait() > 0) {
yield;
}
exit @_;
}
my $dispatch_children=0;
my %dispatched_children=();
my $plugin_numchildren=0;
my %plugin_children;
use IO::Socket::SSL;
my $inet6support;
$inet6support=eval { require Socket6 };
if ($inet6support) {
$inet6support = eval { require IO::Socket::INET6 };
}
if ($inet6support) {
$inet6support = eval { require IO::Socket::SSL::inet6 };
}
unless ($inet6support) {
eval { require Socket };
eval { require IO::Socket::INET };
}
my $dispatch_requests = 1; # govern whether commands are dispatchable
use IO::Socket;
use IO::Handle;
use IO::Select;
use XML::Simple;
if ($^O =~ /^linux/i) {
$XML::Simple::PREFERRED_PARSER='XML::Parser';
}
use xCAT::Table;
use Data::Dumper;
use Getopt::Long;
use Sys::Syslog qw(:DEFAULT setlogsock);
openlog("xcatd",,"local4");
setlogsock(["tcp","unix","stream"]);
use xCAT::NotifHandler;
use xCAT_monitoring::monitorctrl;
Getopt::Long::Configure("bundling");
Getopt::Long::Configure("pass_through");
use Storable qw(dclone);
use POSIX qw(WNOHANG setsid);
my $pidfile;
my $foreground;
GetOptions(
'pidfile|p=s' => \$pidfile,
'foreground|f' => \$foreground
);
#start syslog if it is not up
if (xCAT::Utils->isLinux()) {
my $init_file="/etc/init.d/syslog";
if (-f "/etc/fedora-release") {
$init_file="/etc/init.d/rsyslog";
}
my $result=`$init_file status 2>&1`;
if ($result !~ /running/i) {
`$init_file start`;
}
} else {
my $result=`lssrc -s syslogd 2>&1`;
if ($result !~ /active/i) {
`startsrc -s syslogd`;
}
}
my $quit = 0;
my $port;
my $sport;
my $domain;
my $xcatdir;
my $sitetab=xCAT::Table->new('site');
unless ($sitetab) {
print ("ERROR: Unable to open basic site table for configuration\n");
}
my ($tmp) = $sitetab->getAttribs({'key'=>'xcatdport'},'value');
unless ($tmp) {
die "ERROR:Need xcatdport defined in site table, try chtab key=xcatdport site.value=3001";
}
$port = $tmp->{value};
$sport = $tmp->{value}+1;
my $plugins_dir=$::XCATROOT.'/lib/perl/xCAT_plugin';
($tmp) = $sitetab->getAttribs({'key'=>'xcatconfdir'},'value');
$xcatdir = (($tmp and $tmp->{value}) ? $tmp->{value} : "/etc/xcat");
$sitetab->close;
my $progname;
$SIG{PIPE} = sub {
confess "SIGPIPE $$progname encountered a broken pipe (probably Ctrl-C by client)"
};
$progname = \$0;
sub daemonize {
chdir('/');
umask 0022;
my $pid;
defined($pid = xCAT::Utils->xfork) or die "Can't fork: $!";
if ($pid) {
if ($pidfile) {
open(PFILE, '>', $pidfile);
print PFILE $pid;
close (PFILE);
} else {
printf ("xCATd starting as PID $pid \n");
}
exit;
}
open STDIN, '/dev/null' or die "Can't read /dev/null: $!";
open STDOUT, '>/dev/null';
open STDERR, '>/dev/null';
$0='xcatd';
$progname = \$0;
setsid or die "Can't start new session";
}
my %cmd_handlers;
sub do_installm_service {
#This function servers as a handler for messages from installing nodes
my $socket;
if ($inet6support) {
$socket = IO::Socket::INET6->new(LocalPort=>$sport,
Proto => 'tcp',
ReuseAddr => 1,
Listen => 64);
} else {
$socket = IO::Socket::INET->new(LocalPort=>$sport,
Proto => 'tcp',
ReuseAddr => 1,
Listen => 64);
}
unless ($socket) {
xCAT::MsgUtils->message("S","xcatd unable to open install monitor services on $sport");
die;
}
until ($quit) {
$SIG{ALRM} = sub { die "XCATTIMEOUT"; };
my $conn;
next unless $conn = $socket->accept;
my @clients;
if ($inet6support) {
@clients = gethostbyaddr($conn->peeraddr,AF_INET6);
} else {
@clients = gethostbyaddr($conn->peeraddr,AF_INET);
}
my $validclient=0;
my $node;
foreach my $client (@clients) {
$client =~ s/\..*//;
($node) = noderange($client); #ensure this is coming from a node IP at least
if ($node) { #Means the source isn't a valid deal...
$validclient=1;
last;
}
}
unless ($validclient) {
close($conn);
next;
}
my $tftpdir = "/tftpboot/";
eval {
alarm(2);
print $conn "ready\n";
while (my $text = <$conn>) {
alarm(0);
print $conn "done\n";
$text =~ s/\r//g;
if ($text =~ /next/) {
my %request = (
command => [ 'nodeset' ],
node => [ $node ],
arg => [ 'next' ],
);
close($conn);
#node should be blocked, race condition may occur otherwise
#my $pid=xCAT::Utils->xfork();
#unless ($pid) { #fork off the nodeset and potential slowness
plugin_command(\%request,undef,\&build_response);
# exit(0);
#}
} elsif ($text =~ /^unlocktftpdir/) { #TODO: only nodes in install state should be allowed
mkpath("$tftpdir/xcat/$node");
chmod 01777,"$tftpdir/xcat/$node";
chmod 0666,glob("$tftpdir/xcat/$node/*");
close($conn);
} elsif ($text =~ /locktftpdir/) {
chmod 0755,"$tftpdir/xcat/$node";
chmod 0644,glob("$tftpdir/xcat/$node/*");
} elsif ($text =~ /^getpostscript/) {
my $reply =plugin_command({command=>['getpostscript'],_xcat_clienthost=>[$node]},undef,\&build_response);
foreach (@{$reply->{data}}) {
print $conn $_;
}
print $conn "#END OF SCRIPT\n";
close($conn);
} elsif ($text =~ /^setiscsiparms/) {
$text =~ s/^setiscsiparms\s+//;
my $kname;
my $iname;
my $kcmdline;
($kname,$iname,$kcmdline) = split(/\s+/,$text,3);
chomp($kcmdline);
my $bptab = xCAT::Table->new('bootparams',-create=>1);
$bptab->setNodeAttribs($node,{kernel=>"xcat/$node/$kname",initrd=>"xcat/$node/$iname",kcmdline=>$kcmdline});
my $iscsitab = xCAT::Table->new('iscsi',-create=>1);
$iscsitab->setNodeAttribs($node,{kernel=>"xcat/$node/$kname",initrd=>"xcat/$node/$iname",kcmdline=>$kcmdline});
my $chaintab = xCAT::Table->new('chain',-create=>1);
$chaintab->setNodeAttribs($node,{currstate=>'iscsiboot',currchain=>'netboot'});
$bptab->close;
$chaintab->close;
undef $bptab;
undef $chaintab;
my %request = (
command => [ 'nodeset' ],
node => [ $node ],
arg => [ 'enact' ],
);
my $pid=xCAT::Utils->xfork();
unless ($pid) { #fork off the nodeset and potential slowness
plugin_command(\%request,undef,\&build_response);
xexit(0);
}
}
alarm(2);
}
alarm(0);
};
if ($@) {
if ($@ =~ /XCATTIMEOUT/) {
xCAT::MsgUtils->message("S","xcatd installmonitor timed out talking to $node");
} else {
xCAT::MsgUtils->message("S","xcatd: possible BUG encountered by xCAT install monitor service: ".$@);
}
}
}
}
sub do_udp_service { #This function opens up a UDP port
#It will do similar to the standard service, except:
#-Obviously, unencrypted and messages are not guaranteed
#-For that reason, more often than not plugins designed with
#-this method will not expect to have a callback
#Also, this throttles to handle one message at a time, so no forking either
#Explicitly, to handle whatever operations nodes periodically send during discover state
#Could be used for heartbeating and such as desired
$dispatch_requests=0;
my $socket;
my $select = new IO::Select;
if ($inet6support) {
$socket = IO::Socket::INET6->new(LocalPort => $port,
Proto => 'udp',
Domain => AF_INET);
} else {
$socket = IO::Socket::INET->new(LocalPort => $port,
Proto => 'udp',
Domain => AF_INET);
}
openlog("xCAT UDP",'','local4');
unless ($socket) {
xCAT::MsgUtils->message("S","xCAT UDP service unable to open port $port: $!");
closelog();
die "Unable to start UDP on $port";
}
$select->add($socket);
my $data;
my $part;
my $sport;
my $client;
my $peerhost;
my %packets;
my $actualpid=$$;
until ($quit) {
eval {
while (1) {
until ($select->can_read(5)) { if ($quit) { last; }; yield; } #Wait for data
while ($select->can_read(0)) { #Pull all buffer data that can be pulled
$part = $socket->recv($data,1500);
($sport,$client) = sockaddr_in($part);
if ($sport < 1000) { #Only remember udp packets from privileged ports
$packets{inet_ntoa($client)} = [$part,$data];
}
}
foreach my $pkey (keys %packets) {
($sport,$client) = sockaddr_in($packets{$pkey}->[0]);
$data=$packets{$pkey}->[1];
$peerhost=gethostbyaddr($client,AF_INET)."\n";
my $req = eval { XMLin($data, SuppressEmpty=>undef,ForceArray=>1) };
if ($req and $req->{command} and ($req->{command}->[0] eq "findme")) {
$req->{'_xcat_clienthost'}=gethostbyaddr($client,AF_INET);
$req->{'_xcat_clientip'}=inet_ntoa($client);
$req->{'_xcat_clientport'}=$sport;
if (defined($cmd_handlers{"findme"})) {
$req->{cacheonly}->[0] = 1;
plugin_command($req,undef,\&build_response);
if ($req->{cacheonly}->[0]) {
delete $req->{cacheonly};
plugin_command($req,undef,\&build_response);
#if ($req) {
# $req->{cacheonly}->[0] = 1;
# $req->{checkallmacs}->[0] = 1;
# plugin_command($req,undef,\&convey_response);
# }
}
}
}
if ($quit) { last; }
while ($select->can_read(0)) { #grab any incoming requests during run
$part = $socket->recv($data,1500);
($sport,$client) = sockaddr_in($part);
$packets{inet_ntoa($client)} = [$part,$data];
}
#Some of those 'future' packets might be stale dupes of this packet, so...
delete $packets{$pkey}; #Delete any duplicates of current packet
}
if ($quit) { last; }
}
};
if ($@) {
xCAT::MsgUtils->message("S","xcatd: possible BUG encountered by xCAT UDP service: ".$@);
}
unless ($actualpid == $$) { #We should absolutely never be here, exponential growth from a plugin crash.
exit 1;
}
}
}
sub scan_plugins {
my @plugins=glob($plugins_dir."/*.pm");
foreach (@plugins) {
/.*\/([^\/]*).pm$/;
my $modname = $1;
require "$_";
no strict 'refs';
my $cmd_adds=${"xCAT_plugin::".$modname."::"}{handled_commands}->();
foreach (keys %$cmd_adds) {
my $value = $_;
if (defined($cmd_handlers{$_})) {
my $add=1;
#This next bit of code iterates through the handlers.
#If the value doesn't contain an equal, and has an equivalent entry added by
# another plugin already, don't add (otherwise would hit the DB multiple times)
# a better idea, restructure the cmd_handlers as a multi-level hash
# prove out this idea real quick before doing that
foreach (@{$cmd_handlers{$_}}) {
if (($_->[1] eq $cmd_adds->{$value}) and (($cmd_adds->{$value} !~ /=/) or ($_->[0] eq $modname))) {
$add = 0;
}
}
if ($add) { push @{$cmd_handlers{$_}},[$modname,$cmd_adds->{$_}]; }
#die "Conflicting handler information from $modname";
} else {
$cmd_handlers{$_} = [ [$modname,$cmd_adds->{$_}] ];
}
}
}
}
scan_plugins;
eval {
xCAT::MsgUtils->message("S","xCATd: service starting");
};
if ($@) {
print "ERROR: $@";
xexit;
}
unless ($foreground) {
daemonize;
}
my $CHILDPID=0; #Global for reapers
sub generic_reaper {
while (waitpid(-1,WNOHANG) > 0) {
yield;
}
$SIG{CHLD} = \&generic_reaper;
}
sub ssl_reaper {
while (waitpid(-1,WNOHANG) > 0) {
$sslclients--;
}
$SIG{CHLD} = \&ssl_reaper;
}
sub dispatch_reaper {
while (($CHILDPID =waitpid(-1, WNOHANG)) > 0) {
if ($dispatched_children{$CHILDPID}) {
delete $dispatched_children{$CHILDPID};
$dispatch_children--;
}
}
$SIG{CHLD} = \&dispatch_reaper;
}
sub plugin_reaper {
while (($CHILDPID = waitpid(-1, WNOHANG)) > 0) {
if ($plugin_children{$CHILDPID}) {
delete $plugin_children{$CHILDPID};
$plugin_numchildren--;
}
}
$SIG{CHLD} = \&plugin_reaper;
}
$SIG{CHLD} = \&generic_reaper;
$SIG{TERM} = $SIG{INT} = sub {
printf("Asked to quit...\n");
$quit++;
foreach (keys %dispatched_children) {
kill 2, $_;
}
foreach (keys %plugin_children) {
kill 2, $_;
}
$SIG{ALRM} = sub { xexit 0; }; #die "Did not close out in time for 5 second grace period"; };
alarm(2);
};
my $pid = xCAT::Utils->xfork;
defined $pid or die "Unable to fork for UDP/TCP";
unless ($pid) {
$$progname="xcatd: UDP listener";
do_udp_service;
xexit(0);
}
$pid = xCAT::Utils->xfork;
defined $pid or die "Unable to fork installmonitor";
unless ($pid) {
$$progname="xcatd: install monitor";
do_installm_service;
xexit(0);
}
$$progname="xcatd: SSL listener";
openlog("xCAT SSL","","local4");
my $listener = IO::Socket::INET->new(
LocalPort => $port,
Listen => 64,
Reuse => 1,
);
unless ($listener) {
kill 2, $pid;
xCAT::MsgUtils->message("S","xCAT service unable to open SSL services on $port: $!");
closelog();
die "ERROR:Unable to start xCAT service on port $port.";
}
closelog();
#setup signal in NotifHandler so that the cache can be updated
xCAT::NotifHandler::setup($$);
#start the monitoring process
xCAT_monitoring::monitorctrl::start($$);
my $peername;
my $ssltimeout;
until ($quit) {
$SIG{CHLD} = \&ssl_reaper; #set here to ensure that signal handler is not corrupted during loop
next unless my $cnnection=$listener->accept;
my $connection;
while ($sslclients > $maxsslclients) { #THROTTLE
sleep 0.1; #Keep processor utilization down
}
my $child = xCAT::Utils->xfork(); #Yes we fork, IO::Socket::SSL is not threadsafe..
unless (defined $child) {
die "xCATd cannot fork";
}
if ($child == 0) {
$SIG{CHLD} = \&generic_reaper; #THROTTLE
$listener->close;
$SIG{ALRM} = sub { $ssltimeout = 1; die; };
eval {
alarm(10);
$connection = IO::Socket::SSL->start_SSL($cnnection,
SSL_key_file=>$xcatdir."/cert/server-cred.pem",
SSL_cert_file=>$xcatdir."/cert/server-cred.pem",
SSL_ca_file=>$xcatdir."/cert/ca.pem",
SSL_server=>1,
SSL_verify_mode=> 1
);
alarm(0);
};
$SIG{ALRM}='DEFAULT';
if ($@) { #SSL failure
close($cnnection);
xexit 0;
}
unless ($connection) {
xexit 0;
}
$clientselect->add($connection);
my $peerhost=undef;
my $peer=$connection->peer_certificate("owner");
if ($peer) {
$peer =~ m/CN=([^\/]*)/;
$peername = $1;
} else {
$peername=undef;
}
$sitetab=xCAT::Table->new('site');
my ($tmp) = $sitetab->getAttribs({'key'=>'domain'},'value');
if (defined $tmp->{value}) {
$domain = $tmp->{value};
}
$sitetab->close;
if ($inet6support) {
$peerhost = gethostbyaddr($connection->peeraddr,AF_INET6);
} else {
$peerhost = gethostbyaddr($connection->peeraddr,AF_INET);
}
unless ($peerhost) { $peerhost = gethostbyaddr($connection->peeraddr,AF_INET); }
$peerhost =~ s/\.$domain\.*$//;
$peerhost =~ s/-eth\d*$//;
$peerhost =~ s/-myri\d*$//;
$peerhost =~ s/-ib\d*$//;
#printf('info'.": xcatd: connection from ".($peername ? $peername . "@" . $peerhost : $peerhost)."\n");
$$progname="xCATd SSL: Instance for ".($peername ? $peername ."@".$peerhost : $peerhost);
service_connection($connection,$peername,$peerhost);
xexit(0);
}
$sslclients++; #THROTTLE
$cnnection->close();
}
$listener->close;
#stop the monitoring process
xCAT_monitoring::monitorctrl::stop($$);
my $parent_fd;
my %resps;
sub plugin_command {
my $req = shift;
my $sock = shift;
my $callback = shift;
my %handler_hash;
use xCAT::NodeRange;
$Main::resps={};
my @nodes;
if ($req->{node}) {
@nodes = @{$req->{node}};
} elsif ($req->{noderange} and $req->{noderange}->[0]) {
@nodes = noderange($req->{noderange}->[0]);
if (nodesmissed) {
my $rsp = {errorcode=>1,error=>"Invalid nodes in noderange:".join(',',nodesmissed)};
$rsp->{serverdone} = {};
if ($sock) {
print $sock XMLout($rsp,RootName=>'xcatresponse' ,NoAttr=>1);
}
return ($rsp);
}
unless (@nodes) {
$req->{emptynoderange} = [1];
}
}
if (@nodes) { $req->{node} = \@nodes; }
my %unhandled_nodes;
foreach (@nodes) {
$unhandled_nodes{$_}=1;
}
my $useunhandled=0;
if (defined($cmd_handlers{$req->{command}->[0]})) {
my $hdlspec;
foreach (@{$cmd_handlers{$req->{command}->[0]}}) {
$hdlspec =$_->[1];
my $ownmod = $_->[0];
if ($hdlspec =~ /:/) { #Specificed a table lookup path for plugin name
$useunhandled=1;
my $table;
my $cols;
($table,$cols) = split(/:/,$hdlspec);
my @colmns=split(/,/,$cols);
my @columns;
my $hdlrtable=xCAT::Table->new($table);
unless ($hdlrtable) {
#TODO: proper error handling
}
my $node;
my $colvals = {};
foreach my $colu (@colmns) {
if ($colu =~ /=/) { #a value redirect to a pattern/specific name
my $coln; my $colv;
($coln,$colv) = split(/=/,$colu,2);
$colvals->{$coln} = $colv;
push (@columns,$coln);
} else {
push (@columns,$colu);
}
}
unless (@nodes) { #register the plugin in the event of usage
$handler_hash{$ownmod} = 1;
}
my $hdlrcache = $hdlrtable->getNodesAttribs(\@nodes,\@columns);
foreach $node (@nodes) {
my $attribs = $hdlrcache->{$node}->[0]; #$hdlrtable->getNodeAttribs($node,\@columns);
unless (defined($attribs)) { next; } #TODO: This really ought to craft an unsupported response for this request
foreach (@columns) {
my $col=$_;
if (defined($attribs->{$col})) {
if ($colvals->{$col}) { #A pattern match style request.
if ($attribs->{$col} =~ /$colvals->{$col}/) {
$handler_hash{$ownmod}->{$node} = 1;
delete $unhandled_nodes{$node};
last;
}
} else {
$handler_hash{$attribs->{$col}}->{$node} = 1;
delete $unhandled_nodes{$node};
last;
}
}
}
}
$hdlrtable->close;
} else {
unless (@nodes) {
$handler_hash{$hdlspec} = 1;
}
foreach (@nodes) { #Specified a specific plugin, not a table lookup
$handler_hash{$hdlspec}->{$_} = 1;
}
}
}
} else {
return 1; #TODO: error back that request has no known plugin for it
}
if ($useunhandled) {
my $queuelist;
foreach (@{$cmd_handlers{$req->{command}->[0]}->[0]}) {
unless (/:/) {
next;
}
$queuelist .= "$_,";
}
$queuelist =~ s/,$//;
$queuelist =~ s/:/./g;
foreach (keys %unhandled_nodes) {
if ($sock) {
print $sock XMLout({node=>[{name=>[$_],data=>["Unable to identify plugin for this command, check relevant tables: $queuelist"],errorcode=>[1]}]},NoAttr=>1,RootName=>'xcatresponse');
} else {
my $tabdesc = $queuelist;
$tabdesc =~ s/=.*$//;
$callback->({node=>[{name=>[$_],data=>['Unable to identify plugin for this command, check relevant tables: '.$tabdesc],errorcode=>[1]}]});
}
}
}
$plugin_numchildren=0;
%plugin_children=();
$SIG{CHLD} = \&plugin_reaper; #sub {my $plugpid; while (($plugpid = waitpid(-1, WNOHANG)) > 0) { if ($plugin_children{$plugpid}) { delete $plugin_children{$plugpid}; $plugin_numchildren--; } } };
my $check_fds;
if ($sock) {
$check_fds = new IO::Select;
}
foreach (keys %handler_hash) {
my $modname = $_;
if (-r $plugins_dir."/".$modname.".pm") {
require $plugins_dir."/".$modname.".pm";
$plugin_numchildren++;
my $pfd; #will be referenced for inter-process messaging.
my $parfd; #not causing a problem that I discern yet, but theoretically
my $child;
if ($sock) { #If $sock not passed in, don't fork..
socketpair($pfd, $parfd,AF_UNIX,SOCK_STREAM,PF_UNSPEC) or die "socketpair: $!";
#pipe($pfd,$cfd);
$parfd->autoflush(1);
$pfd->autoflush(1);
$child = xCAT::Utils->xfork;
} else {
$child = 0;
}
unless (defined $child) { die "Fork failed"; }
if ($child == 0) {
if ($parfd) { #If xCAT is doing multiple requests in same communication PID, things would get unfortunate otherwise
$parent_fd = $parfd;
}
my $oldprogname=$$progname;
$$progname=$oldprogname.": $modname instance";
if ($sock) { close $pfd; }
unless ($handler_hash{$_} == 1) {
my @nodes = sort {($a =~ /(\d+)/)[0] <=> ($b =~ /(\d+)/)[0] || $a cmp $b } (keys %{$handler_hash{$_}});
$req->{node}=\@nodes;
}
no strict "refs";
if ($dispatch_requests) {
dispatch_request($req,$callback,$modname);
} else {
$SIG{CHLD}='DEFAULT';
${"xCAT_plugin::".$modname."::"}{process_request}->($req,$callback,\&do_request);
}
$$progname=$oldprogname;
if ($sock) {
close($parent_fd);
xexit(0);
}
} else {
$plugin_children{$child}=1;
close $parfd;
$check_fds->add($pfd);
}
}
}
unless ($sock) { return $Main::resps };
while (($plugin_numchildren > 0) and ($check_fds->count > 0)) { #this tracks end of useful data from children much more closely
relay_fds($check_fds,$sock);
}
#while (relay_fds($check_fds,$sock)) {}
my %done;
$done{serverdone} = {};
if ($req->{transid}) {
$done{transid}=$req->{transid}->[0];
}
if ($sock) {
my $clientpresence = new IO::Select; #The client may have gone away without confirmation, don't PIPE over this trivial thing
$clientpresence->add($sock);
if ($clientpresence->can_write(5)) {
print $sock XMLout(\%done,RootName => 'xcatresponse',NoAttr=>1);
}
}
}
my $dispatch_dnf=0;
my $dispatch_cb;
my $dispatch_parentfd;
sub dispatch_callback {
my $rspo = shift;
unless ($rspo) {
return;
}
my $rsp = {%$rspo}; # deep copy
delete $rsp->{serverdone};
unless (%$rsp) { return; }
if ($dispatch_dnf) {
$dispatch_cb->($rsp);
} else {
print $dispatch_parentfd freeze($rsp);
print $dispatch_parentfd "\nENDOFFREEZE6sK6xa\n";
yield; #This has to happen before next line could possibly work anyway
my $parselect = new IO::Select;
$parselect->add($dispatch_parentfd);
my $selbits = $parselect->bits;
my $rsp;
while (defined($selbits) && ($rsp = select($selbits,undef,undef,5))) { #block for up to 5 seconds before continuing
if ($quit) { # termination requested by a clean shutdown facility
xexit 0;
}
if ($rsp == 0) { #The select call failed to find any ready items
last;
}
if ($rsp < 0) { #A child exited or other signal event that made select skip out before suggesting succes
next;
}
if ($rsp = <$dispatch_parentfd>) {
if ($rsp =~ /die/ or $quit) {
xexit 0;
}
last;
} else {
$parselect->remove($dispatch_parentfd); #Block until parent acks data
last;
}
$selbits = $parselect->bits;
yield;
}
}
}
sub relay_dispatch {
my $fds = shift;
my @ready_ins = $fds->can_read(1);
foreach my $rin (@ready_ins) {
my $data;
if ($data = <$rin>) {
while ($data !~ /ENDOFFREEZE6sK6xa/) {
$data .= <$rin>;
}
my $response = thaw($data);
print $rin "dfin\n";
$dispatch_cb->($response);
} else {
$fds->remove($rin);
close($rin);
}
}
yield; #At this point, explicitly yield to other processes. If children will have more data, this process would otherwise uselessly loop on data that never will be. If children are all done, still no harm in waiting a short bit for a timeslice to come back
return scalar(@ready_ins);
}
sub dispatch_request {
%dispatched_children=();
my $req = shift;
$dispatch_cb = shift;
my $modname = shift;
my $reqs = [];
my $child_fdset = new IO::Select;
no strict "refs";
#Hierarchy support. Originally, the default scope for noderange commands was
#going to be the servicenode associated unless overriden.
#However, assume for example that you have blades and a blade is the service node
#rpower being executed by the servicenode for one of its subnodes would have to
#reach it's own management module. This has the potential to be non-trivial for some quite possible network configurations.
#Since plugins may commonly experience this, a preprocess_request implementation
#will for now be required for a command to be scaled through service nodes
#If the plugin offers a preprocess method, use it to set the request array
if (defined(${"xCAT_plugin::".$modname."::"}{preprocess_request})) {
$SIG{CHLD}='DEFAULT';
$reqs = ${"xCAT_plugin::".$modname."::"}{preprocess_request}->($req,$dispatch_cb,\&do_request);
} else { #otherwise, pass it in without hierarchy support
$reqs = [$req];
}
$dispatch_children=0;
$SIG{CHLD} = \&dispatch_reaper; #sub {my $cpid; while (($cpid =waitpid(-1, WNOHANG)) > 0) { if ($dispatched_children{$cpid}) { delete $dispatched_children{$cpid}; $dispatch_children--; } } };
my $onlyone=0;
if (defined $reqs and (scalar(@{$reqs}) == 1)) {
$onlyone=1;
}