diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 5ac875b7..7612e4e2 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -100,9 +100,12 @@ def connect_to_leader(cert=None, name=None, leader=None): while (len(dbjson) < dbsize): ndata = remote.recv(dbsize - len(dbjson)) if not ndata: + try: + remote.close() + except Exception: + pass raise Exception("Error doing initial DB transfer") dbjson += ndata - cfm.stop_following() cfm.clear_configuration() try: cfm._restore_keys(keydata, None, sync=False) @@ -117,6 +120,7 @@ def connect_to_leader(cert=None, name=None, leader=None): sync=False) cfm.commit_clear() except Exception: + cfm.stop_following() cfm.rollback_clear() raise currentleader = leader diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bf71a55d..3d069f1e 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -463,34 +463,42 @@ def set_global(globalname, value, sync=True): cfgstreams = {} def relay_slaved_requests(name, listener): global cfgleader - stop_following() - if name in cfgstreams: - cfgstreams[name].close() - cfgstreams[name] = listener - msg = listener.recv(8) - while msg: - if name not in cfgstreams: - raise Exception("Unexpected loss of node in followers: " + name) - sz = struct.unpack('!Q', msg)[0] - if sz != 0: - rpc = '' - while len(rpc) < sz: - nrpc = listener.recv(sz - len(rpc)) - if not nrpc: - raise Exception('Truncated client error') - rpc += nrpc - rpc = cPickle.loads(rpc) - globals()[rpc['function']](*rpc['args']) - if 'xid' in rpc: - _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) - msg = listener.recv(8) - listener.close() try: - del cfgstreams[name] - except KeyError: - pass # May have already been closed/deleted... - if not cfgstreams and not cfgleader: - stop_following(True) + stop_following() + if name in cfgstreams: + try: + cfgstreams[name].close() + except Exception: + pass + cfgstreams[name] = listener + msg = listener.recv(8) + while msg: + if name not in cfgstreams: + raise Exception("Unexpected loss of node in followers: " + name) + sz = struct.unpack('!Q', msg)[0] + if sz != 0: + rpc = '' + while len(rpc) < sz: + nrpc = listener.recv(sz - len(rpc)) + if not nrpc: + raise Exception('Truncated client error') + rpc += nrpc + rpc = cPickle.loads(rpc) + globals()[rpc['function']](*rpc['args']) + if 'xid' in rpc: + _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) + msg = listener.recv(8) + finally: + try: + listener.close() + except Exception: + pass + try: + del cfgstreams[name] + except KeyError: + pass # May have already been closed/deleted... + if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective + stop_following(True) def stop_following(replacement=None): @@ -505,7 +513,10 @@ def stop_following(replacement=None): def stop_leading(): for stream in list(cfgstreams): - cfgstreams[stream].close() + try: + cfgstreams[stream].close() + except Exception: + pass try: del cfgstreams[stream] except KeyError: @@ -533,6 +544,8 @@ def clear_configuration(): global _txcount global _oldcfgstore global _oldtxcount + stop_leading() + stop_following() _oldcfgstore = _cfgstore _oldtxcount = _txcount _cfgstore = {} @@ -557,6 +570,7 @@ cfgleader = None def follow_channel(channel): global _txcount + stop_leading() stop_following(channel) msg = channel.recv(8) while msg: