diff --git a/xCAT-server/lib/xcat/plugins/docker.pm b/xCAT-server/lib/xcat/plugins/docker.pm index 6e2021527..14b331f71 100755 --- a/xCAT-server/lib/xcat/plugins/docker.pm +++ b/xCAT-server/lib/xcat/plugins/docker.pm @@ -19,22 +19,20 @@ use strict; use POSIX qw(WNOHANG nice); use POSIX qw(WNOHANG setsid :errno_h); use Errno; -use IO::Select; use MIME::Base64 qw(encode_base64); require IO::Socket::SSL; IO::Socket::SSL->import('inet4'); use Time::HiRes qw(gettimeofday sleep); use Fcntl qw/:DEFAULT :flock/; use File::Path; use File::Copy; +use File::Basename; use Getopt::Long; Getopt::Long::Configure("bundling"); +use HTTP::Async; use HTTP::Headers; use HTTP::Request; -use HTTP::Response; use xCAT::Utils; use xCAT::MsgUtils; -use Getopt::Long; -use File::Basename; use Cwd; use xCAT::Usage; use JSON; @@ -43,20 +41,42 @@ use JSON; my $verbose; my $global_callback; -my $select = IO::Select->new(); +my $async; #------------------------------------------------------- -=head3 The hash variable to store node related SSL connection and state information +=head3 The hash variable to store node related http request id + + The structure is like this + %http_session_variable = ( + $session_id => $node, + ); + +=cut +#------------------------------------------------------- + +my %http_session_variable = (); + +#------------------------------------------------------- +=head3 The hash variable to store node parameters to access docker container The structure is like this %node_hash_variable = ( - $SSL_connection => { - node => $node, - state => $current_state, - state_machine_engine => $state_machine_for_the_node, - total_len => $total_len, - get_len => $get_len, - data_buf => $data, + $node => { + image=>$nodetype.provmethod, + cmd=>$nodetype.provmethod, + ip=>$host.ip, + mac=>$mac.mac, + cpu=>$vm.cpus + memory=>$vm.memory + flag=>$vm.othersettings, + hostinfo=>{ + name => $host, + port => $port, + }, + http_req_method => $init_method, + http_req_url => $node_init_url, + node_app_state => $init_state, + state_machine_engine => $state_machine_engine, }, ); @@ -65,40 +85,11 @@ my $select = IO::Select->new(); my %node_hash_variable = (); -#------------------------------------------------------- -=head3 The hash variable to store node parameters to create docker container - - The structure is like this - %node_create_variable = ( - $node => { - image=>$nodetype.provmethod, - cmd=>$nodetype.provmethod, - ip=>$host.ip, - mac=>$mac.mac, - cpu=>$vm.cpus - memory=>$vm.memory - flag=>$vm.othersettings - }, - ); - -=cut -#------------------------------------------------------- - -my %node_create_variable = (); - -# The counter to record how many request have been send and responses are expected. -my $pending_res = 0; - -# The counter to record concurrent openting SSL connection numbers -my $concurrent_ssl_sessions = 0; - # The function point used for mkdocker to generate http request, for other cmd it will point to &genreq; my $genreq_ptr = \&genreq; -# The vairables below are used to update attributes -my $vmtab; # vm.othersettings -my $nodelisttab; # nodelist.status -my $nodetypetab; #nodetype.provmethod +# The num of HTTP requests that is progressing +my $http_requests_in_progress = 0; #------------------------------------------------------- @@ -114,7 +105,7 @@ sub handled_commands { rpower => 'nodehm:mgt', mkdocker => 'nodehm:mgt', rmdocker => 'nodehm:mgt', - lsdocker => 'nodehm:mgt=docker|ipmi', + lsdocker => 'nodehm:mgt=docker|ipmi|kvm', } ); } @@ -262,7 +253,7 @@ sub http_state_code_info { The state_machine_engine to deal with http response Input: - $sockfd: The SSL connection from which the http response is returned + $id: The http session id when adding HTTP request into HTTP::Async object $data: The http response Return: If there are any errors or msg, they will be outputed directly. @@ -275,213 +266,89 @@ sub http_state_code_info { #------------------------------------------------------- sub single_state_engine { - my $sockfd = shift; + my $id = shift; my $data = shift; - if (!defined $node_hash_variable{$sockfd}) { + my $node = $http_session_variable{$id}; + if (!defined($node)) { return; } + my $node_hash = $node_hash_variable{$node}; + my $curr_state = $node_hash->{node_app_state}; my $info_flag = 'data'; - my $get_another_pkg = 0; - my $node = $node_hash_variable{$sockfd}->{node}; - my $curr_state = $node_hash_variable{$sockfd}->{state}; - my $data_buf = $node_hash_variable{$sockfd}->{data_buf}; - my $data_total_len = $node_hash_variable{$sockfd}->{total_len}; - my $data_get_len = $node_hash_variable{$sockfd}->{get_len}; - my $data_chunked = $node_hash_variable{$sockfd}->{chunked}; - my @chunked_array = (); - # The code logic to deal with http response and state machine - #Need to Dumper to log file later - my $res = HTTP::Response->parse($data); - #print Dumper($res); - my $content = undef; - # Deal with the scenario that a http response is splited into multiple pkgs - unless ($res->code and $res->code =~ /\d{3}/) { - my $len = length($data); - if (defined($data_chunked)) { - $content = $data; - $res = HTTP::Response->parse($data_buf); - } - elsif (!defined($data_buf) or !defined($data_total_len) or !defined($data_get_len) or ($data_get_len + $len > $data_total_len)) { - $global_callback->({node=>[{name=>[$node],error=>["Incorrect data received"],errorcode=>[1]}]}); - $concurrent_ssl_sessions--; - $select->remove($sockfd); - close($sockfd); - delete($node_hash_variable{$sockfd}); - return; - } - else { - my $len = length($data); - if ($data_get_len + $len < $data_total_len) { - $node_hash_variable{$sockfd}->{get_len} += $len; - $node_hash_variable{$sockfd}->{data_buf} .= $data; - $pending_res++; - return; - } - else { # Exactly all the data are received - $res = HTTP::Response->parse($data_buf.$data); - delete $node_hash_variable{$sockfd}->{data_buf}; - delete $node_hash_variable{$sockfd}->{total_len}; - delete $node_hash_variable{$sockfd}->{get_len}; - } - } + + if ($data->is_error or (defined($data->header("connection")) and $data->header("connection") =~ /close/)) { + $http_requests_in_progress--; + delete($http_session_variable{$id}); } - if (!defined($content) and $res->content()) { - $content = $res->content(); - } - my $get_content_len = length($content); - my $content_length = $res->header('content-length'); - if (defined($content_length) and $get_content_len < $content_length) { - $node_hash_variable{$sockfd}->{data_buf} = $data; - $node_hash_variable{$sockfd}->{total_len} = $content_length; - $node_hash_variable{$sockfd}->{get_len} = $get_content_len; - $pending_res++; - return; - } - - my $encoding_flag = $res->header('transfer-encoding'); - if (defined($encoding_flag) and $encoding_flag eq 'chunked') { - $node_hash_variable{$sockfd}->{chunked} = 1; - $data_chunked = 1; - if ($get_content_len < 3) { - $node_hash_variable{$sockfd}->{data_buf} = $data; - $pending_res++; - return; - } - } - if (defined($data_chunked)) { - while (length($content)) { - my $split_pos = index($content, "\r\n"); - my $length_string = substr($content, 0, $split_pos); - my $data_length = hex($length_string); - if ($data_length lt 2) { - if ($data_length eq 0) { - push @chunked_array, '0'; - } - last; - } - push @chunked_array, $length_string; - push @chunked_array, substr($content, $split_pos + 2, $data_length); - $content = substr($content, $split_pos + 4 + $data_length); - } - } + my $content = $data->decoded_content; my @msg = (); - $msg[0] = &http_state_code_info($res->code, $curr_state); - unless ($res->is_success) { + $msg[0] = &http_state_code_info($data->code, $curr_state); + if ($data->is_error) { if ($content ne '') { $msg[0]->[1] = "$content"; } - } - if ($curr_state eq "INIT_TO_WAIT_FOR_QUERY_STATE_DONE") { - if ($res->is_success) { - my $node_state = undef; - if ($data_chunked) { - my $length = shift @chunked_array; - while ($length) { - my $content_hash = decode_json (shift @chunked_array); - if (defined($content_hash->{'State'}->{'Status'})) { - $node_state = $content_hash->{'State'}->{'Status'}; - last; - } - $length = shift @chunked_array; - } - if (!defined($node_state) and $length) { - $get_another_pkg = 1; - } - } - else { - my $content_hash = decode_json $content; - $node_state = $content_hash->{'State'}->{'Status'}; - } - if (defined($node_state)) { - if ($nodelisttab) { - $nodelisttab->setNodeAttribs($node, {status=>$node_state}); - } - $msg[0] = [0, $node_state]; - } - elsif (!$get_another_pkg) { - $msg[0] = [1, "Can not get status"]; - } + elsif ($data->message ne '') { + $msg[0]->[1] = $data->message; } - elsif ($res->code eq '404') { - if ($nodelisttab) { - $nodelisttab->setNodeAttribs($node, {status=>''}); + } + + my $content_type = $data->header("content-type"); + my $content_hash = undef; + if (defined($content_type) and $content_type =~ /json/i) { + $content_hash = decode_json $content; + } + elsif (!defined($content_type)) { + $content_type = "undefined"; + } + + if ($curr_state eq "INIT_TO_WAIT_FOR_QUERY_STATE_DONE") { + if ($data->is_success) { + if ($content_type =~ /json/i) { + my $node_state = $content_hash->{'State'}->{'Status'}; + if (defined($node_state)) { + $msg[0] = [0, $node_state]; + } + else { + $msg[0] = [1, "Can not get status"]; + } + } + else { + $msg[0] = [1, "The content type: $content_type is unable to be parsed."]; } } } elsif ($curr_state eq "INIT_TO_WAIT_FOR_QUERY_LOG_DONE") { - if (!$msg[0]->[0]) { + if ($data->is_success) { $info_flag = "base64_data"; @msg = (); - if (defined($data_chunked)) { - my @data_array = (); - my $tmp_len = shift(@chunked_array); - while ($tmp_len and scalar(@chunked_array)) { - push @data_array, shift(@chunked_array); - $tmp_len = shift(@chunked_array); - } - if ($tmp_len ne 0) { - $get_another_pkg = 1; - } - if (scalar(@data_array)) { - my $string = join('', @data_array); - $msg[0] = [0, encode_base64($string)]; - } - else { - $msg[0] = [0, encode_base64("No logs")]; - } + if ($content_type =~ /text\/plain/i) { + $msg[0] = [0,encode_base64($content)]; } else { - $msg[0] = [0, encode_base64($content)]; + $msg[0] = [1, "The content type: $content_type is unable to be parsed."]; } } } elsif ($curr_state eq "INIT_TO_WAIT_FOR_QUERY_DOCKER_DONE") { - if ($res->is_success) { + if ($data->is_success) { @msg = (); - if (!defined($content_length) or ($content_length > 3)) { - if (defined($data_chunked)) { - my $tmp_entry = shift @chunked_array; - while ($tmp_entry and scalar(@chunked_array)) { - my $content_hash = decode_json (shift @chunked_array); - if (ref($content_hash) eq 'ARRAY') { - foreach (@$content_hash) { - push @msg, [0, parse_docker_list_info($_, 1)]; - } - } - else { - push @msg, [0, parse_docker_list_info($content_hash, 0)]; - } - $tmp_entry = shift @chunked_array; - } - if ($tmp_entry ne '0') { - $get_another_pkg = 1; + if ($content_type =~ /json/i) { + if (ref($content_hash) eq 'ARRAY') { + foreach (@$content_hash) { + push @msg, [0, parse_docker_list_info($_, 1)]; } } else { - my $content_hash = decode_json $content; - if (ref($content_hash) eq 'ARRAY') { - foreach (@$content_hash) { - push @msg, [0, parse_docker_list_info($_, 1)]; - } - } - else { - push @msg, [0, parse_docker_list_info($content_hash, 0)]; - } + push @msg, [0, parse_docker_list_info($content_hash, 0)]; } } - else { + if (!scalar(@msg)) { @msg = [0, "No running docker"]; } } } elsif ($curr_state eq 'INIT_TO_WAIT_FOR_CREATE_DONE') { - if ($nodetypetab) { - $nodetypetab->setNodeAttribs($node,{provmethod=>"$node_create_variable{$node}->{image}:$node_create_variable{$node}->{cmd}"}); - } - if ($vmtab) { - $vmtab->setNodeAttribs($node,{othersettings=>$node_create_variable{$node}->{flag}}); - } } foreach my $tmp (@msg) { @@ -492,14 +359,7 @@ sub single_state_engine { $global_callback->({node=>[{name=>[$node],"$info_flag"=>["$tmp->[1]"]}]}); } } - if ($get_another_pkg) { - $pending_res++; - return; - } - $concurrent_ssl_sessions--; - $select->remove($sockfd); - close($sockfd); - delete($node_hash_variable{$sockfd}); + return; } @@ -580,7 +440,7 @@ sub parse_docker_list_info { $created =~ s/\..*$//; } my $cmd = sprintf("\"%.20s\"", $command); - my $string = sprintf("%-12s %-30.30s %-22s %-20s %-10s %s", $id, $image, $cmd, $created, $status, $names); + my $string = sprintf("%-12s %-30.30s %-22s %-20s %-10s %s", $id, $image, $cmd, $created, $status, $names); return($string); } @@ -592,7 +452,7 @@ sub parse_docker_list_info { Input: %args: a hash which currently only key 'timeout' is using Return: - The expected number of response which havn't been received + The number of response have received Usage example: =cut @@ -606,42 +466,16 @@ sub deal_with_rsp if (defined($args{timeout})) { $timeout = $args{timeout}; } - my @data = (); - if ($select->can_read($timeout)) { - my @ready_fds = $select->can_read(0); - foreach my $sockfd (@ready_fds) { - my $res = ""; - my $node_hash = $node_hash_variable{$sockfd}; - if (defined($node_hash)) { - while (1) { - my $readbytes = undef; - $readbytes = sysread($sockfd, $res, 65535, length($res)); - if (!defined($readbytes)) { - if ($!{EAGAIN} or $!{EWOULDBLOCK}) { - $pending_res--; - last; - } - elsif ($!{EINTR} or $!{ENOTTY}) { - next; - } - else { - die "read failed: $!"; - } - } - elsif ($readbytes == 0) { - $pending_res--; - last; - } - } - # readbytes UNDEF means a reading error, so print out a msg and parse the next SSL connection - push @data, [$node_hash->{state_machine_engine}, $sockfd, $res]; - } + my $deal_num = 0; + while (my ($response, $id) = $async->wait_for_next_response($timeout)) { + my $node = $http_session_variable{$id}; + if (defined($node)) { + $deal_num++; + $node_hash_variable{$node}->{state_machine_engine}->($id, $response); } } - foreach (@data) { - $_->[0]->($_->[1], $_->[2]); - } - return $pending_res; + + return $deal_num; } #------------------------------------------------------- @@ -823,10 +657,8 @@ sub process_request { my $init_url = undef; my $init_state = undef; my $state_machine_engine = undef; - my @nodeargs = (); - my @errornodes = (); my $mapping_hash = undef; - my $max_concur_ssl_session_allow = 10; # A variable can be set by caculated in the future + my $max_concur_session_allow = 20; # A variable can be set by caculated in the future $mapping_hash = $command_states{$command}{$args->[0]}; unless($mapping_hash) { $mapping_hash = $command_states{$command}{all}; @@ -846,24 +678,25 @@ sub process_request { if (!defined($init_state)) { $init_state = "INIT_TO_WAIT_FOR_RSP"; } - if ($command eq 'rpower' and defined($args->[0]) and ($args->[0] eq 'state')) { - $nodelisttab = xCAT::Table->new('nodelist'); - } if ($command eq 'lsdocker') { my @new_noderange = (); my $nodehm = xCAT::Table->new('nodehm'); if ($nodehm) { my $nodehmhash = $nodehm->getNodesAttribs($noderange, ['mgt']); foreach my $node (@$noderange) { - if (defined($nodehmhash->{$node}->[0]->{mgt}) and $nodehmhash->{$node}->[0]->{mgt} eq 'ipmi') { + if (defined($nodehmhash->{$node}->[0]->{mgt}) and $nodehmhash->{$node}->[0]->{mgt} =~ /ipmi|kvm/) { if (defined($args) and $args->[0] ne '') { - $callback->({error=>[" -l|--log is not support for $node"], errorcode=>1}); + $callback->({error=>[" $args->[0] is not support for $node"], errorcode=>1}); return; } my $node_init_url = $init_url; $node_init_url =~ s/#NODE#\///; - push @nodeargs, [$node, {name=>$node,port=>'2375'}, $init_method, $node_init_url, $init_state, $state_machine_engine]; + ${$node_hash_variable{$node}}{hostinfo} = {name=>$node,port=>'2375'}; + $node_hash_variable{$node}->{http_req_method} = $init_method; + $node_hash_variable{$node}->{http_req_url} = $node_init_url; + $node_hash_variable{$node}->{node_app_state} = $init_state; + $node_hash_variable{$node}->{state_machine_engine} = $state_machine_engine; } else { push @new_noderange, $node; @@ -874,10 +707,11 @@ sub process_request { } # The dockerhost is mapped to vm.host, so open vm table here - $vmtab = xCAT::Table->new('vm'); + my $vmtab = xCAT::Table->new('vm'); if ($vmtab) { my $vmhashs = $vmtab->getNodesAttribs($noderange, ['host']); if ($vmhashs) { + my @errornodes = (); foreach my $node (@$noderange) { my $vmhash = $vmhashs->{$node}->[0]; if (!defined($vmhash) or !defined($vmhash->{host})) { @@ -894,10 +728,22 @@ sub process_request { } my $node_init_url = $init_url; $node_init_url =~ s/#NODE#/$node/; - push @nodeargs, [$node, {name=>$host,port=>$port}, $init_method, $node_init_url, $init_state, $state_machine_engine]; + ${$node_hash_variable{$node}}{hostinfo} = {name=>$host,port=>$port}; + $node_hash_variable{$node}->{http_req_method} = $init_method; + $node_hash_variable{$node}->{http_req_url} = $node_init_url; + $node_hash_variable{$node}->{node_app_state} = $init_state; + $node_hash_variable{$node}->{state_machine_engine} = $state_machine_engine; + } + if (scalar(@errornodes)) { + $callback->({error=>["Docker host not set correct for @errornodes"], errorcode=>1}); + return; } } } + else { + $callback->({error=>["Open table 'vm' failed"], errorcode=>1}); + return; + } #parse parameters for mkdocker if ($command eq 'mkdocker') { my ($imagearg, $cmdarg, $flagarg); @@ -913,11 +759,19 @@ sub process_request { } } $genreq_ptr = \&genreq_for_mkdocker; - $nodetypetab = xCAT::Table->new('nodetype'); + my $nodetypetab = xCAT::Table->new('nodetype'); + if (!defined($nodetypetab)) { + $callback->({error=>["Open table 'nodetype' failed"], errorcode=>1}); + return; + } my $hosttab = xCAT::Table->new('hosts'); + if (!defined($hosttab)) { + $callback->({error=>["Open table 'hosts' failed"], errorcode=>1}); + return; + } my $mactab = xCAT::Table->new('mac'); - if (!defined($hosttab) or !defined($nodetypetab) or !defined($mactab) or !defined($vmtab)) { - $callback->({error=>["Open table 'nodetype', 'hosts' or 'mac' failed"], errorcode=>1}); + if (!defined($mactab)) { + $callback->({error=>["Open table 'mac' failed"], errorcode=>1}); return; } my $nodetypehash = $nodetypetab->getNodesAttribs($noderange, ['provmethod']); @@ -927,9 +781,13 @@ sub process_request { my @errornodes = (); foreach my $node (@$noderange) { if ($imagearg) { - $node_create_variable{$node}->{image} = $imagearg; + $node_hash_variable{$node}->{image} = $imagearg; if ($cmdarg) { - $node_create_variable{$node}->{cmd} = $cmdarg; + $node_hash_variable{$node}->{cmd} = $cmdarg; + $nodetypetab->setNodeAttribs($node,{provmethod=>"$imagearg!$cmdarg"}); + } + else { + $nodetypetab->setNodeAttribs($node,{provmethod=>"$imagearg"}); } } else { @@ -938,114 +796,98 @@ sub process_request { next; } else { - my ($tmp_img,$tmp_cmd) = split /:/, $nodetypehash->{$node}->[0]->{provmethod}; + my ($tmp_img,$tmp_cmd) = split /!/, $nodetypehash->{$node}->[0]->{provmethod}; if (!defined($tmp_img)) { push @errornodes, $node; next; } - $node_create_variable{$node}->{image} = $tmp_img; - $node_create_variable{$node}->{cmd} = $tmp_cmd; + $node_hash_variable{$node}->{image} = $tmp_img; + $node_hash_variable{$node}->{cmd} = $tmp_cmd; } } if ($flagarg) { - $node_create_variable{$node}->{flag} = $flagarg; + $node_hash_variable{$node}->{flag} = $flagarg; + $vmtab->setNodeAttribs($node,{othersettings=>$flagarg}); } if (defined($hosthash->{$node}->[0]->{ip})) { - $node_create_variable{$node}->{ip} = $hosthash->{$node}->[0]->{ip}; + $node_hash_variable{$node}->{ip} = $hosthash->{$node}->[0]->{ip}; } if (defined($machash->{$node}->[0]->{mac})) { - $node_create_variable{$node}->{mac} = $machash->{$node}->[0]->{mac}; + $node_hash_variable{$node}->{mac} = $machash->{$node}->[0]->{mac}; } my $vmnodehash = $vmhash->{$node}->[0]; if (defined($vmnodehash)) { if (defined($vmnodehash->{cpus})) { - $node_create_variable{$node}->{cpus} = $vmnodehash->{cpus}; + $node_hash_variable{$node}->{cpus} = $vmnodehash->{cpus}; } if (defined($vmnodehash->{memory})) { - $node_create_variable{$node}->{memory} = $vmnodehash->{memory}; + $node_hash_variable{$node}->{memory} = $vmnodehash->{memory}; } if (!defined($flagarg) and defined($vmnodehash->{othersettings})) { - $node_create_variable{$node}->{flag} = $vmnodehash->{othersettings}; + $node_hash_variable{$node}->{flag} = $vmnodehash->{othersettings}; } } } + $nodetypetab->close; + $hosttab->close; + $mactab->close; + + if (scalar(@errornodes)) { + $callback->({error=>["Docker image not set correct for @errornodes"], errorcode=>1}); + return; + } } + $vmtab->close; + init_async(slots=>$max_concur_session_allow); + my @nodeargs = keys(%node_hash_variable); - - if (scalar(@errornodes)) { - $callback->({error=>["Docker host not set correct for @errornodes"], errorcode=>1}); - return; - } - my $timeout = 0; - my $pre_pending_res = undef; - my $no_res_times = 0; while (1) { - my $pending_nodes = scalar(@nodeargs); - if ($pending_nodes eq 0) { - if ($pending_res eq 0) { # No more nodes needed to be process, no more response is expected, end the loop - last; - } - # The steps below is used to judge whether there is no response - # In the 1st round, just record the pending response num - # Then, check whether the pending num have changed. - # If NO changes, increase NO-change times counter and waiting time - # If changed, clear counter, waiting time - elsif (!defined($pre_pending_res)) { - $pre_pending_res = $pending_res; - } - elsif ($pre_pending_res eq $pending_res) { - $no_res_times++; - $timeout += $pending_res; - } - else { - $pre_pending_res = undef; - $no_res_times = 0; - $timeout = 0; - } - # Wait for 10 * num_of_sessions - if ($no_res_times > 5) { - last; - } - } - - - if (($pending_nodes eq 0) and ($pending_res eq 0)) { # No more nodes needed to be process, no more response is expected, end the loop + while ((scalar @nodeargs) and $http_requests_in_progress < $max_concur_session_allow) { + deal_with_rsp(); + my $node = shift @nodeargs; + my $node_hash = $node_hash_variable{$node}; + sendreq($node, $node_hash->{hostinfo}, $node_hash->{http_req_method}, $node_hash->{http_req_url}); + } + if ($async->empty) { last; } - if (($pending_nodes) and ($concurrent_ssl_sessions lt $max_concur_ssl_session_allow)) { - my $node = shift @nodeargs; - my $ssl_connect = init_ssl_connection($node->[1]); - if (!defined($ssl_connect)) { - $callback->({error=>["Create SSL connection failed for docker $node->[0] on host $node->[1]->{host}"], errorcode=>1}); - } - elsif (not ref($ssl_connect)) { - $callback->({error=>["$ssl_connect"], errorcode=>1}); - } - else { - my $res = sendreq($ssl_connect, @$node); - if (defined($res)) { - $callback->({node=>[{name=>[$node->[0]], error=>[$res], errorcode=>[1]}]}); - close($ssl_connect); - $concurrent_ssl_sessions--; - } - } - } - deal_with_rsp(timeout=>$timeout); + deal_with_rsp(); } - my @failed_handler_array = $select->handles; - if (scalar(@failed_handler_array)) { - my @err_msg = (); - foreach my $sockfd (@failed_handler_array) { - if (defined($node_hash_variable{$sockfd})) { - push @err_msg, {name=>[$node_hash_variable{$sockfd}->{node}], error=>["Timeout to wait for response"], errorcode=>[1]}; - } - } - $callback->({node=>\@err_msg}); - } - if ($nodelisttab) { $nodelisttab->commit;} - if ($nodetypetab) { $nodetypetab->commit;} - if ($vmtab) {$vmtab->commit;} + return; +} + +#------------------------------------------------------- + +=head3 init_async + + Creates a new HTTP::Async object and sets it up. + Input: + %args: the hash stores params to create the HTTP::Async object + slots: maximum number of parallel requests to make + Usage example: + init_async(slots=>) + +=cut + +#------------------------------------------------------- + +sub init_async { + my %args = @_; + my @user = getpwuid($>); + my $homedir = $user[7]; + my $ssl_ca_file = $homedir . "/.xcat/ca.pem"; + my $ssl_cert_file = $homedir . "/.xcat/client-cred.pem"; + my $key_file = $homedir . "/.xcat/client-cred.pem"; + $async = HTTP::Async->new( + slots => $args{slots}, + ssl_options => { + SSL_verify_mode => "SSL_VERIFY_PEER", + SSL_ca_file => $ssl_ca_file, + SSL_cert_file => $ssl_cert_file, + SSL_key_file => $key_file, + }, + ); return; } @@ -1116,10 +958,7 @@ sub genreq { sub genreq_for_mkdocker { my ($node, $dockerhost, $method, $api) = @_; - my $dockerinfo = $node_create_variable{$node}; - if (!defined($dockerinfo) or !defined($dockerinfo->{image})) { - return "No image defined"; - } + my $dockerinfo = $node_hash_variable{$node}; my %info_hash = (); #$info_hash{name} = '/'.$node; #$info_hash{Hostname} = ''; @@ -1143,103 +982,31 @@ sub genreq_for_mkdocker { Based on the method, url create a http request and send out on the given SSL connection - Input: $ssl_connection: the SSL connection for this request + Input: $node: the docker container name $dockerhost: hash, keys: name, port, user, pw, user, pw $method: the http method to generate a http request $url: the http url to generate a http request - $state: the state for the action - $state_machine_engine: the function to deal with the http response for the request generate by $method and $url - return: 0-undefine If no error 1-return generate http request failed; 2-return http request error message; Usage example: - my $res = send_req($ssl_connetion, $node, \%dockerhost, 'GET', '/containers/$node/json', "INIT_TO_WAIT_FOR_RSP", \&single_state_engine); + my $res = sendreq($node, \%dockerhost, 'GET', '/containers/$node/json'); =cut #------------------------------------------------------- sub sendreq { - my ($ssl_connection, $node, $dockerhost, $init_method, $init_url, $init_state, $state_machine_engine) = @_; + my ($node, $dockerhost, $init_method, $init_url) = @_; my $http_req = $genreq_ptr->($node, $dockerhost, $init_method, $init_url); # Need to Dumper to log file later #print Dumper($http_req); - if (!defined($http_req)) { - return "Generate http request failed"; - } - elsif (not ref($http_req)) { - return $http_req; - } - $select->add($ssl_connection); - print $ssl_connection $http_req->as_string(); - $node_hash_variable{$ssl_connection}->{node} = $node; - $node_hash_variable{$ssl_connection}->{state} = $init_state; - $node_hash_variable{$ssl_connection}->{state_machine_engine} = $state_machine_engine; - $pending_res++; + my $http_session_id = $async->add_with_opts($http_req, {}); + $http_session_variable{$http_session_id} = $node; + $http_requests_in_progress++; return undef; } -#------------------------------------------------------- - -=head3 init_ssl_connection - - This function is used to create a SSL connection to the docker host - - Input: $dockerhost: hash, keys: name, port, user, pw, user, pw - - return: A SSL connection handler if success. - An error msg if failed. - Usage example: - my $ssl_connect = init_ssl_connection(\%dockerhost); - -=cut - -#------------------------------------------------------- - -sub init_ssl_connection { - my $dockerhost = shift; - my $hostname = $dockerhost->{name}; - my $port = $dockerhost->{port}; - my @user = getpwuid($>); - my $homedir = $user[7]; - my $ssl_ca_file = $homedir . "/.xcat/ca.pem"; - my $ssl_cert_file = $homedir . "/.xcat/client-cred.pem"; - my $key_file = $homedir . "/.xcat/client-cred.pem"; - my $rc = 0; - my $response; - my $connect; - my $socket = IO::Socket::INET->new( PeerHost => $hostname, - PeerPort => $port, - Timeout => 2); - if ($socket) { - $connect = IO::Socket::SSL->start_SSL( $socket, - SSL_verify_mode => "SSL_VERIFY_PEER", - SSL_ca_file => $ssl_ca_file, - SSL_cert_file =>$ssl_cert_file, - SSL_key_file => $key_file, - Timeout => 2 - ); - if ($connect) { - my $flags=fcntl($connect,F_GETFL,0); - $flags |= O_NONBLOCK; - fcntl($connect,F_SETFL,$flags); - } else { - $rc = 1; - $response = "Could not make ssl connection to $hostname:$port."; - } - } else { - $rc = 1; - $response = "Could not create socket to $hostname:$port."; - } - - if ($rc) { - return $response; - } else { - $concurrent_ssl_sessions++; - return $connect; - } -} 1;