2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-15 04:07:51 +00:00

Actually hook in new assimilation behavior

Additionally, use collective member name as
tiebreaker if txcount and follower count is
identical.
This commit is contained in:
Jarrod Johnson 2021-04-07 13:15:36 -04:00
parent 6a4086679c
commit c5ec34d5a5

View File

@ -26,6 +26,7 @@ import eventlet
import eventlet.green.socket as socket
import eventlet.green.ssl as ssl
import eventlet.green.threading as threading
import confluent.sortutil as sortutil
import greenlet
import random
import time
@ -416,11 +417,19 @@ def handle_connection(connection, cert, request, local=False):
except exc.DegradedCollective:
followcount = request.get('followcount', None)
myfollowcount = len(list(cfm.cfgstreams))
if followcount is not None and followcount < myfollowcount:
tlvdata.send(connection,
{'error': 'Refusing to be assimilated by leader with fewer followers',
'txcount': cfm._txcount,})
return
if followcount is not None:
if followcount < myfollowcount:
tlvdata.send(connection,
{'error': 'Refusing to be assimilated by leader with fewer followers',
'txcount': cfm._txcount,})
return
elif followcount == myfollowcount:
myname = sortutil.naturalize_string(get_myname())
if myname < sortutil.naturalize_string(request['name']):
tlvdata.send(connection,
{'error': 'Refusing, my name is better',
'txcount': cfm._txcount,})
return
if follower:
tlvdata.send(
connection,
@ -442,6 +451,8 @@ def handle_connection(connection, cert, request, local=False):
return
log.log({'info': 'Connecting in response to assimilation',
'subsystem': 'collective'})
if cfm.cfgstreams:
retire_as_leader(connection.getpeername())
eventlet.spawn_n(connect_to_leader, None, None,
leader=connection.getpeername()[0])
tlvdata.send(connection, {'status': 0})