diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index dcb11717..5f326591 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -25,6 +25,7 @@ import eventlet import eventlet.green.socket as socket import eventlet.green.ssl as ssl import eventlet.green.threading as threading +import greenlet import random try: import OpenSSL.crypto as crypto @@ -51,7 +52,6 @@ class ContextBool(object): connecting = ContextBool() leader_init = ContextBool() - def connect_to_leader(cert=None, name=None, leader=None): global currentleader global cfginitlock @@ -60,12 +60,14 @@ def connect_to_leader(cert=None, name=None, leader=None): cfginitlock = threading.RLock() if leader is None: leader = currentleader - log.log({'info': 'Attempting connection to leader {0}'.format(leader)}) + log.log({'info': 'Attempting connection to leader {0}'.format(leader), + 'subsystem': 'collective'}) try: remote = connect_to_collective(cert, leader) except socket.error as e: log.log({'error': 'Collective connection attempt to {0} failed: {1}' - ''.format(leader, str(e))}) + ''.format(leader, str(e)), + 'subsystem': 'collective'}) return False with connecting: with cfginitlock: @@ -83,14 +85,16 @@ def connect_to_leader(cert=None, name=None, leader=None): if 'backoff' in keydata: log.log({ 'info': 'Collective initialization in progress on ' - '{0}, will retry connection'.format(leader)}) + '{0}, will retry connection'.format(leader), + 'subsystem': 'collective'}) eventlet.spawn_after(random.random(), connect_to_leader, cert, name, leader) return True if 'leader' in keydata: log.log( {'info': 'Prospective leader {0} has redirected this ' - 'member to {1}'.format(leader, keydata['leader'])}) + 'member to {1}'.format(leader, keydata['leader']), + 'subsystem': 'collective'}) ldrc = cfm.get_collective_member_by_address( keydata['leader']) if ldrc and ldrc['name'] == name: @@ -108,7 +112,8 @@ def connect_to_leader(cert=None, name=None, leader=None): follower.kill() cfm.stop_following() follower = None - log.log({'info': 'Following leader {0}'.format(leader)}) + log.log({'info': 'Following leader {0}'.format(leader), + 'subsystem': 'collective'}) colldata = tlvdata.recv(remote) globaldata = tlvdata.recv(remote) dbi = tlvdata.recv(remote) @@ -148,11 +153,18 @@ def connect_to_leader(cert=None, name=None, leader=None): def follow_leader(remote): global currentleader + cleanexit = False try: cfm.follow_channel(remote) + except greenlet.GreenletExit: + cleanexit = True finally: + if cleanexit: + log.log({'info': 'Previous following cleanly closed', + 'subsystem': 'collective'}) + return log.log({'info': 'Current leader has disappeared, restarting ' - 'collective membership'}) + 'collective membership', 'subsystem': 'collective'}) # The leader has folded, time to startup again... cfm.stop_following() currentleader = None @@ -457,19 +469,22 @@ def try_assimilate(drone): log.log( {'error': 'No answer from {0} while trying to assimilate'.format( - drone)}) + drone), + 'subsystem': 'collective'}) return if 'txcount' in answer: log.log({'info': 'Deferring to {0} due to transaction count'.format( - drone)}) + drone), 'subsystem': 'collective'}) connect_to_leader(None, None, leader=remote.getpeername()[0]) return if 'error' in answer: log.log({ 'error': 'Error encountered while attempting to ' - 'assimilate {0}: {1}'.format(drone, answer['error'])}) + 'assimilate {0}: {1}'.format(drone, answer['error']), + 'subsystem': 'collective'}) return - log.log({'Assimilated {0} into collective'.format(drone)}) + log.log({'info': 'Assimilated {0} into collective'.format(drone), + 'subsystem': 'collective'}) def get_leader(connection): @@ -485,7 +500,8 @@ def retire_as_leader(): def become_leader(connection): global currentleader global follower - log.log({'info': 'Becoming leader of collective'}) + log.log({'info': 'Becoming leader of collective', + 'subsystem': 'collective'}) if follower: follower.kill() follower = None