From 4204e9070ee0e35c4ed8d3fad66c27b68cbfb67b Mon Sep 17 00:00:00 2001 From: jbjohnso Date: Mon, 8 Apr 2013 17:36:57 +0000 Subject: [PATCH] Implement udp request (but no reply yet) git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@15877 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd --- xCAT-server/sbin/xcatd | 89 +++++++++++++++++++++++++++++------------- 1 file changed, 61 insertions(+), 28 deletions(-) diff --git a/xCAT-server/sbin/xcatd b/xCAT-server/sbin/xcatd index 656443acd..9652b07eb 100755 --- a/xCAT-server/sbin/xcatd +++ b/xCAT-server/sbin/xcatd @@ -50,6 +50,7 @@ use xCAT::Client qw(submit_request); my $clientselect = new IO::Select; my $sslclients = 0; #THROTTLE my $maxsslclients = 64; #default +my $batchclients = 50; my @deferredmsgargs; # hold argumentlist for MsgUtils call until after fork #parallelizing logging overhead with real work @@ -176,6 +177,8 @@ if ($tmp) { } ($tmp) = $sitetab->getAttribs({'key'=>'xcatmaxconnections'},'value'); if ($tmp and $tmp->{value}) { $maxsslclients = $tmp->{value}; } +($tmp) = $sitetab->getAttribs({'key'=>'xcatmaxbatchconnections'},'value'); +if ($tmp and $tmp->{value}) { $batchclients = $tmp->{value}; } my $plugins_dir=$::XCATROOT.'/lib/perl/xCAT_plugin'; @@ -481,6 +484,36 @@ if ($inet6support) { } } +sub update_udpcontext_from_sslctl { + my %args = @_; + my $udpcontext = $args{udpcontext}; + my $select = $args{select}; + my $msg; + eval { $msg = fd_retrieve($sslctl); }; + if ($msg) { + #remember new count and, if new connection and we have a fudge factor, decrese fudge factor optimisticly assuming it's the right one + $udpcontext->{sslclientcount} = $msg->{sslclientcount}; + if ($udpcontext->{clientfudge} and $msg->{clientcountchange} > 0) { $udpcontext->{clientfudge} -= $msg->{clientcountchange}; } + } else { + $select->remove($sslctl); close($sslctl); #something went horribly wrong + } +} + +sub grant_tcrequests { + my $requestors = shift; + my $udpcontext = shift; + my $availableslots = $batchclients; + $availableslots -= $udpcontext->{clientfudge}; #value that forecasts the pressure + $availableslots -= $udpcontext->{sslclientcount}; #subtract all currently really active sessions + my $oldtime = time()-180; #drop requests older than three minutes if still around + foreach my $rkey (keys %{$requestors}) { + if ($requestors->{$rkey}->{timestamp} < $oldtime) { delete $requestors->{$rkey}; next; } + unless ($availableslots > 0) { next; } # no slots, ignore requests for now + $udpcontext->{clientfudge}+=1; #adjust forecast for being busy + $availableslots-=1; + $udpcontext->{socket}->send("resourcerequest: ok",$requestors->{$rkey}->{sockaddr}); + } +} sub do_udp_service { #This function opens up a UDP port #It will do similar to the standard service, except: @@ -491,6 +524,9 @@ sub do_udp_service { #This function opens up a UDP port #Explicitly, to handle whatever operations nodes periodically send during discover state #Could be used for heartbeating and such as desired $dispatch_requests=0; + my $udpcontext; + $udpcontext->{clientfudge}=0; + $udpcontext->{sslclientcount}=0; my $udppidfile; my $retry=1; my $socket; @@ -504,7 +540,7 @@ sub do_udp_service { #This function opens up a UDP port if ($inet6support) { $socket = IO::Socket::INET6->new(LocalPort => $port, Proto => 'udp', - Domain => AF_INET); + ); } else { $socket = IO::Socket::INET->new(LocalPort => $port, Proto => 'udp', @@ -524,7 +560,7 @@ while (not $socket and $retry) { if ($inet6support) { $socket = IO::Socket::INET6->new(LocalPort => $port, Proto => 'udp', - Domain => AF_INET); + ); } else { $socket = IO::Socket::INET->new(LocalPort => $port, Proto => 'udp', @@ -544,6 +580,7 @@ sleep 0.05; print $udppidfile $$; close($udppidfile); $select->add($socket); + $udpcontext->{socket} = $socket; $select->add($sslctl); my $data; my $part; @@ -570,37 +607,31 @@ sleep 0.05; foreach $hdl (@hdls) { if ($hdl == $socket) { $part = $socket->recv($data,1500); - ($sport,$client) = sockaddr_in($part); - if ($sport < 1000) { #Only remember udp packets from privileged ports - $packets{inet_ntoa($client)} = [$part,$data]; - } + $packets{$part} = [$part,$data]; } elsif ($hdl == $sslctl) { - my $msg; - eval { $msg = fd_retrieve($sslctl); }; - if ($msg) { - } else { - $select->remove($hdl); close($hdl); #something went horribly wrong - } - + update_udpcontext_from_sslctl(udpcontext=>$udpcontext,select=>$select); } else { print "Something is wrong in udp process (search xcatd for this string)\n"; } } } + my $tcclients; # hash reference to store traffic control requests foreach my $pkey (keys %packets) { + if ($inet6support) { + ($sport,$client) = Socket6::unpack_sockaddr_in6($packets{$pkey}->[0]); + } else { ($sport,$client) = sockaddr_in($packets{$pkey}->[0]); + } $data=$packets{$pkey}->[1]; - $peerhost=gethostbyaddr($client,AF_INET); - $peerhost .="\n"; if ($data =~ /^\037\213/) { #per rfc 1952, these two bytes are gzip, and they are invalid for #xcatrequest xml, so go ahead and decompress it my $bigdata; IO::Uncompress::Gunzip::gunzip(\$data,\$bigdata); $data = $bigdata } - if ($data !~ /^undef,ForceArray=>1) }; - if ($req and $req->{command} and ($req->{command}->[0] eq "findme")) { + if ($req and $req->{command} and ($req->{command}->[0] eq "findme" and $sport < 1000)) { #only consider priveleged port requests to start with $req->{'_xcat_clienthost'}=gethostbyaddr($client,AF_INET); $req->{'_xcat_clientip'}=inet_ntoa($client); $req->{'_xcat_clientport'}=$sport; @@ -611,30 +642,32 @@ sleep 0.05; if ($req->{cacheonly}->[0]) { delete $req->{cacheonly}; plugin_command($req,undef,\&build_response); - #if ($req) { - # $req->{cacheonly}->[0] = 1; - # $req->{checkallmacs}->[0] = 1; - # plugin_command($req,undef,\&convey_response); - # } - } } else { xCAT::MsgUtils->message("S","xcatd: Skipping discovery from ".inet_ntoa($client)." because we either have no discovery plugins or the client address does not match an IP network that xCAT is managing"); } } } else { # for *now*, we'll do a tiny YAML subset + if ($data =~ /^resourcerequest: xcatd$/) { + $tcclients->{$pkey}={ sockaddr=>$packets{$pkey}->[0], timestamp=>time() } + } } # JSON maybe one day if important - } if ($quit) { last; } - while ($select->can_read(0)) { #grab any incoming requests during run - $part = $socket->recv($data,1500); - ($sport,$client) = sockaddr_in($part); - $packets{inet_ntoa($client)} = [$part,$data]; + while (@hdls = $select->can_read(0)) { #grab any incoming requests during run + foreach my $hdl (@hdls) { + if ($hdl == $socket) { + $part = $socket->recv($data,1500); + $packets{$part} = [$part,$data]; + } elsif ($hdl == $sslctl) { + update_udpcontext_from_sslctl(udpcontext=>$udpcontext,select=>$select); + } + } } #Some of those 'future' packets might be stale dupes of this packet, so... delete $packets{$pkey}; #Delete any duplicates of current packet } if ($quit) { last; } + grant_tcrequests($tcclients,$udpcontext); } }; if ($@) {