mirror of
https://github.com/xcat2/confluent.git
synced 2025-03-29 03:27:24 +00:00
Resolve race conditions on simultaneous collective outage
Implement random backoff strategy for serializing connect out and connect in.
This commit is contained in:
parent
39378170b1
commit
1eaf5357ca
@ -24,6 +24,7 @@ import eventlet
|
||||
import eventlet.green.socket as socket
|
||||
import eventlet.green.ssl as ssl
|
||||
import eventlet.green.threading as threading
|
||||
import random
|
||||
try:
|
||||
import OpenSSL.crypto as crypto
|
||||
except ImportError:
|
||||
@ -35,6 +36,18 @@ currentleader = None
|
||||
cfginitlock = None
|
||||
follower = None
|
||||
|
||||
class ContextBool(object):
|
||||
def __init__(self):
|
||||
self.active = False
|
||||
|
||||
def __enter__(self):
|
||||
self.active = True
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.active = False
|
||||
|
||||
connecting = ContextBool()
|
||||
|
||||
|
||||
def connect_to_leader(cert=None, name=None, leader=None):
|
||||
global currentleader
|
||||
@ -48,52 +61,59 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
remote = connect_to_collective(cert, leader)
|
||||
except socket.error:
|
||||
return False
|
||||
tlvdata.recv(remote) # the banner
|
||||
tlvdata.recv(remote) # authpassed... 0..
|
||||
if name is None:
|
||||
name = get_myname()
|
||||
tlvdata.send(remote, {'collective': {'operation': 'connect',
|
||||
'name': name,
|
||||
'txcount': cfm._txcount}})
|
||||
keydata = tlvdata.recv(remote)
|
||||
if 'error' in keydata:
|
||||
if 'leader' in keydata:
|
||||
ldrc = cfm.get_collective_member_by_address(keydata['leader'])
|
||||
if ldrc and ldrc['name'] == name:
|
||||
raise Exception("Redirected to self")
|
||||
return connect_to_leader(name=name, leader=keydata['leader'])
|
||||
if 'txcount' in keydata:
|
||||
return become_leader(remote)
|
||||
raise Exception(keydata['error'])
|
||||
if follower is not None:
|
||||
follower.kill()
|
||||
follower = None
|
||||
colldata = tlvdata.recv(remote)
|
||||
globaldata = tlvdata.recv(remote)
|
||||
dbi = tlvdata.recv(remote)
|
||||
dbsize = dbi['dbsize']
|
||||
dbjson = ''
|
||||
while (len(dbjson) < dbsize):
|
||||
ndata = remote.recv(dbsize - len(dbjson))
|
||||
if not ndata:
|
||||
raise Exception("Error doing initial DB transfer")
|
||||
dbjson += ndata
|
||||
with cfginitlock:
|
||||
cfm.cfgleader = None
|
||||
cfm.clear_configuration()
|
||||
cfm._restore_keys(keydata, None)
|
||||
for c in colldata:
|
||||
cfm.add_collective_member(c, colldata[c]['address'],
|
||||
colldata[c]['fingerprint'])
|
||||
cfm._cfgstore['collective'] = colldata
|
||||
for globvar in globaldata:
|
||||
cfm.set_global(globvar, globaldata[globvar])
|
||||
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
|
||||
cfm._txcount = dbi['txcount']
|
||||
cfm.ConfigManager._bg_sync_to_file()
|
||||
currentleader = leader
|
||||
#spawn this as a thread...
|
||||
follower = eventlet.spawn(follow_leader, remote)
|
||||
with connecting:
|
||||
with cfginitlock:
|
||||
tlvdata.recv(remote) # the banner
|
||||
tlvdata.recv(remote) # authpassed... 0..
|
||||
if name is None:
|
||||
name = get_myname()
|
||||
tlvdata.send(remote, {'collective': {'operation': 'connect',
|
||||
'name': name,
|
||||
'txcount': cfm._txcount}})
|
||||
keydata = tlvdata.recv(remote)
|
||||
if 'error' in keydata:
|
||||
if 'backoff' in keydata:
|
||||
eventlet.spawn_after(random.random(), connect_to_leader,
|
||||
cert, name, leader)
|
||||
return True
|
||||
if 'leader' in keydata:
|
||||
ldrc = cfm.get_collective_member_by_address(
|
||||
keydata['leader'])
|
||||
if ldrc and ldrc['name'] == name:
|
||||
raise Exception("Redirected to self")
|
||||
return connect_to_leader(name=name,
|
||||
leader=keydata['leader'])
|
||||
if 'txcount' in keydata:
|
||||
return become_leader(remote)
|
||||
raise Exception(keydata['error'])
|
||||
if follower is not None:
|
||||
follower.kill()
|
||||
follower = None
|
||||
colldata = tlvdata.recv(remote)
|
||||
globaldata = tlvdata.recv(remote)
|
||||
dbi = tlvdata.recv(remote)
|
||||
dbsize = dbi['dbsize']
|
||||
dbjson = ''
|
||||
while (len(dbjson) < dbsize):
|
||||
ndata = remote.recv(dbsize - len(dbjson))
|
||||
if not ndata:
|
||||
raise Exception("Error doing initial DB transfer")
|
||||
dbjson += ndata
|
||||
cfm.cfgleader = None
|
||||
cfm.clear_configuration()
|
||||
cfm._restore_keys(keydata, None)
|
||||
for c in colldata:
|
||||
cfm.add_collective_member(c, colldata[c]['address'],
|
||||
colldata[c]['fingerprint'])
|
||||
cfm._cfgstore['collective'] = colldata
|
||||
for globvar in globaldata:
|
||||
cfm.set_global(globvar, globaldata[globvar])
|
||||
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
|
||||
cfm._txcount = dbi['txcount']
|
||||
cfm.ConfigManager._bg_sync_to_file()
|
||||
currentleader = leader
|
||||
#spawn this as a thread...
|
||||
follower = eventlet.spawn(follow_leader, remote)
|
||||
return True
|
||||
|
||||
|
||||
@ -245,6 +265,11 @@ def handle_connection(connection, cert, request, local=False):
|
||||
'in another castle', 'leader': currentleader})
|
||||
connection.close()
|
||||
return
|
||||
if connecting.active:
|
||||
tlvdata.send(connection, {'error': 'Connecting right now',
|
||||
'backoff': True})
|
||||
connection.close()
|
||||
return
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
if not util.cert_matches(droneinfo['fingerprint'], cert):
|
||||
|
Loading…
x
Reference in New Issue
Block a user