diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 5f0c78f6..c8d31e62 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -68,7 +68,7 @@ def join_collective(server, invitation): res = tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) - print(res.get('status', res['error'])) + print(res.get('status', res.get('error', repr(res)))) def show_collective(): diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 09961698..5e49a2cf 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -41,12 +41,15 @@ retrythread = None class ContextBool(object): def __init__(self): self.active = False + self.mylock = threading.RLock() def __enter__(self): self.active = True + self.mylock.__enter__() def __exit__(self, exc_type, exc_val, exc_tb): self.active = False + self.mylock.__exit__(exc_type, exc_val, exc_tb) connecting = ContextBool() leader_init = ContextBool() @@ -81,11 +84,9 @@ def connect_to_leader(cert=None, name=None, leader=None): if 'backoff' in keydata: log.log({ 'info': 'Collective initialization in progress on ' - '{0}, will retry connection'.format(leader), + '{0}'.format(leader), 'subsystem': 'collective'}) - eventlet.spawn_after(random.random(), connect_to_leader, - cert, name, leader) - return True + return False if 'leader' in keydata: log.log( {'info': 'Prospective leader {0} has redirected this ' @@ -101,13 +102,17 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({'info': 'Prospective leader {0} has inferior ' 'transaction count, becoming leader' - ''.format(leader)}) + ''.format(leader), 'subsystem': 'collective', + 'subsystem': 'collective'}) return become_leader(remote) - print(keydata['error']) return False follower.kill() cfm.stop_following() follower = None + if follower: + follower.kill() + cfm.stop_following() + follower = None log.log({'info': 'Following leader {0}'.format(leader), 'subsystem': 'collective'}) colldata = tlvdata.recv(remote) @@ -317,6 +322,8 @@ def handle_connection(connection, cert, request, local=False): f = open('/etc/confluent/cfg/myname', 'w') f.write(name) f.close() + log.log({'info': 'Connecting to collective due to join', + 'subsystem': 'collective'}) eventlet.spawn_n(connect_to_leader, rsp['collective'][ 'fingerprint'], name) if 'enroll' == operation: @@ -360,8 +367,22 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, {'error': 'Refusing to be assimilated by inferior' 'transaction count', - 'txcount': cfm._txcount}) + 'txcount': cfm._txcount,}) return + if connecting.active: + # don't try to connect while actively already trying to connect + tlvdata.send(connection, {'status': 0}) + connection.close() + return + if (currentleader == connection.getpeername()[0] and + follower and follower.isAlive()): + # if we are happily following this leader already, don't stir + # the pot + tlvdata.send(connection, {'status': 0}) + connection.close() + return + log.log({'info': 'Connecting in response to assimilation', + 'subsystem': 'collective'}) eventlet.spawn_n(connect_to_leader, None, None, leader=connection.getpeername()[0]) tlvdata.send(connection, {'status': 0}) @@ -390,6 +411,11 @@ def handle_connection(connection, cert, request, local=False): connection.close() return myself = connection.getsockname()[0] + if connecting.active: + tlvdata.send(connection, {'error': 'Connecting right now', + 'backoff': True}) + connection.close() + return if myself != get_leader(connection): tlvdata.send( connection, @@ -397,17 +423,14 @@ def handle_connection(connection, cert, request, local=False): 'in another castle', 'leader': currentleader}) connection.close() return - if connecting.active: - tlvdata.send(connection, {'error': 'Connecting right now', - 'backoff': True}) - connection.close() - return if request['txcount'] > cfm._txcount: retire_as_leader() tlvdata.send(connection, {'error': 'Client has higher tranasaction count, ' 'should assimilate me, connecting..', 'txcount': cfm._txcount}) + log.log({'info': 'Connecting to leader due to superior ' + 'transaction count', 'subsystem': collective}) eventlet.spawn_n(connect_to_leader, None, None, connection.getpeername()[0]) connection.close() @@ -485,6 +508,12 @@ def try_assimilate(drone): def get_leader(connection): if currentleader is None or connection.getpeername()[0] == currentleader: + if currentleader is None: + msg = 'Becoming leader as no leader known' + else: + msg = 'Becoming leader because {0} attempted to connect and it ' \ + 'is current leader'.format(currentleader) + log.log({'info': msg, 'subsystem': 'collective'}) become_leader(connection) return currentleader @@ -501,6 +530,7 @@ def become_leader(connection): 'subsystem': 'collective'}) if follower: follower.kill() + cfm.stop_following() follower = None if retrythread: retrythread.cancel() @@ -508,9 +538,12 @@ def become_leader(connection): currentleader = connection.getsockname()[0] skipaddr = connection.getpeername()[0] myname = get_myname() + skipem = set(cfm.cfgstreams) + skipem.add(currentleader) + skipem.add(skipaddr) for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] - if dronecandidate in (currentleader, skipaddr) or member == myname: + if dronecandidate in skipem or member == myname: continue eventlet.spawn_n(try_assimilate, dronecandidate) @@ -527,6 +560,7 @@ def start_collective(): global retrythread if follower: follower.kill() + cfm.stop_following() follower = None try: if cfm.cfgstreams: @@ -545,6 +579,8 @@ def start_collective(): if cfm.cfgleader is None: cfm.stop_following(True) ldrcandidate = cfm.get_collective_member(member)['address'] + log.log({'info': 'Performing startup attempt to {0}'.format( + ldrcandidate), 'subsystem': 'collective'}) if connect_to_leader(name=myname, leader=ldrcandidate): break else: diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bc114426..4fcc0b71 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -626,7 +626,7 @@ def rollback_clear(): _cfgstore = _oldcfgstore _oldtxcount = 0 _oldcfgstore = None - ConfigManager._bg_sync_to_file() + ConfigManager.wait_for_sync(True) def clear_configuration(): @@ -638,10 +638,7 @@ def clear_configuration(): stop_following() _oldcfgstore = _cfgstore _oldtxcount = _txcount - if _cfgstore is None or 'main' not in _cfgstore: - _cfgstore = {} - else: - _cfgstore['main'].clear() + _cfgstore = {} _txcount = 0 def commit_clear(): @@ -649,12 +646,13 @@ def commit_clear(): global _oldcfgstore _oldcfgstore = None _oldtxcount = 0 - todelete = _config_areas + ('globals', 'collective', 'transactioncount') - for cfg in todelete: - try: - os.remove(os.path.join(ConfigManager._cfgdir, cfg)) - except OSError as oe: - pass + with _synclock: + todelete = ('transactioncount', 'globals', 'collective') + _config_areas + for cfg in todelete: + try: + os.remove(os.path.join(ConfigManager._cfgdir, cfg)) + except OSError as oe: + pass ConfigManager.wait_for_sync(True) ConfigManager._bg_sync_to_file() @@ -955,6 +953,12 @@ class ConfigManager(object): _nodecollwatchers = {} _notifierids = {} + @property + def _cfgstore(self): + if self.tenant is None: + return _cfgstore['main'] + return _cfgstore['tenant'][self.tenant] + def __init__(self, tenant, decrypt=False, username=None): global _cfgstore with _initlock: @@ -967,8 +971,7 @@ class ConfigManager(object): if 'main' not in _cfgstore: _cfgstore['main'] = {} self._bg_sync_to_file() - self._cfgstore = _cfgstore['main'] - if 'nodegroups' not in self._cfgstore: + if 'nodegroups' not in self._cfgstore: # This can happen during a clear... it seams... and if so it messes up... self._cfgstore['nodegroups'] = {'everything': {'nodes': set()}} _mark_dirtykey('nodegroups', 'everything', self.tenant) self._bg_sync_to_file() @@ -983,13 +986,13 @@ class ConfigManager(object): _cfgstore['tenant'][tenant] = {} self._bg_sync_to_file() self.tenant = tenant - self._cfgstore = _cfgstore['tenant'][tenant] if 'nodegroups' not in self._cfgstore: self._cfgstore['nodegroups'] = {'everything': {}} _mark_dirtykey('nodegroups', 'everything', self.tenant) if 'nodes' not in self._cfgstore: self._cfgstore['nodes'] = {} self._bg_sync_to_file() + self.wait_for_sync() def get_collective_member(self, name): return get_collective_member(name) @@ -2061,9 +2064,13 @@ class ConfigManager(object): if statelessmode: return with cls._syncstate: - if cls._syncrunning: + if (cls._syncrunning and cls._cfgwriter is not None and + cls._cfgwriter.isAlive()): cls._writepending = True return + if cls._syncrunning: # This suggests an unclean write attempt, + # do a fullsync as a recovery + fullsync = True cls._syncrunning = True # if the thread is exiting, join it to let it close, just in case if cls._cfgwriter is not None: @@ -2079,8 +2086,9 @@ class ConfigManager(object): _mkpath(cls._cfgdir) with open(os.path.join(cls._cfgdir, 'transactioncount'), 'w') as f: f.write(struct.pack('!Q', _txcount)) - if fullsync or 'dirtyglobals' in _cfgstore: - if fullsync: + if (fullsync or 'dirtyglobals' in _cfgstore and + 'globals' in _cfgstore): + if fullsync: # globals is not a given to be set.. dirtyglobals = _cfgstore['globals'] else: with _dirtylock: