2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-23 18:11:15 +00:00
confluent/confluent_perl/Confluent/Client.pm
Jarrod Johnson f525f054ba Add support for shell based plugins
A plugin that ends in '.sh' is currently assumed to be a console plugin and is executed once.
2014-05-27 15:02:53 -04:00

205 lines
5.4 KiB
Perl

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
use strict;
use warnings;
package Confluent::Client;
use Confluent::TLV;
use DB_File;
use Digest::SHA qw/sha512_hex/;
use IO::Socket::SSL;
use IO::Socket::UNIX;
use MIME::Base64;
use Net::SSLeay;
sub get_fingerprint {
my $pem = Net::SSLeay::PEM_get_string_X509(shift);
$pem =~ s/-----BEGIN CERTIFICATE-----//;
$pem =~ s/-----END CERTIFICATE-----//;
return 'sha512$' . sha512_hex(decode_base64($pem));
}
sub parse_nettarget {
my $target = shift;
if ($target =~ /^\[(.*)\]:(.*)/) {
return $1, $2;
} elsif ($target =~ /^\[(.*)\]$/) {
return $1, 13001;
} elsif ($target =~ /^(.*):(.*)$/) {
return $1, $2;
} else {
return $target, 13001;
}
}
sub _verify {
my $self = shift;
my $peername = shift;
my $addfingerprint = shift;
my $coreverified = shift;
if ($coreverified) {
return $coreverified;
}
my %knownhosts;
tie %knownhosts, 'DB_File', glob("~/.confluent/knownhosts");
my $fingerprint = get_fingerprint($_[3]);
if ($addfingerprint) {
$knownhosts{$peername} = $addfingerprint;
}
if (not $knownhosts{$peername}) {
die "UKNNOWN_FINGERPRINT: fingerprint=>$fingerprint"
}
if ($fingerprint ne $knownhosts{$peername}) {
die "CONFLICT_FINGERPRINT: fingerprint=>$fingerprint";
}
return 1;
}
sub ssl_connect {
my $self = shift;
my ($peer, $port) = parse_nettarget(shift);
my %args = @_;
my $addfingerprint = undef;
if ($args{fingerprint}) {
$addfingerprint = $args{fingerprint};
}
# TODO: support typical X509 style when CA present
my %sslargs = (
PeerAddr => $peer,
PeerPort => $port,
SSL_verify_mode => SSL_VERIFY_PEER,
SSL_verify_callback =>
sub { $self->_verify($port."@".$peer, $addfingerprint, @_); },
SSL_verifycn_scheme => 'none',
);
if (1) { # TODO: check for ca location
# we would do 'undef'. However, older IO::Socket::SSL doesn't do
# for now, go ahead and tell it to check in a futile manner for
# certificates before failing into our callback to do knownhosts
# style
$sslargs{SSL_ca_path} = '/';
} else {
}
$self->{handle} = IO::Socket::SSL->new(%sslargs);
unless ($self->{handle}) {
die "Unable to reach target, $SSL_ERROR/$!";
}
}
sub new {
my $proto = shift;
my $class = ref($proto) || $proto;
my $self = {};
bless($self, $class);
my $serverlocation = shift;
my %args = @_;
if (not $serverlocation) {
$serverlocation = "/var/run/confluent/api.sock";
}
if (-S $serverlocation) {
$self->{handle} = IO::Socket::UNIX->new($serverlocation);
} else { # assume a remote network connection
$self->{handle} = $self->ssl_connect($serverlocation, @_);
}
unless ($self->{handle}) {
die "General failure connecting $!";
}
$self->{server} = Confluent::TLV->new($self->{handle});
my $banner = $self->{server}->recv();
my $authdata = $self->{server}->recv();
$self->{authenticated} = 0;
if ($authdata->{authpassed}) {
$self->{authenticated} = 1;
}
if ($args{username} and not $self->{authenticated}) {
$self->authenticate(%args);
}
return $self;
}
sub authenticate {
my $self = shift;
my %args = @_;
$self->{server}->send({username=>$args{username},
passphrase=>$args{passphrase}});
my $authdata = $self->{server}->recv();
if ($authdata->{authpassed}) {
$self->{authenticated} = 1;
}
}
sub create {
my $self = shift;
my $path = shift;
return $self->send_request(operation=>'create', path=>$path, @_);
}
sub update {
my $self = shift;
my $path = shift;
return $self->send_request(operation=>'update', path=>$path, @_);
}
sub read {
my $self = shift;
my $path = shift;
my %args = @_;
return $self->send_request(operation=>'retrieve', path=>$path);
}
sub delete {
my $self = shift;
my $path = shift;
my %args = @_;
return $self->send_request(operation=>'delete', path=>$path);
}
sub send_request {
my $self = shift;
if (not $self->{authenticated}) {
die "not yet authenticated";
}
if ($self->{pending}) {
die "Cannot submit multiple requests to same object concurrently";
}
$self->{pending} = 1;
my %args = @_;
my %payload = (
operation => $args{operation},
path => $args{path},
);
if ($args{parameters}) {
$payload{parameters} = $args{parameters};
}
$self->{server}->send(\%payload);
}
sub next_result {
my $self = shift;
unless ($self->{pending}) {
return undef;
}
my $result = $self->{server}->recv();
if (exists $result->{_requestdone}) {
$self->{pending} = 0;
}
return $result;
}
1;