diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index a0f43fb4..bcb26c4e 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -236,6 +236,10 @@ class ConsoleHandler(object): def check_collective(self, attrvalue): myc = attrvalue.get(self.node, {}).get('collective.manager', {}).get( 'value', None) + if configmodule.list_collective() and not myc: + self._is_local = False + self._detach() + self._disconnect() if myc and myc != collective.get_myname(): # Do not do console connect for nodes managed by another # confluent collective member diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 5f046358..cb5c579b 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -739,6 +739,7 @@ def handle_node_request(configmanager, inputdata, operation, plugpath = None nodesbymanager = {} nodesbyhandler = {} + badcollnodes = [] for node in nodes: for attrname in plugroute['pluginattrs']: if attrname in nodeattr[node]: @@ -756,6 +757,8 @@ def handle_node_request(configmanager, inputdata, operation, else: nodesbymanager[manager].add(node) continue + elif cfm.list_collective(): + badcollnodes.append(node) if plugpath is not None: try: hfunc = getattr(pluginmap[plugpath], operation) @@ -766,6 +769,11 @@ def handle_node_request(configmanager, inputdata, operation, nodesbyhandler[hfunc].append(node) else: nodesbyhandler[hfunc] = [node] + if badcollnodes: + raise exc.ConfluentException( + 'collective management active, ' + 'collective.manager must by set for {0}'.format( + ','.join(badcollnodes))) workers = greenpool.GreenPool() numworkers = 0 for hfunc in nodesbyhandler: diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py index c6626227..bb69944e 100644 --- a/confluent_server/confluent/shellserver.py +++ b/confluent_server/confluent/shellserver.py @@ -30,6 +30,9 @@ class _ShellHandler(consoleserver.ConsoleHandler): _genwatchattribs = False _logtobuffer = False + def check_collective(self, attrvalue): + return + def log(self, *args, **kwargs): # suppress logging through proving a stub 'log' function return