2
0
mirror of https://github.com/xcat2/xcat-core.git synced 2025-06-15 19:01:44 +00:00

Refine the docker instance lifecycle management code with HTTP::Async

This commit is contained in:
ertaozh
2016-02-24 22:39:34 -05:00
parent 324d736ba5
commit 1e1e03aec6

View File

@ -19,22 +19,20 @@ use strict;
use POSIX qw(WNOHANG nice);
use POSIX qw(WNOHANG setsid :errno_h);
use Errno;
use IO::Select;
use MIME::Base64 qw(encode_base64);
require IO::Socket::SSL; IO::Socket::SSL->import('inet4');
use Time::HiRes qw(gettimeofday sleep);
use Fcntl qw/:DEFAULT :flock/;
use File::Path;
use File::Copy;
use File::Basename;
use Getopt::Long;
Getopt::Long::Configure("bundling");
use HTTP::Async;
use HTTP::Headers;
use HTTP::Request;
use HTTP::Response;
use xCAT::Utils;
use xCAT::MsgUtils;
use Getopt::Long;
use File::Basename;
use Cwd;
use xCAT::Usage;
use JSON;
@ -43,20 +41,42 @@ use JSON;
my $verbose;
my $global_callback;
my $select = IO::Select->new();
my $async;
#-------------------------------------------------------
=head3 The hash variable to store node related SSL connection and state information
=head3 The hash variable to store node related http request id
The structure is like this
%http_session_variable = (
$session_id => $node,
);
=cut
#-------------------------------------------------------
my %http_session_variable = ();
#-------------------------------------------------------
=head3 The hash variable to store node parameters to access docker container
The structure is like this
%node_hash_variable = (
$SSL_connection => {
node => $node,
state => $current_state,
state_machine_engine => $state_machine_for_the_node,
total_len => $total_len,
get_len => $get_len,
data_buf => $data,
$node => {
image=>$nodetype.provmethod,
cmd=>$nodetype.provmethod,
ip=>$host.ip,
mac=>$mac.mac,
cpu=>$vm.cpus
memory=>$vm.memory
flag=>$vm.othersettings,
hostinfo=>{
name => $host,
port => $port,
},
http_req_method => $init_method,
http_req_url => $node_init_url,
node_app_state => $init_state,
state_machine_engine => $state_machine_engine,
},
);
@ -65,40 +85,11 @@ my $select = IO::Select->new();
my %node_hash_variable = ();
#-------------------------------------------------------
=head3 The hash variable to store node parameters to create docker container
The structure is like this
%node_create_variable = (
$node => {
image=>$nodetype.provmethod,
cmd=>$nodetype.provmethod,
ip=>$host.ip,
mac=>$mac.mac,
cpu=>$vm.cpus
memory=>$vm.memory
flag=>$vm.othersettings
},
);
=cut
#-------------------------------------------------------
my %node_create_variable = ();
# The counter to record how many request have been send and responses are expected.
my $pending_res = 0;
# The counter to record concurrent openting SSL connection numbers
my $concurrent_ssl_sessions = 0;
# The function point used for mkdocker to generate http request, for other cmd it will point to &genreq;
my $genreq_ptr = \&genreq;
# The vairables below are used to update attributes
my $vmtab; # vm.othersettings
my $nodelisttab; # nodelist.status
my $nodetypetab; #nodetype.provmethod
# The num of HTTP requests that is progressing
my $http_requests_in_progress = 0;
#-------------------------------------------------------
@ -114,7 +105,7 @@ sub handled_commands {
rpower => 'nodehm:mgt',
mkdocker => 'nodehm:mgt',
rmdocker => 'nodehm:mgt',
lsdocker => 'nodehm:mgt=docker|ipmi',
lsdocker => 'nodehm:mgt=docker|ipmi|kvm',
} );
}
@ -262,7 +253,7 @@ sub http_state_code_info {
The state_machine_engine to deal with http response
Input:
$sockfd: The SSL connection from which the http response is returned
$id: The http session id when adding HTTP request into HTTP::Async object
$data: The http response
Return:
If there are any errors or msg, they will be outputed directly.
@ -275,213 +266,89 @@ sub http_state_code_info {
#-------------------------------------------------------
sub single_state_engine {
my $sockfd = shift;
my $id = shift;
my $data = shift;
if (!defined $node_hash_variable{$sockfd}) {
my $node = $http_session_variable{$id};
if (!defined($node)) {
return;
}
my $node_hash = $node_hash_variable{$node};
my $curr_state = $node_hash->{node_app_state};
my $info_flag = 'data';
my $get_another_pkg = 0;
my $node = $node_hash_variable{$sockfd}->{node};
my $curr_state = $node_hash_variable{$sockfd}->{state};
my $data_buf = $node_hash_variable{$sockfd}->{data_buf};
my $data_total_len = $node_hash_variable{$sockfd}->{total_len};
my $data_get_len = $node_hash_variable{$sockfd}->{get_len};
my $data_chunked = $node_hash_variable{$sockfd}->{chunked};
my @chunked_array = ();
# The code logic to deal with http response and state machine
#Need to Dumper to log file later
my $res = HTTP::Response->parse($data);
#print Dumper($res);
my $content = undef;
# Deal with the scenario that a http response is splited into multiple pkgs
unless ($res->code and $res->code =~ /\d{3}/) {
my $len = length($data);
if (defined($data_chunked)) {
$content = $data;
$res = HTTP::Response->parse($data_buf);
}
elsif (!defined($data_buf) or !defined($data_total_len) or !defined($data_get_len) or ($data_get_len + $len > $data_total_len)) {
$global_callback->({node=>[{name=>[$node],error=>["Incorrect data received"],errorcode=>[1]}]});
$concurrent_ssl_sessions--;
$select->remove($sockfd);
close($sockfd);
delete($node_hash_variable{$sockfd});
return;
}
else {
my $len = length($data);
if ($data_get_len + $len < $data_total_len) {
$node_hash_variable{$sockfd}->{get_len} += $len;
$node_hash_variable{$sockfd}->{data_buf} .= $data;
$pending_res++;
return;
}
else { # Exactly all the data are received
$res = HTTP::Response->parse($data_buf.$data);
delete $node_hash_variable{$sockfd}->{data_buf};
delete $node_hash_variable{$sockfd}->{total_len};
delete $node_hash_variable{$sockfd}->{get_len};
}
}
if ($data->is_error or (defined($data->header("connection")) and $data->header("connection") =~ /close/)) {
$http_requests_in_progress--;
delete($http_session_variable{$id});
}
if (!defined($content) and $res->content()) {
$content = $res->content();
}
my $get_content_len = length($content);
my $content_length = $res->header('content-length');
if (defined($content_length) and $get_content_len < $content_length) {
$node_hash_variable{$sockfd}->{data_buf} = $data;
$node_hash_variable{$sockfd}->{total_len} = $content_length;
$node_hash_variable{$sockfd}->{get_len} = $get_content_len;
$pending_res++;
return;
}
my $encoding_flag = $res->header('transfer-encoding');
if (defined($encoding_flag) and $encoding_flag eq 'chunked') {
$node_hash_variable{$sockfd}->{chunked} = 1;
$data_chunked = 1;
if ($get_content_len < 3) {
$node_hash_variable{$sockfd}->{data_buf} = $data;
$pending_res++;
return;
}
}
if (defined($data_chunked)) {
while (length($content)) {
my $split_pos = index($content, "\r\n");
my $length_string = substr($content, 0, $split_pos);
my $data_length = hex($length_string);
if ($data_length lt 2) {
if ($data_length eq 0) {
push @chunked_array, '0';
}
last;
}
push @chunked_array, $length_string;
push @chunked_array, substr($content, $split_pos + 2, $data_length);
$content = substr($content, $split_pos + 4 + $data_length);
}
}
my $content = $data->decoded_content;
my @msg = ();
$msg[0] = &http_state_code_info($res->code, $curr_state);
unless ($res->is_success) {
$msg[0] = &http_state_code_info($data->code, $curr_state);
if ($data->is_error) {
if ($content ne '') {
$msg[0]->[1] = "$content";
}
}
if ($curr_state eq "INIT_TO_WAIT_FOR_QUERY_STATE_DONE") {
if ($res->is_success) {
my $node_state = undef;
if ($data_chunked) {
my $length = shift @chunked_array;
while ($length) {
my $content_hash = decode_json (shift @chunked_array);
if (defined($content_hash->{'State'}->{'Status'})) {
$node_state = $content_hash->{'State'}->{'Status'};
last;
}
$length = shift @chunked_array;
}
if (!defined($node_state) and $length) {
$get_another_pkg = 1;
}
}
else {
my $content_hash = decode_json $content;
$node_state = $content_hash->{'State'}->{'Status'};
}
if (defined($node_state)) {
if ($nodelisttab) {
$nodelisttab->setNodeAttribs($node, {status=>$node_state});
}
$msg[0] = [0, $node_state];
}
elsif (!$get_another_pkg) {
$msg[0] = [1, "Can not get status"];
}
elsif ($data->message ne '') {
$msg[0]->[1] = $data->message;
}
elsif ($res->code eq '404') {
if ($nodelisttab) {
$nodelisttab->setNodeAttribs($node, {status=>''});
}
my $content_type = $data->header("content-type");
my $content_hash = undef;
if (defined($content_type) and $content_type =~ /json/i) {
$content_hash = decode_json $content;
}
elsif (!defined($content_type)) {
$content_type = "undefined";
}
if ($curr_state eq "INIT_TO_WAIT_FOR_QUERY_STATE_DONE") {
if ($data->is_success) {
if ($content_type =~ /json/i) {
my $node_state = $content_hash->{'State'}->{'Status'};
if (defined($node_state)) {
$msg[0] = [0, $node_state];
}
else {
$msg[0] = [1, "Can not get status"];
}
}
else {
$msg[0] = [1, "The content type: $content_type is unable to be parsed."];
}
}
}
elsif ($curr_state eq "INIT_TO_WAIT_FOR_QUERY_LOG_DONE") {
if (!$msg[0]->[0]) {
if ($data->is_success) {
$info_flag = "base64_data";
@msg = ();
if (defined($data_chunked)) {
my @data_array = ();
my $tmp_len = shift(@chunked_array);
while ($tmp_len and scalar(@chunked_array)) {
push @data_array, shift(@chunked_array);
$tmp_len = shift(@chunked_array);
}
if ($tmp_len ne 0) {
$get_another_pkg = 1;
}
if (scalar(@data_array)) {
my $string = join('', @data_array);
$msg[0] = [0, encode_base64($string)];
}
else {
$msg[0] = [0, encode_base64("No logs")];
}
if ($content_type =~ /text\/plain/i) {
$msg[0] = [0,encode_base64($content)];
}
else {
$msg[0] = [0, encode_base64($content)];
$msg[0] = [1, "The content type: $content_type is unable to be parsed."];
}
}
}
elsif ($curr_state eq "INIT_TO_WAIT_FOR_QUERY_DOCKER_DONE") {
if ($res->is_success) {
if ($data->is_success) {
@msg = ();
if (!defined($content_length) or ($content_length > 3)) {
if (defined($data_chunked)) {
my $tmp_entry = shift @chunked_array;
while ($tmp_entry and scalar(@chunked_array)) {
my $content_hash = decode_json (shift @chunked_array);
if (ref($content_hash) eq 'ARRAY') {
foreach (@$content_hash) {
push @msg, [0, parse_docker_list_info($_, 1)];
}
}
else {
push @msg, [0, parse_docker_list_info($content_hash, 0)];
}
$tmp_entry = shift @chunked_array;
}
if ($tmp_entry ne '0') {
$get_another_pkg = 1;
if ($content_type =~ /json/i) {
if (ref($content_hash) eq 'ARRAY') {
foreach (@$content_hash) {
push @msg, [0, parse_docker_list_info($_, 1)];
}
}
else {
my $content_hash = decode_json $content;
if (ref($content_hash) eq 'ARRAY') {
foreach (@$content_hash) {
push @msg, [0, parse_docker_list_info($_, 1)];
}
}
else {
push @msg, [0, parse_docker_list_info($content_hash, 0)];
}
push @msg, [0, parse_docker_list_info($content_hash, 0)];
}
}
else {
if (!scalar(@msg)) {
@msg = [0, "No running docker"];
}
}
}
elsif ($curr_state eq 'INIT_TO_WAIT_FOR_CREATE_DONE') {
if ($nodetypetab) {
$nodetypetab->setNodeAttribs($node,{provmethod=>"$node_create_variable{$node}->{image}:$node_create_variable{$node}->{cmd}"});
}
if ($vmtab) {
$vmtab->setNodeAttribs($node,{othersettings=>$node_create_variable{$node}->{flag}});
}
}
foreach my $tmp (@msg) {
@ -492,14 +359,7 @@ sub single_state_engine {
$global_callback->({node=>[{name=>[$node],"$info_flag"=>["$tmp->[1]"]}]});
}
}
if ($get_another_pkg) {
$pending_res++;
return;
}
$concurrent_ssl_sessions--;
$select->remove($sockfd);
close($sockfd);
delete($node_hash_variable{$sockfd});
return;
}
@ -580,7 +440,7 @@ sub parse_docker_list_info {
$created =~ s/\..*$//;
}
my $cmd = sprintf("\"%.20s\"", $command);
my $string = sprintf("%-12s %-30.30s %-22s %-20s %-10s %s", $id, $image, $cmd, $created, $status, $names);
my $string = sprintf("%-12s %-30.30s %-22s %-20s %-10s %s", $id, $image, $cmd, $created, $status, $names);
return($string);
}
@ -592,7 +452,7 @@ sub parse_docker_list_info {
Input:
%args: a hash which currently only key 'timeout' is using
Return:
The expected number of response which havn't been received
The number of response have received
Usage example:
=cut
@ -606,42 +466,16 @@ sub deal_with_rsp
if (defined($args{timeout})) {
$timeout = $args{timeout};
}
my @data = ();
if ($select->can_read($timeout)) {
my @ready_fds = $select->can_read(0);
foreach my $sockfd (@ready_fds) {
my $res = "";
my $node_hash = $node_hash_variable{$sockfd};
if (defined($node_hash)) {
while (1) {
my $readbytes = undef;
$readbytes = sysread($sockfd, $res, 65535, length($res));
if (!defined($readbytes)) {
if ($!{EAGAIN} or $!{EWOULDBLOCK}) {
$pending_res--;
last;
}
elsif ($!{EINTR} or $!{ENOTTY}) {
next;
}
else {
die "read failed: $!";
}
}
elsif ($readbytes == 0) {
$pending_res--;
last;
}
}
# readbytes UNDEF means a reading error, so print out a msg and parse the next SSL connection
push @data, [$node_hash->{state_machine_engine}, $sockfd, $res];
}
my $deal_num = 0;
while (my ($response, $id) = $async->wait_for_next_response($timeout)) {
my $node = $http_session_variable{$id};
if (defined($node)) {
$deal_num++;
$node_hash_variable{$node}->{state_machine_engine}->($id, $response);
}
}
foreach (@data) {
$_->[0]->($_->[1], $_->[2]);
}
return $pending_res;
return $deal_num;
}
#-------------------------------------------------------
@ -823,10 +657,8 @@ sub process_request {
my $init_url = undef;
my $init_state = undef;
my $state_machine_engine = undef;
my @nodeargs = ();
my @errornodes = ();
my $mapping_hash = undef;
my $max_concur_ssl_session_allow = 10; # A variable can be set by caculated in the future
my $max_concur_session_allow = 20; # A variable can be set by caculated in the future
$mapping_hash = $command_states{$command}{$args->[0]};
unless($mapping_hash) {
$mapping_hash = $command_states{$command}{all};
@ -846,24 +678,25 @@ sub process_request {
if (!defined($init_state)) {
$init_state = "INIT_TO_WAIT_FOR_RSP";
}
if ($command eq 'rpower' and defined($args->[0]) and ($args->[0] eq 'state')) {
$nodelisttab = xCAT::Table->new('nodelist');
}
if ($command eq 'lsdocker') {
my @new_noderange = ();
my $nodehm = xCAT::Table->new('nodehm');
if ($nodehm) {
my $nodehmhash = $nodehm->getNodesAttribs($noderange, ['mgt']);
foreach my $node (@$noderange) {
if (defined($nodehmhash->{$node}->[0]->{mgt}) and $nodehmhash->{$node}->[0]->{mgt} eq 'ipmi') {
if (defined($nodehmhash->{$node}->[0]->{mgt}) and $nodehmhash->{$node}->[0]->{mgt} =~ /ipmi|kvm/) {
if (defined($args) and $args->[0] ne '') {
$callback->({error=>[" -l|--log is not support for $node"], errorcode=>1});
$callback->({error=>[" $args->[0] is not support for $node"], errorcode=>1});
return;
}
my $node_init_url = $init_url;
$node_init_url =~ s/#NODE#\///;
push @nodeargs, [$node, {name=>$node,port=>'2375'}, $init_method, $node_init_url, $init_state, $state_machine_engine];
${$node_hash_variable{$node}}{hostinfo} = {name=>$node,port=>'2375'};
$node_hash_variable{$node}->{http_req_method} = $init_method;
$node_hash_variable{$node}->{http_req_url} = $node_init_url;
$node_hash_variable{$node}->{node_app_state} = $init_state;
$node_hash_variable{$node}->{state_machine_engine} = $state_machine_engine;
}
else {
push @new_noderange, $node;
@ -874,10 +707,11 @@ sub process_request {
}
# The dockerhost is mapped to vm.host, so open vm table here
$vmtab = xCAT::Table->new('vm');
my $vmtab = xCAT::Table->new('vm');
if ($vmtab) {
my $vmhashs = $vmtab->getNodesAttribs($noderange, ['host']);
if ($vmhashs) {
my @errornodes = ();
foreach my $node (@$noderange) {
my $vmhash = $vmhashs->{$node}->[0];
if (!defined($vmhash) or !defined($vmhash->{host})) {
@ -894,10 +728,22 @@ sub process_request {
}
my $node_init_url = $init_url;
$node_init_url =~ s/#NODE#/$node/;
push @nodeargs, [$node, {name=>$host,port=>$port}, $init_method, $node_init_url, $init_state, $state_machine_engine];
${$node_hash_variable{$node}}{hostinfo} = {name=>$host,port=>$port};
$node_hash_variable{$node}->{http_req_method} = $init_method;
$node_hash_variable{$node}->{http_req_url} = $node_init_url;
$node_hash_variable{$node}->{node_app_state} = $init_state;
$node_hash_variable{$node}->{state_machine_engine} = $state_machine_engine;
}
if (scalar(@errornodes)) {
$callback->({error=>["Docker host not set correct for @errornodes"], errorcode=>1});
return;
}
}
}
else {
$callback->({error=>["Open table 'vm' failed"], errorcode=>1});
return;
}
#parse parameters for mkdocker
if ($command eq 'mkdocker') {
my ($imagearg, $cmdarg, $flagarg);
@ -913,11 +759,19 @@ sub process_request {
}
}
$genreq_ptr = \&genreq_for_mkdocker;
$nodetypetab = xCAT::Table->new('nodetype');
my $nodetypetab = xCAT::Table->new('nodetype');
if (!defined($nodetypetab)) {
$callback->({error=>["Open table 'nodetype' failed"], errorcode=>1});
return;
}
my $hosttab = xCAT::Table->new('hosts');
if (!defined($hosttab)) {
$callback->({error=>["Open table 'hosts' failed"], errorcode=>1});
return;
}
my $mactab = xCAT::Table->new('mac');
if (!defined($hosttab) or !defined($nodetypetab) or !defined($mactab) or !defined($vmtab)) {
$callback->({error=>["Open table 'nodetype', 'hosts' or 'mac' failed"], errorcode=>1});
if (!defined($mactab)) {
$callback->({error=>["Open table 'mac' failed"], errorcode=>1});
return;
}
my $nodetypehash = $nodetypetab->getNodesAttribs($noderange, ['provmethod']);
@ -927,9 +781,13 @@ sub process_request {
my @errornodes = ();
foreach my $node (@$noderange) {
if ($imagearg) {
$node_create_variable{$node}->{image} = $imagearg;
$node_hash_variable{$node}->{image} = $imagearg;
if ($cmdarg) {
$node_create_variable{$node}->{cmd} = $cmdarg;
$node_hash_variable{$node}->{cmd} = $cmdarg;
$nodetypetab->setNodeAttribs($node,{provmethod=>"$imagearg!$cmdarg"});
}
else {
$nodetypetab->setNodeAttribs($node,{provmethod=>"$imagearg"});
}
}
else {
@ -938,114 +796,98 @@ sub process_request {
next;
}
else {
my ($tmp_img,$tmp_cmd) = split /:/, $nodetypehash->{$node}->[0]->{provmethod};
my ($tmp_img,$tmp_cmd) = split /!/, $nodetypehash->{$node}->[0]->{provmethod};
if (!defined($tmp_img)) {
push @errornodes, $node;
next;
}
$node_create_variable{$node}->{image} = $tmp_img;
$node_create_variable{$node}->{cmd} = $tmp_cmd;
$node_hash_variable{$node}->{image} = $tmp_img;
$node_hash_variable{$node}->{cmd} = $tmp_cmd;
}
}
if ($flagarg) {
$node_create_variable{$node}->{flag} = $flagarg;
$node_hash_variable{$node}->{flag} = $flagarg;
$vmtab->setNodeAttribs($node,{othersettings=>$flagarg});
}
if (defined($hosthash->{$node}->[0]->{ip})) {
$node_create_variable{$node}->{ip} = $hosthash->{$node}->[0]->{ip};
$node_hash_variable{$node}->{ip} = $hosthash->{$node}->[0]->{ip};
}
if (defined($machash->{$node}->[0]->{mac})) {
$node_create_variable{$node}->{mac} = $machash->{$node}->[0]->{mac};
$node_hash_variable{$node}->{mac} = $machash->{$node}->[0]->{mac};
}
my $vmnodehash = $vmhash->{$node}->[0];
if (defined($vmnodehash)) {
if (defined($vmnodehash->{cpus})) {
$node_create_variable{$node}->{cpus} = $vmnodehash->{cpus};
$node_hash_variable{$node}->{cpus} = $vmnodehash->{cpus};
}
if (defined($vmnodehash->{memory})) {
$node_create_variable{$node}->{memory} = $vmnodehash->{memory};
$node_hash_variable{$node}->{memory} = $vmnodehash->{memory};
}
if (!defined($flagarg) and defined($vmnodehash->{othersettings})) {
$node_create_variable{$node}->{flag} = $vmnodehash->{othersettings};
$node_hash_variable{$node}->{flag} = $vmnodehash->{othersettings};
}
}
}
$nodetypetab->close;
$hosttab->close;
$mactab->close;
if (scalar(@errornodes)) {
$callback->({error=>["Docker image not set correct for @errornodes"], errorcode=>1});
return;
}
}
$vmtab->close;
init_async(slots=>$max_concur_session_allow);
my @nodeargs = keys(%node_hash_variable);
if (scalar(@errornodes)) {
$callback->({error=>["Docker host not set correct for @errornodes"], errorcode=>1});
return;
}
my $timeout = 0;
my $pre_pending_res = undef;
my $no_res_times = 0;
while (1) {
my $pending_nodes = scalar(@nodeargs);
if ($pending_nodes eq 0) {
if ($pending_res eq 0) { # No more nodes needed to be process, no more response is expected, end the loop
last;
}
# The steps below is used to judge whether there is no response
# In the 1st round, just record the pending response num
# Then, check whether the pending num have changed.
# If NO changes, increase NO-change times counter and waiting time
# If changed, clear counter, waiting time
elsif (!defined($pre_pending_res)) {
$pre_pending_res = $pending_res;
}
elsif ($pre_pending_res eq $pending_res) {
$no_res_times++;
$timeout += $pending_res;
}
else {
$pre_pending_res = undef;
$no_res_times = 0;
$timeout = 0;
}
# Wait for 10 * num_of_sessions
if ($no_res_times > 5) {
last;
}
}
if (($pending_nodes eq 0) and ($pending_res eq 0)) { # No more nodes needed to be process, no more response is expected, end the loop
while ((scalar @nodeargs) and $http_requests_in_progress < $max_concur_session_allow) {
deal_with_rsp();
my $node = shift @nodeargs;
my $node_hash = $node_hash_variable{$node};
sendreq($node, $node_hash->{hostinfo}, $node_hash->{http_req_method}, $node_hash->{http_req_url});
}
if ($async->empty) {
last;
}
if (($pending_nodes) and ($concurrent_ssl_sessions lt $max_concur_ssl_session_allow)) {
my $node = shift @nodeargs;
my $ssl_connect = init_ssl_connection($node->[1]);
if (!defined($ssl_connect)) {
$callback->({error=>["Create SSL connection failed for docker $node->[0] on host $node->[1]->{host}"], errorcode=>1});
}
elsif (not ref($ssl_connect)) {
$callback->({error=>["$ssl_connect"], errorcode=>1});
}
else {
my $res = sendreq($ssl_connect, @$node);
if (defined($res)) {
$callback->({node=>[{name=>[$node->[0]], error=>[$res], errorcode=>[1]}]});
close($ssl_connect);
$concurrent_ssl_sessions--;
}
}
}
deal_with_rsp(timeout=>$timeout);
deal_with_rsp();
}
my @failed_handler_array = $select->handles;
if (scalar(@failed_handler_array)) {
my @err_msg = ();
foreach my $sockfd (@failed_handler_array) {
if (defined($node_hash_variable{$sockfd})) {
push @err_msg, {name=>[$node_hash_variable{$sockfd}->{node}], error=>["Timeout to wait for response"], errorcode=>[1]};
}
}
$callback->({node=>\@err_msg});
}
if ($nodelisttab) { $nodelisttab->commit;}
if ($nodetypetab) { $nodetypetab->commit;}
if ($vmtab) {$vmtab->commit;}
return;
}
#-------------------------------------------------------
=head3 init_async
Creates a new HTTP::Async object and sets it up.
Input:
%args: the hash stores params to create the HTTP::Async object
slots: maximum number of parallel requests to make
Usage example:
init_async(slots=><num>)
=cut
#-------------------------------------------------------
sub init_async {
my %args = @_;
my @user = getpwuid($>);
my $homedir = $user[7];
my $ssl_ca_file = $homedir . "/.xcat/ca.pem";
my $ssl_cert_file = $homedir . "/.xcat/client-cred.pem";
my $key_file = $homedir . "/.xcat/client-cred.pem";
$async = HTTP::Async->new(
slots => $args{slots},
ssl_options => {
SSL_verify_mode => "SSL_VERIFY_PEER",
SSL_ca_file => $ssl_ca_file,
SSL_cert_file => $ssl_cert_file,
SSL_key_file => $key_file,
},
);
return;
}
@ -1116,10 +958,7 @@ sub genreq {
sub genreq_for_mkdocker {
my ($node, $dockerhost, $method, $api) = @_;
my $dockerinfo = $node_create_variable{$node};
if (!defined($dockerinfo) or !defined($dockerinfo->{image})) {
return "No image defined";
}
my $dockerinfo = $node_hash_variable{$node};
my %info_hash = ();
#$info_hash{name} = '/'.$node;
#$info_hash{Hostname} = '';
@ -1143,103 +982,31 @@ sub genreq_for_mkdocker {
Based on the method, url create a http request and send out on the given SSL connection
Input: $ssl_connection: the SSL connection for this request
Input:
$node: the docker container name
$dockerhost: hash, keys: name, port, user, pw, user, pw
$method: the http method to generate a http request
$url: the http url to generate a http request
$state: the state for the action
$state_machine_engine: the function to deal with the http response for the request generate by $method and $url
return: 0-undefine If no error
1-return generate http request failed;
2-return http request error message;
Usage example:
my $res = send_req($ssl_connetion, $node, \%dockerhost, 'GET', '/containers/$node/json', "INIT_TO_WAIT_FOR_RSP", \&single_state_engine);
my $res = sendreq($node, \%dockerhost, 'GET', '/containers/$node/json');
=cut
#-------------------------------------------------------
sub sendreq {
my ($ssl_connection, $node, $dockerhost, $init_method, $init_url, $init_state, $state_machine_engine) = @_;
my ($node, $dockerhost, $init_method, $init_url) = @_;
my $http_req = $genreq_ptr->($node, $dockerhost, $init_method, $init_url);
# Need to Dumper to log file later
#print Dumper($http_req);
if (!defined($http_req)) {
return "Generate http request failed";
}
elsif (not ref($http_req)) {
return $http_req;
}
$select->add($ssl_connection);
print $ssl_connection $http_req->as_string();
$node_hash_variable{$ssl_connection}->{node} = $node;
$node_hash_variable{$ssl_connection}->{state} = $init_state;
$node_hash_variable{$ssl_connection}->{state_machine_engine} = $state_machine_engine;
$pending_res++;
my $http_session_id = $async->add_with_opts($http_req, {});
$http_session_variable{$http_session_id} = $node;
$http_requests_in_progress++;
return undef;
}
#-------------------------------------------------------
=head3 init_ssl_connection
This function is used to create a SSL connection to the docker host
Input: $dockerhost: hash, keys: name, port, user, pw, user, pw
return: A SSL connection handler if success.
An error msg if failed.
Usage example:
my $ssl_connect = init_ssl_connection(\%dockerhost);
=cut
#-------------------------------------------------------
sub init_ssl_connection {
my $dockerhost = shift;
my $hostname = $dockerhost->{name};
my $port = $dockerhost->{port};
my @user = getpwuid($>);
my $homedir = $user[7];
my $ssl_ca_file = $homedir . "/.xcat/ca.pem";
my $ssl_cert_file = $homedir . "/.xcat/client-cred.pem";
my $key_file = $homedir . "/.xcat/client-cred.pem";
my $rc = 0;
my $response;
my $connect;
my $socket = IO::Socket::INET->new( PeerHost => $hostname,
PeerPort => $port,
Timeout => 2);
if ($socket) {
$connect = IO::Socket::SSL->start_SSL( $socket,
SSL_verify_mode => "SSL_VERIFY_PEER",
SSL_ca_file => $ssl_ca_file,
SSL_cert_file =>$ssl_cert_file,
SSL_key_file => $key_file,
Timeout => 2
);
if ($connect) {
my $flags=fcntl($connect,F_GETFL,0);
$flags |= O_NONBLOCK;
fcntl($connect,F_SETFL,$flags);
} else {
$rc = 1;
$response = "Could not make ssl connection to $hostname:$port.";
}
} else {
$rc = 1;
$response = "Could not create socket to $hostname:$port.";
}
if ($rc) {
return $response;
} else {
$concurrent_ssl_sessions++;
return $connect;
}
}
1;