From 11968faffcccc553b516760058ad6080288365a5 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 10 Jul 2018 14:55:57 -0400 Subject: [PATCH] Numerous fixes to collective If client has higher transaction count, do not close the connection before extracting peer address. If our connect session is rudely terminated, abort rather than trying to continue. On assimilate failure, ignore a failed assimilate with no data. Fix problem where a follower getting double deleted was causing an error. --- confluent_server/confluent/collective/manager.py | 8 +++++--- confluent_server/confluent/config/configmanager.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index dafffe0d..fd7401f4 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -71,6 +71,8 @@ def connect_to_leader(cert=None, name=None, leader=None): 'name': name, 'txcount': cfm._txcount}}) keydata = tlvdata.recv(remote) + if not keydata: + return False if 'error' in keydata: if 'backoff' in keydata: eventlet.spawn_after(random.random(), connect_to_leader, @@ -108,7 +110,7 @@ def connect_to_leader(cert=None, name=None, leader=None): cfm._cfgstore['collective'] = colldata for globvar in globaldata: cfm.set_global(globvar, globaldata[globvar]) - cfm._txcount = dbi['txcount'] + cfm._txcount = dbi.get('txcount', 0) cfm.ConfigManager(tenant=None)._load_from_json(dbjson) currentleader = leader #spawn this as a thread... @@ -296,9 +298,9 @@ def handle_connection(connection, cert, request, local=False): {'error': 'Client has higher tranasaction count, ' 'should assimilate me, connecting..', 'txcount': cfm._txcount}) - connection.close() eventlet.spawn_n(connect_to_leader, None, None, connection.getpeername()[0]) + connection.close() return tlvdata.send(connection, cfm._dump_keys(None, False)) tlvdata.send(connection, cfm._cfgstore['collective']) @@ -327,7 +329,7 @@ def try_assimilate(drone): tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. answer = tlvdata.recv(remote) - if 'error' in answer: + if answer and 'error' in answer: connect_to_leader(None, None, leader=remote.getpeername()[0]) def get_leader(connection): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index f07a5202..3d148ecb 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -478,7 +478,10 @@ def relay_slaved_requests(name, listener): _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) msg = listener.recv(8) listener.close() - del cfgstreams[name] + try: + del cfgstreams[name] + except KeyError: + pass # May have already been closed/deleted... if not cfgstreams and not cfgleader: cfgleader = True @@ -486,7 +489,10 @@ def relay_slaved_requests(name, listener): def stop_leading(): for stream in list(cfgstreams): cfgstreams[stream].close() - del cfgstreams[stream] + try: + del cfgstreams[stream] + except KeyError: + pass # may have already been deleted.. def clear_configuration():