From d86e1fc4eb0d6e67f2c66b3d0d4b42497c9bd28c Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 2 Oct 2018 10:17:44 -0400 Subject: [PATCH] Give the cfg init a lock Move collective manager and configmanager to share a configinitlock, so that bad timings during internal initialization and collective activity cannot interfere and produce corrupt database. This became an issue with the fix for 'everything' disappearing. --- .../confluent/collective/manager.py | 9 +--- .../confluent/config/configmanager.py | 54 ++++++++++--------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 33b87323..09961698 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -35,7 +35,6 @@ except ImportError: crypto = None currentleader = None -cfginitlock = None follower = None retrythread = None @@ -54,10 +53,7 @@ leader_init = ContextBool() def connect_to_leader(cert=None, name=None, leader=None): global currentleader - global cfginitlock global follower - if cfginitlock is None: - cfginitlock = threading.RLock() if leader is None: leader = currentleader log.log({'info': 'Attempting connection to leader {0}'.format(leader), @@ -70,7 +66,7 @@ def connect_to_leader(cert=None, name=None, leader=None): 'subsystem': 'collective'}) return False with connecting: - with cfginitlock: + with cfm._initlock: tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. if name is None: @@ -520,13 +516,10 @@ def become_leader(connection): def startup(): - global cfginitlock members = list(cfm.list_collective()) if len(members) < 2: # Not in collective mode, return return - if cfginitlock is None: - cfginitlock = threading.RLock() eventlet.spawn_n(start_collective) def start_collective(): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index e9258eb4..0e105c29 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -82,6 +82,7 @@ _dirtylock = threading.RLock() _leaderlock = gthread.RLock() _synclock = threading.RLock() _rpclock = gthread.RLock() +_initlock = gthread.RLock() _followerlocks = {} _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None @@ -953,38 +954,39 @@ class ConfigManager(object): def __init__(self, tenant, decrypt=False, username=None): global _cfgstore - if _cfgstore is None: - init() - self.decrypt = decrypt - self.current_user = username - if tenant is None: - self.tenant = None - if 'main' not in _cfgstore: - _cfgstore['main'] = {} + with _initlock: + if _cfgstore is None: + init() + self.decrypt = decrypt + self.current_user = username + if tenant is None: + self.tenant = None + if 'main' not in _cfgstore: + _cfgstore['main'] = {} + self._bg_sync_to_file() + self._cfgstore = _cfgstore['main'] + if 'nodegroups' not in self._cfgstore: + self._cfgstore['nodegroups'] = {'everything': {'nodes': set()}} + _mark_dirtykey('nodegroups', 'everything', self.tenant) + self._bg_sync_to_file() + if 'nodes' not in self._cfgstore: + self._cfgstore['nodes'] = {} + self._bg_sync_to_file() + return + elif 'tenant' not in _cfgstore: + _cfgstore['tenant'] = {tenant: {}} self._bg_sync_to_file() - self._cfgstore = _cfgstore['main'] + elif tenant not in _cfgstore['tenant']: + _cfgstore['tenant'][tenant] = {} + self._bg_sync_to_file() + self.tenant = tenant + self._cfgstore = _cfgstore['tenant'][tenant] if 'nodegroups' not in self._cfgstore: - self._cfgstore['nodegroups'] = {'everything': {'nodes': set()}} + self._cfgstore['nodegroups'] = {'everything': {}} _mark_dirtykey('nodegroups', 'everything', self.tenant) - self._bg_sync_to_file() if 'nodes' not in self._cfgstore: self._cfgstore['nodes'] = {} - self._bg_sync_to_file() - return - elif 'tenant' not in _cfgstore: - _cfgstore['tenant'] = {tenant: {}} self._bg_sync_to_file() - elif tenant not in _cfgstore['tenant']: - _cfgstore['tenant'][tenant] = {} - self._bg_sync_to_file() - self.tenant = tenant - self._cfgstore = _cfgstore['tenant'][tenant] - if 'nodegroups' not in self._cfgstore: - self._cfgstore['nodegroups'] = {'everything': {}} - _mark_dirtykey('nodegroups', 'everything', self.tenant) - if 'nodes' not in self._cfgstore: - self._cfgstore['nodes'] = {} - self._bg_sync_to_file() def get_collective_member(self, name): return get_collective_member(name)