mirror of
https://github.com/xcat2/confluent.git
synced 2025-10-23 23:45:40 +00:00
A plugin that ends in '.sh' is currently assumed to be a console plugin and is executed once.
205 lines
5.4 KiB
Perl
205 lines
5.4 KiB
Perl
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2014 IBM Corporation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
use strict;
|
|
use warnings;
|
|
|
|
package Confluent::Client;
|
|
|
|
use Confluent::TLV;
|
|
use DB_File;
|
|
use Digest::SHA qw/sha512_hex/;
|
|
use IO::Socket::SSL;
|
|
use IO::Socket::UNIX;
|
|
use MIME::Base64;
|
|
use Net::SSLeay;
|
|
|
|
sub get_fingerprint {
|
|
my $pem = Net::SSLeay::PEM_get_string_X509(shift);
|
|
$pem =~ s/-----BEGIN CERTIFICATE-----//;
|
|
$pem =~ s/-----END CERTIFICATE-----//;
|
|
return 'sha512$' . sha512_hex(decode_base64($pem));
|
|
}
|
|
|
|
sub parse_nettarget {
|
|
my $target = shift;
|
|
if ($target =~ /^\[(.*)\]:(.*)/) {
|
|
return $1, $2;
|
|
} elsif ($target =~ /^\[(.*)\]$/) {
|
|
return $1, 13001;
|
|
} elsif ($target =~ /^(.*):(.*)$/) {
|
|
return $1, $2;
|
|
} else {
|
|
return $target, 13001;
|
|
}
|
|
}
|
|
|
|
sub _verify {
|
|
my $self = shift;
|
|
my $peername = shift;
|
|
my $addfingerprint = shift;
|
|
my $coreverified = shift;
|
|
if ($coreverified) {
|
|
return $coreverified;
|
|
}
|
|
my %knownhosts;
|
|
tie %knownhosts, 'DB_File', glob("~/.confluent/knownhosts");
|
|
my $fingerprint = get_fingerprint($_[3]);
|
|
if ($addfingerprint) {
|
|
$knownhosts{$peername} = $addfingerprint;
|
|
}
|
|
if (not $knownhosts{$peername}) {
|
|
die "UKNNOWN_FINGERPRINT: fingerprint=>$fingerprint"
|
|
}
|
|
if ($fingerprint ne $knownhosts{$peername}) {
|
|
die "CONFLICT_FINGERPRINT: fingerprint=>$fingerprint";
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
sub ssl_connect {
|
|
my $self = shift;
|
|
my ($peer, $port) = parse_nettarget(shift);
|
|
my %args = @_;
|
|
my $addfingerprint = undef;
|
|
if ($args{fingerprint}) {
|
|
$addfingerprint = $args{fingerprint};
|
|
}
|
|
# TODO: support typical X509 style when CA present
|
|
my %sslargs = (
|
|
PeerAddr => $peer,
|
|
PeerPort => $port,
|
|
SSL_verify_mode => SSL_VERIFY_PEER,
|
|
SSL_verify_callback =>
|
|
sub { $self->_verify($port."@".$peer, $addfingerprint, @_); },
|
|
SSL_verifycn_scheme => 'none',
|
|
);
|
|
if (1) { # TODO: check for ca location
|
|
# we would do 'undef'. However, older IO::Socket::SSL doesn't do
|
|
# for now, go ahead and tell it to check in a futile manner for
|
|
# certificates before failing into our callback to do knownhosts
|
|
# style
|
|
$sslargs{SSL_ca_path} = '/';
|
|
} else {
|
|
}
|
|
$self->{handle} = IO::Socket::SSL->new(%sslargs);
|
|
unless ($self->{handle}) {
|
|
die "Unable to reach target, $SSL_ERROR/$!";
|
|
}
|
|
}
|
|
|
|
sub new {
|
|
my $proto = shift;
|
|
my $class = ref($proto) || $proto;
|
|
my $self = {};
|
|
bless($self, $class);
|
|
my $serverlocation = shift;
|
|
my %args = @_;
|
|
if (not $serverlocation) {
|
|
$serverlocation = "/var/run/confluent/api.sock";
|
|
}
|
|
if (-S $serverlocation) {
|
|
$self->{handle} = IO::Socket::UNIX->new($serverlocation);
|
|
} else { # assume a remote network connection
|
|
$self->{handle} = $self->ssl_connect($serverlocation, @_);
|
|
}
|
|
unless ($self->{handle}) {
|
|
die "General failure connecting $!";
|
|
}
|
|
$self->{server} = Confluent::TLV->new($self->{handle});
|
|
my $banner = $self->{server}->recv();
|
|
my $authdata = $self->{server}->recv();
|
|
$self->{authenticated} = 0;
|
|
if ($authdata->{authpassed}) {
|
|
$self->{authenticated} = 1;
|
|
}
|
|
if ($args{username} and not $self->{authenticated}) {
|
|
$self->authenticate(%args);
|
|
}
|
|
return $self;
|
|
}
|
|
|
|
sub authenticate {
|
|
my $self = shift;
|
|
my %args = @_;
|
|
$self->{server}->send({username=>$args{username},
|
|
passphrase=>$args{passphrase}});
|
|
|
|
my $authdata = $self->{server}->recv();
|
|
if ($authdata->{authpassed}) {
|
|
$self->{authenticated} = 1;
|
|
}
|
|
}
|
|
|
|
sub create {
|
|
my $self = shift;
|
|
my $path = shift;
|
|
return $self->send_request(operation=>'create', path=>$path, @_);
|
|
}
|
|
|
|
sub update {
|
|
my $self = shift;
|
|
my $path = shift;
|
|
return $self->send_request(operation=>'update', path=>$path, @_);
|
|
}
|
|
|
|
sub read {
|
|
my $self = shift;
|
|
my $path = shift;
|
|
my %args = @_;
|
|
return $self->send_request(operation=>'retrieve', path=>$path);
|
|
}
|
|
|
|
sub delete {
|
|
my $self = shift;
|
|
my $path = shift;
|
|
my %args = @_;
|
|
return $self->send_request(operation=>'delete', path=>$path);
|
|
}
|
|
|
|
sub send_request {
|
|
my $self = shift;
|
|
if (not $self->{authenticated}) {
|
|
die "not yet authenticated";
|
|
}
|
|
if ($self->{pending}) {
|
|
die "Cannot submit multiple requests to same object concurrently";
|
|
}
|
|
$self->{pending} = 1;
|
|
my %args = @_;
|
|
my %payload = (
|
|
operation => $args{operation},
|
|
path => $args{path},
|
|
);
|
|
if ($args{parameters}) {
|
|
$payload{parameters} = $args{parameters};
|
|
}
|
|
$self->{server}->send(\%payload);
|
|
}
|
|
|
|
sub next_result {
|
|
my $self = shift;
|
|
unless ($self->{pending}) {
|
|
return undef;
|
|
}
|
|
my $result = $self->{server}->recv();
|
|
if (exists $result->{_requestdone}) {
|
|
$self->{pending} = 0;
|
|
}
|
|
return $result;
|
|
}
|
|
|
|
1;
|