mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-23 01:53:28 +00:00
Initial support for non-console dispatch
For non-exceptional cases, it is now functional.
This commit is contained in:
parent
f2500d9d27
commit
810be71720
@ -759,6 +759,9 @@ class ConfigManager(object):
|
||||
self._cfgstore['nodes'] = {}
|
||||
self._bg_sync_to_file()
|
||||
|
||||
def get_collective_member(self, name):
|
||||
return get_collective_member(name)
|
||||
|
||||
def filter_node_attributes(self, expression, nodes=None):
|
||||
"""Filtered nodelist according to expression
|
||||
|
||||
|
@ -35,7 +35,9 @@
|
||||
|
||||
import confluent
|
||||
import confluent.alerts as alerts
|
||||
import confluent.tlvdata as tlvdata
|
||||
import confluent.config.attributes as attrscheme
|
||||
import confluent.config.configmanager as cfm
|
||||
import confluent.collective.manager as collective
|
||||
import confluent.discovery.core as disco
|
||||
import confluent.interface.console as console
|
||||
@ -47,8 +49,21 @@ try:
|
||||
import confluent.shellmodule as shellmodule
|
||||
except ImportError:
|
||||
pass
|
||||
try:
|
||||
import OpenSSL.crypto as crypto
|
||||
except ImportError:
|
||||
# Only required for collective mode
|
||||
crypto = None
|
||||
import confluent.util as util
|
||||
import eventlet.green.ssl as ssl
|
||||
import itertools
|
||||
import os
|
||||
try:
|
||||
import cPickle as pickle
|
||||
except ImportError:
|
||||
import pickle
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
|
||||
pluginmap = {}
|
||||
@ -560,6 +575,61 @@ def abbreviate_noderange(configmanager, inputdata, operation):
|
||||
return (msg.KeyValueData({'noderange': noderange.ReverseNodeRange(inputdata['nodes'], configmanager).noderange}),)
|
||||
|
||||
|
||||
def handle_dispatch(connection, cert, dispatch):
|
||||
cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
|
||||
if not util.cert_matches(
|
||||
cfm.get_collective_member(dispatch['name'])['fingerprint'], cert):
|
||||
connection.close()
|
||||
configmanager = cfm.ConfigManager(dispatch['tenant'])
|
||||
nodes = dispatch['nodes']
|
||||
inputdata = dispatch['inputdata']
|
||||
operation = dispatch['operation']
|
||||
pathcomponents = dispatch['path']
|
||||
routespec = nested_lookup(noderesources, pathcomponents)
|
||||
plugroute = routespec.routeinfo
|
||||
plugpath = None
|
||||
nodesbyhandler = {}
|
||||
passvalues = []
|
||||
nodeattr = configmanager.get_node_attributes(
|
||||
nodes, plugroute['pluginattrs'])
|
||||
for node in nodes:
|
||||
for attrname in plugroute['pluginattrs']:
|
||||
if attrname in nodeattr[node]:
|
||||
plugpath = nodeattr[node][attrname]['value']
|
||||
elif 'default' in plugroute:
|
||||
plugpath = plugroute['default']
|
||||
if plugpath is not None:
|
||||
try:
|
||||
hfunc = getattr(pluginmap[plugpath], operation)
|
||||
except KeyError:
|
||||
nodesbyhandler[BadPlugin(node, plugpath).error] = [node]
|
||||
continue
|
||||
if hfunc in nodesbyhandler:
|
||||
nodesbyhandler[hfunc].append(node)
|
||||
else:
|
||||
nodesbyhandler[hfunc] = [node]
|
||||
try:
|
||||
for hfunc in nodesbyhandler:
|
||||
passvalues.append(hfunc(
|
||||
nodes=nodesbyhandler[hfunc], element=pathcomponents,
|
||||
configmanager=configmanager,
|
||||
inputdata=inputdata))
|
||||
for res in itertools.chain(*passvalues):
|
||||
_forward_rsp(connection, res)
|
||||
except Exception as res:
|
||||
_forward_rsp(connection, res)
|
||||
connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||
|
||||
|
||||
def _forward_rsp(connection, res):
|
||||
r = pickle.dumps(res)
|
||||
rlen = len(r)
|
||||
if not rlen:
|
||||
return
|
||||
connection.sendall(struct.pack('!Q', rlen))
|
||||
connection.sendall(r)
|
||||
|
||||
|
||||
def handle_node_request(configmanager, inputdata, operation,
|
||||
pathcomponents, autostrip=True):
|
||||
iscollection = False
|
||||
@ -663,26 +733,25 @@ def handle_node_request(configmanager, inputdata, operation,
|
||||
nodeattr = configmanager.get_node_attributes(
|
||||
nodes, plugroute['pluginattrs'] + ['collective.manager'])
|
||||
plugpath = None
|
||||
if 'default' in plugroute:
|
||||
plugpath = plugroute['default']
|
||||
mynodes = set(nodes)
|
||||
nodesbymanager = {}
|
||||
nodesbyhandler = {}
|
||||
for node in nodes:
|
||||
for attrname in plugroute['pluginattrs']:
|
||||
if attrname in nodeattr[node]:
|
||||
plugpath = nodeattr[node][attrname]['value']
|
||||
elif 'default' in plugroute:
|
||||
plugpath = plugroute['default']
|
||||
if plugpath in dispatch_plugins:
|
||||
manager = nodeattr[node].get('collective.manager', {}).get(
|
||||
'value', None)
|
||||
if manager:
|
||||
if collective.get_myname() != manager:
|
||||
mynodes.discard(manager)
|
||||
if manager not in nodesbymanager:
|
||||
nodesbymanager[manager] = set([node])
|
||||
else:
|
||||
nodesbymanager[manager].add(node)
|
||||
if plugpath is not None and node in mynodes:
|
||||
continue
|
||||
if plugpath is not None:
|
||||
try:
|
||||
hfunc = getattr(pluginmap[plugpath], operation)
|
||||
except KeyError:
|
||||
@ -698,7 +767,10 @@ def handle_node_request(configmanager, inputdata, operation,
|
||||
configmanager=configmanager,
|
||||
inputdata=inputdata))
|
||||
for manager in nodesbymanager:
|
||||
raise Exception('TODO: dispatch requests')
|
||||
passvalues.append(dispatch_request(
|
||||
nodes=nodesbymanager[manager], manager=manager,
|
||||
element=pathcomponents, configmanager=configmanager,
|
||||
inputdata=inputdata, operation=operation))
|
||||
if isnoderange or not autostrip:
|
||||
return itertools.chain(*passvalues)
|
||||
else:
|
||||
@ -716,6 +788,47 @@ def handle_node_request(configmanager, inputdata, operation,
|
||||
# return stripnode(passvalues[0], nodes[0])
|
||||
|
||||
|
||||
def dispatch_request(nodes, manager, element, configmanager, inputdata,
|
||||
operation):
|
||||
a = configmanager.get_collective_member(manager)
|
||||
remote = socket.create_connection((a['address'], 13001))
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
|
||||
keyfile='/etc/confluent/privkey.pem',
|
||||
certfile='/etc/confluent/srvcert.pem')
|
||||
if not util.cert_matches(a['fingerprint'], remote.getpeercert(
|
||||
binary_form=True)):
|
||||
raise Exception("Invalid certificate on peer")
|
||||
tlvdata.recv(remote)
|
||||
tlvdata.recv(remote)
|
||||
myname = collective.get_myname()
|
||||
tlvdata.send(remote,
|
||||
{'dispatch': {'name': myname, 'nodes': list(nodes),
|
||||
'path': element,
|
||||
'tenant': configmanager.tenant,
|
||||
'operation': operation,
|
||||
'inputdata': inputdata}})
|
||||
while True:
|
||||
rlen = remote.recv(8)
|
||||
while len(rlen) < 8:
|
||||
nlen = remote.recv(8 - len(rlen))
|
||||
if not nlen:
|
||||
raise Exception('Error receiving data')
|
||||
rlen += nlen
|
||||
rlen = struct.unpack('!Q', rlen)[0]
|
||||
if rlen == 0:
|
||||
break
|
||||
rsp = remote.recv(rlen)
|
||||
while len(rsp) < rlen:
|
||||
nrsp = remote.recv(rlen - len(rsp))
|
||||
if not nrsp:
|
||||
raise Exception('Error receving data')
|
||||
rsp += nrsp
|
||||
rsp = pickle.loads(rsp)
|
||||
if isinstance(rsp, Exception):
|
||||
raise rsp
|
||||
yield rsp
|
||||
|
||||
|
||||
def handle_discovery(pathcomponents, operation, configmanager, inputdata):
|
||||
if pathcomponents[0] == 'detected':
|
||||
pass
|
||||
|
@ -785,7 +785,7 @@ def serve(bind_host, bind_port):
|
||||
' a second\n')
|
||||
eventlet.sleep(1)
|
||||
eventlet.wsgi.server(sock, resourcehandler, log=False, log_output=False,
|
||||
debug=False, socket_timeout=60)
|
||||
debug=False, socket_timeout=60, keepalive=False)
|
||||
|
||||
|
||||
class HttpApi(object):
|
||||
|
@ -119,6 +119,9 @@ def sessionhdl(connection, authname, skipauth=False, cert=None):
|
||||
if 'collective' in response:
|
||||
return collective.handle_connection(connection, cert,
|
||||
response['collective'])
|
||||
if 'dispatch' in response:
|
||||
return pluginapi.handle_dispatch(connection, cert,
|
||||
response['dispatch'])
|
||||
authname = response['username']
|
||||
passphrase = response['password']
|
||||
# note(jbjohnso): here, we need to authenticate, but not
|
||||
|
Loading…
Reference in New Issue
Block a user