mirror of
https://github.com/xcat2/confluent.git
synced 2025-02-26 07:11:35 +00:00
Refactor/rename code
confetty had a lot of stuff in it. Factor out bits to a common library for constructing purpose oriented commands.
This commit is contained in:
parent
04faa9fdfd
commit
a5504ec62f
4
TODO
4
TODO
@ -6,6 +6,10 @@
|
||||
in, the respective session layers will provide a caching session that
|
||||
should accelerate things after the client gets in once
|
||||
-penalizing a client clearly trying to break in
|
||||
-other auth
|
||||
-pam if user exists but has no passphrase
|
||||
-keystone?
|
||||
-ad? (specialized to the AD case)
|
||||
-expressionkeys never gets smaller - perf impact
|
||||
-need event notification for config change- e.g. set attribute triggers consol
|
||||
session object check to see if credentials changed
|
||||
|
106
bin/confetty
106
bin/confetty
@ -45,8 +45,8 @@ path = os.path.realpath(os.path.join(path, '..'))
|
||||
sys.path.append(path)
|
||||
|
||||
import confluent.common.tlvdata as tlvdata
|
||||
import confluent.common.client as client
|
||||
|
||||
SO_PASSCRED = 16
|
||||
conserversequence = '\x05c' # ctrl-e, c
|
||||
|
||||
oldtcattr = termios.tcgetattr(sys.stdin.fileno())
|
||||
@ -75,6 +75,7 @@ valid_commands = [
|
||||
]
|
||||
|
||||
candidates = None
|
||||
session = None
|
||||
|
||||
def completer(text, state):
|
||||
try:
|
||||
@ -110,7 +111,7 @@ def rcompleter(text, state):
|
||||
if candidates is None:
|
||||
candidates = []
|
||||
targpath = fullpath_target(lastarg)
|
||||
for res in send_request('retrieve', targpath, server):
|
||||
for res in session.read(targpath):
|
||||
if 'item' in res: # a link relation
|
||||
if type(res['item']) == dict:
|
||||
candidates.append(res['item']["href"])
|
||||
@ -129,19 +130,6 @@ def rcompleter(text, state):
|
||||
candidates = None
|
||||
return None
|
||||
|
||||
def parseservervalue(serverstring):
|
||||
if serverstring.find(']:') != -1:
|
||||
server, port = serverstring[1:].split(']:')
|
||||
elif serverstring[0] == '[':
|
||||
server = serverstring[1:-1]
|
||||
port = 4001
|
||||
elif -1 != opts.server.find(':'):
|
||||
server, port = opts.server.split(":")
|
||||
else:
|
||||
server = serverstring
|
||||
port = 4001
|
||||
return (server, port)
|
||||
|
||||
|
||||
def parse_command(command):
|
||||
args = shlex.split(command, posix=True)
|
||||
@ -150,16 +138,6 @@ def parse_command(command):
|
||||
|
||||
currchildren = None
|
||||
|
||||
def send_request(operation, path, server, parameters=None):
|
||||
payload = {'operation': operation, 'path': path}
|
||||
if parameters is not None:
|
||||
payload['parameters'] = parameters
|
||||
tlvdata.send_tlvdata(server, payload)
|
||||
result = tlvdata.recv_tlvdata(server)
|
||||
while '_requestdone' not in result:
|
||||
yield result
|
||||
result = tlvdata.recv_tlvdata(server)
|
||||
|
||||
def do_command(command, server):
|
||||
global exitcode
|
||||
global target
|
||||
@ -171,7 +149,6 @@ def do_command(command, server):
|
||||
if len(argv) == 0:
|
||||
return
|
||||
if argv[0] == 'exit':
|
||||
server.close()
|
||||
sys.exit(0)
|
||||
elif argv[0] == 'cd':
|
||||
otarget = target
|
||||
@ -179,7 +156,7 @@ def do_command(command, server):
|
||||
target = fullpath_target(argv[1], forcepath=True)
|
||||
else: # cd by itself, go 'home'
|
||||
target = '/'
|
||||
for res in send_request('retrieve', target, server):
|
||||
for res in session.read(target, server):
|
||||
if 'errorcode' in res:
|
||||
exitcode = res['errorcode']
|
||||
if 'error' in res:
|
||||
@ -190,7 +167,7 @@ def do_command(command, server):
|
||||
targpath = fullpath_target(argv[1])
|
||||
else:
|
||||
targpath = target
|
||||
for res in send_request('retrieve', targpath, server):
|
||||
for res in session.read(targpath):
|
||||
if 'item' in res: # a link relation
|
||||
if type(res['item']) == dict:
|
||||
print res['item']["href"]
|
||||
@ -228,8 +205,8 @@ def do_command(command, server):
|
||||
elif argv[0] == 'start':
|
||||
targpath = fullpath_target(argv[1])
|
||||
currconsole = targpath
|
||||
tlvdata.send_tlvdata(server, {'operation': 'start', 'path': targpath})
|
||||
status = tlvdata.recv_tlvdata(server)
|
||||
tlvdata.send(server, {'operation': 'start', 'path': targpath})
|
||||
status = tlvdata.recv(server)
|
||||
if 'error' in status:
|
||||
if 'errorcode' in status:
|
||||
exitcode = status['errorcode']
|
||||
@ -251,7 +228,7 @@ def clearvalues(resource, attribs):
|
||||
keydata = {}
|
||||
for attrib in attribs:
|
||||
keydata[attrib] = None
|
||||
for res in send_request('update', targpath, server, keydata):
|
||||
for res in session.update(targpath, keydata):
|
||||
if 'error' in res:
|
||||
if 'errorcode' in res:
|
||||
exitcode = res['errorcode']
|
||||
@ -276,7 +253,7 @@ def setvalues(attribs):
|
||||
value = attrib[attrib.index("=") + 1:]
|
||||
keydata[key] = value
|
||||
targpath = fullpath_target(resource)
|
||||
for res in send_request('update', targpath, server, keydata):
|
||||
for res in session.update(targpath, keydata):
|
||||
if 'error' in res:
|
||||
if 'errorcode' in res:
|
||||
exitcode = res['errorcode']
|
||||
@ -326,10 +303,9 @@ def exit(code=0):
|
||||
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, oldtcattr)
|
||||
if consoleonly:
|
||||
server.shutdown(socket.SHUT_RDWR)
|
||||
server.close()
|
||||
sys.exit(code)
|
||||
else:
|
||||
tlvdata.send_tlvdata(server, {'operation': 'stop', 'path': currconsole})
|
||||
tlvdata.send(server, {'operation': 'stop', 'path': currconsole})
|
||||
inconsole = False
|
||||
|
||||
def conserver_command(fh, command):
|
||||
@ -363,63 +339,23 @@ def check_escape_seq(input, fh):
|
||||
return input
|
||||
|
||||
|
||||
def connect_unix_server(sockpath):
|
||||
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
server.setsockopt(socket.SOL_SOCKET, SO_PASSCRED, 1)
|
||||
server.connect(sockpath)
|
||||
return server
|
||||
|
||||
|
||||
def connect_tls_server(serverstring):
|
||||
host, port = parseservervalue(serverstring)
|
||||
for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC,
|
||||
socket.SOCK_STREAM):
|
||||
af, socktype, proto, cononname, sa = res
|
||||
try:
|
||||
server = socket.socket(af, socktype, proto)
|
||||
server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
except:
|
||||
server = None
|
||||
continue
|
||||
try:
|
||||
server.settimeout(5)
|
||||
server.connect(sa)
|
||||
except:
|
||||
server.close()
|
||||
server = None
|
||||
continue
|
||||
break
|
||||
if server is None:
|
||||
sys.stderr.write("Failed to connect to %s\n" % serverstring)
|
||||
sys.exit(1)
|
||||
secserver = ssl.wrap_socket(server)
|
||||
return secserver
|
||||
|
||||
parser = optparse.OptionParser()
|
||||
parser.add_option("-s", "--server", dest="server",
|
||||
help="TLS server to connect to", metavar="SERVER:PORT")
|
||||
parser.add_option("-u", "--unixsocket", dest="unixsock",
|
||||
help="TLS server to connect to", metavar="UNIXDOMAINSOCKET")
|
||||
help="Confluent instance to connect to", metavar="SERVER:PORT")
|
||||
opts, args = parser.parse_args()
|
||||
if opts.server: # going over a TLS network
|
||||
server = connect_tls_server(opts.server)
|
||||
elif opts.unixsock:
|
||||
server = connect_unix_server(opts.unixsock)
|
||||
elif os.path.exists("/var/run/confluent/api.sock"):
|
||||
server = connect_unix_server("/var/run/confluent/api.sock")
|
||||
session = client.Command(opts.server)
|
||||
else:
|
||||
session = client.Command()
|
||||
|
||||
#Next stop, reading and writing from whichever of stdin and server goes first.
|
||||
#see pyghmi code for solconnect.py
|
||||
|
||||
banner = tlvdata.recv_tlvdata(server)
|
||||
authinfo = tlvdata.recv_tlvdata(server)
|
||||
while authinfo['authpassed'] != 1:
|
||||
while not session.authenticated:
|
||||
username = raw_input("Name: ")
|
||||
readline.clear_history()
|
||||
passphrase = getpass.getpass("Passphrase: ")
|
||||
tlvdata.send_tlvdata(server,
|
||||
{'username': username, 'passphrase': passphrase})
|
||||
authinfo = tlvdata.recv_tlvdata(server)
|
||||
session.authenticate(username, passphrase)
|
||||
# clear on start can help with readable of TUI, but it
|
||||
# can be annoying, so for now don't do it.
|
||||
# sys.stdout.write('\x1b[H\x1b[J')
|
||||
@ -436,16 +372,16 @@ if len(args) == 1: # a node name, go straight to trying to console
|
||||
do_command("start /node/%s/console/session" % args[0], server)
|
||||
while not doexit:
|
||||
if inconsole:
|
||||
rdylist, _, _ = select.select((sys.stdin, server), (), (), 60)
|
||||
rdylist, _, _ = select.select((sys.stdin, session.connection), (), (), 60)
|
||||
for fh in rdylist:
|
||||
if fh == server:
|
||||
if fh == session.connection:
|
||||
# this only should get called in the
|
||||
# case of a console session
|
||||
# each command should slurp up all relevant
|
||||
# recv_tlvdata potential
|
||||
# recv potential
|
||||
#fh.read()
|
||||
try:
|
||||
data = tlvdata.recv_tlvdata(fh)
|
||||
data = tlvdata.recv(fh)
|
||||
except Exception:
|
||||
data = None
|
||||
if type(data) == dict:
|
||||
@ -462,7 +398,7 @@ while not doexit:
|
||||
input = fh.read()
|
||||
input = check_escape_seq(input, fh)
|
||||
if input:
|
||||
tlvdata.send_tlvdata(server, input)
|
||||
tlvdata.send(session.connection, input)
|
||||
else:
|
||||
command = prompt()
|
||||
do_command(command, server)
|
||||
|
@ -145,6 +145,7 @@ def check_user_passphrase(name, passphrase, element=None, tenant=False):
|
||||
# TODO(jbjohnso): WORKERPOOL
|
||||
# PBKDF2 is, by design, cpu intensive
|
||||
# throw it at the worker pool when implemented
|
||||
# maybe a distinct worker pool, wondering about starving out non-auth stuff
|
||||
salt, crypt = ucfg['cryptpass']
|
||||
crypted = kdf.PBKDF2(passphrase, salt, 32, 10000,
|
||||
lambda p, s: hash.HMAC.new(p, s, hash.SHA256).digest()
|
||||
|
101
confluent/common/client.py
Normal file
101
confluent/common/client.py
Normal file
@ -0,0 +1,101 @@
|
||||
import os
|
||||
import socket
|
||||
import ssl
|
||||
import confluent.common.tlvdata as tlvdata
|
||||
|
||||
SO_PASSCRED = 16
|
||||
def _parseserver(string):
|
||||
if ']:' in string:
|
||||
server, port = string[1:].split(']:')
|
||||
elif string[0] == '[':
|
||||
server = serverstring[1:-1]
|
||||
port = 4001
|
||||
elif ':' in string:
|
||||
server, port = string.plit(':')
|
||||
else:
|
||||
server = string
|
||||
port = 4001
|
||||
return (server, port)
|
||||
|
||||
class Command(object):
|
||||
|
||||
def __init__(self, server="/var/run/confluent/api.sock"):
|
||||
self.serverloc = server
|
||||
if os.path.isabs(server) and os.path.exists(server):
|
||||
self._connect_unix()
|
||||
else:
|
||||
self._connect_tls()
|
||||
banner = tlvdata.recv(self.connection)
|
||||
authdata = tlvdata.recv(self.connection)
|
||||
if authdata['authpassed'] == 1:
|
||||
self.authenticated = True
|
||||
else:
|
||||
self.authenticated = False
|
||||
|
||||
def authenticate(self, username, passphrase):
|
||||
tlvdata.send(self.connection,
|
||||
{'username': username, 'passphrase': passphrase})
|
||||
authdata = tlvdata.recv(self.connection)
|
||||
if authdata['authpassed'] == 1:
|
||||
self.authenticated = True
|
||||
|
||||
def read(self, path, parameters=None):
|
||||
return send_request('retrieve', path, self.connection, parameters)
|
||||
|
||||
def update(self, path, parameters=None):
|
||||
return send_request('update', path, self.connection, parameters)
|
||||
|
||||
def create(self, path, parameters=None):
|
||||
return send_request('create', path, self.connection, parameters)
|
||||
|
||||
def delete(self, path, parameters=None):
|
||||
return send_request('delete', path, self.connection, parameters)
|
||||
|
||||
def _connect_unix(self):
|
||||
self.connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.connection.setsockopt(socket.SOL_SOCKET, SO_PASSCRED, 1)
|
||||
self.connection.connect(self.serverloc)
|
||||
|
||||
def _connect_tls(self):
|
||||
server, port = _parseserver(self.serverloc)
|
||||
for res in socket.getaddrinfo(server, port, socket.AF_UNSPEC, socket.SOCK_STREAM):
|
||||
af, socktype, proto, canonname, sa = res
|
||||
try:
|
||||
self.connection = socket.socket(af, socktype, proto)
|
||||
self.connection.setsockopt(
|
||||
socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
except:
|
||||
self.connection = None
|
||||
continue
|
||||
try:
|
||||
self.connection.settimeout(5)
|
||||
self.connection.connection(sa)
|
||||
except:
|
||||
self.connection.close()
|
||||
self.connection = None
|
||||
continue
|
||||
break
|
||||
if self.connection is None:
|
||||
raise Exception("Failed to connect to %s" % self.serverloc)
|
||||
#TODO(jbjohnso): server certificate validation
|
||||
self.connection = ssl.wrap_socket(self.connection)
|
||||
|
||||
def send_request(operation, path, server, parameters=None):
|
||||
'''This function iterates over all the responses
|
||||
received from the server.
|
||||
|
||||
:param operation: The operation to request, retrieve, update, delete,
|
||||
create, start, stop
|
||||
:param path: The URI path to the resource to operate on
|
||||
:param server: The socket to send data over
|
||||
:param parameters: Parameters if any to send along with the request
|
||||
'''
|
||||
payload = {'operation': operation, 'path': path}
|
||||
if parameters is not None:
|
||||
payload['parameters'] = parameters
|
||||
tlvdata.send(server, payload)
|
||||
result = tlvdata.recv(server)
|
||||
while '_requestdone' not in result:
|
||||
yield result
|
||||
result = tlvdata.recv(server)
|
||||
|
@ -3,7 +3,7 @@ import confluent.common.tlv as tlv
|
||||
import json
|
||||
import struct
|
||||
|
||||
def send_tlvdata(handle, data):
|
||||
def send(handle, data):
|
||||
if isinstance(data, str):
|
||||
# plain text, e.g. console data
|
||||
tl = len(data)
|
||||
@ -25,7 +25,7 @@ def send_tlvdata(handle, data):
|
||||
handle.sendall(struct.pack("!I", tl))
|
||||
handle.sendall(sdata)
|
||||
|
||||
def recv_tlvdata(handle):
|
||||
def recv(handle):
|
||||
tl = handle.recv(4)
|
||||
if len(tl) == 0:
|
||||
return None
|
||||
|
@ -33,12 +33,12 @@ class ClientConsole(object):
|
||||
if not self.xmit:
|
||||
self.pendingdata += data
|
||||
return
|
||||
tlvdata.send_tlvdata(self.client, data)
|
||||
tlvdata.send(self.client, data)
|
||||
|
||||
def startsending(self):
|
||||
self.xmit = True
|
||||
if self.pendingdata != "":
|
||||
tlvdata.send_tlvdata(self.client, self.pendingdata)
|
||||
tlvdata.send(self.client, self.pendingdata)
|
||||
self.pendingdata = None
|
||||
|
||||
|
||||
@ -54,10 +54,10 @@ def sessionhdl(connection, authname):
|
||||
if authdata is not None:
|
||||
cfm = authdata[1]
|
||||
authenticated = True
|
||||
tlvdata.send_tlvdata(connection,"Confluent -- v0 --")
|
||||
tlvdata.send(connection,"Confluent -- v0 --")
|
||||
while not authenticated: # prompt for name and passphrase
|
||||
tlvdata.send_tlvdata(connection, {'authpassed': 0})
|
||||
response = tlvdata.recv_tlvdata(connection)
|
||||
tlvdata.send(connection, {'authpassed': 0})
|
||||
response = tlvdata.recv(connection)
|
||||
username = response['username']
|
||||
passphrase = response['passphrase']
|
||||
# note(jbjohnso): here, we need to authenticate, but not
|
||||
@ -68,26 +68,26 @@ def sessionhdl(connection, authname):
|
||||
if authdata is not None:
|
||||
authenticated = True
|
||||
cfm = authdata[1]
|
||||
tlvdata.send_tlvdata(connection, {'authpassed': 1})
|
||||
request = tlvdata.recv_tlvdata(connection)
|
||||
tlvdata.send(connection, {'authpassed': 1})
|
||||
request = tlvdata.recv(connection)
|
||||
while request is not None:
|
||||
try:
|
||||
process_request(connection, request, cfm, authdata)
|
||||
except:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
tlvdata.send_tlvdata(connection, {'errorcode': 500,
|
||||
tlvdata.send(connection, {'errorcode': 500,
|
||||
'error': 'Unexpected error'})
|
||||
tlvdata.send_tlvdata(connection, {'_requestdone': 1})
|
||||
request = tlvdata.recv_tlvdata(connection)
|
||||
tlvdata.send(connection, {'_requestdone': 1})
|
||||
request = tlvdata.recv(connection)
|
||||
|
||||
|
||||
def send_response(responses, connection):
|
||||
if responses is None:
|
||||
return
|
||||
for rsp in responses:
|
||||
tlvdata.send_tlvdata(connection, rsp.raw())
|
||||
tlvdata.send_tlvdata(connection, {'_requestdone': 1})
|
||||
tlvdata.send(connection, rsp.raw())
|
||||
tlvdata.send(connection, {'_requestdone': 1})
|
||||
|
||||
|
||||
def process_request(connection, request, cfm, authdata):
|
||||
@ -108,10 +108,10 @@ def process_request(connection, request, cfm, authdata):
|
||||
node=node, configmanager=cfm, datacallback=ccons.sendall)
|
||||
if consession is None:
|
||||
raise Exception("TODO")
|
||||
tlvdata.send_tlvdata(connection, {'started': 1})
|
||||
tlvdata.send(connection, {'started': 1})
|
||||
ccons.startsending()
|
||||
while consession is not None:
|
||||
data = tlvdata.recv_tlvdata(connection)
|
||||
data = tlvdata.recv(connection)
|
||||
if type(data) == dict:
|
||||
if data['operation'] == 'stop':
|
||||
consession.destroy()
|
||||
@ -125,13 +125,13 @@ def process_request(connection, request, cfm, authdata):
|
||||
else:
|
||||
hdlr = pluginapi.handle_path(path, operation, cfm, params)
|
||||
except exc.NotFoundException:
|
||||
tlvdata.send_tlvdata(connection, {"errorcode": 404,
|
||||
tlvdata.send(connection, {"errorcode": 404,
|
||||
"error": "Target not found"})
|
||||
tlvdata.send_tlvdata(connection, {"_requestdone": 1})
|
||||
tlvdata.send(connection, {"_requestdone": 1})
|
||||
except exc.InvalidArgumentException:
|
||||
tlvdata.send_tlvdata(connection, {"errorcode": 400,
|
||||
tlvdata.send(connection, {"errorcode": 400,
|
||||
"error": "Bad Request"})
|
||||
tlvdata.send_tlvdata(connection, {"_requestdone": 1})
|
||||
tlvdata.send(connection, {"_requestdone": 1})
|
||||
send_response(hdlr, connection)
|
||||
return
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user