diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index b41a1093..9979107b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -26,6 +26,7 @@ import eventlet import eventlet.green.socket as socket import eventlet.green.ssl as ssl import eventlet.green.threading as threading +import confluent.sortutil as sortutil import greenlet import random import time @@ -416,11 +417,19 @@ def handle_connection(connection, cert, request, local=False): except exc.DegradedCollective: followcount = request.get('followcount', None) myfollowcount = len(list(cfm.cfgstreams)) - if followcount is not None and followcount < myfollowcount: - tlvdata.send(connection, - {'error': 'Refusing to be assimilated by leader with fewer followers', - 'txcount': cfm._txcount,}) - return + if followcount is not None: + if followcount < myfollowcount: + tlvdata.send(connection, + {'error': 'Refusing to be assimilated by leader with fewer followers', + 'txcount': cfm._txcount,}) + return + elif followcount == myfollowcount: + myname = sortutil.naturalize_string(get_myname()) + if myname < sortutil.naturalize_string(request['name']): + tlvdata.send(connection, + {'error': 'Refusing, my name is better', + 'txcount': cfm._txcount,}) + return if follower: tlvdata.send( connection, @@ -442,6 +451,8 @@ def handle_connection(connection, cert, request, local=False): return log.log({'info': 'Connecting in response to assimilation', 'subsystem': 'collective'}) + if cfm.cfgstreams: + retire_as_leader(connection.getpeername()) eventlet.spawn_n(connect_to_leader, None, None, leader=connection.getpeername()[0]) tlvdata.send(connection, {'status': 0})