diff --git a/confluent_client/bin/nodefreshen b/confluent_client/bin/nodefreshen new file mode 100755 index 00000000..e39447bc --- /dev/null +++ b/confluent_client/bin/nodefreshen @@ -0,0 +1,166 @@ +#!/usr/bin/python3 +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2016-2017 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import deque +import optparse +import os +import select +import signal +import subprocess +import sys + +try: + signal.signal(signal.SIGPIPE, signal.SIG_DFL) +except AttributeError: + pass +path = os.path.dirname(os.path.realpath(__file__)) +path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) +if path.startswith('/opt'): + sys.path.append(path) + +import confluent.client as client +import confluent.sortutil as sortutil +devnull = None + +def run(): + global devnull + devnull = open(os.devnull, 'rb') + argparser = optparse.OptionParser( + usage="Usage: %prog [options] noderange commandexpression", + epilog="Expressions are the same as in attributes, e.g. " + "'ipmitool -H {hardwaremanagement.manager}' will be expanded.") + argparser.add_option('-f', '-c', '--count', type='int', default=168, + help='Number of commands to run at a time') + argparser.add_option('-k', '--security', action='store_true', + help='Update SSH setup') + argparser.add_option('-F', '--sync', action='store_true', + help='Run the syncfiles associated with the currently completed OS profile on the noderange') + argparser.add_option('-P', '--scripts', + help='Re-run specified scripts, with full path under scripts, e.g. post.d/first,firstboot.d/second') + argparser.add_option('-m', '--maxnodes', type='int', + help='Specify a maximum number of ' + 'nodes to run remote ssh command to, ' + 'prompting if over the threshold') + # among other things, FD_SETSIZE limits. Besides, spawning too many + # processes can be unkind for the unaware on memory pressure and such... + #argparser.disable_interspersed_args() + (options, args) = argparser.parse_args() + if len(args) < 1: + argparser.print_help() + sys.exit(1) + client.check_globbing(args[0]) + concurrentprocs = options.count + c = client.Command() + + currprocs = 0 + all = set([]) + pipedesc = {} + pendingexecs = deque() + exitcode = 0 + + c.stop_if_noderange_over(args[0], options.maxnodes) + nodemap = {} + cmdparms = [] + nodes = [] + for res in c.read('/noderange/{0}/nodes/'.format(args[0])): + if 'error' in res: + sys.stderr.write(res['error'] + '\n') + exitcode |= res.get('errorcode', 1) + break + node = res['item']['href'][:-1] + nodes.append(node) + + cmdstorun = [] + if options.security: + cmdstorun.append(['run_remote', 'setupssh']) + if options.sync: + cmdstorun.append(['run_remote_python', 'syncfileclient']) + if options.scripts: + for script in options.scripts.split(','): + cmdstorun.append(['run_remote', script]) + if not cmdstorun: + argparser.print_help() + sys.exit(1) + idxbynode = {} + cmdvbase = ['bash', '/etc/confluent/functions'] + for sshnode in nodes: + idxbynode[sshnode] = 1 + cmdv = ['ssh', sshnode] + cmdvbase + cmdstorun[0] + if currprocs < concurrentprocs: + currprocs += 1 + run_cmdv(node, cmdv, all, pipedesc) + else: + pendingexecs.append((node, cmdv)) + if not all or exitcode: + sys.exit(exitcode) + rdy, _, _ = select.select(all, [], [], 10) + while all: + pernodeout = {} + for r in rdy: + desc = pipedesc[r] + node = desc['node'] + data = True + while data and select.select([r], [], [], 0)[0]: + data = r.readline() + if data: + if desc['type'] == 'stdout': + if node not in pernodeout: + pernodeout[node] = [] + pernodeout[node].append(data) + else: + data = client.stringify(data) + sys.stderr.write('{0}: {1}'.format(node, data)) + sys.stderr.flush() + else: + pop = desc['popen'] + ret = pop.poll() + if ret is not None: + exitcode = exitcode | ret + all.discard(r) + r.close() + if desc['type'] == 'stdout': + if idxbynode[node] < len(cmdstorun): + cmdv = ['ssh', sshnode] + cmdvbase + cmdstorun[idxbynode[node]] + idxbynode[node] += 1 + run_cmdv(node, cmdv, all, pipedesc) + elif pendingexecs: + node, cmdv = pendingexecs.popleft() + run_cmdv(node, cmdv, all, pipedesc) + for node in sortutil.natural_sort(pernodeout): + for line in pernodeout[node]: + line = client.stringify(line) + line = line.lstrip('\x08') + sys.stdout.write('{0}: {1}'.format(node, line)) + sys.stdout.flush() + if all: + rdy, _, _ = select.select(all, [], [], 10) + sys.exit(exitcode) + + +def run_cmdv(node, cmdv, all, pipedesc): + nopen = subprocess.Popen( + cmdv, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + pipedesc[nopen.stdout] = {'node': node, 'popen': nopen, + 'type': 'stdout'} + pipedesc[nopen.stderr] = {'node': node, 'popen': nopen, + 'type': 'stderr'} + all.add(nopen.stdout) + all.add(nopen.stderr) + + +if __name__ == '__main__': + run()