From a5504ec62f841812cf1e78eb18c090822c21872d Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 4 Mar 2014 17:12:19 -0500 Subject: [PATCH] Refactor/rename code confetty had a lot of stuff in it. Factor out bits to a common library for constructing purpose oriented commands. --- TODO | 4 ++ bin/confetty | 106 +++++++----------------------------- confluent/auth.py | 1 + confluent/common/client.py | 101 ++++++++++++++++++++++++++++++++++ confluent/common/tlvdata.py | 4 +- confluent/sockapi.py | 36 ++++++------ 6 files changed, 147 insertions(+), 105 deletions(-) create mode 100644 confluent/common/client.py diff --git a/TODO b/TODO index 44c98095..f40ca91f 100644 --- a/TODO +++ b/TODO @@ -6,6 +6,10 @@ in, the respective session layers will provide a caching session that should accelerate things after the client gets in once -penalizing a client clearly trying to break in +-other auth + -pam if user exists but has no passphrase + -keystone? + -ad? (specialized to the AD case) -expressionkeys never gets smaller - perf impact -need event notification for config change- e.g. set attribute triggers consol session object check to see if credentials changed diff --git a/bin/confetty b/bin/confetty index ce76df1d..2fe5075e 100755 --- a/bin/confetty +++ b/bin/confetty @@ -45,8 +45,8 @@ path = os.path.realpath(os.path.join(path, '..')) sys.path.append(path) import confluent.common.tlvdata as tlvdata +import confluent.common.client as client -SO_PASSCRED = 16 conserversequence = '\x05c' # ctrl-e, c oldtcattr = termios.tcgetattr(sys.stdin.fileno()) @@ -75,6 +75,7 @@ valid_commands = [ ] candidates = None +session = None def completer(text, state): try: @@ -110,7 +111,7 @@ def rcompleter(text, state): if candidates is None: candidates = [] targpath = fullpath_target(lastarg) - for res in send_request('retrieve', targpath, server): + for res in session.read(targpath): if 'item' in res: # a link relation if type(res['item']) == dict: candidates.append(res['item']["href"]) @@ -129,19 +130,6 @@ def rcompleter(text, state): candidates = None return None -def parseservervalue(serverstring): - if serverstring.find(']:') != -1: - server, port = serverstring[1:].split(']:') - elif serverstring[0] == '[': - server = serverstring[1:-1] - port = 4001 - elif -1 != opts.server.find(':'): - server, port = opts.server.split(":") - else: - server = serverstring - port = 4001 - return (server, port) - def parse_command(command): args = shlex.split(command, posix=True) @@ -150,16 +138,6 @@ def parse_command(command): currchildren = None -def send_request(operation, path, server, parameters=None): - payload = {'operation': operation, 'path': path} - if parameters is not None: - payload['parameters'] = parameters - tlvdata.send_tlvdata(server, payload) - result = tlvdata.recv_tlvdata(server) - while '_requestdone' not in result: - yield result - result = tlvdata.recv_tlvdata(server) - def do_command(command, server): global exitcode global target @@ -171,7 +149,6 @@ def do_command(command, server): if len(argv) == 0: return if argv[0] == 'exit': - server.close() sys.exit(0) elif argv[0] == 'cd': otarget = target @@ -179,7 +156,7 @@ def do_command(command, server): target = fullpath_target(argv[1], forcepath=True) else: # cd by itself, go 'home' target = '/' - for res in send_request('retrieve', target, server): + for res in session.read(target, server): if 'errorcode' in res: exitcode = res['errorcode'] if 'error' in res: @@ -190,7 +167,7 @@ def do_command(command, server): targpath = fullpath_target(argv[1]) else: targpath = target - for res in send_request('retrieve', targpath, server): + for res in session.read(targpath): if 'item' in res: # a link relation if type(res['item']) == dict: print res['item']["href"] @@ -228,8 +205,8 @@ def do_command(command, server): elif argv[0] == 'start': targpath = fullpath_target(argv[1]) currconsole = targpath - tlvdata.send_tlvdata(server, {'operation': 'start', 'path': targpath}) - status = tlvdata.recv_tlvdata(server) + tlvdata.send(server, {'operation': 'start', 'path': targpath}) + status = tlvdata.recv(server) if 'error' in status: if 'errorcode' in status: exitcode = status['errorcode'] @@ -251,7 +228,7 @@ def clearvalues(resource, attribs): keydata = {} for attrib in attribs: keydata[attrib] = None - for res in send_request('update', targpath, server, keydata): + for res in session.update(targpath, keydata): if 'error' in res: if 'errorcode' in res: exitcode = res['errorcode'] @@ -276,7 +253,7 @@ def setvalues(attribs): value = attrib[attrib.index("=") + 1:] keydata[key] = value targpath = fullpath_target(resource) - for res in send_request('update', targpath, server, keydata): + for res in session.update(targpath, keydata): if 'error' in res: if 'errorcode' in res: exitcode = res['errorcode'] @@ -326,10 +303,9 @@ def exit(code=0): termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, oldtcattr) if consoleonly: server.shutdown(socket.SHUT_RDWR) - server.close() sys.exit(code) else: - tlvdata.send_tlvdata(server, {'operation': 'stop', 'path': currconsole}) + tlvdata.send(server, {'operation': 'stop', 'path': currconsole}) inconsole = False def conserver_command(fh, command): @@ -363,63 +339,23 @@ def check_escape_seq(input, fh): return input -def connect_unix_server(sockpath): - server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET, SO_PASSCRED, 1) - server.connect(sockpath) - return server - - -def connect_tls_server(serverstring): - host, port = parseservervalue(serverstring) - for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, - socket.SOCK_STREAM): - af, socktype, proto, cononname, sa = res - try: - server = socket.socket(af, socktype, proto) - server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - except: - server = None - continue - try: - server.settimeout(5) - server.connect(sa) - except: - server.close() - server = None - continue - break - if server is None: - sys.stderr.write("Failed to connect to %s\n" % serverstring) - sys.exit(1) - secserver = ssl.wrap_socket(server) - return secserver - parser = optparse.OptionParser() parser.add_option("-s", "--server", dest="server", - help="TLS server to connect to", metavar="SERVER:PORT") -parser.add_option("-u", "--unixsocket", dest="unixsock", - help="TLS server to connect to", metavar="UNIXDOMAINSOCKET") + help="Confluent instance to connect to", metavar="SERVER:PORT") opts, args = parser.parse_args() if opts.server: # going over a TLS network - server = connect_tls_server(opts.server) -elif opts.unixsock: - server = connect_unix_server(opts.unixsock) -elif os.path.exists("/var/run/confluent/api.sock"): - server = connect_unix_server("/var/run/confluent/api.sock") + session = client.Command(opts.server) +else: + session = client.Command() #Next stop, reading and writing from whichever of stdin and server goes first. #see pyghmi code for solconnect.py -banner = tlvdata.recv_tlvdata(server) -authinfo = tlvdata.recv_tlvdata(server) -while authinfo['authpassed'] != 1: +while not session.authenticated: username = raw_input("Name: ") readline.clear_history() passphrase = getpass.getpass("Passphrase: ") - tlvdata.send_tlvdata(server, - {'username': username, 'passphrase': passphrase}) - authinfo = tlvdata.recv_tlvdata(server) + session.authenticate(username, passphrase) # clear on start can help with readable of TUI, but it # can be annoying, so for now don't do it. # sys.stdout.write('\x1b[H\x1b[J') @@ -436,16 +372,16 @@ if len(args) == 1: # a node name, go straight to trying to console do_command("start /node/%s/console/session" % args[0], server) while not doexit: if inconsole: - rdylist, _, _ = select.select((sys.stdin, server), (), (), 60) + rdylist, _, _ = select.select((sys.stdin, session.connection), (), (), 60) for fh in rdylist: - if fh == server: + if fh == session.connection: # this only should get called in the # case of a console session # each command should slurp up all relevant - # recv_tlvdata potential + # recv potential #fh.read() try: - data = tlvdata.recv_tlvdata(fh) + data = tlvdata.recv(fh) except Exception: data = None if type(data) == dict: @@ -462,7 +398,7 @@ while not doexit: input = fh.read() input = check_escape_seq(input, fh) if input: - tlvdata.send_tlvdata(server, input) + tlvdata.send(session.connection, input) else: command = prompt() do_command(command, server) diff --git a/confluent/auth.py b/confluent/auth.py index aaf27b31..012aaf9a 100644 --- a/confluent/auth.py +++ b/confluent/auth.py @@ -145,6 +145,7 @@ def check_user_passphrase(name, passphrase, element=None, tenant=False): # TODO(jbjohnso): WORKERPOOL # PBKDF2 is, by design, cpu intensive # throw it at the worker pool when implemented + # maybe a distinct worker pool, wondering about starving out non-auth stuff salt, crypt = ucfg['cryptpass'] crypted = kdf.PBKDF2(passphrase, salt, 32, 10000, lambda p, s: hash.HMAC.new(p, s, hash.SHA256).digest() diff --git a/confluent/common/client.py b/confluent/common/client.py new file mode 100644 index 00000000..2999f082 --- /dev/null +++ b/confluent/common/client.py @@ -0,0 +1,101 @@ +import os +import socket +import ssl +import confluent.common.tlvdata as tlvdata + +SO_PASSCRED = 16 +def _parseserver(string): + if ']:' in string: + server, port = string[1:].split(']:') + elif string[0] == '[': + server = serverstring[1:-1] + port = 4001 + elif ':' in string: + server, port = string.plit(':') + else: + server = string + port = 4001 + return (server, port) + +class Command(object): + + def __init__(self, server="/var/run/confluent/api.sock"): + self.serverloc = server + if os.path.isabs(server) and os.path.exists(server): + self._connect_unix() + else: + self._connect_tls() + banner = tlvdata.recv(self.connection) + authdata = tlvdata.recv(self.connection) + if authdata['authpassed'] == 1: + self.authenticated = True + else: + self.authenticated = False + + def authenticate(self, username, passphrase): + tlvdata.send(self.connection, + {'username': username, 'passphrase': passphrase}) + authdata = tlvdata.recv(self.connection) + if authdata['authpassed'] == 1: + self.authenticated = True + + def read(self, path, parameters=None): + return send_request('retrieve', path, self.connection, parameters) + + def update(self, path, parameters=None): + return send_request('update', path, self.connection, parameters) + + def create(self, path, parameters=None): + return send_request('create', path, self.connection, parameters) + + def delete(self, path, parameters=None): + return send_request('delete', path, self.connection, parameters) + + def _connect_unix(self): + self.connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.connection.setsockopt(socket.SOL_SOCKET, SO_PASSCRED, 1) + self.connection.connect(self.serverloc) + + def _connect_tls(self): + server, port = _parseserver(self.serverloc) + for res in socket.getaddrinfo(server, port, socket.AF_UNSPEC, socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + try: + self.connection = socket.socket(af, socktype, proto) + self.connection.setsockopt( + socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + except: + self.connection = None + continue + try: + self.connection.settimeout(5) + self.connection.connection(sa) + except: + self.connection.close() + self.connection = None + continue + break + if self.connection is None: + raise Exception("Failed to connect to %s" % self.serverloc) + #TODO(jbjohnso): server certificate validation + self.connection = ssl.wrap_socket(self.connection) + +def send_request(operation, path, server, parameters=None): + '''This function iterates over all the responses + received from the server. + + :param operation: The operation to request, retrieve, update, delete, + create, start, stop + :param path: The URI path to the resource to operate on + :param server: The socket to send data over + :param parameters: Parameters if any to send along with the request + ''' + payload = {'operation': operation, 'path': path} + if parameters is not None: + payload['parameters'] = parameters + tlvdata.send(server, payload) + result = tlvdata.recv(server) + while '_requestdone' not in result: + yield result + result = tlvdata.recv(server) + diff --git a/confluent/common/tlvdata.py b/confluent/common/tlvdata.py index f61a9ef8..5d97c4da 100644 --- a/confluent/common/tlvdata.py +++ b/confluent/common/tlvdata.py @@ -3,7 +3,7 @@ import confluent.common.tlv as tlv import json import struct -def send_tlvdata(handle, data): +def send(handle, data): if isinstance(data, str): # plain text, e.g. console data tl = len(data) @@ -25,7 +25,7 @@ def send_tlvdata(handle, data): handle.sendall(struct.pack("!I", tl)) handle.sendall(sdata) -def recv_tlvdata(handle): +def recv(handle): tl = handle.recv(4) if len(tl) == 0: return None diff --git a/confluent/sockapi.py b/confluent/sockapi.py index c4f47d0a..1da87cd4 100644 --- a/confluent/sockapi.py +++ b/confluent/sockapi.py @@ -33,12 +33,12 @@ class ClientConsole(object): if not self.xmit: self.pendingdata += data return - tlvdata.send_tlvdata(self.client, data) + tlvdata.send(self.client, data) def startsending(self): self.xmit = True if self.pendingdata != "": - tlvdata.send_tlvdata(self.client, self.pendingdata) + tlvdata.send(self.client, self.pendingdata) self.pendingdata = None @@ -54,10 +54,10 @@ def sessionhdl(connection, authname): if authdata is not None: cfm = authdata[1] authenticated = True - tlvdata.send_tlvdata(connection,"Confluent -- v0 --") + tlvdata.send(connection,"Confluent -- v0 --") while not authenticated: # prompt for name and passphrase - tlvdata.send_tlvdata(connection, {'authpassed': 0}) - response = tlvdata.recv_tlvdata(connection) + tlvdata.send(connection, {'authpassed': 0}) + response = tlvdata.recv(connection) username = response['username'] passphrase = response['passphrase'] # note(jbjohnso): here, we need to authenticate, but not @@ -68,26 +68,26 @@ def sessionhdl(connection, authname): if authdata is not None: authenticated = True cfm = authdata[1] - tlvdata.send_tlvdata(connection, {'authpassed': 1}) - request = tlvdata.recv_tlvdata(connection) + tlvdata.send(connection, {'authpassed': 1}) + request = tlvdata.recv(connection) while request is not None: try: process_request(connection, request, cfm, authdata) except: import traceback traceback.print_exc() - tlvdata.send_tlvdata(connection, {'errorcode': 500, + tlvdata.send(connection, {'errorcode': 500, 'error': 'Unexpected error'}) - tlvdata.send_tlvdata(connection, {'_requestdone': 1}) - request = tlvdata.recv_tlvdata(connection) + tlvdata.send(connection, {'_requestdone': 1}) + request = tlvdata.recv(connection) def send_response(responses, connection): if responses is None: return for rsp in responses: - tlvdata.send_tlvdata(connection, rsp.raw()) - tlvdata.send_tlvdata(connection, {'_requestdone': 1}) + tlvdata.send(connection, rsp.raw()) + tlvdata.send(connection, {'_requestdone': 1}) def process_request(connection, request, cfm, authdata): @@ -108,10 +108,10 @@ def process_request(connection, request, cfm, authdata): node=node, configmanager=cfm, datacallback=ccons.sendall) if consession is None: raise Exception("TODO") - tlvdata.send_tlvdata(connection, {'started': 1}) + tlvdata.send(connection, {'started': 1}) ccons.startsending() while consession is not None: - data = tlvdata.recv_tlvdata(connection) + data = tlvdata.recv(connection) if type(data) == dict: if data['operation'] == 'stop': consession.destroy() @@ -125,13 +125,13 @@ def process_request(connection, request, cfm, authdata): else: hdlr = pluginapi.handle_path(path, operation, cfm, params) except exc.NotFoundException: - tlvdata.send_tlvdata(connection, {"errorcode": 404, + tlvdata.send(connection, {"errorcode": 404, "error": "Target not found"}) - tlvdata.send_tlvdata(connection, {"_requestdone": 1}) + tlvdata.send(connection, {"_requestdone": 1}) except exc.InvalidArgumentException: - tlvdata.send_tlvdata(connection, {"errorcode": 400, + tlvdata.send(connection, {"errorcode": 400, "error": "Bad Request"}) - tlvdata.send_tlvdata(connection, {"_requestdone": 1}) + tlvdata.send(connection, {"_requestdone": 1}) send_response(hdlr, connection) return