2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 17:43:14 +00:00

Numerous fixes to collective

If client has higher transaction count, do not close the connection
before extracting peer address.

If our connect session is rudely terminated, abort rather than trying
to continue.

On assimilate failure, ignore a failed assimilate with no data.

Fix problem where a follower getting double deleted was causing an error.
This commit is contained in:
Jarrod Johnson 2018-07-10 14:55:57 -04:00
parent 8769c438c0
commit 11968faffc
2 changed files with 13 additions and 5 deletions

View File

@ -71,6 +71,8 @@ def connect_to_leader(cert=None, name=None, leader=None):
'name': name,
'txcount': cfm._txcount}})
keydata = tlvdata.recv(remote)
if not keydata:
return False
if 'error' in keydata:
if 'backoff' in keydata:
eventlet.spawn_after(random.random(), connect_to_leader,
@ -108,7 +110,7 @@ def connect_to_leader(cert=None, name=None, leader=None):
cfm._cfgstore['collective'] = colldata
for globvar in globaldata:
cfm.set_global(globvar, globaldata[globvar])
cfm._txcount = dbi['txcount']
cfm._txcount = dbi.get('txcount', 0)
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
currentleader = leader
#spawn this as a thread...
@ -296,9 +298,9 @@ def handle_connection(connection, cert, request, local=False):
{'error': 'Client has higher tranasaction count, '
'should assimilate me, connecting..',
'txcount': cfm._txcount})
connection.close()
eventlet.spawn_n(connect_to_leader, None, None,
connection.getpeername()[0])
connection.close()
return
tlvdata.send(connection, cfm._dump_keys(None, False))
tlvdata.send(connection, cfm._cfgstore['collective'])
@ -327,7 +329,7 @@ def try_assimilate(drone):
tlvdata.recv(remote) # the banner
tlvdata.recv(remote) # authpassed... 0..
answer = tlvdata.recv(remote)
if 'error' in answer:
if answer and 'error' in answer:
connect_to_leader(None, None, leader=remote.getpeername()[0])
def get_leader(connection):

View File

@ -478,7 +478,10 @@ def relay_slaved_requests(name, listener):
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
msg = listener.recv(8)
listener.close()
del cfgstreams[name]
try:
del cfgstreams[name]
except KeyError:
pass # May have already been closed/deleted...
if not cfgstreams and not cfgleader:
cfgleader = True
@ -486,7 +489,10 @@ def relay_slaved_requests(name, listener):
def stop_leading():
for stream in list(cfgstreams):
cfgstreams[stream].close()
del cfgstreams[stream]
try:
del cfgstreams[stream]
except KeyError:
pass # may have already been deleted..
def clear_configuration():