From a9acb42b1e37afd76b1f2f7acd7781973e3a401f Mon Sep 17 00:00:00 2001 From: lissav Date: Fri, 16 Aug 2013 07:05:32 -0400 Subject: [PATCH] add flow control --- xCAT-server/sbin/xcatd | 154 +++++++++++++++++++++++++-------- xCAT/postscripts/remoteshell | 39 +++++---- xCAT/postscripts/xcatdsklspost | 24 +++-- 3 files changed, 158 insertions(+), 59 deletions(-) diff --git a/xCAT-server/sbin/xcatd b/xCAT-server/sbin/xcatd index 149346d79..20d272850 100755 --- a/xCAT-server/sbin/xcatd +++ b/xCAT-server/sbin/xcatd @@ -18,6 +18,8 @@ BEGIN } } +my $sslctl; +my $udpctl; # if AIX - make sure we include perl 5.8.2 in INC path. # Needed to find perl dependencies shipped in deps tarball. if ($^O =~ /^aix/i) { @@ -48,6 +50,7 @@ use xCAT::Client qw(submit_request); my $clientselect = new IO::Select; my $sslclients = 0; #THROTTLE my $maxsslclients = 64; #default +my $batchclients = 50; my @deferredmsgargs; # hold argumentlist for MsgUtils call until after fork #parallelizing logging overhead with real work @@ -125,7 +128,7 @@ unless ($pidfile) { #start syslog if it is not up if (xCAT::Utils->isLinux()) { my $init_file="/etc/init.d/syslog"; - if ((-f "/etc/fedora-release") || (-f "/etc/redhat-release") || (-f "/etc/lsb-release")){ + if ((-f "/etc/fedora-release") || (-f "/etc/redhat-release") || (-f "/etc/lsb-release")){ $init_file="/etc/init.d/rsyslog"; } if ( -x $init_file ) { @@ -174,6 +177,8 @@ if ($tmp) { } ($tmp) = $sitetab->getAttribs({'key'=>'xcatmaxconnections'},'value'); if ($tmp and $tmp->{value}) { $maxsslclients = $tmp->{value}; } +($tmp) = $sitetab->getAttribs({'key'=>'xcatmaxbatchconnections'},'value'); +if ($tmp and $tmp->{value}) { $batchclients = $tmp->{value}; } my $plugins_dir=$::XCATROOT.'/lib/perl/xCAT_plugin'; @@ -479,6 +484,37 @@ if ($inet6support) { } } +sub update_udpcontext_from_sslctl { + my %args = @_; + my $udpcontext = $args{udpcontext}; + my $select = $args{select}; + my $msg; + eval { $msg = fd_retrieve($sslctl); }; + if ($msg) { + #remember new count and, if new connection and we have a fudge factor, decrese fudge factor optimisticly assuming it's the right one + $udpcontext->{sslclientcount} = $msg->{sslclientcount}; + if ($udpcontext->{clientfudge} and $msg->{clientcountchange} > 0) { $udpcontext->{clientfudge} -= $msg->{clientcountchange}; } + } else { + $select->remove($sslctl); close($sslctl); #something went horribly wrong + } +} + +sub grant_tcrequests { + my $requestors = shift; + my $udpcontext = shift; + my $availableslots = $batchclients; + $availableslots -= $udpcontext->{clientfudge}; #value that forecasts the pressure + $availableslots -= $udpcontext->{sslclientcount}; #subtract all currently really active sessions + my $oldtime = time()-180; #drop requests older than three minutes if still around + foreach my $rkey (keys %{$requestors}) { + if ($requestors->{$rkey}->{timestamp} < $oldtime) { delete $requestors->{$rkey}; next; } + unless ($availableslots > 0) { next; } # no slots, ignore requests for now + $udpcontext->{clientfudge}+=1; #adjust forecast for being busy + $availableslots-=1; + $udpcontext->{socket}->send("resourcerequest: ok\n",0,$requestors->{$rkey}->{sockaddr}); + delete ($requestors->{$rkey}); #we acknoweldged, assume consumer got it, they'll do retry if they failed + } +} sub do_udp_service { #This function opens up a UDP port #It will do similar to the standard service, except: @@ -489,6 +525,9 @@ sub do_udp_service { #This function opens up a UDP port #Explicitly, to handle whatever operations nodes periodically send during discover state #Could be used for heartbeating and such as desired $dispatch_requests=0; + my $udpcontext; + $udpcontext->{clientfudge}=0; + $udpcontext->{sslclientcount}=0; my $udppidfile; my $retry=1; my $socket; @@ -499,10 +538,10 @@ sub do_udp_service { #This function opens up a UDP port xCAT::MsgUtils->message("S","xcatd udp service $$ quiescing"); } }; - if ($inet6support) { + if ($inet6support) { $socket = IO::Socket::INET6->new(LocalPort => $port, Proto => 'udp', - Domain => AF_INET); + ); } else { $socket = IO::Socket::INET->new(LocalPort => $port, Proto => 'udp', @@ -519,10 +558,10 @@ sub do_udp_service { #This function opens up a UDP port } my $select = new IO::Select; while (not $socket and $retry) { -if ($inet6support) { +if ($inet6support) { $socket = IO::Socket::INET6->new(LocalPort => $port, Proto => 'udp', - Domain => AF_INET); + ); } else { $socket = IO::Socket::INET->new(LocalPort => $port, Proto => 'udp', @@ -542,6 +581,8 @@ sleep 0.05; print $udppidfile $$; close($udppidfile); $select->add($socket); + $udpcontext->{socket} = $socket; + $select->add($sslctl); my $data; my $part; my $sport; @@ -551,6 +592,7 @@ sleep 0.05; my $actualpid=$$; until ($quit) { eval { + my $tcclients; # hash reference to store traffic control requests while (1) { unless ($actualpid == $$) { #This really should be impossible now... xCAT::MsgUtils->message("S","xcatd: Something absolutely ludicrous happpened, xCAT developers think this message is impossible to see, post if you see it, fork bomb averted"); @@ -561,57 +603,84 @@ sleep 0.05; populate_site_hash(); yield; } - while ($select->can_read(0)) { #Pull all buffer data that can be pulled + my @hdls; + while (@hdls = $select->can_read(0)) { #Pull all buffer data that can be pulled + my $hdl; + foreach $hdl (@hdls) { + if ($hdl == $socket) { $part = $socket->recv($data,1500); - ($sport,$client) = sockaddr_in($part); - if ($sport < 1000) { #Only remember udp packets from privileged ports - $packets{inet_ntoa($client)} = [$part,$data]; - } + $packets{$part} = [$part,$data]; + } elsif ($hdl == $sslctl) { + update_udpcontext_from_sslctl(udpcontext=>$udpcontext,select=>$select); + } else { + print "Something is wrong in udp process (search xcatd for this string)\n"; + } + } } foreach my $pkey (keys %packets) { - ($sport,$client) = sockaddr_in($packets{$pkey}->[0]); + my $saddr = $packets{$pkey}->[0]; + my $clientn; + my $clientip; + if ($inet6support) { + ($client,$sport) = Socket6::getnameinfo($saddr); + ($clientip,$sport) = Socket6::getnameinfo($saddr,Socket6::NI_NUMERICHOST()); + if ($clientip =~ /::ffff:.*\..*\./) { + $clientip =~ s/^::ffff://; + } + if ($client =~ /::ffff:.*\..*\./) { + $client =~ s/^::ffff://; + } + } else { + ($sport,$clientn) = sockaddr_in($saddr); + $clientip = inet_ntoa($clientn); + $client=gethostbyaddr($clientn,AF_INET); + } $data=$packets{$pkey}->[1]; - $peerhost=gethostbyaddr($client,AF_INET); - $peerhost .="\n"; if ($data =~ /^\037\213/) { #per rfc 1952, these two bytes are gzip, and they are invalid for #xcatrequest xml, so go ahead and decompress it my $bigdata; IO::Uncompress::Gunzip::gunzip(\$data,\$bigdata); $data = $bigdata } + if ($data =~ /^undef,ForceArray=>1) }; - if ($req and $req->{command} and ($req->{command}->[0] eq "findme")) { - $req->{'_xcat_clienthost'}=gethostbyaddr($client,AF_INET); - $req->{'_xcat_clientip'}=inet_ntoa($client); + if ($req and $req->{command} and ($req->{command}->[0] eq "findme" and $sport < 1000)) { #only consider priveleged port requests to start with + $req->{'_xcat_clienthost'}=$client; + $req->{'_xcat_clientip'}=$clientip; $req->{'_xcat_clientport'}=$sport; - if (defined($cmd_handlers{"findme"}) and xCAT::NetworkUtils->nodeonmynet(inet_ntoa($client))) { #only discover from ips that appear to be on a managed network + if (defined($cmd_handlers{"findme"}) and xCAT::NetworkUtils->nodeonmynet($clientip)) { #only discover from ips that appear to be on a managed network xCAT::MsgUtils->message("S","xcatd: Processing discovery request from ".$req->{'_xcat_clientip'}); $req->{cacheonly}->[0] = 1; plugin_command($req,undef,\&build_response); if ($req->{cacheonly}->[0]) { delete $req->{cacheonly}; plugin_command($req,undef,\&build_response); - #if ($req) { - # $req->{cacheonly}->[0] = 1; - # $req->{checkallmacs}->[0] = 1; - # plugin_command($req,undef,\&convey_response); - # } - } } else { - xCAT::MsgUtils->message("S","xcatd: Skipping discovery from ".inet_ntoa($client)." because we either have no discovery plugins or the client address does not match an IP network that xCAT is managing"); + xCAT::MsgUtils->message("S","xcatd: Skipping discovery from ".$client." because we either have no discovery plugins or the client address does not match an IP network that xCAT is managing"); } } + } else { # for *now*, we'll do a tiny YAML subset + if ($data =~ /^resourcerequest: xcatd$/) { + $tcclients->{$pkey}={ sockaddr=>$packets{$pkey}->[0], timestamp=>time() } + } + } # JSON maybe one day if important if ($quit) { last; } - while ($select->can_read(0)) { #grab any incoming requests during run - $part = $socket->recv($data,1500); - ($sport,$client) = sockaddr_in($part); - $packets{inet_ntoa($client)} = [$part,$data]; + while (@hdls = $select->can_read(0)) { #grab any incoming requests during run + foreach my $hdl (@hdls) { + if ($hdl == $socket) { + $part = $socket->recv($data,1500); + $packets{$part} = [$part,$data]; + } elsif ($hdl == $sslctl) { + update_udpcontext_from_sslctl(udpcontext=>$udpcontext,select=>$select); + } + } } #Some of those 'future' packets might be stale dupes of this packet, so... delete $packets{$pkey}; #Delete any duplicates of current packet } if ($quit) { last; } + grant_tcrequests($tcclients,$udpcontext); } }; if ($@) { @@ -695,6 +764,7 @@ if (defined $pid_init) { close($writepipe); %cmd_handlers = %{fd_retrieve($readpipe)}; } else { + $$progname = "xcatd: plugin initialization"; scan_plugins($writepipe); exit(0); } @@ -730,17 +800,20 @@ sub generic_reaper { sub ssl_reaper { local($!); + my $numdone = 0; while (($CHILDPID=waitpid(-1,WNOHANG)) > 0) { if ($immediatechildren{$CHILDPID}) { delete $immediatechildren{$CHILDPID}; + $sslclients--; + $numdone--; } - $sslclients--; } + store_fd({clientcountchange=>$numdone,sslclientcount=>$sslclients},$udpctl); #notify udp service of how many clients are active $SIG{CHLD} = \&ssl_reaper; } sub dispatch_reaper { - local($!); + local($!); while (($CHILDPID =waitpid(-1, WNOHANG)) > 0) { if ($dispatched_children{$CHILDPID}) { delete $dispatched_children{$CHILDPID}; @@ -789,16 +862,24 @@ $SIG{TERM} = $SIG{INT} = sub { alarm(2); }; +socketpair($sslctl, $udpctl,AF_UNIX,SOCK_STREAM,PF_UNSPEC); +my $prevfh = select($udpctl); +$|=1; +select($sslctl); +$|=1; +select($prevfh); $pid_UDP = xCAT::Utils->xfork; if (! defined $pid_UDP) { xCAT::MsgUtils->message("S", "Unable to fork for UDP/TCP"); die; } unless ($pid_UDP) { + close($udpctl); $$progname="xcatd: UDP listener"; do_udp_service; xexit(0); } +close($sslctl); $pid_MON = xCAT::Utils->xfork; if (! defined $pid_MON) { xCAT::MsgUtils->message("S", "Unable to fork installmonitor"); @@ -806,6 +887,7 @@ if (! defined $pid_MON) { } unless ($pid_MON) { $$progname="xcatd: install monitor"; + close($udpctl); do_installm_service; xexit(0); } @@ -946,6 +1028,7 @@ until ($quit) { } if ($child == 0) { + close($udpctl); $SIG{TERM} = $SIG{INT} = {}; $SIG{CHLD} = \&generic_reaper; #THROTTLE $listener->close; @@ -1023,6 +1106,7 @@ if ($inet6support) { xexit(0); } $sslclients++; #THROTTLE + store_fd({clientcountchange=>1,sslclientcount=>$sslclients},$udpctl); #notify udp service of how many clients are active $cnnection->close(); } if (open($mainpidfile,"<","/var/run/xcat/mainservice.pid")) { @@ -1647,7 +1731,7 @@ sub dispatch_request { #flock($dlock,LOCK_UN); if ($errstr) { if ($numdests == 1) { - dispatch_callback({error=>["Unable to dispatch hierarchical sub-command to ".$ENV{XCATHOST}.". Error: $errstr."],errorcode=>[1]}); + dispatch_callback({error=>["Unable to dispatch hierarchical sub-command to ".$ENV{XCATHOST}.". This service node may be down or its xcatd daemon may not be responding."],errorcode=>[1]}); xCAT::MsgUtils->message("S","Error dispatching request to ".$ENV{XCATHOST}.": ".$errstr); } else { xCAT::MsgUtils->message("S","Error dispatching request to ".$ENV{XCATHOST}.", trying other service nodes: ".$errstr); @@ -1861,7 +1945,7 @@ sub service_connection { my $bytesread; do { $bytesread=sysread($sock,$line,65536,length($line)) } while ($bytesread); if (length($line)==0) { - if (not defined $bytesread and (($! == EAGAIN) or ($! == ECHILD))) { next; } # + if (not defined $bytesread and (($! == EAGAIN) or ($! == ECHILD))) { next; } # last; } $flags=fcntl($sock,F_GETFL,0); @@ -2014,9 +2098,9 @@ sub send_pending_responses { $blocks += 1; } foreach (0..$blocks) { - do { - syswrite($sock,$resp,4096,$_*4096); - } while (($! == EAGAIN) or ($! == ECHILD)); + do { + syswrite($sock,$resp,4096,$_*4096); + } while (($! == EAGAIN) or ($! == ECHILD)); } }; } diff --git a/xCAT/postscripts/remoteshell b/xCAT/postscripts/remoteshell index c8a7553fa..151208a00 100755 --- a/xCAT/postscripts/remoteshell +++ b/xCAT/postscripts/remoteshell @@ -21,6 +21,8 @@ if [ "$(uname -s)" = "AIX" ]; then logger -t xcat -p local4.info "Install: On AIX , remoteshell calling aixremoteshell -d " exit 0 fi +master=$MASTER + if [ -r /etc/ssh/sshd_config ] then logger -t xcat -p local4.info "remoteshell: setup /etc/ssh/sshd_config and ssh_config" @@ -46,7 +48,7 @@ then echo "StrictHostKeyChecking no" >> /etc/ssh/ssh_config fi - +xcatpost="xcatpost" if [ -d /xcatpost/_ssh ] then logger -p local4.info -t xcat "Install: setup root .ssh" @@ -69,13 +71,11 @@ if [ ! -x /usr/bin/openssl ]; then fi allowcred.awk & CREDPID=$! -sleep 1 -if [ "$(uname -s)" = "AIX" ]; then - AIX=1 - export AIX - logger -t xcat -p local4.info "remoteshell setting up AIX" -fi + +#first contact daemon xcatflowrequest 3001 +logger -t xCAT -p local4.info "xcatdsklspost: /$xcatpost/xcatflowrequest $master 3001" +/$xcatpost/xcatflowrequest $master 3001 getcredentials.awk ssh_dsa_hostkey | grep -E -v '|' | sed -e 's/<//' -e 's/&/&/' -e 's/"/"/' -e "s/'/'/" > /tmp/ssh_dsa_hostkey #check the message is an error or not @@ -86,9 +86,9 @@ if [ $? -ne 0 ]; then logger -t xCAT -p local4.info ssh_dsa_hostkey MYCONT=`cat /etc/ssh/ssh_host_dsa_key` while [ -z "$MYCONT" ]; do - let SLI=$RANDOM%10 - let SLI=SLI+10 - sleep $SLI + #first contact daemon xcatflowrequest 3001 + logger -t xCAT -p local4.info "xcatdsklspost: /$xcatpost/xcatflowrequest $master 3001" + /$xcatpost/xcatflowrequest $master 3001 getcredentials.awk ssh_dsa_hostkey | grep -v '<'|sed -e 's/<//' -e 's/&/&/' -e 's/"/"/' -e "s/'/'/" > /etc/ssh/ssh_host_dsa_key MYCONT=`cat /etc/ssh/ssh_host_dsa_key` done @@ -107,6 +107,9 @@ else fi rm /tmp/ssh_dsa_hostkey +# first contact daemon xcatflowrequest 3001 + logger -t xCAT -p local4.info "xcatdsklspost: /$xcatpost/xcatflowrequest $master 3001" +/$xcatpost/xcatflowrequest $master 3001 getcredentials.awk ssh_rsa_hostkey | grep -E -v '|' | sed -e 's/<//' -e 's/&/&/' -e 's/"/"/' -e "s/'/'/" > /tmp/ssh_rsa_hostkey #check whether the message is an error or not @@ -117,9 +120,10 @@ if [ $? -ne 0 ]; then logger -t xCAT -p local4.info ssh_rsa_hostkey MYCONT=`cat /etc/ssh/ssh_host_rsa_key` while [ -z "$MYCONT" ]; do - let SLI=$RANDOM%10 - let SLI=SLI+10 - sleep $SLI + # first contact daemon xcatflowrequest 3001 + logger -t xCAT -p local4.info "xcatdsklspost: /$xcatpost/xcatflowrequest $master 3001" + /$xcatpost/xcatflowrequest $master 3001 + getcredentials.awk ssh_rsa_hostkey | grep -v '<'|sed -e 's/<//' -e 's/&/&/' -e 's/"/"/' -e "s/'/'/" > /etc/ssh/ssh_host_rsa_key MYCONT=`cat /etc/ssh/ssh_host_rsa_key` done @@ -150,6 +154,9 @@ mkdir -p /root/.ssh/ sleep 1 if [ $ENABLESSHBETWEENNODES = "YES" ]; then + #first contact daemon xcatflowrequest 3001 + logger -t xCAT -p local4.info "xcatdsklspost: /$xcatpost/xcatflowrequest $master 3001" + /$xcatpost/xcatflowrequest $master 3001 getcredentials.awk ssh_root_key | grep -E -v '|'|sed -e 's/<//' -e 's/&/&/' -e 's/"/"/' -e "s/'/'/" > /tmp/ssh_root_key #check whether the message is an error or not @@ -160,9 +167,9 @@ then logger -t xCAT -p local4.info ssh_root_key MYCONT=`cat /root/.ssh/id_rsa` while [ -z "$MYCONT" ]; do - let SLI=$RANDOM%10 - let SLI=SLI+10 - sleep $SLI + # first contact daemon xcatflowrequest 3001 + logger -t xCAT -p local4.info "xcatdsklspost: /$xcatpost/xcatflowrequest $master 3001" + /$xcatpost/xcatflowrequest $master 3001 getcredentials.awk ssh_root_key | grep -v '<'|sed -e 's/<//' -e 's/&/&/' -e 's/"/"/' -e "s/'/'/" > /root/.ssh/id_rsa MYCONT=`cat /root/.ssh/id_rsa` done diff --git a/xCAT/postscripts/xcatdsklspost b/xCAT/postscripts/xcatdsklspost index 62d795f02..1bce0ef3b 100755 --- a/xCAT/postscripts/xcatdsklspost +++ b/xCAT/postscripts/xcatdsklspost @@ -3,8 +3,8 @@ ##################################################### # # Generic xCAT post script for diskless nodes -# The syntax of this script -# xcatdsklspost {mode} {-m|-M} [postscripts] +# The syntax of this script: +# xcatdsklspost {mode} {-m|-M} [postscripts] --tftp /tftpboot --installdir /install --nfsv4 no -c -V # This script is called in the following different places: # updatenode -P ... --> xcatdsklspost 1 -m/-M ... # updatenode -S --> xcatdsklspost 2 -m/-M otherpkgs @@ -169,15 +169,19 @@ else fi fi if [ $ARGNUM -gt 10 ]; then - if [ $11 = "-c" ]; then + if [ ${11} = "-c" ]; then CFLAG=${11} fi fi + if [ $ARGNUM -gt 11 ]; then + if [ ${12} = "-V" ]; then + export VERBOSE=1 + fi + fi ;; 3|4|6) MODE=$1;; esac fi - # set the default path for the xcatpost directory xcatpost="/xcatpost" # Check for debug mode and you have nodename available you can change the path for debug @@ -204,7 +208,7 @@ if [ ! `uname` = Linux ]; then rmdir /xcatmnt logger -t xCAT -p local4.err "Running xcataixpost $*" echo "/$xcatpost/xcataixpost $1 $2 $3 '"$4"' $5 $6 $7 $8 $9 ${10} ${11}" - exec /$xcatpost/xcataixpost $1 $2 $3 "$4" $5 $6 $7 $8 $9 ${10} ${11} + exec /$xcatpost/xcataixpost $1 $2 $3 "$4" $5 $6 $7 $8 $9 ${10} ${11} exit fi @@ -470,6 +474,9 @@ fi # We need to call getpostscript.awk . if [ ! -x /$xcatpost/mypostscript ]; then + # first contact daemon xcatflowrequest 3001 + logger -t xCAT -p local4.info "xcatdsklspost: xcatflowrequest $SIP 3001" +/$xcatpost/xcatflowrequest $SIP 3001 /$xcatpost/getpostscript.awk | egrep '' | sed -e 's/<[^>]*>//g'|egrep -v '^ *$'|sed -e 's/^ *//' > /$xcatpost/mypostscript; MYCONT=`grep MASTER /$xcatpost/mypostscript` MAX_RETRIES=10 @@ -481,9 +488,10 @@ if [ ! -x /$xcatpost/mypostscript ]; then break fi - SLI=$(awk 'BEGIN{srand(); printf("%d\n",rand()*10)}') - SLI=$((10 + $SLI)) - sleep $SLI + # contact daemon xcatflowrequest 3001 + logger -t xCAT -p local4.info "xcatdsklspost: xcatflowrequest $SIP 3001" + /$xcatpost/xcatflowrequest $SIP 3001 + /$xcatpost/getpostscript.awk | sed -e 's/<[^>]*>//g'|egrep -v '^ *$'|sed -e 's/^ *//' > /$xcatpost/mypostscript; MYCONT=`grep MASTER /$xcatpost/mypostscript` if [ ! -z "$MYCONT" ]; then