mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-16 20:57:53 +00:00
Begin the 'connect' collective operation
First check if we are current leader, reject if not, then if cert is invalid, reject, then comes the TODO.
This commit is contained in:
parent
06fdc648b8
commit
c7b01e00b6
@ -24,12 +24,15 @@ import eventlet.green.ssl as ssl
|
||||
try:
|
||||
import OpenSSL.crypto as crypto
|
||||
except ImportError:
|
||||
# while not always required, we use pyopenssl required for at least collective
|
||||
# while not always required, we use pyopenssl required for at least
|
||||
# collective
|
||||
crypto = None
|
||||
|
||||
currentleader = None
|
||||
|
||||
|
||||
def handle_connection(connection, cert, request, local=False):
|
||||
global currentleader
|
||||
operation = request['operation']
|
||||
if cert:
|
||||
cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
|
||||
@ -39,7 +42,8 @@ def handle_connection(connection, cert, request, local=False):
|
||||
if 'invite' == operation:
|
||||
name = request['name']
|
||||
invitation = invites.create_server_invitation(name)
|
||||
tlvdata.send(connection, {'collective': {'invitation': invitation}})
|
||||
tlvdata.send(connection,
|
||||
{'collective': {'invitation': invitation}})
|
||||
if 'join' == operation:
|
||||
invitation = request['invitation']
|
||||
invitation = base64.b64decode(invitation)
|
||||
@ -61,7 +65,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
tlvdata.recv(remote) # ignore banner
|
||||
tlvdata.recv(remote) # ignore authpassed: 0
|
||||
tlvdata.send(remote, {'collective': {'operation': 'joinchallenge',
|
||||
'name': name, 'hmac': proof}})
|
||||
'name': name, 'hmac': proof}})
|
||||
rsp = tlvdata.recv(remote)
|
||||
proof = rsp['collective']['approval']
|
||||
proof = base64.b64decode(proof)
|
||||
@ -69,6 +73,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
if not j:
|
||||
return
|
||||
tlvdata.send(connection, {'collective': {'status': 'Success'}})
|
||||
currentleader = rsp['collective']['leader']
|
||||
if 'joinchallenge' == operation:
|
||||
mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem')
|
||||
proof = base64.b64decode(request['hmac'])
|
||||
@ -85,7 +90,23 @@ def handle_connection(connection, cert, request, local=False):
|
||||
tlvdata.send(connection,
|
||||
{'collective': {'approval': myrsp,
|
||||
'leader': get_leader(connection)}})
|
||||
|
||||
if 'connect' == operation:
|
||||
myself = connection.getsockname()[0]
|
||||
if myself != get_leader(connection):
|
||||
tlvdata.send(
|
||||
connection,
|
||||
{'error': 'Cannot assimilate, our leader is '
|
||||
'in another castle', 'leader': currentleader})
|
||||
return
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
if not util.cert_matches(droneinfo['fingerprint'], cert):
|
||||
tlvdata.send(connection,
|
||||
{'error': 'Invalid certificate,'
|
||||
'redo invitation process'})
|
||||
return
|
||||
# ok, we have a connecting member whose certificate checks out
|
||||
# He needs to bootstrap his configuration and subscribe it to updates
|
||||
|
||||
def get_leader(connection):
|
||||
global currentleader
|
||||
|
@ -380,6 +380,8 @@ def add_collective_member(name, address, fingerprint):
|
||||
_cfgstore['collectivedirty'].add(name)
|
||||
ConfigManager._bg_sync_to_file()
|
||||
|
||||
def get_collective_member(name):
|
||||
return _cfgstore['collective'][name]
|
||||
|
||||
def _mark_dirtykey(category, key, tenant=None):
|
||||
if type(key) in (str, unicode):
|
||||
|
Loading…
x
Reference in New Issue
Block a user