2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-21 17:11:58 +00:00

Add perl client library for confluent

For applications wishing to use perl to script to confluent,
this library facilitates that.
This commit is contained in:
Jarrod Johnson 2014-05-20 13:37:27 -04:00
parent 38f07252f8
commit 36edb24b56
3 changed files with 254 additions and 0 deletions

View File

@ -0,0 +1,170 @@
use strict;
use warnings;
package Confluent::Client;
use Confluent::TLV;
use DB_File;
use Digest::SHA qw/sha512_hex/;
use IO::Socket::SSL;
use IO::Socket::UNIX;
use MIME::Base64;
use Net::SSLeay;
sub get_fingerprint {
my $pem = Net::SSLeay::PEM_get_string_X509(shift);
$pem =~ s/-----BEGIN CERTIFICATE-----//;
$pem =~ s/-----END CERTIFICATE-----//;
return 'sha512$' . sha512_hex(decode_base64($pem));
}
sub parse_nettarget {
my $target = shift;
if ($target =~ /^\[(.*)\]:(.*)/) {
return $1, $2;
} elsif ($target =~ /^\[(.*)\]$/) {
return $1, 13001;
} elsif ($target =~ /^(.*):(.*)$/) {
return $1, $2;
} else {
return $target, 13001;
}
}
sub _verify {
my $self = shift;
my $peername = shift;
my $addfingerprint = shift;
my $coreverified = shift;
if ($coreverified) {
return $coreverified;
}
my %knownhosts;
tie %knownhosts, 'DB_File', glob("~/.confluent/knownhosts");
my $fingerprint = get_fingerprint($_[3]);
if ($addfingerprint) {
$knownhosts{$peername} = $addfingerprint;
}
if (not $knownhosts{$peername}) {
die "UKNNOWN_FINGERPRINT: fingerprint=>$fingerprint"
}
if ($fingerprint ne $knownhosts{$peername}) {
die "CONFLICT_FINGERPRINT: fingerprint=>$fingerprint";
}
return 1;
}
sub ssl_connect {
my $self = shift;
my ($peer, $port) = parse_nettarget(shift);
my %args = @_;
my $addfingerprint = undef;
if ($args{fingerprint}) {
$addfingerprint = $args{fingerprint};
}
# TODO: support typical X509 style when CA present
my %sslargs = (
PeerAddr => $peer,
PeerPort => $port,
SSL_verify_mode => SSL_VERIFY_PEER,
SSL_verify_callback =>
sub { $self->_verify($port."@".$peer, $addfingerprint, @_); },
SSL_verifycn_scheme => 'none',
);
if (1) { # TODO: check for ca location
# we would do 'undef'. However, older IO::Socket::SSL doesn't do
# for now, go ahead and tell it to check in a futile manner for
# certificates before failing into our callback to do knownhosts
# style
$sslargs{SSL_ca_path} = '/';
} else {
}
$self->{handle} = IO::Socket::SSL->new(%sslargs);
unless ($self->{handle}) {
die "Unable to reach target, $SSL_ERROR/$!";
}
}
sub new {
my $proto = shift;
my $class = ref($proto) || $proto;
my $self = {};
bless($self, $class);
my $serverlocation = shift;
my %args = @_;
if (not $serverlocation) {
$serverlocation = "/var/run/confluent/api.sock";
}
if (-S $serverlocation) {
$self->{handle} = IO::Socket::UNIX->new($serverlocation);
} else { # assume a remote network connection
$self->{handle} = $self->ssl_connect($serverlocation, @_);
}
unless ($self->{handle}) {
die "General failure connecting $!";
}
$self->{server} = Confluent::TLV->new($self->{handle});
my $banner = $self->{server}->recv();
my $authdata = $self->{server}->recv();
$self->{authenticated} = 0;
if ($authdata->{authpassed}) {
$self->{authenticated} = 1;
}
if ($args{username} and not $self->{authenticated}) {
$self->authenticate(%args);
}
return $self;
}
sub authenticate {
my $self = shift;
my %args = @_;
$self->{server}->send({username=>$args{username},
passphrase=>$args{passphrase}});
my $authdata = $self->{server}->recv();
if ($authdata->{authpassed}) {
$self->{authenticated} = 1;
}
}
sub read {
my $self = shift;
my $path = shift;
my %args = @_;
return $self->send_request(operation=>'retrieve', path=>$path);
}
sub send_request {
my $self = shift;
if (not $self->{authenticated}) {
die "not yet authenticated";
}
if ($self->{pending}) {
die "Cannot submit multiple requests to same object concurrently";
}
$self->{pending} = 1;
my %args = @_;
my %payload = (
operation => $args{operation},
path => $args{path},
);
if ($args{parameters}) {
$payload{parameters} = $args{parameters};
}
$self->{server}->send(\%payload);
}
sub next_result {
my $self = shift;
unless ($self->{pending}) {
return undef;
}
my $result = $self->{server}->recv();
if (exists $result->{_requestdone}) {
$self->{pending} = 0;
}
return $result;
}
1;

View File

@ -0,0 +1,70 @@
use strict;
use warnings;
package Confluent::TLV;
use JSON;
sub new {
my $proto = shift;
my $class = ref($proto) || $proto;
my $self = {};
$self->{handle} = shift;
$self->{json} = JSON->new;
bless($self, $class);
return $self;
}
sub recv {
my $self = shift;
my $data;
my $tl = '';
do {
sysread($self->{handle}, $data, 4 - length($tl));
$tl .= $data;
} while (length($tl) < 4);
$tl = unpack("N", $tl);
if ($tl & (1 << 31)) {
die "Reserved bit used, protocol violated";
}
my $length = $tl & 16777215; # lower 24 bits only
my $datatype = ($tl >> 24);
my $bytedata = '';
while (length($bytedata) < $length) {
sysread($self->{handle}, $data, $length - length($bytedata));
$bytedata .= $data;
}
if ($datatype == 0) {
return $bytedata;
} elsif ($datatype == 1) {
return $self->{json}->utf8->decode($bytedata);
} else {
die "Unexpected data type $datatype";
}
}
sub send {
my $self = shift;
my $data = shift;
if (ref $data eq 'HASH') { # Need to do JSON
my $json = $self->{json}->utf8->encode($data);
my $typelength = length($json);
if ($typelength > 16777215) {
die "Data too large";
}
$typelength |= 16777216;
my $handle = $self->{handle};
print $handle pack("N", $typelength) . $json;
$self->{handle}->flush();
} elsif (not ref $data) { # text data
my $typelength = length($data);
if ($typelength > 16777215) {
die "Data too large";
}
my $handle = $self->{handle};
print $handle pack("N", $typelength) . $data;
$self->{handle}->flush();
}
}
1;

14
confluent_perl/example.pl Normal file
View File

@ -0,0 +1,14 @@
use strict;
use warnings;
use Confluent::Client;
my $client = Confluent::Client->new();
$client->read('/nodes/n1/power/state');
my $data = $client->next_result();
while ($data) {
if (exists $data->{state}) {
print $data->{state}->{value} . "\n";
}
$data = $client->next_result();
}