2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-09 21:15:53 +00:00

Wrap cfg init on follow in lock

Use a lock to provide more atomic behavior for connecting should
something go wrong in calling connect_to_leader incorrectly.
This commit is contained in:
Jarrod Johnson 2018-06-26 10:13:27 -04:00
parent 4de1fea7aa
commit 7433dd3e38

View File

@ -22,6 +22,7 @@ import confluent.util as util
import eventlet
import eventlet.green.socket as socket
import eventlet.green.ssl as ssl
import eventlet.green.threading as threading
try:
import OpenSSL.crypto as crypto
except ImportError:
@ -30,16 +31,20 @@ except ImportError:
crypto = None
currentleader = None
cfginitlock = None
def connect_to_leader(cert=None, name=None, leader=None):
global currentleader
global cfginitlock
if cfginitlock is None:
cfginitlock = threading.RLock()
if leader is None:
leader = currentleader
try:
remote = connect_to_collective(cert, leader)
except socket.error:
return
return False
tlvdata.recv(remote) # the banner
tlvdata.recv(remote) # authpassed... 0..
if name is None:
@ -64,18 +69,19 @@ def connect_to_leader(cert=None, name=None, leader=None):
if not ndata:
raise Exception("Error doing initial DB transfer")
dbjson += ndata
cfm.cfgleader = None
cfm.clear_configuration()
cfm._restore_keys(keydata, None)
for c in colldata:
cfm.add_collective_member(c, colldata[c]['address'],
colldata[c]['fingerprint'])
cfm._cfgstore['collective'] = colldata
for globvar in globaldata:
cfm.set_global(globvar, globaldata[globvar])
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
cfm.ConfigManager._bg_sync_to_file()
currentleader = leader
with cfginitlock:
cfm.cfgleader = None
cfm.clear_configuration()
cfm._restore_keys(keydata, None)
for c in colldata:
cfm.add_collective_member(c, colldata[c]['address'],
colldata[c]['fingerprint'])
cfm._cfgstore['collective'] = colldata
for globvar in globaldata:
cfm.set_global(globvar, globaldata[globvar])
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
cfm.ConfigManager._bg_sync_to_file()
currentleader = leader
cfm.follow_channel(remote)
# The leader has folded, time to startup again...
eventlet.spawn_n(start_collective)
@ -260,10 +266,13 @@ def become_leader(connection):
def startup():
global cfginitlock
members = list(cfm.list_collective())
if len(members) < 2:
# Not in collective mode, return
return
if cfginitlock is None:
cfginitlock = threading.RLock()
eventlet.spawn_n(start_collective)
def start_collective():