diff --git a/confluent_perl/Confluent/Client.pm b/confluent_perl/Confluent/Client.pm index 65f14cbe..7dd26f91 100644 --- a/confluent_perl/Confluent/Client.pm +++ b/confluent_perl/Confluent/Client.pm @@ -143,6 +143,18 @@ sub authenticate { } } +sub create { + my $self = shift; + my $path = shift; + return $self->send_request(operation=>'create', path=>$path, @_); +} + +sub update { + my $self = shift; + my $path = shift; + return $self->send_request(operation=>'update', path=>$path, @_); +} + sub read { my $self = shift; my $path = shift; @@ -150,6 +162,13 @@ sub read { return $self->send_request(operation=>'retrieve', path=>$path); } +sub delete { + my $self = shift; + my $path = shift; + my %args = @_; + return $self->send_request(operation=>'delete', path=>$path); +} + sub send_request { my $self = shift; if (not $self->{authenticated}) { diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index da12f4ed..2d91ce94 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -36,6 +36,7 @@ import confluent.config.attributes as attrscheme import confluent.interface.console as console import confluent.exceptions as exc import confluent.messages as msg +import confluent.shellmodule as shellmodule import os import sys @@ -61,11 +62,15 @@ def load_plugins(): sys.path.append(plugindir) #two passes, to avoid adding both py and pyc files for plugin in os.listdir(plugindir): - plugin = os.path.splitext(plugin)[0] - plugins.add(plugin) - for plugin in plugins: if plugin.startswith('.'): continue + (plugin, plugtype) = os.path.splitext(plugin) + if plugtype == '.sh': + pluginmap[plugin] = shellmodule.Plugin( + os.path.join(plugindir, plugin + '.sh')) + else: + plugins.add(plugin) + for plugin in plugins: tmpmod = __import__(plugin) if 'plugin_names' in tmpmod.__dict__: for name in tmpmod.plugin_names: @@ -297,7 +302,8 @@ def handle_path(path, operation, configmanager, inputdata=None): inputdata = msg.get_input_message( pathcomponents[2:], operation, inputdata) if 'handler' in plugroute: # fixed handler definition - return pluginmap[plugroute['handler']].__dict__[operation]( + hfunc = getattr(pluginmap[plugroute['handler']], operation) + return hfunc( nodes=None, element=pathcomponents, configmanager=configmanager, inputdata=inputdata) @@ -331,7 +337,8 @@ def handle_path(path, operation, configmanager, inputdata=None): inputdata = msg.get_input_message( pathcomponents, operation, inputdata, (node,)) if 'handler' in plugroute: # fixed handler definition - passvalue = pluginmap[plugroute['handler']].__dict__[operation]( + hfunc = getattr(pluginmap[plugroute['handler']], operation) + passvalue = hfunc( nodes=(node,), element=pathcomponents, configmanager=configmanager, inputdata=inputdata) @@ -347,7 +354,8 @@ def handle_path(path, operation, configmanager, inputdata=None): if attrname in nodeattr[node]: plugpath = nodeattr[node][attrname]['value'] if plugpath is not None: - passvalue = pluginmap[plugpath].__dict__[operation]( + hfunc = getattr(pluginmap[plugpath], operation) + passvalue = hfunc( nodes=(node,), element=pathcomponents, configmanager=configmanager, inputdata=inputdata) diff --git a/confluent_server/confluent/shellmodule.py b/confluent_server/confluent/shellmodule.py new file mode 100644 index 00000000..a941111a --- /dev/null +++ b/confluent_server/confluent/shellmodule.py @@ -0,0 +1,87 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2014 IBM Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This implements a plugin factory to produce plugin objects from shell scripts +# For now, this only is intended to support scripts to implement console +# Special comment fields shall be used to request anything like config +# data and config data will be passed in as environment variables. +# at least in linux, this is a safe enough practice since environ is readable +# only by the process owner and such an owner would be able to read a file +# anyway. Regardless, it is advisable to 'unset' + +import confluent.interface.console as conapi +import eventlet +import eventlet.green.select as select +import eventlet.green.subprocess as subprocess +import fcntl +import os +import pty +import random +import subprocess + + +class ExecConsole(conapi.Console): + def __init__(self, executable, node): + self.executable = executable + self.subenv = { + 'TERM': 'xterm', + 'CONFLUENT_NODE': node, + } + + def relaydata(self): + while 1: + select.select( + (self._master, self.subproc.stderr), (), (), + 3600 + (random.random() * 120)) + try: + while 1: + self._datacallback(os.read(self._master, 128)) + eventlet.sleep(0) + except OSError as e: + if e.errno == 11: + pass + + def connect(self, callback): + self._datacallback = callback + master, slave = pty.openpty() + self._master = master + self.subproc = subprocess.Popen( + [self.executable], env=self.subenv, + stdin=slave, stdout=slave, + stderr=subprocess.PIPE) + fcntl.fcntl(master, fcntl.F_SETFL, os.O_NONBLOCK) + fcntl.fcntl(self.subproc.stderr.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) + self.readerthread = eventlet.spawn(self.relaydata) + + def write(self, data): + self.subproc.stdin.write(data) + + def close(self): + self.subproc.terminate() + eventlet.sleep(10) + self.subproc.kill() + + +class Plugin(object): + def __init__(self, filename): + self.filename = filename + + def create(self, nodes, element, configmanager, inputdata): + if element != ['_console', 'session']: + raise NotImplementedError("Shell plugins only do console") + if len(nodes) != 1: + raise NotImplementedError("_console/session is only single node") + return ExecConsole(self.filename, nodes[0])