Break discovery out to it's own worker
git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@15957 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd
This commit is contained in:
parent
f54c5dfb36
commit
7d7bfb00b4
@ -516,6 +516,60 @@ sub grant_tcrequests {
|
||||
}
|
||||
}
|
||||
|
||||
sub do_discovery_process {
|
||||
my %args =@_;
|
||||
my $broker = $args{broker};
|
||||
my $quit=0;
|
||||
my $vintage = time();
|
||||
populate_site_hash();
|
||||
while (not $quit) {
|
||||
if ((time()-$vintage)> 15) { populate_site_hash(); } #site table reread every 15 second
|
||||
my $msg = fd_retrieve($broker);
|
||||
my $data;
|
||||
my $client;
|
||||
my $clientn;
|
||||
my $clientip;
|
||||
if (ref $msg eq 'HASH') { $data = $msg->{data}; } else { die "incorrect code to disco"; }
|
||||
my $saddr = $msg->{sockaddr};
|
||||
if ($inet6support) {
|
||||
($client,$sport) = Socket6::getnameinfo($saddr);
|
||||
($clientip,$sport) = Socket6::getnameinfo($saddr,Socket6::NI_NUMERICHOST());
|
||||
if ($clientip =~ /::ffff:.*\..*\./) {
|
||||
$clientip =~ s/^::ffff://;
|
||||
}
|
||||
if ($client =~ /::ffff:.*\..*\./) {
|
||||
$client =~ s/^::ffff://;
|
||||
}
|
||||
} else {
|
||||
($sport,$clientn) = sockaddr_in($saddr);
|
||||
$clientip = inet_ntoa($clientn);
|
||||
$client=gethostbyaddr($clientn,AF_INET);
|
||||
}
|
||||
if ($data =~ /^\037\213/) { #per rfc 1952, these two bytes are gzip, and they are invalid for
|
||||
#xcatrequest xml, so go ahead and decompress it
|
||||
my $bigdata;
|
||||
IO::Uncompress::Gunzip::gunzip(\$data,\$bigdata);
|
||||
$data = $bigdata
|
||||
}
|
||||
my $req = eval { XMLin($data, SuppressEmpty=>undef,ForceArray=>1) };
|
||||
if ($req and $req->{command} and ($req->{command}->[0] eq "findme" and $sport < 1000)) { #only consider priveleged port requests to start with
|
||||
$req->{'_xcat_clienthost'}=$client;
|
||||
$req->{'_xcat_clientip'}=$clientip;
|
||||
$req->{'_xcat_clientport'}=$sport;
|
||||
if (defined($cmd_handlers{"findme"}) and xCAT::NetworkUtils->nodeonmynet($clientip)) { #only discover from ips that appear to be on a managed network
|
||||
xCAT::MsgUtils->message("S","xcatd: Processing discovery request from ".$req->{'_xcat_clientip'});
|
||||
$req->{cacheonly}->[0] = 1;
|
||||
plugin_command($req,undef,\&build_response);
|
||||
if ($req->{cacheonly}->[0]) {
|
||||
delete $req->{cacheonly};
|
||||
plugin_command($req,undef,\&build_response);
|
||||
}
|
||||
} else {
|
||||
xCAT::MsgUtils->message("S","xcatd: Skipping discovery from ".$client." because we either have no discovery plugins or the client address does not match an IP network that xCAT is managing");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sub do_udp_service { #This function opens up a UDP port
|
||||
#It will do similar to the standard service, except:
|
||||
#-Obviously, unencrypted and messages are not guaranteed
|
||||
@ -524,6 +578,8 @@ sub do_udp_service { #This function opens up a UDP port
|
||||
#Also, this throttles to handle one message at a time, so no forking either
|
||||
#Explicitly, to handle whatever operations nodes periodically send during discover state
|
||||
#Could be used for heartbeating and such as desired
|
||||
my %args=@_;
|
||||
my $discoctl = $args{discoctl};
|
||||
$dispatch_requests=0;
|
||||
my $udpcontext;
|
||||
$udpcontext->{clientfudge}=0;
|
||||
@ -583,6 +639,7 @@ sleep 0.05;
|
||||
$select->add($socket);
|
||||
$udpcontext->{socket} = $socket;
|
||||
$select->add($sslctl);
|
||||
$select->add($discoctl);
|
||||
my $data;
|
||||
my $part;
|
||||
my $sport;
|
||||
@ -612,6 +669,7 @@ sleep 0.05;
|
||||
$packets{$part} = [$part,$data];
|
||||
} elsif ($hdl == $sslctl) {
|
||||
update_udpcontext_from_sslctl(udpcontext=>$udpcontext,select=>$select);
|
||||
} elsif ($hdl == $discoctl) { #got a discovery response....
|
||||
} else {
|
||||
print "Something is wrong in udp process (search xcatd for this string)\n";
|
||||
}
|
||||
@ -619,47 +677,11 @@ sleep 0.05;
|
||||
}
|
||||
foreach my $pkey (keys %packets) {
|
||||
my $saddr = $packets{$pkey}->[0];
|
||||
my $clientn;
|
||||
my $clientip;
|
||||
if ($inet6support) {
|
||||
($client,$sport) = Socket6::getnameinfo($saddr);
|
||||
($clientip,$sport) = Socket6::getnameinfo($saddr,Socket6::NI_NUMERICHOST());
|
||||
if ($clientip =~ /::ffff:.*\..*\./) {
|
||||
$clientip =~ s/^::ffff://;
|
||||
}
|
||||
if ($client =~ /::ffff:.*\..*\./) {
|
||||
$client =~ s/^::ffff://;
|
||||
}
|
||||
} else {
|
||||
($sport,$clientn) = sockaddr_in($saddr);
|
||||
$clientip = inet_ntoa($clientn);
|
||||
$client=gethostbyaddr($clientn,AF_INET);
|
||||
}
|
||||
$data=$packets{$pkey}->[1];
|
||||
if ($data =~ /^\037\213/) { #per rfc 1952, these two bytes are gzip, and they are invalid for
|
||||
#xcatrequest xml, so go ahead and decompress it
|
||||
my $bigdata;
|
||||
IO::Uncompress::Gunzip::gunzip(\$data,\$bigdata);
|
||||
$data = $bigdata
|
||||
}
|
||||
if ($data =~ /^<xcat/) { #xml format
|
||||
my $req = eval { XMLin($data, SuppressEmpty=>undef,ForceArray=>1) };
|
||||
if ($req and $req->{command} and ($req->{command}->[0] eq "findme" and $sport < 1000)) { #only consider priveleged port requests to start with
|
||||
$req->{'_xcat_clienthost'}=$client;
|
||||
$req->{'_xcat_clientip'}=$clientip;
|
||||
$req->{'_xcat_clientport'}=$sport;
|
||||
if (defined($cmd_handlers{"findme"}) and xCAT::NetworkUtils->nodeonmynet($clientip)) { #only discover from ips that appear to be on a managed network
|
||||
xCAT::MsgUtils->message("S","xcatd: Processing discovery request from ".$req->{'_xcat_clientip'});
|
||||
$req->{cacheonly}->[0] = 1;
|
||||
plugin_command($req,undef,\&build_response);
|
||||
if ($req->{cacheonly}->[0]) {
|
||||
delete $req->{cacheonly};
|
||||
plugin_command($req,undef,\&build_response);
|
||||
}
|
||||
} else {
|
||||
xCAT::MsgUtils->message("S","xcatd: Skipping discovery from ".$client." because we either have no discovery plugins or the client address does not match an IP network that xCAT is managing");
|
||||
}
|
||||
}
|
||||
store_fd({data=>$data,sockaddr=>$saddr},$discoctl); #for now, punt the gunzip to the worker process
|
||||
} elsif ($data =~ /^<xcat/) { #xml format
|
||||
store_fd({data=>$data,sockaddr=>$saddr},$discoctl);
|
||||
} else { # for *now*, we'll do a tiny YAML subset
|
||||
if ($data =~ /^resourcerequest: xcatd$/) {
|
||||
$tcclients->{$pkey}={ sockaddr=>$packets{$pkey}->[0], timestamp=>time() }
|
||||
@ -872,7 +894,23 @@ if (! defined $pid_UDP) {
|
||||
unless ($pid_UDP) {
|
||||
close($udpctl);
|
||||
$$progname="xcatd: UDP listener";
|
||||
do_udp_service;
|
||||
my $pid_disco;
|
||||
my $discoctl;
|
||||
my $udpbroker;
|
||||
socketpair($discoctl,$udpbroker,AF_UNIX,SOCK_STREAM,PF_UNSPEC);
|
||||
$pid_disco = xCAT::Utils->xfork;
|
||||
if (!defined $pid_disco) {
|
||||
xCAT::MsgUtils->message("S", "Unable to fork for UDP/TCP");
|
||||
die;
|
||||
}
|
||||
unless ($pid_disco) { #this is the child, therefore the discovery process..
|
||||
close($discoctl);
|
||||
$$progname="xcatd: Discovery worker";
|
||||
do_discovery_process(broker=>$udpbroker);
|
||||
xexit(0);
|
||||
}
|
||||
close($udpbroker);
|
||||
do_udp_service(discoctl=>$discoctl);
|
||||
xexit(0);
|
||||
}
|
||||
close($sslctl);
|
||||
|
Loading…
x
Reference in New Issue
Block a user