mirror of
https://github.com/xcat2/confluent.git
synced 2025-02-25 23:02:20 +00:00
Make cfgleader modifications more robust
If cfgleader is about to forget a socket, explicitly try to close it first.
This commit is contained in:
parent
1de82936ed
commit
efaf1dae70
@ -101,7 +101,7 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
if not ndata:
|
||||
raise Exception("Error doing initial DB transfer")
|
||||
dbjson += ndata
|
||||
cfm.cfgleader = None
|
||||
cfm.stop_following()
|
||||
cfm.clear_configuration()
|
||||
try:
|
||||
cfm._restore_keys(keydata, None, sync=False)
|
||||
@ -128,7 +128,7 @@ def follow_leader(remote):
|
||||
global currentleader
|
||||
cfm.follow_channel(remote)
|
||||
# The leader has folded, time to startup again...
|
||||
remote.close()
|
||||
cfm.stop_following()
|
||||
currentleader = None
|
||||
eventlet.spawn_n(start_collective)
|
||||
|
||||
@ -319,7 +319,6 @@ def handle_connection(connection, cert, request, local=False):
|
||||
connection.sendall(cfgdata)
|
||||
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
|
||||
# so far unused anyway
|
||||
cfm.cfgleader = None
|
||||
cfm.relay_slaved_requests(drone, connection)
|
||||
# ok, we have a connecting member whose certificate checks out
|
||||
# He needs to bootstrap his configuration and subscribe it to updates
|
||||
@ -386,7 +385,7 @@ def start_collective():
|
||||
if member == myname:
|
||||
continue
|
||||
if cfm.cfgleader is None:
|
||||
cfm.cfgleader = True
|
||||
cfm.stop_following(True)
|
||||
ldrcandidate = cfm.get_collective_member(member)['address']
|
||||
if connect_to_leader(name=myname, leader=ldrcandidate):
|
||||
break
|
||||
|
@ -61,6 +61,7 @@ import cPickle
|
||||
import errno
|
||||
import eventlet
|
||||
import eventlet.event as event
|
||||
import eventlet.green.threading as gthread
|
||||
import fnmatch
|
||||
import json
|
||||
import operator
|
||||
@ -77,6 +78,7 @@ import traceback
|
||||
_masterkey = None
|
||||
_masterintegritykey = None
|
||||
_dirtylock = threading.RLock()
|
||||
_leaderlock = gthread.RLock()
|
||||
_config_areas = ('nodegroups', 'nodes', 'usergroups', 'users')
|
||||
tracelog = None
|
||||
statelessmode = False
|
||||
@ -460,6 +462,7 @@ def set_global(globalname, value, sync=True):
|
||||
cfgstreams = {}
|
||||
def relay_slaved_requests(name, listener):
|
||||
global cfgleader
|
||||
stop_following()
|
||||
cfgstreams[name] = listener
|
||||
msg = listener.recv(8)
|
||||
while msg:
|
||||
@ -484,9 +487,19 @@ def relay_slaved_requests(name, listener):
|
||||
except KeyError:
|
||||
pass # May have already been closed/deleted...
|
||||
if not cfgstreams and not cfgleader:
|
||||
cfgleader = True
|
||||
stop_following(True)
|
||||
|
||||
|
||||
def stop_following(replacement=None):
|
||||
with _leaderlock:
|
||||
global cfgleader
|
||||
if cfgleader and not isinstance(cfgleader, bool):
|
||||
try:
|
||||
cfgleader.close()
|
||||
except Exception:
|
||||
pass
|
||||
cfgleader = replacement
|
||||
|
||||
def stop_leading():
|
||||
for stream in list(cfgstreams):
|
||||
cfgstreams[stream].close()
|
||||
@ -537,10 +550,11 @@ def commit_clear():
|
||||
ConfigManager._bg_sync_to_file()
|
||||
|
||||
cfgleader = None
|
||||
|
||||
|
||||
def follow_channel(channel):
|
||||
global cfgleader
|
||||
global _txcount
|
||||
cfgleader = channel
|
||||
stop_following(channel)
|
||||
msg = channel.recv(8)
|
||||
while msg:
|
||||
sz = struct.unpack('!Q', msg)[0]
|
||||
@ -560,7 +574,7 @@ def follow_channel(channel):
|
||||
_pendingchangesets[rpc['xid']].send()
|
||||
msg = channel.recv(8)
|
||||
# mark the connection as broken
|
||||
cfgleader = True
|
||||
stop_following(True)
|
||||
|
||||
|
||||
def add_collective_member(name, address, fingerprint):
|
||||
|
Loading…
x
Reference in New Issue
Block a user