mirror of
https://github.com/xcat2/confluent.git
synced 2024-12-25 12:41:39 +00:00
Fixes for attribute clear warning behavior
Correct collective behavior for failing to clear on followers. Also, connect the warnings from the leader to the member issuing the RPC.
This commit is contained in:
parent
8e5ee6c9d8
commit
a6a2f2f2de
@ -252,10 +252,12 @@ def _rpc_master_rename_nodegroups(tenant, renamemap):
|
||||
|
||||
|
||||
def _rpc_master_clear_node_attributes(tenant, nodes, attributes):
|
||||
ConfigManager(tenant).clear_node_attributes(nodes, attributes)
|
||||
warnings = []
|
||||
ConfigManager(tenant).clear_node_attributes(nodes, attributes, warnings)
|
||||
return warnings
|
||||
|
||||
|
||||
def _rpc_clear_node_attributes(tenant, nodes, attributes):
|
||||
def _rpc_clear_node_attributes(tenant, nodes, attributes): # master has to do the warnings
|
||||
ConfigManager(tenant)._true_clear_node_attributes(nodes, attributes)
|
||||
|
||||
|
||||
@ -348,9 +350,9 @@ def exec_on_leader(function, *args):
|
||||
rpclen = len(rpcpayload)
|
||||
cfgleader.sendall(struct.pack('!Q', rpclen))
|
||||
cfgleader.sendall(rpcpayload)
|
||||
_pendingchangesets[xid].wait()
|
||||
retv = _pendingchangesets[xid].wait()
|
||||
del _pendingchangesets[xid]
|
||||
return
|
||||
return retv
|
||||
|
||||
|
||||
def exec_on_followers(fnname, *args):
|
||||
@ -714,8 +716,9 @@ def relay_slaved_requests(name, listener):
|
||||
exc = None
|
||||
if not (rpc['function'].startswith('_rpc_') or rpc['function'].endswith('_collective_member')):
|
||||
raise Exception('Unsupported function {0} called'.format(rpc['function']))
|
||||
retv = None
|
||||
try:
|
||||
globals()[rpc['function']](*rpc['args'])
|
||||
retv = globals()[rpc['function']](*rpc['args'])
|
||||
except ValueError as ve:
|
||||
exc = ['ValueError', str(ve)]
|
||||
except Exception as e:
|
||||
@ -723,7 +726,7 @@ def relay_slaved_requests(name, listener):
|
||||
exc = ['Exception', str(e)]
|
||||
if 'xid' in rpc:
|
||||
res = _push_rpc(listener, msgpack.packb({'xid': rpc['xid'],
|
||||
'exc': exc}, use_bin_type=False))
|
||||
'exc': exc, 'ret': retv}, use_bin_type=False))
|
||||
if not res:
|
||||
break
|
||||
try:
|
||||
@ -929,7 +932,7 @@ def follow_channel(channel):
|
||||
exc = Exception(excstr)
|
||||
_pendingchangesets[rpc['xid']].send_exception(exc)
|
||||
else:
|
||||
_pendingchangesets[rpc['xid']].send()
|
||||
_pendingchangesets[rpc['xid']].send(rpc.get('ret', None))
|
||||
if 'quorum' in rpc:
|
||||
_hasquorum = rpc['quorum']
|
||||
res = _push_rpc(channel, b'') # use null as ACK
|
||||
@ -2204,14 +2207,17 @@ class ConfigManager(object):
|
||||
|
||||
def clear_node_attributes(self, nodes, attributes, warnings=None):
|
||||
if cfgleader:
|
||||
return exec_on_leader('_rpc_master_clear_node_attributes',
|
||||
mywarnings = exec_on_leader('_rpc_master_clear_node_attributes',
|
||||
self.tenant, nodes, attributes)
|
||||
if warnings is not None:
|
||||
warnings.extend(mywarnings)
|
||||
return
|
||||
if cfgstreams:
|
||||
exec_on_followers('_rpc_clear_node_attributes', self.tenant,
|
||||
nodes, attributes)
|
||||
self._true_clear_node_attributes(nodes, attributes, warnings)
|
||||
|
||||
def _true_clear_node_attributes(self, nodes, attributes, warnings):
|
||||
def _true_clear_node_attributes(self, nodes, attributes, warnings=None):
|
||||
# accumulate all changes into a changeset and push in one go
|
||||
changeset = {}
|
||||
realattributes = []
|
||||
@ -2234,16 +2240,16 @@ class ConfigManager(object):
|
||||
# delete it and check for inheritence to backfil data
|
||||
del nodek[attrib]
|
||||
self._do_inheritance(nodek, attrib, node, changeset)
|
||||
if not warnings is None:
|
||||
if warnings is not None:
|
||||
if attrib in nodek:
|
||||
warnings.append('The attribute "{}" was defined specifically for the node and clearing now has a value inherited from the group "{}"'.format(attrib, nodek[attrib]['inheritedfrom']))
|
||||
_addchange(changeset, node, attrib)
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
elif attrib in nodek:
|
||||
if not warnings is None:
|
||||
if warnings is not None:
|
||||
warnings.append('The attribute "{0}" is inherited from group "{1}", leaving the inherited value alone (use "{0}=" with no value to explicitly blank the value if desired)'.format(attrib, nodek[attrib]['inheritedfrom']))
|
||||
else:
|
||||
if not warnings is None:
|
||||
if warnings is not None:
|
||||
warnings.append('Attribute "{}" is either already cleared, or does not match a defined attribute (if referencing an attribute group, try a wildcard)'.format(attrib))
|
||||
if ('_expressionkeys' in nodek and
|
||||
attrib in nodek['_expressionkeys']):
|
||||
|
Loading…
Reference in New Issue
Block a user