2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-26 11:30:23 +00:00

Extend collective data functions to more functions

Add to users and groups.  Refactor reusable code.
Code that remains still looks awfully repetitive though...
This commit is contained in:
Jarrod Johnson 2018-05-21 15:46:51 -04:00
parent caa4000b7e
commit 41298a8e01

View File

@ -159,17 +159,87 @@ def _do_notifier(cfg, watcher, callback):
logException()
def _rpc_master_set_user(tenant, name, attributemap):
ConfigManager(tenant).set_user(name, attributemap)
def _rpc_set_user(tenant, name, attributemap):
ConfigManager(tenant)._true_set_user(name, attributemap)
def _rpc_master_set_node_attributes(tenant, attribmap, autocreate):
c = ConfigManager(tenant)
c.send_to_followers('_rpc_set_node_attributes', tenant,
attribmap, autocreate)
c._true_set_node_attributes(attribmap, autocreate)
ConfigManager(tenant).set_node_attributes(attribmap, autocreate)
def _rpc_master_set_group_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant).set_group_attributes(attribmap, autocreate)
def _rpc_master_del_user(tenant, name):
ConfigManager(tenant).del_user(name)
def _rpc_del_user(tenant, name):
ConfigManager(tenant)._true_del_user(name)
def _rpc_master_create_user(tenant, *args):
ConfigManager(tenant).create_user(*args)
def _rpc_create_user(tenant, *args):
ConfigManager(tenant)._true_create_user(*args)
def _rpc_master_del_groups(tenant, groups):
ConfigManager(tenant).del_groups(groups)
def _rpc_del_groups(tenant, groups):
ConfigManager(tenant)._true_del_groups(groups)
def _rpc_master_del_nodes(tenant, nodes):
ConfigManager(tenant).del_nodes(nodes)
def _rpc_del_nodes(tenant, nodes)
ConfigManager(tenant)._true_del_nodes(nodes)
def _rpc_set_node_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant)._true_set_node_attributes(attribmap, autocreate)
def _rpc_set_group_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant)._true_set_group_attributes(attribmap, autocreate)
def exec_on_leader(function, *args):
xid = os.urandom(8)
while xid in _pendingchangesets:
xid = os.urandom(8)
_pendingchangesets[xid] = event.Event()
rpcpayload = cPickle.dumps({'function': function, 'args': args,
'xid': xid})
rpclen = len(rpcpayload)
cfgleader.sendall(struct.pack('!Q', rpclen))
cfgleader.sendall(rpcpayload)
_pendingchangesets[xid].wait()
del _pendingchangesets[xid]
return
def exec_on_followers(fnname, *args):
global _txcount
_txcount += 1
if len(cfgstreams) < (len(_cfgstore['collective']) // 2) :
# the leader counts in addition to registered streams
raise Exception("collective does not have quorum")
pushes = eventlet.GreenPool()
payload = cPickle.dumps({'function': fnname, 'args': args,
'txcount': _txcount})
for res in pushes.starmap(
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
pass
def logException():
global tracelog
if tracelog is None:
@ -903,6 +973,14 @@ class ConfigManager(object):
:param name: The login name of the user
:param attributemap: A dict of key values to set
"""
if cfgleader:
return exec_on_leader('_rpc_master_set_user', self.tenant, name,
attributemap)
if cfgstreams:
exec_on_followers('_rpc_set_user', self.tenant, name)
self._true_set_user(name, attributemap)
def _true_set_user(self, name, attributemap):
user = self._cfgstore['users'][name]
for attribute in attributemap:
if attribute == 'password':
@ -919,6 +997,13 @@ class ConfigManager(object):
self._bg_sync_to_file()
def del_user(self, name):
if cfgleader:
return exec_on_leader('_rpc_master_del_user', name)
if cfgstreams:
exec_on_followers('_rpc_del_user', name)
self._true_del_user(name)
def _true_del_user(self, name):
if name in self._cfgstore['users']:
del self._cfgstore['users'][name]
_mark_dirtykey('users', name, self.tenant)
@ -936,6 +1021,16 @@ class ConfigManager(object):
:param uid: Custom identifier number if desired. Defaults to random.
:param displayname: Optional long format name for UI consumption
"""
if cfgleader:
return exec_on_leader('_rpc_master_create_user', self.tenant,
name, role, uid, displayname, attributemap)
if cfgstreams:
exec_on_followers('_rpc_set_group_attributes', self.tenant, name,
role, uid, displayname, attributemap)
self._true_create_user(name, role, uid, displayname, attributemap)
def _true_create_user(self, name, role="Administrator", uid=None,
displayname=None, attributemap=None):
if 'idmap' not in _cfgstore['main']:
_cfgstore['main']['idmap'] = {}
if uid is None:
@ -1133,6 +1228,15 @@ class ConfigManager(object):
self.set_group_attributes(attribmap, autocreate=True)
def set_group_attributes(self, attribmap, autocreate=False):
if cfgleader: # currently config slave to another
return exec_on_leader('_rpc_master_set_group_attributes',
self.tenant, attribmap, autocreate)
if cfgstreams:
exec_on_followers('_rpc_set_group_attributes', self.tenant,
attribmap, autocreate)
self._true_set_group_attributes(attribmap, autocreate)
def _true_set_group_attributes(self, attribmap, autocreate=False):
changeset = {}
for group in attribmap:
if group == '':
@ -1326,8 +1430,15 @@ class ConfigManager(object):
callback = watcher['callback']
eventlet.spawn_n(_do_notifier, self, watcher, callback)
def del_nodes(self, nodes):
if cfgleader: # slaved to a collective
return exec_on_loader('_rpc_master_del_nodes', self.tenant,
nodes)
if cfgstreams:
exec_on_followers('_rpc_del_nodes', self.tenant, nodes)
self._true_del_nodes(nodes)
def _true_del_nodes(self, nodes):
if self.tenant in self._nodecollwatchers:
for watcher in self._nodecollwatchers[self.tenant].itervalues():
watcher(added=[], deleting=nodes, configmanager=self)
@ -1346,6 +1457,14 @@ class ConfigManager(object):
self._bg_sync_to_file()
def del_groups(self, groups):
if cfgleader:
return exec_on_leader('_rpc_master_del_groups', self.tenant,
groups)
if cfgstreams:
exec_on_followers('_rpc_del_groups', self.tenant, groups)
self._true_del_groups(groups)
def _true_del_groups(self, groups):
changeset = {}
for group in groups:
if group in self._cfgstore['nodegroups']:
@ -1397,39 +1516,12 @@ class ConfigManager(object):
attribmap[node]['groups'] = []
self.set_node_attributes(attribmap, autocreate=True)
def set_leader_node_attributes(self, attribmap, autocreate):
xid = os.urandom(8)
while xid in _pendingchangesets:
xid = os.urandom(8)
_pendingchangesets[xid] = event.Event()
rpcpayload = cPickle.dumps({'function': '_rpc_master_set_node_attributes',
'args': (self.tenant, attribmap,
autocreate), 'xid': xid})
rpclen = len(rpcpayload)
cfgleader.sendall(struct.pack('!Q', rpclen))
cfgleader.sendall(rpcpayload)
_pendingchangesets[xid].wait()
del _pendingchangesets[xid]
return
def send_to_followers(self, fnname, *args):
global _txcount
_txcount += 1
if len(cfgstreams) < (len(_cfgstore['collective']) // 2) :
# the leader counts in addition to registered streams
raise Exception("collective does not have quorum")
pushes = eventlet.GreenPool()
payload = cPickle.dumps({'function': fnname, 'args': args,
'txcount': _txcount})
for res in pushes.starmap(
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
pass
def set_node_attributes(self, attribmap, autocreate=False):
if cfgleader: # currently config slave to another
return self.set_leader_node_attributes(attribmap, autocreate)
return exec_on_leader('_rpc_master_set_node_attributes',
self.tenant, attribmap, autocreate)
if cfgstreams:
self.send_to_followers('_rpc_set_node_attributes',
exec_on_followers('_rpc_set_node_attributes',
self.tenant, attribmap, autocreate)
self._true_set_node_attributes(attribmap, autocreate)