2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-08-19 17:50:21 +00:00

Merge branch 'master' into async

This commit is contained in:
Jarrod Johnson
2024-04-15 10:04:19 -04:00

View File

@@ -252,10 +252,12 @@ def _rpc_master_rename_nodegroups(tenant, renamemap):
def _rpc_master_clear_node_attributes(tenant, nodes, attributes):
ConfigManager(tenant).clear_node_attributes(nodes, attributes)
warnings = []
ConfigManager(tenant).clear_node_attributes(nodes, attributes, warnings)
return warnings
def _rpc_clear_node_attributes(tenant, nodes, attributes):
def _rpc_clear_node_attributes(tenant, nodes, attributes): # master has to do the warnings
ConfigManager(tenant)._true_clear_node_attributes(nodes, attributes)
@@ -348,9 +350,9 @@ def exec_on_leader(function, *args):
rpclen = len(rpcpayload)
cfgleader.sendall(struct.pack('!Q', rpclen))
cfgleader.sendall(rpcpayload)
_pendingchangesets[xid].wait()
retv = _pendingchangesets[xid].wait()
del _pendingchangesets[xid]
return
return retv
def exec_on_followers(fnname, *args):
@@ -714,8 +716,9 @@ def relay_slaved_requests(name, listener):
exc = None
if not (rpc['function'].startswith('_rpc_') or rpc['function'].endswith('_collective_member')):
raise Exception('Unsupported function {0} called'.format(rpc['function']))
retv = None
try:
globals()[rpc['function']](*rpc['args'])
retv = globals()[rpc['function']](*rpc['args'])
except ValueError as ve:
exc = ['ValueError', str(ve)]
except Exception as e:
@@ -723,7 +726,7 @@ def relay_slaved_requests(name, listener):
exc = ['Exception', str(e)]
if 'xid' in rpc:
res = _push_rpc(listener, msgpack.packb({'xid': rpc['xid'],
'exc': exc}, use_bin_type=False))
'exc': exc, 'ret': retv}, use_bin_type=False))
if not res:
break
try:
@@ -929,7 +932,7 @@ def follow_channel(channel):
exc = Exception(excstr)
_pendingchangesets[rpc['xid']].send_exception(exc)
else:
_pendingchangesets[rpc['xid']].send()
_pendingchangesets[rpc['xid']].send(rpc.get('ret', None))
if 'quorum' in rpc:
_hasquorum = rpc['quorum']
res = _push_rpc(channel, b'') # use null as ACK
@@ -2204,14 +2207,17 @@ class ConfigManager(object):
def clear_node_attributes(self, nodes, attributes, warnings=None):
if cfgleader:
return exec_on_leader('_rpc_master_clear_node_attributes',
mywarnings = exec_on_leader('_rpc_master_clear_node_attributes',
self.tenant, nodes, attributes)
if mywarnings and warnings is not None:
warnings.extend(mywarnings)
return
if cfgstreams:
exec_on_followers('_rpc_clear_node_attributes', self.tenant,
nodes, attributes)
self._true_clear_node_attributes(nodes, attributes, warnings)
def _true_clear_node_attributes(self, nodes, attributes, warnings):
def _true_clear_node_attributes(self, nodes, attributes, warnings=None):
# accumulate all changes into a changeset and push in one go
changeset = {}
realattributes = []
@@ -2234,16 +2240,16 @@ class ConfigManager(object):
# delete it and check for inheritence to backfil data
del nodek[attrib]
self._do_inheritance(nodek, attrib, node, changeset)
if not warnings is None:
if warnings is not None:
if attrib in nodek:
warnings.append('The attribute "{}" was defined specifically for the node and clearing now has a value inherited from the group "{}"'.format(attrib, nodek[attrib]['inheritedfrom']))
_addchange(changeset, node, attrib)
_mark_dirtykey('nodes', node, self.tenant)
elif attrib in nodek:
if not warnings is None:
if warnings is not None:
warnings.append('The attribute "{0}" is inherited from group "{1}", leaving the inherited value alone (use "{0}=" with no value to explicitly blank the value if desired)'.format(attrib, nodek[attrib]['inheritedfrom']))
else:
if not warnings is None:
if warnings is not None:
warnings.append('Attribute "{}" is either already cleared, or does not match a defined attribute (if referencing an attribute group, try a wildcard)'.format(attrib))
if ('_expressionkeys' in nodek and
attrib in nodek['_expressionkeys']):