mirror of
https://github.com/xcat2/confluent.git
synced 2024-12-25 12:41:39 +00:00
First phase of collective manager candidate implementation
This implements recovery on loss of collective member to the least loaded candidates for the node.
This commit is contained in:
parent
2d6bdffebe
commit
086ce9823b
@ -19,6 +19,7 @@ import confluent.collective.invites as invites
|
||||
import confluent.config.configmanager as cfm
|
||||
import confluent.exceptions as exc
|
||||
import confluent.log as log
|
||||
import confluent.noderange as noderange
|
||||
import confluent.tlvdata as tlvdata
|
||||
import confluent.util as util
|
||||
import eventlet
|
||||
@ -38,6 +39,7 @@ except ImportError:
|
||||
currentleader = None
|
||||
follower = None
|
||||
retrythread = None
|
||||
failovercheck = None
|
||||
|
||||
class ContextBool(object):
|
||||
def __init__(self):
|
||||
@ -576,7 +578,53 @@ def startup():
|
||||
return
|
||||
eventlet.spawn_n(start_collective)
|
||||
|
||||
def check_managers():
|
||||
global failovercheck
|
||||
if not follower:
|
||||
c = cfm.ConfigManager(None)
|
||||
collinfo = {}
|
||||
populate_collinfo(collinfo)
|
||||
availmanagers = {}
|
||||
offlinemgrs = set(collinfo['offline'])
|
||||
offlinemgrs.add('')
|
||||
for offline in collinfo['offline']:
|
||||
nodes = noderange.NodeRange(
|
||||
'collective.manager=={}'.format(offline), c).nodes
|
||||
managercandidates = c.get_node_attributes(
|
||||
nodes, 'collective.managercandidates')
|
||||
expandednoderanges = {}
|
||||
for node in nodes:
|
||||
if node not in managercandidates:
|
||||
continue
|
||||
targets = managercandidates[node].get('collective.managercandidates', {}).get('value', None)
|
||||
if not targets:
|
||||
continue
|
||||
if not availmanagers:
|
||||
for active in collinfo['active']:
|
||||
availmanagers[active] = len(
|
||||
noderange.NodeRange(
|
||||
'collective.manager=={}'.format(active), c).nodes)
|
||||
availmanagers[collinfo['leader']] = len(
|
||||
noderange.NodeRange(
|
||||
'collective.manager=={}'.format(
|
||||
collinfo['leader']), c).nodes)
|
||||
if targets not in expandednoderanges:
|
||||
expandednoderanges[targets] = set(
|
||||
noderange.NodeRange(targets, c).nodes) - offlinemgrs
|
||||
targets = sorted(expandednoderanges[targets], key=availmanagers.get)
|
||||
if not targets:
|
||||
continue
|
||||
c.set_node_attributes({node: {'collective.manager': {'value': targets[0]}}})
|
||||
availmanagers[targets[0]] += 1
|
||||
failovercheck = None
|
||||
|
||||
def schedule_rebalance():
|
||||
global failovercheck
|
||||
if not failovercheck:
|
||||
failovercheck = eventlet.spawn_after(10, check_managers)
|
||||
|
||||
def start_collective():
|
||||
cfm.membership_callback = schedule_rebalance
|
||||
global follower
|
||||
global retrythread
|
||||
if follower:
|
||||
|
@ -201,6 +201,14 @@ node = {
|
||||
'indicates candidate managers, either for '
|
||||
'high availability or load balancing purposes.')
|
||||
},
|
||||
'collective.managercandidates': {
|
||||
'description': ('A noderange of nodes permitted to be a manager for '
|
||||
'the node. This controls failover and deployment. If '
|
||||
'not defined, all managers may deploy and no '
|
||||
'automatic failover will be performed.
|
||||
'Using this requires that collective members be '
|
||||
'defined as nodes for noderange expansion')
|
||||
},
|
||||
'deployment.pendingprofile': {
|
||||
'description': ('An OS profile that is pending deployment. This indicates to '
|
||||
'the network boot subsystem what should be offered when a potential '
|
||||
|
@ -115,6 +115,7 @@ _attraliases = {
|
||||
}
|
||||
_validroles = ('Administrator', 'Operator', 'Monitor')
|
||||
|
||||
membership_callback = None
|
||||
|
||||
def attrib_supports_expression(attrib):
|
||||
if not isinstance(attrib, str):
|
||||
@ -409,6 +410,8 @@ def _push_rpc(stream, payload):
|
||||
except Exception:
|
||||
logException()
|
||||
del cfgstreams[stream]
|
||||
if membership_callback:
|
||||
membership_callback()
|
||||
stream.close()
|
||||
|
||||
|
||||
@ -615,6 +618,8 @@ def relay_slaved_requests(name, listener):
|
||||
except Exception:
|
||||
pass
|
||||
del cfgstreams[name]
|
||||
if membership_callback:
|
||||
membership_callback()
|
||||
cfgstreams[name] = listener
|
||||
lh = StreamHandler(listener)
|
||||
_hasquorum = len(cfgstreams) >= (
|
||||
@ -682,6 +687,8 @@ def relay_slaved_requests(name, listener):
|
||||
_push_rpc,
|
||||
[(cfgstreams[s], payload) for s in cfgstreams]):
|
||||
pass
|
||||
if membership_callback:
|
||||
membership_callback()
|
||||
if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective
|
||||
stop_following(True)
|
||||
return False
|
||||
@ -739,6 +746,8 @@ def stop_leading():
|
||||
del cfgstreams[stream]
|
||||
except KeyError:
|
||||
pass # may have already been deleted..
|
||||
if membership_callback:
|
||||
membership_callback()
|
||||
|
||||
|
||||
_oldcfgstore = None
|
||||
|
Loading…
Reference in New Issue
Block a user