mirror of
				https://github.com/xcat2/confluent.git
				synced 2025-10-26 08:55:31 +00:00 
			
		
		
		
	Add nodersync
This provides bulk transfer with status to a noderange.
This commit is contained in:
		
							
								
								
									
										153
									
								
								confluent_client/bin/nodersync
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										153
									
								
								confluent_client/bin/nodersync
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,153 @@ | ||||
| #!/usr/bin/env python | ||||
| # vim: tabstop=4 shiftwidth=4 softtabstop=4 | ||||
|  | ||||
| # Copyright 2016-2017 Lenovo | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
|  | ||||
| from collections import deque | ||||
| import optparse | ||||
| import os | ||||
| import select | ||||
| import shlex | ||||
| import signal | ||||
| import subprocess | ||||
| import sys | ||||
|  | ||||
| try: | ||||
|     signal.signal(signal.SIGPIPE, signal.SIG_DFL) | ||||
| except AttributeError: | ||||
|     pass | ||||
| path = os.path.dirname(os.path.realpath(__file__)) | ||||
| path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) | ||||
| if path.startswith('/opt'): | ||||
|     sys.path.append(path) | ||||
|  | ||||
| import confluent.client as client | ||||
| import confluent.screensqueeze as sq | ||||
| import confluent.sortutil as sortutil | ||||
|  | ||||
|  | ||||
| def run(): | ||||
|     argparser = optparse.OptionParser( | ||||
|         usage="Usage: %prog location noderange:location", | ||||
|     ) | ||||
|     argparser.add_option('-f', '-c', '--count', type='int', default=168, | ||||
|                          help='Number of nodes to concurrently rsync') | ||||
|     # among other things, FD_SETSIZE limits.  Besides, spawning too many | ||||
|     # processes can be unkind for the unaware on memory pressure and such... | ||||
|     argparser.disable_interspersed_args() | ||||
|     (options, args) = argparser.parse_args() | ||||
|     if len(args) < 2 or ':' not in args[-1]: | ||||
|         argparser.print_help() | ||||
|         sys.exit(1) | ||||
|     concurrentprocs = options.count | ||||
|     noderange, targpath = args[-1].split(':', 1) | ||||
|     client.check_globbing(noderange) | ||||
|     c = client.Command() | ||||
|     cmdstr = " ".join(args[:-1]) | ||||
|     cmdstr = 'rsync -av --info=progress2 ' + cmdstr | ||||
|     cmdstr += ' {node}:' + targpath | ||||
|  | ||||
|     currprocs = 0 | ||||
|     all = set([]) | ||||
|     pipedesc = {} | ||||
|     pendingexecs = deque() | ||||
|     exitcode = 0 | ||||
|  | ||||
|     for exp in c.create('/noderange/{0}/attributes/expression'.format(noderange), | ||||
|                         {'expression': cmdstr}): | ||||
|         if 'error' in exp: | ||||
|             sys.stderr.write(exp['error'] + '\n') | ||||
|             exitcode |= exp.get('errorcode', 1) | ||||
|         ex = exp.get('databynode', ()) | ||||
|         for node in ex: | ||||
|             cmd = ex[node]['value'].encode('utf-8') | ||||
|             cmdv = shlex.split(cmd) | ||||
|             if currprocs < concurrentprocs: | ||||
|                 currprocs += 1 | ||||
|                 run_cmdv(node, cmdv, all, pipedesc) | ||||
|             else: | ||||
|                 pendingexecs.append((node, cmdv)) | ||||
|     if not all or exitcode: | ||||
|         sys.exit(exitcode) | ||||
|     rdy, _, _ = select.select(all, [], [], 10) | ||||
|     nodeerrs = {} | ||||
|     pernodeout = {} | ||||
|     pernodefile = {} | ||||
|     output = sq.ScreenPrinter(noderange, c) | ||||
|  | ||||
|     while all: | ||||
|         for r in rdy: | ||||
|             desc = pipedesc[r] | ||||
|             node = desc['node'] | ||||
|             data = True | ||||
|             while data and select.select([r], [], [], 0)[0]: | ||||
|                 data = r.read(1) | ||||
|                 if data: | ||||
|                     if desc['type'] == 'stdout': | ||||
|                         if node not in pernodeout: | ||||
|                             pernodeout[node] = '' | ||||
|                         pernodeout[node] += data | ||||
|                         if '\n' in pernodeout[node]: | ||||
|                             currout, pernodeout[node] = pernodeout[node].split('\n', 1) | ||||
|                             if currout: | ||||
|                                 pernodefile[node] = os.path.basename(currout) | ||||
|                         if '\r' in pernodeout[node]: | ||||
|                             currout, pernodeout[node] = pernodeout[node].split('\r', 1) | ||||
|                             if currout: | ||||
|                                 currout = currout.split() | ||||
|                                 try: | ||||
|                                     currout = currout[1] | ||||
|                                     output.set_output(node, '{0}:{1}'.format(pernodefile[node], currout)) | ||||
|                                 except IndexError: | ||||
|                                     pernodefile = currout[0] | ||||
|                                     pass | ||||
|                     else: | ||||
|                         output.set_output(node, 'error!') | ||||
|                         if node not in nodeerrs: | ||||
|                             nodeerrs[node] = '' | ||||
|                         nodeerrs[node] += data | ||||
|                 else: | ||||
|                     pop = desc['popen'] | ||||
|                     ret = pop.poll() | ||||
|                     if ret is not None: | ||||
|                         exitcode = exitcode | ret | ||||
|                         all.discard(r) | ||||
|                         r.close() | ||||
|                         if node not in nodeerrs: | ||||
|                             output.set_output(node, 'complete') | ||||
|                         if desc['type'] == 'stdout' and pendingexecs: | ||||
|                             node, cmdv = pendingexecs.popleft() | ||||
|                             run_cmdv(node, cmdv, all, pipedesc) | ||||
|         if all: | ||||
|             rdy, _, _ = select.select(all, [], [], 10) | ||||
|     for node in nodeerrs: | ||||
|         for line in nodeerrs[node].split('\n'): | ||||
|             sys.stderr.write('{0}: {1}\n'.format(node, line)) | ||||
|     sys.exit(exitcode) | ||||
|  | ||||
|  | ||||
| def run_cmdv(node, cmdv, all, pipedesc): | ||||
|     nopen = subprocess.Popen( | ||||
|         cmdv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||||
|     pipedesc[nopen.stdout] = {'node': node, 'popen': nopen, | ||||
|                               'type': 'stdout'} | ||||
|     pipedesc[nopen.stderr] = {'node': node, 'popen': nopen, | ||||
|                               'type': 'stderr'} | ||||
|     all.add(nopen.stdout) | ||||
|     all.add(nopen.stderr) | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     run() | ||||
| @@ -44,6 +44,8 @@ class ScreenPrinter(object): | ||||
|         self.fieldwidth = maxlen + textlen + 1  # 1 for column | ||||
|  | ||||
|     def set_output(self, node, text): | ||||
|         if self.nodeoutput[node] == text: | ||||
|             return | ||||
|         self.nodeoutput[node] = text | ||||
|         if len(text) >= self.textlen: | ||||
|             self.textlen = len(text) + 1 | ||||
|   | ||||
		Reference in New Issue
	
	Block a user