diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index bfce9181..735d9b1b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -134,11 +134,13 @@ def connect_to_leader(cert=None, name=None, leader=None): def follow_leader(remote): global currentleader - cfm.follow_channel(remote) - # The leader has folded, time to startup again... - cfm.stop_following() - currentleader = None - eventlet.spawn_n(start_collective) + try: + cfm.follow_channel(remote) + finally: + # The leader has folded, time to startup again... + cfm.stop_following() + currentleader = None + eventlet.spawn_n(start_collective) def connect_to_collective(cert, member): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 34972394..756021c6 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -658,41 +658,43 @@ cfgleader = None def follow_channel(channel): global _txcount global _hasquorum - stop_leading() - stop_following(channel) - lh = StreamHandler(channel) - msg = lh.get_next_msg() - while msg: - sz = struct.unpack('!Q', msg)[0] - if sz != 0: - rpc = '' - while len(rpc) < sz: - nrpc = channel.recv(sz - len(rpc)) - if not nrpc: - raise Exception('Truncated message error') - rpc += nrpc - rpc = cPickle.loads(rpc) - if 'txcount' in rpc: - _txcount = rpc['txcount'] - if 'function' in rpc: - try: - globals()[rpc['function']](*rpc['args']) - except Exception as e: - print(repr(e)) - if 'xid' in rpc and rpc['xid']: - if rpc.get('exc', None): - _pendingchangesets[rpc['xid']].send_exception(rpc['exc']) - else: - _pendingchangesets[rpc['xid']].send() - if 'quorum' in rpc: - _hasquorum = rpc['quorum'] - _push_rpc(channel, b'') # use null as ACK + try: + stop_leading() + stop_following(channel) + lh = StreamHandler(channel) msg = lh.get_next_msg() - # mark the connection as broken - if cfgstreams: - stop_following(None) - else: - stop_following(True) + while msg: + sz = struct.unpack('!Q', msg)[0] + if sz != 0: + rpc = '' + while len(rpc) < sz: + nrpc = channel.recv(sz - len(rpc)) + if not nrpc: + raise Exception('Truncated message error') + rpc += nrpc + rpc = cPickle.loads(rpc) + if 'txcount' in rpc: + _txcount = rpc['txcount'] + if 'function' in rpc: + try: + globals()[rpc['function']](*rpc['args']) + except Exception as e: + print(repr(e)) + if 'xid' in rpc and rpc['xid']: + if rpc.get('exc', None): + _pendingchangesets[rpc['xid']].send_exception(rpc['exc']) + else: + _pendingchangesets[rpc['xid']].send() + if 'quorum' in rpc: + _hasquorum = rpc['quorum'] + _push_rpc(channel, b'') # use null as ACK + msg = lh.get_next_msg() + finally: + # mark the connection as broken + if cfgstreams: + stop_following(None) + else: + stop_following(True) def add_collective_member(name, address, fingerprint):