diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 9e7818b5..528924e8 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -252,10 +252,12 @@ def _rpc_master_rename_nodegroups(tenant, renamemap): def _rpc_master_clear_node_attributes(tenant, nodes, attributes): - ConfigManager(tenant).clear_node_attributes(nodes, attributes) + warnings = [] + ConfigManager(tenant).clear_node_attributes(nodes, attributes, warnings) + return warnings -def _rpc_clear_node_attributes(tenant, nodes, attributes): +def _rpc_clear_node_attributes(tenant, nodes, attributes): # master has to do the warnings ConfigManager(tenant)._true_clear_node_attributes(nodes, attributes) @@ -348,9 +350,9 @@ def exec_on_leader(function, *args): rpclen = len(rpcpayload) cfgleader.sendall(struct.pack('!Q', rpclen)) cfgleader.sendall(rpcpayload) - _pendingchangesets[xid].wait() + retv = _pendingchangesets[xid].wait() del _pendingchangesets[xid] - return + return retv def exec_on_followers(fnname, *args): @@ -714,8 +716,9 @@ def relay_slaved_requests(name, listener): exc = None if not (rpc['function'].startswith('_rpc_') or rpc['function'].endswith('_collective_member')): raise Exception('Unsupported function {0} called'.format(rpc['function'])) + retv = None try: - globals()[rpc['function']](*rpc['args']) + retv = globals()[rpc['function']](*rpc['args']) except ValueError as ve: exc = ['ValueError', str(ve)] except Exception as e: @@ -723,7 +726,7 @@ def relay_slaved_requests(name, listener): exc = ['Exception', str(e)] if 'xid' in rpc: res = _push_rpc(listener, msgpack.packb({'xid': rpc['xid'], - 'exc': exc}, use_bin_type=False)) + 'exc': exc, 'ret': retv}, use_bin_type=False)) if not res: break try: @@ -929,7 +932,7 @@ def follow_channel(channel): exc = Exception(excstr) _pendingchangesets[rpc['xid']].send_exception(exc) else: - _pendingchangesets[rpc['xid']].send() + _pendingchangesets[rpc['xid']].send(rpc.get('ret', None)) if 'quorum' in rpc: _hasquorum = rpc['quorum'] res = _push_rpc(channel, b'') # use null as ACK @@ -2204,14 +2207,17 @@ class ConfigManager(object): def clear_node_attributes(self, nodes, attributes, warnings=None): if cfgleader: - return exec_on_leader('_rpc_master_clear_node_attributes', + mywarnings = exec_on_leader('_rpc_master_clear_node_attributes', self.tenant, nodes, attributes) + if mywarnings and warnings is not None: + warnings.extend(mywarnings) + return if cfgstreams: exec_on_followers('_rpc_clear_node_attributes', self.tenant, nodes, attributes) self._true_clear_node_attributes(nodes, attributes, warnings) - def _true_clear_node_attributes(self, nodes, attributes, warnings): + def _true_clear_node_attributes(self, nodes, attributes, warnings=None): # accumulate all changes into a changeset and push in one go changeset = {} realattributes = [] @@ -2234,16 +2240,16 @@ class ConfigManager(object): # delete it and check for inheritence to backfil data del nodek[attrib] self._do_inheritance(nodek, attrib, node, changeset) - if not warnings is None: + if warnings is not None: if attrib in nodek: warnings.append('The attribute "{}" was defined specifically for the node and clearing now has a value inherited from the group "{}"'.format(attrib, nodek[attrib]['inheritedfrom'])) _addchange(changeset, node, attrib) _mark_dirtykey('nodes', node, self.tenant) elif attrib in nodek: - if not warnings is None: + if warnings is not None: warnings.append('The attribute "{0}" is inherited from group "{1}", leaving the inherited value alone (use "{0}=" with no value to explicitly blank the value if desired)'.format(attrib, nodek[attrib]['inheritedfrom'])) else: - if not warnings is None: + if warnings is not None: warnings.append('Attribute "{}" is either already cleared, or does not match a defined attribute (if referencing an attribute group, try a wildcard)'.format(attrib)) if ('_expressionkeys' in nodek and attrib in nodek['_expressionkeys']):