From 1efa566c88019717285b1d52d0be03cc820afd74 Mon Sep 17 00:00:00 2001 From: jbjohnso Date: Sat, 26 Jan 2008 00:23:23 +0000 Subject: [PATCH] Parallelize xcatd dispatched requests git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@328 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd --- xCAT-server-2.0/sbin/xcatd | 100 ++++++++++++++++++++++++++++--------- 1 file changed, 76 insertions(+), 24 deletions(-) diff --git a/xCAT-server-2.0/sbin/xcatd b/xCAT-server-2.0/sbin/xcatd index 978e1af7b..9c680f324 100755 --- a/xCAT-server-2.0/sbin/xcatd +++ b/xCAT-server-2.0/sbin/xcatd @@ -5,6 +5,7 @@ BEGIN $::XCATROOT = $ENV{'XCATROOT'} ? $ENV{'XCATROOT'} : '/opt/xcat'; } use lib "$::XCATROOT/lib/perl"; +use Storable qw(freeze thaw); use xCAT::Utils; use File::Path; use xCAT::Client submit_request; @@ -68,6 +69,7 @@ $xcatdir = (($tmp and $tmp->{value}) ? $tmp->{value} : "/etc/xcat"); $sitetab->close; my $progname; $SIG{PIPE} = sub { die "SIGPIPE $$progname encountered a broken pipe (probably Ctrl-C by client)" }; +$progname = \$0; sub daemonize { chdir('/'); umask 0; @@ -562,52 +564,102 @@ sub plugin_command { if ($sock) { print $sock XMLout(\%done,RootName => 'xcatresponse',NoAttr=>1); } } +my $dispatch_dnf=0; my $dispatch_cb; +my $dispatch_parentfd; sub dispatch_callback { - my $rsp = shift; + my $rspo = shift; + my $rsp = {%$rspo}; # deep copy delete $rsp->{serverdone}; - $dispatch_cb->($rsp); + unless (%$rsp) { return; } + if ($dispatch_dnf) { + $dispatch_cb->($rsp); + } else { + print $dispatch_parentfd freeze($rsp); + print $dispatch_parentfd "\nENDOFFREEZE6sK6xa\n"; + <$dispatch_parentfd>; #Block until parent acks data + } } + +sub relay_dispatch { + my $fds = shift; + my @ready_ins = $fds->can_read(0); + foreach my $rin (@ready_ins) { + my $data; + if ($data = <$rin>) { + while ($data !~ /ENDOFFREEZE6sK6xa/) { + $data .= <$rin>; + } + my $response = thaw($data); + print $rin "fin\n"; + $dispatch_cb->($response); + } else { + $fds->remove($rin); + close($rin); + } + } + return scalar(@ready_ins); +} + sub dispatch_request { - my $req = shift; - $dispatch_cb = shift; + my $req = shift; + $dispatch_cb = shift; - my $modname = shift; - my $reqs = []; - no strict "refs"; + my $modname = shift; + my $reqs = []; + my $child_fdset = new IO::Select; + no strict "refs"; - #Hierarchy support. Originally, the default scope for noderange commands was - #going to be the servicenode associated unless overriden. - #However, assume for example that you have blades and a blade is the service node - #rpower being executed by the servicenode for one of its subnodes would have to - #reach it's own management module. This has the potential to be non-trivial for some quite possible network configurations. - #Since plugins may commonly experience this, a preprocess_request implementation - #will for now be required for a command to be scaled through service nodes - #If the plugin offers a preprocess method, use it to set the request array - if (defined(${"xCAT_plugin::".$modname."::"}{preprocess_request})) { + #Hierarchy support. Originally, the default scope for noderange commands was + #going to be the servicenode associated unless overriden. + #However, assume for example that you have blades and a blade is the service node + #rpower being executed by the servicenode for one of its subnodes would have to + #reach it's own management module. This has the potential to be non-trivial for some quite possible network configurations. + #Since plugins may commonly experience this, a preprocess_request implementation + #will for now be required for a command to be scaled through service nodes + #If the plugin offers a preprocess method, use it to set the request array + if (defined(${"xCAT_plugin::".$modname."::"}{preprocess_request})) { $reqs = ${"xCAT_plugin::".$modname."::"}{preprocess_request}->($req); - } else { #otherwise, pass it in without hierarchy support + } else { #otherwise, pass it in without hierarchy support $reqs = [$req]; - } + } - #TODO: calls to dispatch must be parallelized - foreach (@{$reqs}) { + my $childrn=0; + $SIG{CHLD} = sub {while (waitpid(-1, WNOHANG) > 0) { $childrn--; } }; + #TODO: calls to dispatch must be parallelized + foreach (@{$reqs}) { + my $pfd; + my $child; + delete $_->{noderange}; + socketpair($pfd, $dispatch_parentfd,AF_UNIX,SOCK_STREAM,PF_UNSPEC) or die "socketpair: $!"; + $dispatch_parentfd->autoflush(1); + $pfd->autoflush(1); + $child = xCAT::Utils->xfork; + if ($child) { + $child_fdset->add($pfd); + $childrn++; + next; + } + unless (defined $child) { + $dispatch_cb->({error=>['Fork failure dispatching request'],errorcode=>[1]}); + } if ($_->{'_xcatdest'} and thishostisnot($_->{'_xcatdest'})) { - my $oldenv = $ENV{XCATHOST}; $ENV{XCATHOST} = ( $_->{'_xcatdest'} =~ /:/ ? $_->{'_xcatdest'} : $_->{'_xcatdest'}.":3001" ); eval { undef $_->{'_xcatdest'}; xCAT::Client::submit_request($_,\&dispatch_callback,$xcatdir."/cert/server-key.pem",$xcatdir."/cert/server-cert.pem",$xcatdir."/cert/ca.pem"); }; if ($@) { - $dispatch_cb->({error=>["Error dispatching command to ".$ENV{XCATHOST}.""],errorcode=>[1]}); + dispatch_callback({error=>["Error dispatching command to ".$ENV{XCATHOST}.""],errorcode=>[1]}); syslog("local4|err","Error dispatching request: ".$@); } - $ENV{XCATHOST} = $oldenv; } else { - ${"xCAT_plugin::".$modname."::"}{process_request}->($_,$dispatch_cb,\&do_request); + ${"xCAT_plugin::".$modname."::"}{process_request}->($_,\&dispatch_callback,\&do_request); } + exit; } + while ($childrn > 0) { relay_dispatch($child_fdset) } + while (relay_dispatch($child_fdset)) { } #Potentially useless drain. } sub thishostisnot {