diff --git a/confluent_client/bin/nodeping b/confluent_client/bin/nodeping new file mode 100755 index 00000000..b5f2e225 --- /dev/null +++ b/confluent_client/bin/nodeping @@ -0,0 +1,141 @@ +#!/usr/bin/python2 +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2016-2017 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import deque +import optparse +import os +import select +import signal +import subprocess +import sys + +try: + signal.signal(signal.SIGPIPE, signal.SIG_DFL) +except AttributeError: + pass +path = os.path.dirname(os.path.realpath(__file__)) +path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) +if path.startswith('/opt'): + sys.path.append(path) + +import confluent.client as client +import confluent.sortutil as sortutil + + +def run(): + + argparser = optparse.OptionParser( + usage="Usage: %prog [options] noderange") + argparser.add_option('-f', '-c', '--count', type='int', default=168, + help='Number of commands to run at a time') + argparser.add_option('-s', '--substitutename', + help='Use a different name other than the nodename for ping') + # among other things, FD_SETSIZE limits. Besides, spawning too many + # processes can be unkind for the unaware on memory pressure and such... + argparser.disable_interspersed_args() + (options, args) = argparser.parse_args() + if len(args) < 1: + argparser.print_help() + sys.exit(1) + client.check_globbing(args[0]) + concurrentprocs = options.count + c = client.Command() + + currprocs = 0 + all = set([]) + pipedesc = {} + pendingexecs = deque() + exitcode = 0 + + nodemap = {} + if options.substitutename: + subname = options.substitutename + if '{' not in subname: + subname = '{node}' + subname + for exp in c.create('/noderange/{0}/attributes/expression'.format(args[0]), + {'expression': subname}): + if 'error' in exp: + sys.stderr.write(exp['error'] + '\n') + exitcode |= exp.get('errorcode', 1) + ex = exp.get('databynode', ()) + for node in ex: + nodemap[node] = ex[node]['value'] + for rsp in c.read('/noderange/{0}/nodes/'.format(args[0])): + if 'error' in rsp: + sys.stderr.write(rsp['error'] + '\n') + exitcode |= rsp.get('errorcode', 1) + if rsp.get('item', {}).get('href', None): + node = rsp['item']['href'][:-1] + pingnode = nodemap.get(node, node) + cmdv = ['ping', '-c', '1', '-W', '1', pingnode] + if currprocs < concurrentprocs: + currprocs += 1 + run_cmdv(node, cmdv, all, pipedesc) + else: + pendingexecs.append((node, cmdv)) + if not all or exitcode: + sys.exit(exitcode) + rdy, _, _ = select.select(all, [], [], 10) + while all: + pernodeout = {} + for r in rdy: + desc = pipedesc[r] + node = desc['node'] + data = True + while data and select.select([r], [], [], 0)[0]: + data = r.readline() + if not data: + pop = desc['popen'] + ret = pop.poll() + if ret is not None: + exitcode = exitcode | ret + all.discard(r) + r.close() + if desc['type'] == 'stdout': + if ret: + print('{0}: no_ping'.format(node)) + else: + print('{0}: ping'.format(node)) + if pendingexecs: + node, cmdv = pendingexecs.popleft() + run_cmdv(node, cmdv, all, pipedesc) + for node in sortutil.natural_sort(pernodeout): + for line in pernodeout[node]: + line = client.stringify(line) + if options.nonodeprefix: + sys.stdout.write(line) + else: + sys.stdout.write('{0}: {1}'.format(node, line)) + sys.stdout.flush() + if all: + rdy, _, _ = select.select(all, [], [], 10) + sys.exit(exitcode) + + +def run_cmdv(node, cmdv, all, pipedesc): + nopen = subprocess.Popen( + cmdv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + pipedesc[nopen.stdout] = {'node': node, 'popen': nopen, + 'type': 'stdout'} + pipedesc[nopen.stderr] = {'node': node, 'popen': nopen, + 'type': 'stderr'} + all.add(nopen.stdout) + all.add(nopen.stderr) + + +if __name__ == '__main__': + run()