Implement udp request (but no reply yet)

git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@15877 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd
This commit is contained in:
jbjohnso 2013-04-08 17:36:57 +00:00
parent b9e587e703
commit 4204e9070e

View File

@ -50,6 +50,7 @@ use xCAT::Client qw(submit_request);
my $clientselect = new IO::Select;
my $sslclients = 0; #THROTTLE
my $maxsslclients = 64; #default
my $batchclients = 50;
my @deferredmsgargs; # hold argumentlist for MsgUtils call until after fork
#parallelizing logging overhead with real work
@ -176,6 +177,8 @@ if ($tmp) {
}
($tmp) = $sitetab->getAttribs({'key'=>'xcatmaxconnections'},'value');
if ($tmp and $tmp->{value}) { $maxsslclients = $tmp->{value}; }
($tmp) = $sitetab->getAttribs({'key'=>'xcatmaxbatchconnections'},'value');
if ($tmp and $tmp->{value}) { $batchclients = $tmp->{value}; }
my $plugins_dir=$::XCATROOT.'/lib/perl/xCAT_plugin';
@ -481,6 +484,36 @@ if ($inet6support) {
}
}
sub update_udpcontext_from_sslctl {
my %args = @_;
my $udpcontext = $args{udpcontext};
my $select = $args{select};
my $msg;
eval { $msg = fd_retrieve($sslctl); };
if ($msg) {
#remember new count and, if new connection and we have a fudge factor, decrese fudge factor optimisticly assuming it's the right one
$udpcontext->{sslclientcount} = $msg->{sslclientcount};
if ($udpcontext->{clientfudge} and $msg->{clientcountchange} > 0) { $udpcontext->{clientfudge} -= $msg->{clientcountchange}; }
} else {
$select->remove($sslctl); close($sslctl); #something went horribly wrong
}
}
sub grant_tcrequests {
my $requestors = shift;
my $udpcontext = shift;
my $availableslots = $batchclients;
$availableslots -= $udpcontext->{clientfudge}; #value that forecasts the pressure
$availableslots -= $udpcontext->{sslclientcount}; #subtract all currently really active sessions
my $oldtime = time()-180; #drop requests older than three minutes if still around
foreach my $rkey (keys %{$requestors}) {
if ($requestors->{$rkey}->{timestamp} < $oldtime) { delete $requestors->{$rkey}; next; }
unless ($availableslots > 0) { next; } # no slots, ignore requests for now
$udpcontext->{clientfudge}+=1; #adjust forecast for being busy
$availableslots-=1;
$udpcontext->{socket}->send("resourcerequest: ok",$requestors->{$rkey}->{sockaddr});
}
}
sub do_udp_service { #This function opens up a UDP port
#It will do similar to the standard service, except:
@ -491,6 +524,9 @@ sub do_udp_service { #This function opens up a UDP port
#Explicitly, to handle whatever operations nodes periodically send during discover state
#Could be used for heartbeating and such as desired
$dispatch_requests=0;
my $udpcontext;
$udpcontext->{clientfudge}=0;
$udpcontext->{sslclientcount}=0;
my $udppidfile;
my $retry=1;
my $socket;
@ -504,7 +540,7 @@ sub do_udp_service { #This function opens up a UDP port
if ($inet6support) {
$socket = IO::Socket::INET6->new(LocalPort => $port,
Proto => 'udp',
Domain => AF_INET);
);
} else {
$socket = IO::Socket::INET->new(LocalPort => $port,
Proto => 'udp',
@ -524,7 +560,7 @@ while (not $socket and $retry) {
if ($inet6support) {
$socket = IO::Socket::INET6->new(LocalPort => $port,
Proto => 'udp',
Domain => AF_INET);
);
} else {
$socket = IO::Socket::INET->new(LocalPort => $port,
Proto => 'udp',
@ -544,6 +580,7 @@ sleep 0.05;
print $udppidfile $$;
close($udppidfile);
$select->add($socket);
$udpcontext->{socket} = $socket;
$select->add($sslctl);
my $data;
my $part;
@ -570,37 +607,31 @@ sleep 0.05;
foreach $hdl (@hdls) {
if ($hdl == $socket) {
$part = $socket->recv($data,1500);
($sport,$client) = sockaddr_in($part);
if ($sport < 1000) { #Only remember udp packets from privileged ports
$packets{inet_ntoa($client)} = [$part,$data];
}
$packets{$part} = [$part,$data];
} elsif ($hdl == $sslctl) {
my $msg;
eval { $msg = fd_retrieve($sslctl); };
if ($msg) {
} else {
$select->remove($hdl); close($hdl); #something went horribly wrong
}
update_udpcontext_from_sslctl(udpcontext=>$udpcontext,select=>$select);
} else {
print "Something is wrong in udp process (search xcatd for this string)\n";
}
}
}
my $tcclients; # hash reference to store traffic control requests
foreach my $pkey (keys %packets) {
if ($inet6support) {
($sport,$client) = Socket6::unpack_sockaddr_in6($packets{$pkey}->[0]);
} else {
($sport,$client) = sockaddr_in($packets{$pkey}->[0]);
}
$data=$packets{$pkey}->[1];
$peerhost=gethostbyaddr($client,AF_INET);
$peerhost .="\n";
if ($data =~ /^\037\213/) { #per rfc 1952, these two bytes are gzip, and they are invalid for
#xcatrequest xml, so go ahead and decompress it
my $bigdata;
IO::Uncompress::Gunzip::gunzip(\$data,\$bigdata);
$data = $bigdata
}
if ($data !~ /^<xcat/) { #xml format
if ($data =~ /^<xcat/) { #xml format
my $req = eval { XMLin($data, SuppressEmpty=>undef,ForceArray=>1) };
if ($req and $req->{command} and ($req->{command}->[0] eq "findme")) {
if ($req and $req->{command} and ($req->{command}->[0] eq "findme" and $sport < 1000)) { #only consider priveleged port requests to start with
$req->{'_xcat_clienthost'}=gethostbyaddr($client,AF_INET);
$req->{'_xcat_clientip'}=inet_ntoa($client);
$req->{'_xcat_clientport'}=$sport;
@ -611,30 +642,32 @@ sleep 0.05;
if ($req->{cacheonly}->[0]) {
delete $req->{cacheonly};
plugin_command($req,undef,\&build_response);
#if ($req) {
# $req->{cacheonly}->[0] = 1;
# $req->{checkallmacs}->[0] = 1;
# plugin_command($req,undef,\&convey_response);
# }
}
} else {
xCAT::MsgUtils->message("S","xcatd: Skipping discovery from ".inet_ntoa($client)." because we either have no discovery plugins or the client address does not match an IP network that xCAT is managing");
}
}
} else { # for *now*, we'll do a tiny YAML subset
if ($data =~ /^resourcerequest: xcatd$/) {
$tcclients->{$pkey}={ sockaddr=>$packets{$pkey}->[0], timestamp=>time() }
}
} # JSON maybe one day if important
}
if ($quit) { last; }
while ($select->can_read(0)) { #grab any incoming requests during run
$part = $socket->recv($data,1500);
($sport,$client) = sockaddr_in($part);
$packets{inet_ntoa($client)} = [$part,$data];
while (@hdls = $select->can_read(0)) { #grab any incoming requests during run
foreach my $hdl (@hdls) {
if ($hdl == $socket) {
$part = $socket->recv($data,1500);
$packets{$part} = [$part,$data];
} elsif ($hdl == $sslctl) {
update_udpcontext_from_sslctl(udpcontext=>$udpcontext,select=>$select);
}
}
}
#Some of those 'future' packets might be stale dupes of this packet, so...
delete $packets{$pkey}; #Delete any duplicates of current packet
}
if ($quit) { last; }
grant_tcrequests($tcclients,$udpcontext);
}
};
if ($@) {