diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 826327c1..a0342d09 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -19,6 +19,7 @@ import confluent.collective.invites as invites import confluent.config.configmanager as cfm import confluent.exceptions as exc import confluent.log as log +import confluent.noderange as noderange import confluent.tlvdata as tlvdata import confluent.util as util import eventlet @@ -38,6 +39,7 @@ except ImportError: currentleader = None follower = None retrythread = None +failovercheck = None class ContextBool(object): def __init__(self): @@ -576,7 +578,53 @@ def startup(): return eventlet.spawn_n(start_collective) +def check_managers(): + global failovercheck + if not follower: + c = cfm.ConfigManager(None) + collinfo = {} + populate_collinfo(collinfo) + availmanagers = {} + offlinemgrs = set(collinfo['offline']) + offlinemgrs.add('') + for offline in collinfo['offline']: + nodes = noderange.NodeRange( + 'collective.manager=={}'.format(offline), c).nodes + managercandidates = c.get_node_attributes( + nodes, 'collective.managercandidates') + expandednoderanges = {} + for node in nodes: + if node not in managercandidates: + continue + targets = managercandidates[node].get('collective.managercandidates', {}).get('value', None) + if not targets: + continue + if not availmanagers: + for active in collinfo['active']: + availmanagers[active] = len( + noderange.NodeRange( + 'collective.manager=={}'.format(active), c).nodes) + availmanagers[collinfo['leader']] = len( + noderange.NodeRange( + 'collective.manager=={}'.format( + collinfo['leader']), c).nodes) + if targets not in expandednoderanges: + expandednoderanges[targets] = set( + noderange.NodeRange(targets, c).nodes) - offlinemgrs + targets = sorted(expandednoderanges[targets], key=availmanagers.get) + if not targets: + continue + c.set_node_attributes({node: {'collective.manager': {'value': targets[0]}}}) + availmanagers[targets[0]] += 1 + failovercheck = None + +def schedule_rebalance(): + global failovercheck + if not failovercheck: + failovercheck = eventlet.spawn_after(10, check_managers) + def start_collective(): + cfm.membership_callback = schedule_rebalance global follower global retrythread if follower: diff --git a/confluent_server/confluent/config/attributes.py b/confluent_server/confluent/config/attributes.py index 3967240f..4e6638f8 100644 --- a/confluent_server/confluent/config/attributes.py +++ b/confluent_server/confluent/config/attributes.py @@ -201,6 +201,14 @@ node = { 'indicates candidate managers, either for ' 'high availability or load balancing purposes.') }, + 'collective.managercandidates': { + 'description': ('A noderange of nodes permitted to be a manager for ' + 'the node. This controls failover and deployment. If ' + 'not defined, all managers may deploy and no ' + 'automatic failover will be performed. + 'Using this requires that collective members be ' + 'defined as nodes for noderange expansion') + }, 'deployment.pendingprofile': { 'description': ('An OS profile that is pending deployment. This indicates to ' 'the network boot subsystem what should be offered when a potential ' diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 7313a1c3..6f96bcec 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -115,6 +115,7 @@ _attraliases = { } _validroles = ('Administrator', 'Operator', 'Monitor') +membership_callback = None def attrib_supports_expression(attrib): if not isinstance(attrib, str): @@ -409,6 +410,8 @@ def _push_rpc(stream, payload): except Exception: logException() del cfgstreams[stream] + if membership_callback: + membership_callback() stream.close() @@ -615,6 +618,8 @@ def relay_slaved_requests(name, listener): except Exception: pass del cfgstreams[name] + if membership_callback: + membership_callback() cfgstreams[name] = listener lh = StreamHandler(listener) _hasquorum = len(cfgstreams) >= ( @@ -682,6 +687,8 @@ def relay_slaved_requests(name, listener): _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): pass + if membership_callback: + membership_callback() if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective stop_following(True) return False @@ -739,6 +746,8 @@ def stop_leading(): del cfgstreams[stream] except KeyError: pass # may have already been deleted.. + if membership_callback: + membership_callback() _oldcfgstore = None