-Implement phase 1 of the DB worker, similar feature/performance with fewer db handles (up to two to accomodate some monitoring framework activity)

git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@3941 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd
This commit is contained in:
jbjohnso 2009-08-04 21:10:32 +00:00
parent a46c4c529c
commit 5f8f6ae70e

View File

@ -7,8 +7,9 @@
#class xcattable
package xCAT::Table;
use Sys::Syslog;
use Storable qw/freeze thaw/;
use Data::Dumper;
use IO::Socket::UNIX;
use IO::Socket;
BEGIN
{
$::XCATROOT = $ENV{'XCATROOT'} ? $ENV{'XCATROOT'} : -d '/opt/xcat' ? '/opt/xcat' : '/usr';
@ -42,19 +43,47 @@ my $dbsockpath = "/var/run/xcat/dbworker.sock";
my $exitdbthread;
sub dbc_submit {
my $request = shift;
my $data = freeze($request);
$data.= "ENDOFFREEZEQFVyo4Cj6Q0v";
my $clisock = IO::Socket::UNIX->new(PeerAddr => $dbsockpath, Type => SOCK_STREAM, Timeout => 120 );
print $clisock $data;
$data="";
while ($data !~ /ENDOFFREEZEQFVyo4Cj6Q0v/) {
$data .= <$clisock>;
}
return thaw($data);
sub dbc_call {
my $self = shift;
my $function = shift;
my @args = @_;
my $request = {
function => $function,
tablename => $self->{tabname},
autocommit => $self->{autocommit},
args=>\@args,
};
return dbc_submit($request);
}
sub dbc_submit {
my $request = shift;
$request->{'wantarray'} = wantarray();
my $data = freeze($request);
$data.= "ENDOFFREEZEQFVyo4Cj6Q0v\n";
my $clisock = IO::Socket::UNIX->new(Peer => $dbsockpath, Type => SOCK_STREAM, Timeout => 120 );
unless ($clisock) {
use Carp qw/cluck/;
cluck();
}
print $clisock $data;
$data="";
while ($data !~ /ENDOFFREEZEQFVyo4Cj6Q0j/) {
$data .= <$clisock>;
}
my @returndata = @{thaw($data)};
if (wantarray) {
return @returndata;
} else {
return $returndata[0];
}
}
sub shut_dbworker {
$dbworkerpid = 0; #For now, just turn off usage of the db worker
#This was created as the monitoring framework shutdown code otherwise seems to have a race condition
#this may incur an extra db handle per service node to tolerate shutdown scenarios
}
sub init_dbworker {
#create a db worker process
$dbworkerpid = fork;
@ -73,7 +102,11 @@ sub init_dbworker {
alarm(10);
};
unlink($dbsockpath);
$dbworkersocket = IO::Socket::UNIX->new(LocalAddr => $dbsockpath, Type => SOCK_STREAM, Listen => 32);
umask(0077);
$dbworkersocket = IO::Socket::UNIX->new(Local => $dbsockpath, Type => SOCK_STREAM, Listen => 3);
unless ($dbworkersocket) {
die $!;
}
my $dbconn;
my $currcon;
my $clientset = new IO::Select;
@ -92,8 +125,11 @@ sub init_dbworker {
}
}
close($dbworkersocket);
unlink($dbsockpath);
exit 0;
}
print "INITTED\n";
}
sub handle_dbc_conn {
my $client = shift;
@ -104,8 +140,16 @@ sub handle_dbc_conn {
$data .= <$client>;
}
my $request = thaw($data);
my $response = freeze(handle_dbc_request($request));
$response .= 'ENDOFFREEZEQFVyo4Cj6Q0j';
use Data::Dumper;
my $response;
my @returndata;
if ($request->{'wantarray'}) {
@returndata = handle_dbc_request($request);
} else {
@returndata = (scalar(handle_dbc_request($request)));
}
$response = freeze(\@returndata);
$response .= "ENDOFFREEZEQFVyo4Cj6Q0j\n";
print $client $response;
} else { #Connection terminated, clean up
$clientset->remove($client);
@ -123,6 +167,7 @@ sub handle_dbc_request {
my $autocommit = $request->{autocommit};
if ($functionname eq 'new') {
unless ($opentables{$tablename}->{$autocommit}) {
shift @args; #Strip repeat class stuff
$opentables{$tablename}->{$autocommit} = xCAT::Table->new(@args);
}
if ($opentables{$tablename}->{$autocommit}) {
@ -150,6 +195,12 @@ sub handle_dbc_request {
return $opentables{$tablename}->{$autocommit}->setAttribsWhere(@args);
} elsif ($functionname eq 'delEntries') {
return $opentables{$tablename}->{$autocommit}->delEntries(@args);
} elsif ($functionname eq 'commit') {
return $opentables{$tablename}->{$autocommit}->commit(@args);
} elsif ($functionname eq 'rollback') {
return $opentables{$tablename}->{$autocommit}->rollback(@args);
} else {
die "undefined function $functionname";
}
}
@ -334,7 +385,9 @@ sub new
autocommit => $self->{autocommit},
args=>\@args,
};
dbc_submit($request);
unless (dbc_submit($request)) {
return undef;
}
} else { #direct db access mode
$self->{dbuser}="";
$self->{dbpass}="";
@ -466,7 +519,7 @@ sub new
updateschema($self);
}
} #END DB ACCESS SPECIFIC SECTION
if ($self->{tabname} eq 'nodelist')
{
weaken($self->{nodelist} = $self);
@ -652,6 +705,9 @@ sub addNodeAttribs
sub addAttribs
{
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'addAttribs',@_);
}
my $key = shift;
my $keyval = shift;
my $elems = shift;
@ -728,6 +784,9 @@ sub addAttribs
sub rollback
{
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'rollback',@_);
}
$self->{dbh}->rollback;
}
@ -758,6 +817,9 @@ sub rollback
sub commit
{
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'commit',@_);
}
$self->{dbh}->commit;
}
@ -804,6 +866,9 @@ sub setAttribs
#-Key value
#-Hash reference of column-value pairs to set
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'setAttribs',@_);
}
my $pKeypairs=shift;
my %keypairs = ();
if ($pKeypairs != undef) { %keypairs = %{$pKeypairs}; }
@ -992,6 +1057,9 @@ sub setAttribsWhere
#-Where clause
#-Hash reference of column-value pairs to set
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'setAttribsWhere',@_);
}
my $where_clause = shift;
my $elems = shift;
my $cols = "";
@ -1452,6 +1520,9 @@ sub getNodeAttribs_nosub_returnany
sub getAllEntries
{
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'getAllEntries',@_);
}
my $allentries = shift;
my @rets;
my $query;
@ -1508,6 +1579,9 @@ sub getAllAttribsWhere
#Takes a list of attributes, returns all records in the table.
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'getAllAttribsWhere',@_);
}
my $whereclause = shift;
my @attribs = @_;
my @results = ();
@ -1566,6 +1640,9 @@ sub getAllNodeAttribs
#Extract and substitute every node record, expanding groups and substituting as getNodeAttribs does
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'getAllNodeAttribs',@_);
}
my $attribq = shift;
my $hashretstyle = shift;
my $rethash;
@ -1666,6 +1743,9 @@ sub getAllAttribs
#Takes a list of attributes, returns all records in the table.
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'getAllAttribs',@_);
}
#print "Being asked to dump ".$self->{tabname}."for something\n";
my @attribs = @_;
my @results = ();
@ -1747,6 +1827,9 @@ sub getAllAttribs
sub delEntries
{
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'delEntries',@_);
}
my $keyref = shift;
my %keypairs;
if ($keyref)
@ -1844,6 +1927,9 @@ sub getAttribs
# (recurse argument intended only for internal use.)
# Returns a hash reference with requested attributes defined.
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'getAttribs',@_);
}
#my $key = shift;
#my $keyval = shift;
@ -1991,6 +2077,9 @@ sub getTable
# table. Each array entry contains a pointer to a hash which is
# one row of the table. The row hash is keyed by attribute name.
my $self = shift;
if ($dbworkerpid) {
return dbc_call($self,'getTable',@_);
}
my @return;
my $statement = 'SELECT * FROM ' . $self->{tabname};
my $query = $self->{dbh}->prepare($statement);