mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-16 12:47:50 +00:00
Start setting the stage for leader change on restart
Have connect() have a way to recover if leader is dead. Also these will be involved in configmanager detected loss of leader
This commit is contained in:
parent
7a912b31cb
commit
4f73ddc41e
@ -51,8 +51,11 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
raise Exception("Certificate mismatch in the collective")
|
||||
tlvdata.recv(remote) # the banner
|
||||
tlvdata.recv(remote) # authpassed... 0..
|
||||
if name is None:
|
||||
name = get_myname()
|
||||
tlvdata.send(remote, {'collective': {'operation': 'connect',
|
||||
'name': name}})
|
||||
'name': name,
|
||||
'txcount': cfm._txcount}})
|
||||
keydata = tlvdata.recv(remote)
|
||||
if 'error' in keydata:
|
||||
if 'leader' in keydata:
|
||||
@ -159,6 +162,23 @@ def handle_connection(connection, cert, request, local=False):
|
||||
tlvdata.send(connection,
|
||||
{'collective': {'approval': myrsp,
|
||||
'leader': get_leader(connection)}})
|
||||
if 'assimilate' == operation:
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
if not util.cert_matches(droneinfo['fingerprint'], cert):
|
||||
tlvdata.send(connection,
|
||||
{'error': 'Invalid certificate, '
|
||||
'redo invitation process'})
|
||||
return
|
||||
if request['txcount'] < cfm._txcount:
|
||||
tlvdata.send(connection,
|
||||
{'error': 'Refusing to be assimilated by inferior'
|
||||
'transaction count',
|
||||
'txcount': cfm._txcount})
|
||||
return
|
||||
eventlet.spawn_n(connect_to_leader, None, None,
|
||||
leader=connection.getpeername()[0])
|
||||
tlvdata.send(connection, {'status': 0})
|
||||
if 'connect' == operation:
|
||||
myself = connection.getsockname()[0]
|
||||
if myself != get_leader(connection):
|
||||
@ -171,9 +191,18 @@ def handle_connection(connection, cert, request, local=False):
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
if not util.cert_matches(droneinfo['fingerprint'], cert):
|
||||
tlvdata.send(connection,
|
||||
{'error': 'Invalid certificate,'
|
||||
{'error': 'Invalid certificate, '
|
||||
'redo invitation process'})
|
||||
return
|
||||
if request['txcount'] > cfm._txcount:
|
||||
retire_as_leader()
|
||||
tlvdata.send(connection,
|
||||
{'error': 'Client has higher tranasaction count, '
|
||||
'should assimilate me, connecting..',
|
||||
'txcount': cfm._txcount})
|
||||
eventlet.spawn_n(connect_to_leader, None, None,
|
||||
connection.getpeername()[0])
|
||||
return
|
||||
tlvdata.send(connection, cfm._dump_keys(None, False))
|
||||
tlvdata.send(connection, cfm._cfgstore['collective'])
|
||||
tlvdata.send(connection, cfm.get_globals())
|
||||
@ -186,12 +215,30 @@ def handle_connection(connection, cert, request, local=False):
|
||||
# ok, we have a connecting member whose certificate checks out
|
||||
# He needs to bootstrap his configuration and subscribe it to updates
|
||||
|
||||
def try_assimilate(drone):
|
||||
pass
|
||||
|
||||
def get_leader(connection):
|
||||
global currentleader
|
||||
if currentleader is None:
|
||||
currentleader = connection.getsockname()[0]
|
||||
if currentleader is None or connection.getpeername()[0] == currentleader:
|
||||
become_leader(connection)
|
||||
return currentleader
|
||||
|
||||
def retire_as_leader():
|
||||
global currentleader
|
||||
cfm.stop_leading()
|
||||
currentleader = None
|
||||
|
||||
def become_leader(connection):
|
||||
global currentleader
|
||||
currentleader = connection.getsockname()[0]
|
||||
skipaddr = connection.getpeername()[0]
|
||||
for member in cfm.list_collective():
|
||||
dronecandidate = cfm.get_collective_member(member)['address']
|
||||
if dronecandidate in (currentleader, skipaddr):
|
||||
continue
|
||||
eventlet.spawn_n(try_assimilate, dronecandidate)
|
||||
|
||||
|
||||
def startup():
|
||||
members = list(cfm.list_collective())
|
||||
if len(members) < 2:
|
||||
|
@ -395,6 +395,12 @@ def relay_slaved_requests(name, listener):
|
||||
msg = listener.recv(8)
|
||||
|
||||
|
||||
def stop_leading():
|
||||
for stream in list(cfgstreams):
|
||||
cfgstreams[stream].close()
|
||||
del cfgstreams[stream]
|
||||
|
||||
|
||||
def clear_configuration():
|
||||
global _cfgstore
|
||||
_cfgstore = {}
|
||||
|
Loading…
x
Reference in New Issue
Block a user