diff --git a/confluent_perl/Confluent/Client.pm b/confluent_perl/Confluent/Client.pm new file mode 100644 index 00000000..b2dd08f5 --- /dev/null +++ b/confluent_perl/Confluent/Client.pm @@ -0,0 +1,170 @@ +use strict; +use warnings; + +package Confluent::Client; + +use Confluent::TLV; +use DB_File; +use Digest::SHA qw/sha512_hex/; +use IO::Socket::SSL; +use IO::Socket::UNIX; +use MIME::Base64; +use Net::SSLeay; + +sub get_fingerprint { + my $pem = Net::SSLeay::PEM_get_string_X509(shift); + $pem =~ s/-----BEGIN CERTIFICATE-----//; + $pem =~ s/-----END CERTIFICATE-----//; + return 'sha512$' . sha512_hex(decode_base64($pem)); +} + +sub parse_nettarget { + my $target = shift; + if ($target =~ /^\[(.*)\]:(.*)/) { + return $1, $2; + } elsif ($target =~ /^\[(.*)\]$/) { + return $1, 13001; + } elsif ($target =~ /^(.*):(.*)$/) { + return $1, $2; + } else { + return $target, 13001; + } +} + +sub _verify { + my $self = shift; + my $peername = shift; + my $addfingerprint = shift; + my $coreverified = shift; + if ($coreverified) { + return $coreverified; + } + my %knownhosts; + tie %knownhosts, 'DB_File', glob("~/.confluent/knownhosts"); + my $fingerprint = get_fingerprint($_[3]); + if ($addfingerprint) { + $knownhosts{$peername} = $addfingerprint; + } + if (not $knownhosts{$peername}) { + die "UKNNOWN_FINGERPRINT: fingerprint=>$fingerprint" + } + if ($fingerprint ne $knownhosts{$peername}) { + die "CONFLICT_FINGERPRINT: fingerprint=>$fingerprint"; + } + return 1; +} + +sub ssl_connect { + my $self = shift; + my ($peer, $port) = parse_nettarget(shift); + my %args = @_; + my $addfingerprint = undef; + if ($args{fingerprint}) { + $addfingerprint = $args{fingerprint}; + } + # TODO: support typical X509 style when CA present + my %sslargs = ( + PeerAddr => $peer, + PeerPort => $port, + SSL_verify_mode => SSL_VERIFY_PEER, + SSL_verify_callback => + sub { $self->_verify($port."@".$peer, $addfingerprint, @_); }, + SSL_verifycn_scheme => 'none', + ); + if (1) { # TODO: check for ca location + # we would do 'undef'. However, older IO::Socket::SSL doesn't do + # for now, go ahead and tell it to check in a futile manner for + # certificates before failing into our callback to do knownhosts + # style + $sslargs{SSL_ca_path} = '/'; + } else { + } + $self->{handle} = IO::Socket::SSL->new(%sslargs); + unless ($self->{handle}) { + die "Unable to reach target, $SSL_ERROR/$!"; + } +} + +sub new { + my $proto = shift; + my $class = ref($proto) || $proto; + my $self = {}; + bless($self, $class); + my $serverlocation = shift; + my %args = @_; + if (not $serverlocation) { + $serverlocation = "/var/run/confluent/api.sock"; + } + if (-S $serverlocation) { + $self->{handle} = IO::Socket::UNIX->new($serverlocation); + } else { # assume a remote network connection + $self->{handle} = $self->ssl_connect($serverlocation, @_); + } + unless ($self->{handle}) { + die "General failure connecting $!"; + } + $self->{server} = Confluent::TLV->new($self->{handle}); + my $banner = $self->{server}->recv(); + my $authdata = $self->{server}->recv(); + $self->{authenticated} = 0; + if ($authdata->{authpassed}) { + $self->{authenticated} = 1; + } + if ($args{username} and not $self->{authenticated}) { + $self->authenticate(%args); + } + return $self; +} + +sub authenticate { + my $self = shift; + my %args = @_; + $self->{server}->send({username=>$args{username}, + passphrase=>$args{passphrase}}); + + my $authdata = $self->{server}->recv(); + if ($authdata->{authpassed}) { + $self->{authenticated} = 1; + } +} + +sub read { + my $self = shift; + my $path = shift; + my %args = @_; + return $self->send_request(operation=>'retrieve', path=>$path); +} + +sub send_request { + my $self = shift; + if (not $self->{authenticated}) { + die "not yet authenticated"; + } + if ($self->{pending}) { + die "Cannot submit multiple requests to same object concurrently"; + } + $self->{pending} = 1; + my %args = @_; + my %payload = ( + operation => $args{operation}, + path => $args{path}, + ); + if ($args{parameters}) { + $payload{parameters} = $args{parameters}; + } + $self->{server}->send(\%payload); +} + +sub next_result { + my $self = shift; + unless ($self->{pending}) { + return undef; + } + my $result = $self->{server}->recv(); + if (exists $result->{_requestdone}) { + $self->{pending} = 0; + } + return $result; +} + +1; diff --git a/confluent_perl/Confluent/TLV.pm b/confluent_perl/Confluent/TLV.pm new file mode 100644 index 00000000..aa01e122 --- /dev/null +++ b/confluent_perl/Confluent/TLV.pm @@ -0,0 +1,70 @@ +use strict; +use warnings; + +package Confluent::TLV; + +use JSON; + +sub new { + my $proto = shift; + my $class = ref($proto) || $proto; + my $self = {}; + $self->{handle} = shift; + $self->{json} = JSON->new; + bless($self, $class); + return $self; +} + +sub recv { + my $self = shift; + my $data; + my $tl = ''; + do { + sysread($self->{handle}, $data, 4 - length($tl)); + $tl .= $data; + } while (length($tl) < 4); + $tl = unpack("N", $tl); + if ($tl & (1 << 31)) { + die "Reserved bit used, protocol violated"; + } + my $length = $tl & 16777215; # lower 24 bits only + my $datatype = ($tl >> 24); + my $bytedata = ''; + while (length($bytedata) < $length) { + sysread($self->{handle}, $data, $length - length($bytedata)); + $bytedata .= $data; + } + if ($datatype == 0) { + return $bytedata; + } elsif ($datatype == 1) { + return $self->{json}->utf8->decode($bytedata); + } else { + die "Unexpected data type $datatype"; + } +} + +sub send { + my $self = shift; + my $data = shift; + if (ref $data eq 'HASH') { # Need to do JSON + my $json = $self->{json}->utf8->encode($data); + my $typelength = length($json); + if ($typelength > 16777215) { + die "Data too large"; + } + $typelength |= 16777216; + my $handle = $self->{handle}; + print $handle pack("N", $typelength) . $json; + $self->{handle}->flush(); + } elsif (not ref $data) { # text data + my $typelength = length($data); + if ($typelength > 16777215) { + die "Data too large"; + } + my $handle = $self->{handle}; + print $handle pack("N", $typelength) . $data; + $self->{handle}->flush(); + } +} + +1; diff --git a/confluent_perl/example.pl b/confluent_perl/example.pl new file mode 100644 index 00000000..fd0b5281 --- /dev/null +++ b/confluent_perl/example.pl @@ -0,0 +1,14 @@ +use strict; +use warnings; + +use Confluent::Client; + +my $client = Confluent::Client->new(); +$client->read('/nodes/n1/power/state'); +my $data = $client->next_result(); +while ($data) { + if (exists $data->{state}) { + print $data->{state}->{value} . "\n"; + } + $data = $client->next_result(); +}