Parallelize xcatd dispatched requests
git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@328 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd
This commit is contained in:
parent
bb240e7708
commit
1efa566c88
@ -5,6 +5,7 @@ BEGIN
|
||||
$::XCATROOT = $ENV{'XCATROOT'} ? $ENV{'XCATROOT'} : '/opt/xcat';
|
||||
}
|
||||
use lib "$::XCATROOT/lib/perl";
|
||||
use Storable qw(freeze thaw);
|
||||
use xCAT::Utils;
|
||||
use File::Path;
|
||||
use xCAT::Client submit_request;
|
||||
@ -68,6 +69,7 @@ $xcatdir = (($tmp and $tmp->{value}) ? $tmp->{value} : "/etc/xcat");
|
||||
$sitetab->close;
|
||||
my $progname;
|
||||
$SIG{PIPE} = sub { die "SIGPIPE $$progname encountered a broken pipe (probably Ctrl-C by client)" };
|
||||
$progname = \$0;
|
||||
sub daemonize {
|
||||
chdir('/');
|
||||
umask 0;
|
||||
@ -562,52 +564,102 @@ sub plugin_command {
|
||||
if ($sock) { print $sock XMLout(\%done,RootName => 'xcatresponse',NoAttr=>1); }
|
||||
}
|
||||
|
||||
my $dispatch_dnf=0;
|
||||
my $dispatch_cb;
|
||||
my $dispatch_parentfd;
|
||||
sub dispatch_callback {
|
||||
my $rsp = shift;
|
||||
my $rspo = shift;
|
||||
my $rsp = {%$rspo}; # deep copy
|
||||
delete $rsp->{serverdone};
|
||||
$dispatch_cb->($rsp);
|
||||
unless (%$rsp) { return; }
|
||||
if ($dispatch_dnf) {
|
||||
$dispatch_cb->($rsp);
|
||||
} else {
|
||||
print $dispatch_parentfd freeze($rsp);
|
||||
print $dispatch_parentfd "\nENDOFFREEZE6sK6xa\n";
|
||||
<$dispatch_parentfd>; #Block until parent acks data
|
||||
}
|
||||
}
|
||||
|
||||
sub relay_dispatch {
|
||||
my $fds = shift;
|
||||
my @ready_ins = $fds->can_read(0);
|
||||
foreach my $rin (@ready_ins) {
|
||||
my $data;
|
||||
if ($data = <$rin>) {
|
||||
while ($data !~ /ENDOFFREEZE6sK6xa/) {
|
||||
$data .= <$rin>;
|
||||
}
|
||||
my $response = thaw($data);
|
||||
print $rin "fin\n";
|
||||
$dispatch_cb->($response);
|
||||
} else {
|
||||
$fds->remove($rin);
|
||||
close($rin);
|
||||
}
|
||||
}
|
||||
return scalar(@ready_ins);
|
||||
}
|
||||
|
||||
sub dispatch_request {
|
||||
my $req = shift;
|
||||
$dispatch_cb = shift;
|
||||
my $req = shift;
|
||||
$dispatch_cb = shift;
|
||||
|
||||
my $modname = shift;
|
||||
my $reqs = [];
|
||||
no strict "refs";
|
||||
my $modname = shift;
|
||||
my $reqs = [];
|
||||
my $child_fdset = new IO::Select;
|
||||
no strict "refs";
|
||||
|
||||
#Hierarchy support. Originally, the default scope for noderange commands was
|
||||
#going to be the servicenode associated unless overriden.
|
||||
#However, assume for example that you have blades and a blade is the service node
|
||||
#rpower being executed by the servicenode for one of its subnodes would have to
|
||||
#reach it's own management module. This has the potential to be non-trivial for some quite possible network configurations.
|
||||
#Since plugins may commonly experience this, a preprocess_request implementation
|
||||
#will for now be required for a command to be scaled through service nodes
|
||||
#If the plugin offers a preprocess method, use it to set the request array
|
||||
if (defined(${"xCAT_plugin::".$modname."::"}{preprocess_request})) {
|
||||
#Hierarchy support. Originally, the default scope for noderange commands was
|
||||
#going to be the servicenode associated unless overriden.
|
||||
#However, assume for example that you have blades and a blade is the service node
|
||||
#rpower being executed by the servicenode for one of its subnodes would have to
|
||||
#reach it's own management module. This has the potential to be non-trivial for some quite possible network configurations.
|
||||
#Since plugins may commonly experience this, a preprocess_request implementation
|
||||
#will for now be required for a command to be scaled through service nodes
|
||||
#If the plugin offers a preprocess method, use it to set the request array
|
||||
if (defined(${"xCAT_plugin::".$modname."::"}{preprocess_request})) {
|
||||
$reqs = ${"xCAT_plugin::".$modname."::"}{preprocess_request}->($req);
|
||||
} else { #otherwise, pass it in without hierarchy support
|
||||
} else { #otherwise, pass it in without hierarchy support
|
||||
$reqs = [$req];
|
||||
}
|
||||
}
|
||||
|
||||
#TODO: calls to dispatch must be parallelized
|
||||
foreach (@{$reqs}) {
|
||||
my $childrn=0;
|
||||
$SIG{CHLD} = sub {while (waitpid(-1, WNOHANG) > 0) { $childrn--; } };
|
||||
#TODO: calls to dispatch must be parallelized
|
||||
foreach (@{$reqs}) {
|
||||
my $pfd;
|
||||
my $child;
|
||||
delete $_->{noderange};
|
||||
socketpair($pfd, $dispatch_parentfd,AF_UNIX,SOCK_STREAM,PF_UNSPEC) or die "socketpair: $!";
|
||||
$dispatch_parentfd->autoflush(1);
|
||||
$pfd->autoflush(1);
|
||||
$child = xCAT::Utils->xfork;
|
||||
if ($child) {
|
||||
$child_fdset->add($pfd);
|
||||
$childrn++;
|
||||
next;
|
||||
}
|
||||
unless (defined $child) {
|
||||
$dispatch_cb->({error=>['Fork failure dispatching request'],errorcode=>[1]});
|
||||
}
|
||||
if ($_->{'_xcatdest'} and thishostisnot($_->{'_xcatdest'})) {
|
||||
my $oldenv = $ENV{XCATHOST};
|
||||
$ENV{XCATHOST} = ( $_->{'_xcatdest'} =~ /:/ ? $_->{'_xcatdest'} : $_->{'_xcatdest'}.":3001" );
|
||||
eval {
|
||||
undef $_->{'_xcatdest'};
|
||||
xCAT::Client::submit_request($_,\&dispatch_callback,$xcatdir."/cert/server-key.pem",$xcatdir."/cert/server-cert.pem",$xcatdir."/cert/ca.pem");
|
||||
};
|
||||
if ($@) {
|
||||
$dispatch_cb->({error=>["Error dispatching command to ".$ENV{XCATHOST}.""],errorcode=>[1]});
|
||||
dispatch_callback({error=>["Error dispatching command to ".$ENV{XCATHOST}.""],errorcode=>[1]});
|
||||
syslog("local4|err","Error dispatching request: ".$@);
|
||||
}
|
||||
$ENV{XCATHOST} = $oldenv;
|
||||
} else {
|
||||
${"xCAT_plugin::".$modname."::"}{process_request}->($_,$dispatch_cb,\&do_request);
|
||||
${"xCAT_plugin::".$modname."::"}{process_request}->($_,\&dispatch_callback,\&do_request);
|
||||
}
|
||||
exit;
|
||||
}
|
||||
while ($childrn > 0) { relay_dispatch($child_fdset) }
|
||||
while (relay_dispatch($child_fdset)) { } #Potentially useless drain.
|
||||
}
|
||||
|
||||
sub thishostisnot {
|
||||
|
Loading…
x
Reference in New Issue
Block a user