-Initial changes in preparation for db worker thread
git-svn-id: https://svn.code.sf.net/p/xcat/code/xcat-core/trunk@3937 8638fb3e-16cb-4fca-ae20-7b5d299a9bcd
This commit is contained in:
parent
8a7d38d30f
commit
d9a5adea77
@ -38,7 +38,121 @@ require xCAT::NotifHandler;
|
||||
|
||||
my $dbworkerpid; #The process id of the database worker
|
||||
my $dbworkersocket;
|
||||
my $dbsockpath;
|
||||
my $dbsockpath = "/var/run/xcat/dbworker.sock";
|
||||
my $exitdbthread;
|
||||
|
||||
|
||||
sub dbc_submit {
|
||||
my $request = shift;
|
||||
my $data = freeze($request);
|
||||
$data.= "ENDOFFREEZEQFVyo4Cj6Q0v";
|
||||
my $clisock = IO::Socket::UNIX->new(PeerAddr => $dbsockpath, Type => SOCK_STREAM, Timeout => 120 );
|
||||
print $clisock $data;
|
||||
$data="";
|
||||
while ($data !~ /ENDOFFREEZEQFVyo4Cj6Q0v/) {
|
||||
$data .= <$clisock>;
|
||||
}
|
||||
return thaw($data);
|
||||
}
|
||||
|
||||
sub init_dbworker {
|
||||
#create a db worker process
|
||||
$dbworkerpid = fork;
|
||||
|
||||
unless (defined $dbworkerpid) {
|
||||
die "Error spawining database worker";
|
||||
}
|
||||
unless ($dbworkerpid) {
|
||||
#This process is the database worker, it's job is to manage database queries to reduce required handles and to permit cross-process caching
|
||||
use File::Path;
|
||||
mkpath('/var/run/xcat/');
|
||||
use IO::Socket;
|
||||
$SIG{TERM} = $SIG{INT} = sub {
|
||||
$exitdbthread=1;
|
||||
$SIG{ALRM} = sub { exit 0; };
|
||||
alarm(10);
|
||||
};
|
||||
unlink($dbsockpath);
|
||||
$dbworkersocket = IO::Socket::UNIX->new(LocalAddr => $dbsockpath, Type => SOCK_STREAM, Listen => 32);
|
||||
my $dbconn;
|
||||
my $currcon;
|
||||
my $clientset = new IO::Select;
|
||||
$clientset->add($dbworkersocket);
|
||||
while (not $exitdbthread) {
|
||||
my @ready_socks = $clientset->can_read;
|
||||
foreach $currcon (@ready_socks) {
|
||||
if ($currcon == $dbworkersocket) { #We have a new connection to register
|
||||
$dbconn = $currcon->accept;
|
||||
if ($dbconn) {
|
||||
$clientset->add($dbconn);
|
||||
}
|
||||
} else {
|
||||
handle_dbc_conn($dbconn,$clientset);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
sub handle_dbc_conn {
|
||||
my $client = shift;
|
||||
my $clientset = shift;
|
||||
my $data;
|
||||
if ($data = <$client>) {
|
||||
while ($data !~ /ENDOFFREEZEQFVyo4Cj6Q0v/) {
|
||||
$data .= <$client>;
|
||||
}
|
||||
my $request = thaw($data);
|
||||
my $response = freeze(handle_dbc_request($request));
|
||||
$response .= 'ENDOFFREEZEQFVyo4Cj6Q0j';
|
||||
print $client $response;
|
||||
} else { #Connection terminated, clean up
|
||||
$clientset->remove($client);
|
||||
close($client);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
my %opentables; #USED ONLY BY THE DB WORKER TO TRACK OPEN DATABASES
|
||||
sub handle_dbc_request {
|
||||
my $request = shift;
|
||||
my $functionname = $request->{function};
|
||||
my $tablename = $request->{tablename};
|
||||
my @args = @{$request->{args}};
|
||||
my $autocommit = $request->{autocommit};
|
||||
if ($functionname eq 'new') {
|
||||
unless ($opentables{$tablename}->{$autocommit}) {
|
||||
$opentables{$tablename}->{$autocommit} = xCAT::Table->new(@args);
|
||||
}
|
||||
if ($opentables{$tablename}->{$autocommit}) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
} elsif ($functionname eq 'getAllAttribs') {
|
||||
return $opentables{$tablename}->{$autocommit}->getAllAttribs(@args);
|
||||
} elsif ($functionname eq 'getAttribs') {
|
||||
return $opentables{$tablename}->{$autocommit}->getAttribs(@args);
|
||||
} elsif ($functionname eq 'getTable') {
|
||||
return $opentables{$tablename}->{$autocommit}->getTable(@args);
|
||||
} elsif ($functionname eq 'getAllNodeAttribs') {
|
||||
return $opentables{$tablename}->{$autocommit}->getAllNodeAttribs(@args);
|
||||
} elsif ($functionname eq 'getAllEntries') {
|
||||
return $opentables{$tablename}->{$autocommit}->getAllEntries(@args);
|
||||
} elsif ($functionname eq 'getAllAttribsWhere') {
|
||||
return $opentables{$tablename}->{$autocommit}->getAllAttribsWhere(@args);
|
||||
} elsif ($functionname eq 'addAttribs') {
|
||||
return $opentables{$tablename}->{$autocommit}->addAttribs(@args);
|
||||
} elsif ($functionname eq 'setAttribs') {
|
||||
return $opentables{$tablename}->{$autocommit}->setAttribs(@args);
|
||||
} elsif ($functionname eq 'setAttribsWhere') {
|
||||
return $opentables{$tablename}->{$autocommit}->setAttribsWhere(@args);
|
||||
} elsif ($functionname eq 'delEntries') {
|
||||
return $opentables{$tablename}->{$autocommit}->delEntries(@args);
|
||||
}
|
||||
}
|
||||
|
||||
#--------------------------------------------------------------------------------
|
||||
|
||||
=head1 xCAT::Table
|
||||
@ -193,10 +307,10 @@ sub buildcreatestmt
|
||||
#--------------------------------------------------------------------------------
|
||||
sub new
|
||||
{
|
||||
|
||||
#Constructor takes table name as argument
|
||||
#Also takes a true/false value, or assumes 0. If something true is passed, create table
|
||||
#is requested
|
||||
my @args = @_;
|
||||
my $self = {};
|
||||
my $proto = shift;
|
||||
$self->{tabname} = shift;
|
||||
@ -208,143 +322,151 @@ sub new
|
||||
my $create = 1;
|
||||
if (exists($otherargs{'-create'}) && ($otherargs{'-create'}==0)) {$create = 0;}
|
||||
$self->{autocommit} = $otherargs{'-autocommit'};
|
||||
|
||||
unless (defined($self->{autocommit}))
|
||||
{
|
||||
$self->{autocommit} = 1;
|
||||
}
|
||||
|
||||
my $class = ref($proto) || $proto;
|
||||
$self->{dbuser}="";
|
||||
$self->{dbpass}="";
|
||||
if ($dbworkerpid) {
|
||||
my $request = {
|
||||
function => "new",
|
||||
tablename => $self->{tabname},
|
||||
autocommit => $self->{autocommit},
|
||||
args=>\@args,
|
||||
};
|
||||
dbc_submit($request);
|
||||
} else { #direct db access mode
|
||||
$self->{dbuser}="";
|
||||
$self->{dbpass}="";
|
||||
|
||||
my $xcatcfg = (defined $ENV{'XCATCFG'} ? $ENV{'XCATCFG'} : '');
|
||||
unless ($xcatcfg) {
|
||||
if (-r "/etc/xcat/cfgloc") {
|
||||
my $cfgl;
|
||||
open($cfgl,"<","/etc/xcat/cfgloc");
|
||||
$xcatcfg = <$cfgl>;
|
||||
close($cfgl);
|
||||
chomp($xcatcfg);
|
||||
$ENV{'XCATCFG'}=$xcatcfg; #Store it in env to avoid many file reads
|
||||
}
|
||||
}
|
||||
if ($xcatcfg =~ /^$/)
|
||||
{
|
||||
if (-d "/opt/xcat/cfg")
|
||||
{
|
||||
$xcatcfg = "SQLite:/opt/xcat/cfg";
|
||||
}
|
||||
else
|
||||
{
|
||||
if (-d "/etc/xcat")
|
||||
{
|
||||
$xcatcfg = "SQLite:/etc/xcat";
|
||||
my $xcatcfg = (defined $ENV{'XCATCFG'} ? $ENV{'XCATCFG'} : '');
|
||||
unless ($xcatcfg) {
|
||||
if (-r "/etc/xcat/cfgloc") {
|
||||
my $cfgl;
|
||||
open($cfgl,"<","/etc/xcat/cfgloc");
|
||||
$xcatcfg = <$cfgl>;
|
||||
close($cfgl);
|
||||
chomp($xcatcfg);
|
||||
$ENV{'XCATCFG'}=$xcatcfg; #Store it in env to avoid many file reads
|
||||
}
|
||||
}
|
||||
}
|
||||
($xcatcfg =~ /^$/) && die "Can't locate xCAT configuration";
|
||||
unless ($xcatcfg =~ /:/)
|
||||
{
|
||||
$xcatcfg = "SQLite:" . $xcatcfg;
|
||||
}
|
||||
if ($xcatcfg =~ /^SQLite:/)
|
||||
{
|
||||
$self->{backend_type} = 'sqlite';
|
||||
my @path = split(':', $xcatcfg, 2);
|
||||
unless (-e $path[1] . "/" . $self->{tabname} . ".sqlite" || $create)
|
||||
if ($xcatcfg =~ /^$/)
|
||||
{
|
||||
return undef;
|
||||
}
|
||||
$self->{connstring} =
|
||||
"dbi:" . $xcatcfg . "/" . $self->{tabname} . ".sqlite";
|
||||
}
|
||||
elsif ($xcatcfg =~ /^CSV:/)
|
||||
{
|
||||
$self->{backend_type} = 'csv';
|
||||
$xcatcfg =~ m/^.*?:(.*)$/;
|
||||
my $path = $1;
|
||||
$self->{connstring} = "dbi:CSV:f_dir=" . $path;
|
||||
}
|
||||
else #Generic DBI
|
||||
{
|
||||
($self->{connstring},$self->{dbuser},$self->{dbpass}) = split(/\|/,$xcatcfg);
|
||||
$self->{connstring} =~ s/^dbi://;
|
||||
$self->{connstring} =~ s/^/dbi:/;
|
||||
#return undef;
|
||||
}
|
||||
my $oldumask= umask 0077;
|
||||
unless ($::XCAT_DBHS->{$self->{connstring},$self->{dbuser},$self->{dbpass},$self->{autocommit}}) { #= $self->{tabname};
|
||||
$::XCAT_DBHS->{$self->{connstring},$self->{dbuser},$self->{dbpass},$self->{autocommit}} =
|
||||
DBI->connect($self->{connstring}, $self->{dbuser}, $self->{dbpass}, {AutoCommit => $self->{autocommit}});
|
||||
}
|
||||
umask $oldumask;
|
||||
|
||||
$self->{dbh} = $::XCAT_DBHS->{$self->{connstring},$self->{dbuser},$self->{dbpass},$self->{autocommit}};
|
||||
#DBI->connect($self->{connstring}, $self->{dbuser}, $self->{dbpass}, {AutoCommit => $autocommit});
|
||||
if ($xcatcfg =~ /^SQLite:/)
|
||||
{
|
||||
my $dbexistq =
|
||||
"SELECT name from sqlite_master WHERE type='table' and name = ?";
|
||||
my $sth = $self->{dbh}->prepare($dbexistq);
|
||||
$sth->execute($self->{tabname});
|
||||
my $result = $sth->fetchrow();
|
||||
$sth->finish;
|
||||
unless (defined $result)
|
||||
{
|
||||
if ($create)
|
||||
if (-d "/opt/xcat/cfg")
|
||||
{
|
||||
my $str =
|
||||
buildcreatestmt($self->{tabname},
|
||||
$xCAT::Schema::tabspec{$self->{tabname}},
|
||||
$xcatcfg);
|
||||
$self->{dbh}->do($str);
|
||||
$xcatcfg = "SQLite:/opt/xcat/cfg";
|
||||
}
|
||||
else
|
||||
{
|
||||
if (-d "/etc/xcat")
|
||||
{
|
||||
$xcatcfg = "SQLite:/etc/xcat";
|
||||
}
|
||||
}
|
||||
else { return undef; }
|
||||
}
|
||||
}
|
||||
elsif ($xcatcfg =~ /^CSV:/)
|
||||
{
|
||||
$self->{dbh}->{'csv_tables'}->{$self->{tabname}} =
|
||||
{'file' => $self->{tabname} . ".csv"};
|
||||
$xcatcfg =~ m/^.*?:(.*)$/;
|
||||
my $path = $1;
|
||||
if (!-e $path . "/" . $self->{tabname} . ".csv")
|
||||
($xcatcfg =~ /^$/) && die "Can't locate xCAT configuration";
|
||||
unless ($xcatcfg =~ /:/)
|
||||
{
|
||||
unless ($create)
|
||||
$xcatcfg = "SQLite:" . $xcatcfg;
|
||||
}
|
||||
if ($xcatcfg =~ /^SQLite:/)
|
||||
{
|
||||
$self->{backend_type} = 'sqlite';
|
||||
my @path = split(':', $xcatcfg, 2);
|
||||
unless (-e $path[1] . "/" . $self->{tabname} . ".sqlite" || $create)
|
||||
{
|
||||
return undef;
|
||||
}
|
||||
$self->{connstring} =
|
||||
"dbi:" . $xcatcfg . "/" . $self->{tabname} . ".sqlite";
|
||||
}
|
||||
elsif ($xcatcfg =~ /^CSV:/)
|
||||
{
|
||||
$self->{backend_type} = 'csv';
|
||||
$xcatcfg =~ m/^.*?:(.*)$/;
|
||||
my $path = $1;
|
||||
$self->{connstring} = "dbi:CSV:f_dir=" . $path;
|
||||
}
|
||||
else #Generic DBI
|
||||
{
|
||||
($self->{connstring},$self->{dbuser},$self->{dbpass}) = split(/\|/,$xcatcfg);
|
||||
$self->{connstring} =~ s/^dbi://;
|
||||
$self->{connstring} =~ s/^/dbi:/;
|
||||
#return undef;
|
||||
}
|
||||
my $oldumask= umask 0077;
|
||||
unless ($::XCAT_DBHS->{$self->{connstring},$self->{dbuser},$self->{dbpass},$self->{autocommit}}) { #= $self->{tabname};
|
||||
$::XCAT_DBHS->{$self->{connstring},$self->{dbuser},$self->{dbpass},$self->{autocommit}} =
|
||||
DBI->connect($self->{connstring}, $self->{dbuser}, $self->{dbpass}, {AutoCommit => $self->{autocommit}});
|
||||
}
|
||||
umask $oldumask;
|
||||
|
||||
$self->{dbh} = $::XCAT_DBHS->{$self->{connstring},$self->{dbuser},$self->{dbpass},$self->{autocommit}};
|
||||
#DBI->connect($self->{connstring}, $self->{dbuser}, $self->{dbpass}, {AutoCommit => $autocommit});
|
||||
if ($xcatcfg =~ /^SQLite:/)
|
||||
{
|
||||
my $dbexistq =
|
||||
"SELECT name from sqlite_master WHERE type='table' and name = ?";
|
||||
my $sth = $self->{dbh}->prepare($dbexistq);
|
||||
$sth->execute($self->{tabname});
|
||||
my $result = $sth->fetchrow();
|
||||
$sth->finish;
|
||||
unless (defined $result)
|
||||
{
|
||||
if ($create)
|
||||
{
|
||||
my $str =
|
||||
buildcreatestmt($self->{tabname},
|
||||
$xCAT::Schema::tabspec{$self->{tabname}},
|
||||
$xcatcfg);
|
||||
$self->{dbh}->do($str);
|
||||
}
|
||||
else { return undef; }
|
||||
}
|
||||
}
|
||||
elsif ($xcatcfg =~ /^CSV:/)
|
||||
{
|
||||
$self->{dbh}->{'csv_tables'}->{$self->{tabname}} =
|
||||
{'file' => $self->{tabname} . ".csv"};
|
||||
$xcatcfg =~ m/^.*?:(.*)$/;
|
||||
my $path = $1;
|
||||
if (!-e $path . "/" . $self->{tabname} . ".csv")
|
||||
{
|
||||
unless ($create)
|
||||
{
|
||||
return undef;
|
||||
}
|
||||
my $str =
|
||||
buildcreatestmt($self->{tabname},
|
||||
$xCAT::Schema::tabspec{$self->{tabname}},
|
||||
$xcatcfg);
|
||||
$self->{dbh}->do($str);
|
||||
}
|
||||
} else { #generic DBI
|
||||
my $tbexistq = $self->{dbh}->table_info('','',$self->{tabname},'TABLE');
|
||||
my $found = 0;
|
||||
while (my $data = $tbexistq->fetchrow_hashref) {
|
||||
if ($data->{'TABLE_NAME'} =~ /^\"?$self->{tabname}\"?\z/) {
|
||||
$found = 1;
|
||||
last;
|
||||
}
|
||||
}
|
||||
unless ($found) {
|
||||
unless ($create)
|
||||
{
|
||||
return undef;
|
||||
}
|
||||
my $str =
|
||||
buildcreatestmt($self->{tabname},
|
||||
$xCAT::Schema::tabspec{$self->{tabname}},
|
||||
$xcatcfg);
|
||||
buildcreatestmt($self->{tabname},
|
||||
$xCAT::Schema::tabspec{$self->{tabname}},
|
||||
$xcatcfg);
|
||||
$self->{dbh}->do($str);
|
||||
}
|
||||
} else { #generic DBI
|
||||
my $tbexistq = $self->{dbh}->table_info('','',$self->{tabname},'TABLE');
|
||||
my $found = 0;
|
||||
while (my $data = $tbexistq->fetchrow_hashref) {
|
||||
if ($data->{'TABLE_NAME'} =~ /^\"?$self->{tabname}\"?\z/) {
|
||||
$found = 1;
|
||||
last;
|
||||
}
|
||||
}
|
||||
unless ($found) {
|
||||
unless ($create)
|
||||
{
|
||||
return undef;
|
||||
}
|
||||
my $str =
|
||||
buildcreatestmt($self->{tabname},
|
||||
$xCAT::Schema::tabspec{$self->{tabname}},
|
||||
$xcatcfg);
|
||||
$self->{dbh}->do($str);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
updateschema($self);
|
||||
updateschema($self);
|
||||
}
|
||||
if ($self->{tabname} eq 'nodelist')
|
||||
{
|
||||
weaken($self->{nodelist} = $self);
|
||||
@ -1953,11 +2075,12 @@ sub close
|
||||
=cut
|
||||
|
||||
#--------------------------------------------------------------------------------
|
||||
sub open
|
||||
{
|
||||
my $self = shift;
|
||||
$self->{dbh} = DBI->connect($self->{connstring}, "", "");
|
||||
}
|
||||
#UNSUED FUNCTION
|
||||
#sub open
|
||||
#{
|
||||
# my $self = shift;
|
||||
# $self->{dbh} = DBI->connect($self->{connstring}, "", "");
|
||||
#}
|
||||
|
||||
#--------------------------------------------------------------------------
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user