diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index dafffe0d..fd7401f4 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -71,6 +71,8 @@ def connect_to_leader(cert=None, name=None, leader=None): 'name': name, 'txcount': cfm._txcount}}) keydata = tlvdata.recv(remote) + if not keydata: + return False if 'error' in keydata: if 'backoff' in keydata: eventlet.spawn_after(random.random(), connect_to_leader, @@ -108,7 +110,7 @@ def connect_to_leader(cert=None, name=None, leader=None): cfm._cfgstore['collective'] = colldata for globvar in globaldata: cfm.set_global(globvar, globaldata[globvar]) - cfm._txcount = dbi['txcount'] + cfm._txcount = dbi.get('txcount', 0) cfm.ConfigManager(tenant=None)._load_from_json(dbjson) currentleader = leader #spawn this as a thread... @@ -296,9 +298,9 @@ def handle_connection(connection, cert, request, local=False): {'error': 'Client has higher tranasaction count, ' 'should assimilate me, connecting..', 'txcount': cfm._txcount}) - connection.close() eventlet.spawn_n(connect_to_leader, None, None, connection.getpeername()[0]) + connection.close() return tlvdata.send(connection, cfm._dump_keys(None, False)) tlvdata.send(connection, cfm._cfgstore['collective']) @@ -327,7 +329,7 @@ def try_assimilate(drone): tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. answer = tlvdata.recv(remote) - if 'error' in answer: + if answer and 'error' in answer: connect_to_leader(None, None, leader=remote.getpeername()[0]) def get_leader(connection): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index f07a5202..3d148ecb 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -478,7 +478,10 @@ def relay_slaved_requests(name, listener): _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) msg = listener.recv(8) listener.close() - del cfgstreams[name] + try: + del cfgstreams[name] + except KeyError: + pass # May have already been closed/deleted... if not cfgstreams and not cfgleader: cfgleader = True @@ -486,7 +489,10 @@ def relay_slaved_requests(name, listener): def stop_leading(): for stream in list(cfgstreams): cfgstreams[stream].close() - del cfgstreams[stream] + try: + del cfgstreams[stream] + except KeyError: + pass # may have already been deleted.. def clear_configuration():