2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-15 20:27:50 +00:00

Merge branch '18csi'

This commit is contained in:
Jarrod Johnson 2018-10-12 13:33:09 -04:00
commit f830658818
3 changed files with 75 additions and 31 deletions

View File

@ -68,7 +68,7 @@ def join_collective(server, invitation):
res = tlvdata.recv(s)
res = res.get('collective',
{'status': 'Unknown response: ' + repr(res)})
print(res.get('status', res['error']))
print(res.get('status', res.get('error', repr(res))))
def show_collective():

View File

@ -41,12 +41,15 @@ retrythread = None
class ContextBool(object):
def __init__(self):
self.active = False
self.mylock = threading.RLock()
def __enter__(self):
self.active = True
self.mylock.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
self.active = False
self.mylock.__exit__(exc_type, exc_val, exc_tb)
connecting = ContextBool()
leader_init = ContextBool()
@ -81,11 +84,9 @@ def connect_to_leader(cert=None, name=None, leader=None):
if 'backoff' in keydata:
log.log({
'info': 'Collective initialization in progress on '
'{0}, will retry connection'.format(leader),
'{0}'.format(leader),
'subsystem': 'collective'})
eventlet.spawn_after(random.random(), connect_to_leader,
cert, name, leader)
return True
return False
if 'leader' in keydata:
log.log(
{'info': 'Prospective leader {0} has redirected this '
@ -101,13 +102,17 @@ def connect_to_leader(cert=None, name=None, leader=None):
log.log({'info':
'Prospective leader {0} has inferior '
'transaction count, becoming leader'
''.format(leader)})
''.format(leader), 'subsystem': 'collective',
'subsystem': 'collective'})
return become_leader(remote)
print(keydata['error'])
return False
follower.kill()
cfm.stop_following()
follower = None
if follower:
follower.kill()
cfm.stop_following()
follower = None
log.log({'info': 'Following leader {0}'.format(leader),
'subsystem': 'collective'})
colldata = tlvdata.recv(remote)
@ -317,6 +322,8 @@ def handle_connection(connection, cert, request, local=False):
f = open('/etc/confluent/cfg/myname', 'w')
f.write(name)
f.close()
log.log({'info': 'Connecting to collective due to join',
'subsystem': 'collective'})
eventlet.spawn_n(connect_to_leader, rsp['collective'][
'fingerprint'], name)
if 'enroll' == operation:
@ -360,8 +367,22 @@ def handle_connection(connection, cert, request, local=False):
tlvdata.send(connection,
{'error': 'Refusing to be assimilated by inferior'
'transaction count',
'txcount': cfm._txcount})
'txcount': cfm._txcount,})
return
if connecting.active:
# don't try to connect while actively already trying to connect
tlvdata.send(connection, {'status': 0})
connection.close()
return
if (currentleader == connection.getpeername()[0] and
follower and follower.isAlive()):
# if we are happily following this leader already, don't stir
# the pot
tlvdata.send(connection, {'status': 0})
connection.close()
return
log.log({'info': 'Connecting in response to assimilation',
'subsystem': 'collective'})
eventlet.spawn_n(connect_to_leader, None, None,
leader=connection.getpeername()[0])
tlvdata.send(connection, {'status': 0})
@ -390,6 +411,11 @@ def handle_connection(connection, cert, request, local=False):
connection.close()
return
myself = connection.getsockname()[0]
if connecting.active:
tlvdata.send(connection, {'error': 'Connecting right now',
'backoff': True})
connection.close()
return
if myself != get_leader(connection):
tlvdata.send(
connection,
@ -397,17 +423,14 @@ def handle_connection(connection, cert, request, local=False):
'in another castle', 'leader': currentleader})
connection.close()
return
if connecting.active:
tlvdata.send(connection, {'error': 'Connecting right now',
'backoff': True})
connection.close()
return
if request['txcount'] > cfm._txcount:
retire_as_leader()
tlvdata.send(connection,
{'error': 'Client has higher tranasaction count, '
'should assimilate me, connecting..',
'txcount': cfm._txcount})
log.log({'info': 'Connecting to leader due to superior '
'transaction count', 'subsystem': collective})
eventlet.spawn_n(connect_to_leader, None, None,
connection.getpeername()[0])
connection.close()
@ -485,6 +508,12 @@ def try_assimilate(drone):
def get_leader(connection):
if currentleader is None or connection.getpeername()[0] == currentleader:
if currentleader is None:
msg = 'Becoming leader as no leader known'
else:
msg = 'Becoming leader because {0} attempted to connect and it ' \
'is current leader'.format(currentleader)
log.log({'info': msg, 'subsystem': 'collective'})
become_leader(connection)
return currentleader
@ -501,6 +530,7 @@ def become_leader(connection):
'subsystem': 'collective'})
if follower:
follower.kill()
cfm.stop_following()
follower = None
if retrythread:
retrythread.cancel()
@ -508,9 +538,12 @@ def become_leader(connection):
currentleader = connection.getsockname()[0]
skipaddr = connection.getpeername()[0]
myname = get_myname()
skipem = set(cfm.cfgstreams)
skipem.add(currentleader)
skipem.add(skipaddr)
for member in cfm.list_collective():
dronecandidate = cfm.get_collective_member(member)['address']
if dronecandidate in (currentleader, skipaddr) or member == myname:
if dronecandidate in skipem or member == myname:
continue
eventlet.spawn_n(try_assimilate, dronecandidate)
@ -527,6 +560,7 @@ def start_collective():
global retrythread
if follower:
follower.kill()
cfm.stop_following()
follower = None
try:
if cfm.cfgstreams:
@ -545,6 +579,8 @@ def start_collective():
if cfm.cfgleader is None:
cfm.stop_following(True)
ldrcandidate = cfm.get_collective_member(member)['address']
log.log({'info': 'Performing startup attempt to {0}'.format(
ldrcandidate), 'subsystem': 'collective'})
if connect_to_leader(name=myname, leader=ldrcandidate):
break
else:

View File

@ -626,7 +626,7 @@ def rollback_clear():
_cfgstore = _oldcfgstore
_oldtxcount = 0
_oldcfgstore = None
ConfigManager._bg_sync_to_file()
ConfigManager.wait_for_sync(True)
def clear_configuration():
@ -638,10 +638,7 @@ def clear_configuration():
stop_following()
_oldcfgstore = _cfgstore
_oldtxcount = _txcount
if _cfgstore is None or 'main' not in _cfgstore:
_cfgstore = {}
else:
_cfgstore['main'].clear()
_cfgstore = {}
_txcount = 0
def commit_clear():
@ -649,12 +646,13 @@ def commit_clear():
global _oldcfgstore
_oldcfgstore = None
_oldtxcount = 0
todelete = _config_areas + ('globals', 'collective', 'transactioncount')
for cfg in todelete:
try:
os.remove(os.path.join(ConfigManager._cfgdir, cfg))
except OSError as oe:
pass
with _synclock:
todelete = ('transactioncount', 'globals', 'collective') + _config_areas
for cfg in todelete:
try:
os.remove(os.path.join(ConfigManager._cfgdir, cfg))
except OSError as oe:
pass
ConfigManager.wait_for_sync(True)
ConfigManager._bg_sync_to_file()
@ -955,6 +953,12 @@ class ConfigManager(object):
_nodecollwatchers = {}
_notifierids = {}
@property
def _cfgstore(self):
if self.tenant is None:
return _cfgstore['main']
return _cfgstore['tenant'][self.tenant]
def __init__(self, tenant, decrypt=False, username=None):
global _cfgstore
with _initlock:
@ -967,8 +971,7 @@ class ConfigManager(object):
if 'main' not in _cfgstore:
_cfgstore['main'] = {}
self._bg_sync_to_file()
self._cfgstore = _cfgstore['main']
if 'nodegroups' not in self._cfgstore:
if 'nodegroups' not in self._cfgstore: # This can happen during a clear... it seams... and if so it messes up...
self._cfgstore['nodegroups'] = {'everything': {'nodes': set()}}
_mark_dirtykey('nodegroups', 'everything', self.tenant)
self._bg_sync_to_file()
@ -983,13 +986,13 @@ class ConfigManager(object):
_cfgstore['tenant'][tenant] = {}
self._bg_sync_to_file()
self.tenant = tenant
self._cfgstore = _cfgstore['tenant'][tenant]
if 'nodegroups' not in self._cfgstore:
self._cfgstore['nodegroups'] = {'everything': {}}
_mark_dirtykey('nodegroups', 'everything', self.tenant)
if 'nodes' not in self._cfgstore:
self._cfgstore['nodes'] = {}
self._bg_sync_to_file()
self.wait_for_sync()
def get_collective_member(self, name):
return get_collective_member(name)
@ -2061,9 +2064,13 @@ class ConfigManager(object):
if statelessmode:
return
with cls._syncstate:
if cls._syncrunning:
if (cls._syncrunning and cls._cfgwriter is not None and
cls._cfgwriter.isAlive()):
cls._writepending = True
return
if cls._syncrunning: # This suggests an unclean write attempt,
# do a fullsync as a recovery
fullsync = True
cls._syncrunning = True
# if the thread is exiting, join it to let it close, just in case
if cls._cfgwriter is not None:
@ -2079,8 +2086,9 @@ class ConfigManager(object):
_mkpath(cls._cfgdir)
with open(os.path.join(cls._cfgdir, 'transactioncount'), 'w') as f:
f.write(struct.pack('!Q', _txcount))
if fullsync or 'dirtyglobals' in _cfgstore:
if fullsync:
if (fullsync or 'dirtyglobals' in _cfgstore and
'globals' in _cfgstore):
if fullsync: # globals is not a given to be set..
dirtyglobals = _cfgstore['globals']
else:
with _dirtylock: