2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-25 19:10:10 +00:00

Add more logging to collective process

This commit is contained in:
Jarrod Johnson 2018-09-27 10:49:57 -04:00
parent 0724ad812b
commit 10ce7a9de9

View File

@ -62,7 +62,9 @@ def connect_to_leader(cert=None, name=None, leader=None):
leader = currentleader
try:
remote = connect_to_collective(cert, leader)
except socket.error:
except socket.error as e:
log.log({'error': 'Collective connection attempt to {0} failed: {1}'
''.format(leader, str(e))})
return False
with connecting:
with cfginitlock:
@ -78,10 +80,16 @@ def connect_to_leader(cert=None, name=None, leader=None):
return False
if 'error' in keydata:
if 'backoff' in keydata:
log.log({
'info': 'Collective initialization in progress on '
'{0}, will retry connection'.format(leader)})
eventlet.spawn_after(random.random(), connect_to_leader,
cert, name, leader)
return True
if 'leader' in keydata:
log.log(
{'info': 'Prospective leader {0} has redirected this '
'member to {1}'.format(leader, keydata['leader'])})
ldrc = cfm.get_collective_member_by_address(
keydata['leader'])
if ldrc and ldrc['name'] == name:
@ -89,6 +97,10 @@ def connect_to_leader(cert=None, name=None, leader=None):
return connect_to_leader(name=name,
leader=keydata['leader'])
if 'txcount' in keydata:
log.log({'info':
'Prospective leader {0} has inferior '
'transaction count, becoming leader'
''.format(leader)})
return become_leader(remote)
print(keydata['error'])
return False
@ -96,6 +108,7 @@ def connect_to_leader(cert=None, name=None, leader=None):
follower.kill()
cfm.stop_following()
follower = None
log.log({'info': 'Following leader {0}'.format(leader)})
colldata = tlvdata.recv(remote)
globaldata = tlvdata.recv(remote)
dbi = tlvdata.recv(remote)
@ -138,6 +151,8 @@ def follow_leader(remote):
try:
cfm.follow_channel(remote)
finally:
log.log({'info': 'Current leader has disappeared, restarting '
'collective membership'})
# The leader has folded, time to startup again...
cfm.stop_following()
currentleader = None