2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-27 19:37:57 +00:00

Give the cfg init a lock

Move collective manager and configmanager to share a configinitlock,
so that bad timings during internal initialization and collective
activity cannot interfere and produce corrupt database.

This became an issue with the fix for 'everything' disappearing.
This commit is contained in:
Jarrod Johnson 2018-10-02 10:17:44 -04:00
parent 78a1741e0e
commit d86e1fc4eb
2 changed files with 29 additions and 34 deletions

View File

@ -35,7 +35,6 @@ except ImportError:
crypto = None
currentleader = None
cfginitlock = None
follower = None
retrythread = None
@ -54,10 +53,7 @@ leader_init = ContextBool()
def connect_to_leader(cert=None, name=None, leader=None):
global currentleader
global cfginitlock
global follower
if cfginitlock is None:
cfginitlock = threading.RLock()
if leader is None:
leader = currentleader
log.log({'info': 'Attempting connection to leader {0}'.format(leader),
@ -70,7 +66,7 @@ def connect_to_leader(cert=None, name=None, leader=None):
'subsystem': 'collective'})
return False
with connecting:
with cfginitlock:
with cfm._initlock:
tlvdata.recv(remote) # the banner
tlvdata.recv(remote) # authpassed... 0..
if name is None:
@ -520,13 +516,10 @@ def become_leader(connection):
def startup():
global cfginitlock
members = list(cfm.list_collective())
if len(members) < 2:
# Not in collective mode, return
return
if cfginitlock is None:
cfginitlock = threading.RLock()
eventlet.spawn_n(start_collective)
def start_collective():

View File

@ -82,6 +82,7 @@ _dirtylock = threading.RLock()
_leaderlock = gthread.RLock()
_synclock = threading.RLock()
_rpclock = gthread.RLock()
_initlock = gthread.RLock()
_followerlocks = {}
_config_areas = ('nodegroups', 'nodes', 'usergroups', 'users')
tracelog = None
@ -953,38 +954,39 @@ class ConfigManager(object):
def __init__(self, tenant, decrypt=False, username=None):
global _cfgstore
if _cfgstore is None:
init()
self.decrypt = decrypt
self.current_user = username
if tenant is None:
self.tenant = None
if 'main' not in _cfgstore:
_cfgstore['main'] = {}
with _initlock:
if _cfgstore is None:
init()
self.decrypt = decrypt
self.current_user = username
if tenant is None:
self.tenant = None
if 'main' not in _cfgstore:
_cfgstore['main'] = {}
self._bg_sync_to_file()
self._cfgstore = _cfgstore['main']
if 'nodegroups' not in self._cfgstore:
self._cfgstore['nodegroups'] = {'everything': {'nodes': set()}}
_mark_dirtykey('nodegroups', 'everything', self.tenant)
self._bg_sync_to_file()
if 'nodes' not in self._cfgstore:
self._cfgstore['nodes'] = {}
self._bg_sync_to_file()
return
elif 'tenant' not in _cfgstore:
_cfgstore['tenant'] = {tenant: {}}
self._bg_sync_to_file()
self._cfgstore = _cfgstore['main']
elif tenant not in _cfgstore['tenant']:
_cfgstore['tenant'][tenant] = {}
self._bg_sync_to_file()
self.tenant = tenant
self._cfgstore = _cfgstore['tenant'][tenant]
if 'nodegroups' not in self._cfgstore:
self._cfgstore['nodegroups'] = {'everything': {'nodes': set()}}
self._cfgstore['nodegroups'] = {'everything': {}}
_mark_dirtykey('nodegroups', 'everything', self.tenant)
self._bg_sync_to_file()
if 'nodes' not in self._cfgstore:
self._cfgstore['nodes'] = {}
self._bg_sync_to_file()
return
elif 'tenant' not in _cfgstore:
_cfgstore['tenant'] = {tenant: {}}
self._bg_sync_to_file()
elif tenant not in _cfgstore['tenant']:
_cfgstore['tenant'][tenant] = {}
self._bg_sync_to_file()
self.tenant = tenant
self._cfgstore = _cfgstore['tenant'][tenant]
if 'nodegroups' not in self._cfgstore:
self._cfgstore['nodegroups'] = {'everything': {}}
_mark_dirtykey('nodegroups', 'everything', self.tenant)
if 'nodes' not in self._cfgstore:
self._cfgstore['nodes'] = {}
self._bg_sync_to_file()
def get_collective_member(self, name):
return get_collective_member(name)