# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2014 IBM Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. use strict; use warnings; package Confluent::Client; use Confluent::TLV; use DB_File; use Digest::SHA qw/sha512_hex/; use IO::Socket::SSL; use IO::Socket::UNIX; use MIME::Base64; use Net::SSLeay; sub get_fingerprint { my $pem = Net::SSLeay::PEM_get_string_X509(shift); $pem =~ s/-----BEGIN CERTIFICATE-----//; $pem =~ s/-----END CERTIFICATE-----//; return 'sha512$' . sha512_hex(decode_base64($pem)); } sub parse_nettarget { my $target = shift; if ($target =~ /^\[(.*)\]:(.*)/) { return $1, $2; } elsif ($target =~ /^\[(.*)\]$/) { return $1, 13001; } elsif ($target =~ /^(.*):(.*)$/) { return $1, $2; } else { return $target, 13001; } } sub _verify { my $self = shift; my $peername = shift; my $addfingerprint = shift; my $coreverified = shift; if ($coreverified) { return $coreverified; } my %knownhosts; tie %knownhosts, 'DB_File', glob("~/.confluent/knownhosts"); my $fingerprint = get_fingerprint($_[3]); if ($addfingerprint) { $knownhosts{$peername} = $addfingerprint; } if (not $knownhosts{$peername}) { die "UKNNOWN_FINGERPRINT: fingerprint=>$fingerprint" } if ($fingerprint ne $knownhosts{$peername}) { die "CONFLICT_FINGERPRINT: fingerprint=>$fingerprint"; } return 1; } sub ssl_connect { my $self = shift; my ($peer, $port) = parse_nettarget(shift); my %args = @_; my $addfingerprint = undef; if ($args{fingerprint}) { $addfingerprint = $args{fingerprint}; } # TODO: support typical X509 style when CA present my %sslargs = ( PeerAddr => $peer, PeerPort => $port, SSL_verify_mode => SSL_VERIFY_PEER, SSL_verify_callback => sub { $self->_verify($port."@".$peer, $addfingerprint, @_); }, SSL_verifycn_scheme => 'none', ); if (1) { # TODO: check for ca location # we would do 'undef'. However, older IO::Socket::SSL doesn't do # for now, go ahead and tell it to check in a futile manner for # certificates before failing into our callback to do knownhosts # style $sslargs{SSL_ca_path} = '/'; } else { } $self->{handle} = IO::Socket::SSL->new(%sslargs); unless ($self->{handle}) { die "Unable to reach target, $SSL_ERROR/$!"; } } sub new { my $proto = shift; my $class = ref($proto) || $proto; my $self = {}; bless($self, $class); my $serverlocation = shift; my %args = @_; if (not $serverlocation) { $serverlocation = "/var/run/confluent/api.sock"; } if (-S $serverlocation) { $self->{handle} = IO::Socket::UNIX->new($serverlocation); } else { # assume a remote network connection $self->{handle} = $self->ssl_connect($serverlocation, @_); } unless ($self->{handle}) { die "General failure connecting $!"; } $self->{server} = Confluent::TLV->new($self->{handle}); my $banner = $self->{server}->recv(); my $authdata = $self->{server}->recv(); $self->{authenticated} = 0; if ($authdata->{authpassed}) { $self->{authenticated} = 1; } if ($args{username} and not $self->{authenticated}) { $self->authenticate(%args); } return $self; } sub authenticate { my $self = shift; my %args = @_; $self->{server}->send({username=>$args{username}, passphrase=>$args{passphrase}}); my $authdata = $self->{server}->recv(); if ($authdata->{authpassed}) { $self->{authenticated} = 1; } } sub create { my $self = shift; my $path = shift; return $self->send_request(operation=>'create', path=>$path, @_); } sub update { my $self = shift; my $path = shift; return $self->send_request(operation=>'update', path=>$path, @_); } sub read { my $self = shift; my $path = shift; my %args = @_; return $self->send_request(operation=>'retrieve', path=>$path); } sub delete { my $self = shift; my $path = shift; my %args = @_; return $self->send_request(operation=>'delete', path=>$path); } sub send_request { my $self = shift; if (not $self->{authenticated}) { die "not yet authenticated"; } if ($self->{pending}) { die "Cannot submit multiple requests to same object concurrently"; } $self->{pending} = 1; my %args = @_; my %payload = ( operation => $args{operation}, path => $args{path}, ); if ($args{parameters}) { $payload{parameters} = $args{parameters}; } $self->{server}->send(\%payload); } sub next_result { my $self = shift; unless ($self->{pending}) { return undef; } my $result = $self->{server}->recv(); if (exists $result->{_requestdone}) { $self->{pending} = 0; } return $result; } 1;