2013-09-14 11:08:48 -04:00
|
|
|
# Copyright 2013 IBM Corporation
|
|
|
|
# ALl rights reserved
|
|
|
|
|
|
|
|
# This is the socket api layer.
|
|
|
|
# It implement unix and tls sockets
|
2013-10-10 14:17:08 -04:00
|
|
|
#
|
2013-09-14 11:08:48 -04:00
|
|
|
# TODO: SO_PEERCRED for unix socket
|
2014-02-09 10:43:26 -05:00
|
|
|
import confluent.auth as auth
|
|
|
|
import confluent.common.tlvdata as tlvdata
|
2013-11-02 13:25:56 -04:00
|
|
|
import confluent.consoleserver as consoleserver
|
2013-11-02 16:58:38 -04:00
|
|
|
import confluent.config.configmanager as configmanager
|
2014-02-09 17:35:49 -05:00
|
|
|
import confluent.exceptions as exc
|
|
|
|
import confluent.messages
|
|
|
|
import confluent.pluginapi as pluginapi
|
2013-09-14 11:08:48 -04:00
|
|
|
import eventlet.green.socket as socket
|
|
|
|
import eventlet.green.ssl as ssl
|
2013-09-14 20:21:58 -04:00
|
|
|
import eventlet
|
2014-02-09 17:35:49 -05:00
|
|
|
import json
|
2013-10-14 09:21:55 -04:00
|
|
|
import os
|
2014-02-10 09:41:08 -05:00
|
|
|
import pwd
|
|
|
|
import stat
|
2013-10-14 09:21:55 -04:00
|
|
|
import struct
|
|
|
|
|
|
|
|
SO_PEERCRED = 17
|
2013-09-14 11:08:48 -04:00
|
|
|
|
2014-02-09 10:43:26 -05:00
|
|
|
class ClientConsole(object):
|
|
|
|
def __init__(self, client):
|
2014-02-10 09:16:29 -05:00
|
|
|
self.client = client
|
|
|
|
self.xmit = False
|
|
|
|
self.pendingdata = ""
|
2014-02-09 10:43:26 -05:00
|
|
|
|
|
|
|
def sendall(self, data):
|
2014-02-10 09:16:29 -05:00
|
|
|
if not self.xmit:
|
|
|
|
self.pendingdata += data
|
|
|
|
return
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(self.client, data)
|
2014-02-09 10:43:26 -05:00
|
|
|
|
2014-02-10 09:16:29 -05:00
|
|
|
def startsending(self):
|
|
|
|
self.xmit = True
|
|
|
|
if self.pendingdata != "":
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(self.client, self.pendingdata)
|
2014-02-10 09:16:29 -05:00
|
|
|
self.pendingdata = None
|
|
|
|
|
|
|
|
|
2013-10-14 11:28:07 -04:00
|
|
|
def sessionhdl(connection, authname):
|
2014-02-09 10:43:26 -05:00
|
|
|
# For now, trying to test the console stuff, so let's just do n4.
|
|
|
|
authenticated = False
|
2014-02-10 09:19:22 -05:00
|
|
|
authdata = None
|
2013-10-14 11:28:07 -04:00
|
|
|
if authname and isinstance(authname, bool):
|
2014-02-09 10:43:26 -05:00
|
|
|
authenticated = True
|
|
|
|
cfm = configmanager.ConfigManager(tenant=None)
|
|
|
|
elif authname:
|
|
|
|
authdata = auth.authorize(authname, element=None)
|
2014-02-10 09:41:08 -05:00
|
|
|
if authdata is not None:
|
|
|
|
cfm = authdata[1]
|
|
|
|
authenticated = True
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection,"Confluent -- v0 --")
|
2014-02-09 10:43:26 -05:00
|
|
|
while not authenticated: # prompt for name and passphrase
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {'authpassed': 0})
|
|
|
|
response = tlvdata.recv(connection)
|
2014-02-09 10:43:26 -05:00
|
|
|
username = response['username']
|
|
|
|
passphrase = response['passphrase']
|
2014-02-09 17:35:49 -05:00
|
|
|
# note(jbjohnso): here, we need to authenticate, but not
|
2014-02-09 10:43:26 -05:00
|
|
|
# authorize a user. When authorization starts understanding
|
|
|
|
# element path, that authorization will need to be called
|
|
|
|
# per request the user makes
|
|
|
|
authdata = auth.check_user_passphrase(username, passphrase)
|
2014-02-09 14:00:06 -05:00
|
|
|
if authdata is not None:
|
2014-02-09 10:43:26 -05:00
|
|
|
authenticated = True
|
|
|
|
cfm = authdata[1]
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {'authpassed': 1})
|
|
|
|
request = tlvdata.recv(connection)
|
2014-02-09 17:35:49 -05:00
|
|
|
while request is not None:
|
2014-03-03 15:52:12 -05:00
|
|
|
try:
|
|
|
|
process_request(connection, request, cfm, authdata)
|
|
|
|
except:
|
|
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {'errorcode': 500,
|
2014-03-03 15:52:12 -05:00
|
|
|
'error': 'Unexpected error'})
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {'_requestdone': 1})
|
|
|
|
request = tlvdata.recv(connection)
|
2014-02-09 17:35:49 -05:00
|
|
|
|
|
|
|
|
|
|
|
def send_response(responses, connection):
|
2014-02-09 19:26:48 -05:00
|
|
|
if responses is None:
|
|
|
|
return
|
2014-02-09 17:35:49 -05:00
|
|
|
for rsp in responses:
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, rsp.raw())
|
|
|
|
tlvdata.send(connection, {'_requestdone': 1})
|
2014-02-09 19:26:48 -05:00
|
|
|
|
2014-02-09 17:35:49 -05:00
|
|
|
|
|
|
|
def process_request(connection, request, cfm, authdata):
|
|
|
|
#TODO(jbjohnso): authorize each request
|
|
|
|
if type(request) == dict:
|
|
|
|
operation = request['operation']
|
|
|
|
path = request['path']
|
|
|
|
params = request.get('parameters', None)
|
2014-02-09 19:26:48 -05:00
|
|
|
hdlr = None
|
2014-02-09 17:35:49 -05:00
|
|
|
try:
|
2014-02-10 09:16:29 -05:00
|
|
|
if operation == 'start':
|
|
|
|
elems = path.split('/')
|
|
|
|
if elems[3] != "console":
|
|
|
|
raise exc.InvalidArgumentException()
|
|
|
|
node = elems[2]
|
|
|
|
ccons = ClientConsole(connection)
|
|
|
|
consession = consoleserver.ConsoleSession(
|
|
|
|
node=node, configmanager=cfm, datacallback=ccons.sendall)
|
|
|
|
if consession is None:
|
|
|
|
raise Exception("TODO")
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {'started': 1})
|
2014-02-10 09:16:29 -05:00
|
|
|
ccons.startsending()
|
|
|
|
while consession is not None:
|
2014-03-04 17:12:19 -05:00
|
|
|
data = tlvdata.recv(connection)
|
2014-02-10 18:28:45 -05:00
|
|
|
if type(data) == dict:
|
|
|
|
if data['operation'] == 'stop':
|
|
|
|
consession.destroy()
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
raise Exception("TODO")
|
2014-02-10 09:16:29 -05:00
|
|
|
if not data:
|
|
|
|
consession.destroy()
|
|
|
|
return
|
|
|
|
consession.write(data)
|
|
|
|
else:
|
|
|
|
hdlr = pluginapi.handle_path(path, operation, cfm, params)
|
2014-02-09 17:35:49 -05:00
|
|
|
except exc.NotFoundException:
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {"errorcode": 404,
|
2014-02-09 17:35:49 -05:00
|
|
|
"error": "Target not found"})
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {"_requestdone": 1})
|
2014-02-09 17:35:49 -05:00
|
|
|
except exc.InvalidArgumentException:
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {"errorcode": 400,
|
2014-02-22 20:38:33 -05:00
|
|
|
"error": "Bad Request"})
|
2014-03-04 17:12:19 -05:00
|
|
|
tlvdata.send(connection, {"_requestdone": 1})
|
2014-02-09 17:35:49 -05:00
|
|
|
send_response(hdlr, connection)
|
|
|
|
return
|
2013-09-14 11:08:48 -04:00
|
|
|
|
|
|
|
|
2013-10-14 09:21:55 -04:00
|
|
|
def _tlshandler():
|
2013-09-14 11:08:48 -04:00
|
|
|
plainsocket = socket.socket()
|
2013-10-09 20:14:34 -04:00
|
|
|
plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
2014-02-22 14:12:39 -05:00
|
|
|
plainsocket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
2013-09-14 11:08:48 -04:00
|
|
|
srv = ssl.wrap_socket(plainsocket, keyfile="/etc/confluent/privkey.pem",
|
2013-09-14 20:21:58 -04:00
|
|
|
certfile="/etc/confluent/srvcert.pem",
|
|
|
|
ssl_version=ssl.PROTOCOL_TLSv1,
|
|
|
|
server_side=True)
|
2013-09-14 11:08:48 -04:00
|
|
|
srv.bind(('0.0.0.0', 4001))
|
|
|
|
srv.listen(5)
|
2013-10-14 11:28:07 -04:00
|
|
|
authname = None
|
2013-09-14 11:08:48 -04:00
|
|
|
while (1): # TODO: exithook
|
|
|
|
cnn, addr = srv.accept()
|
2013-10-14 11:28:07 -04:00
|
|
|
eventlet.spawn_n(sessionhdl, cnn, authname)
|
2013-09-14 11:08:48 -04:00
|
|
|
|
2013-10-14 09:21:55 -04:00
|
|
|
|
|
|
|
def _unixdomainhandler():
|
|
|
|
unixsocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
try:
|
|
|
|
os.remove("/var/run/confluent/api.sock")
|
|
|
|
except OSError: # if file does not exist, no big deal
|
|
|
|
pass
|
|
|
|
unixsocket.bind("/var/run/confluent/api.sock")
|
2014-02-10 09:41:08 -05:00
|
|
|
os.chmod("/var/run/confluent/api.sock",
|
|
|
|
stat.S_IWOTH | stat.S_IROTH | stat.S_IWGRP |
|
|
|
|
stat.S_IRGRP | stat.S_IWUSR | stat.S_IRUSR)
|
2013-10-14 09:21:55 -04:00
|
|
|
unixsocket.listen(5)
|
|
|
|
while (1):
|
|
|
|
cnn, addr = unixsocket.accept()
|
|
|
|
creds = cnn.getsockopt(socket.SOL_SOCKET, SO_PEERCRED,
|
|
|
|
struct.calcsize('3i'))
|
2013-10-14 11:28:07 -04:00
|
|
|
pid, uid, gid = struct.unpack('3i',creds)
|
|
|
|
if uid in (os.getuid(), 0):
|
|
|
|
#this is where we happily accept the person
|
|
|
|
#to do whatever. This allows the server to
|
|
|
|
#start with no configuration whatsoever
|
|
|
|
#and yet still be configurable by some means
|
|
|
|
authname = True
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
authname = pwd.getpwuid(uid).pw_name
|
|
|
|
except KeyError:
|
|
|
|
cnn.close()
|
|
|
|
return
|
|
|
|
eventlet.spawn_n(sessionhdl, cnn, authname)
|
2013-10-14 09:21:55 -04:00
|
|
|
|
|
|
|
|
|
|
|
|
2013-09-14 11:08:48 -04:00
|
|
|
class SockApi(object):
|
|
|
|
def start(self):
|
2013-10-14 09:21:55 -04:00
|
|
|
self.tlsserver = eventlet.spawn(_tlshandler)
|
|
|
|
self.unixdomainserver = eventlet.spawn(_unixdomainhandler)
|