diff --git a/confluent_client/bin/noderun b/confluent_client/bin/noderun index 19d8f133..8e380d84 100755 --- a/confluent_client/bin/noderun +++ b/confluent_client/bin/noderun @@ -34,6 +34,7 @@ if path.startswith('/opt'): sys.path.append(path) import confluent.client as client +import confluent.sortutil as sortutil def run(): @@ -78,26 +79,34 @@ def run(): sys.exit(exitcode) rdy, _, _ = select.select(all, [], [], 10) while all: + pernodeout = {} for r in rdy: - data = r.readline() desc = pipedesc[r] - if data: - node = desc['node'] - if desc['type'] == 'stdout': - sys.stdout.write('{0}: {1}'.format(node,data)) - sys.stdout.flush() + node = desc['node'] + data = True + while data and select.select([r], [], [], 0): + data = r.readline() + if data: + if desc['type'] == 'stdout': + if node not in pernodeout: + pernodeout[node] = [] + pernodeout[node].append(data) + else: + sys.stderr.write('{0}: {1}'.format(node, data)) + sys.stderr.flush() else: - sys.stderr.write('{0}: {1}'.format(node, data)) - sys.stderr.flush() - else: - pop = desc['popen'] - ret = pop.poll() - if ret is not None: - exitcode = exitcode | ret - all.discard(r) - if desc['type'] == 'stdout' and pendingexecs: - node, cmdv = pendingexecs.popleft() - run_cmdv(node, cmdv, all, pipedesc) + pop = desc['popen'] + ret = pop.poll() + if ret is not None: + exitcode = exitcode | ret + all.discard(r) + if desc['type'] == 'stdout' and pendingexecs: + node, cmdv = pendingexecs.popleft() + run_cmdv(node, cmdv, all, pipedesc) + for node in sortutil.natural_sort(pernodeout): + for line in pernodeout[node]: + sys.stdout.write('{0}: {1}'.format(node, line)) + sys.stdout.flush() if all: rdy, _, _ = select.select(all, [], [], 10) sys.exit(exitcode) diff --git a/confluent_client/bin/nodeshell b/confluent_client/bin/nodeshell index 6b8f6200..95706c55 100755 --- a/confluent_client/bin/nodeshell +++ b/confluent_client/bin/nodeshell @@ -33,6 +33,7 @@ if path.startswith('/opt'): sys.path.append(path) import confluent.client as client +import confluent.sortutil as sortutil def run(): @@ -79,26 +80,34 @@ def run(): sys.exit(exitcode) rdy, _, _ = select.select(all, [], [], 10) while all: + pernodeout = {} for r in rdy: - data = r.readline() desc = pipedesc[r] - if data: - node = desc['node'] - if desc['type'] == 'stdout': - sys.stdout.write('{0}: {1}'.format(node,data)) - sys.stdout.flush() + node = desc['node'] + data = True + while data and select.select([r], [], [], 0): + data = r.readline() + if data: + if desc['type'] == 'stdout': + if node not in pernodeout: + pernodeout[node] = [] + pernodeout[node].append(data) + else: + sys.stderr.write('{0}: {1}'.format(node, data)) + sys.stderr.flush() else: - sys.stderr.write('{0}: {1}'.format(node, data)) - sys.stderr.flush() - else: - pop = desc['popen'] - ret = pop.poll() - if ret is not None: - exitcode = exitcode | ret - all.discard(r) - if desc['type'] == 'stdout' and pendingexecs: - node, cmdv = pendingexecs.popleft() - run_cmdv(node, cmdv, all, pipedesc) + pop = desc['popen'] + ret = pop.poll() + if ret is not None: + exitcode = exitcode | ret + all.discard(r) + if desc['type'] == 'stdout' and pendingexecs: + node, cmdv = pendingexecs.popleft() + run_cmdv(node, cmdv, all, pipedesc) + for node in sortutil.natural_sort(pernodeout): + for line in pernodeout[node]: + sys.stdout.ouwrite('{0}: {1}'.format(node, line)) + sys.stdout.flush() if all: rdy, _, _ = select.select(all, [], [], 10) sys.exit(exitcode) diff --git a/confluent_client/confluent/sortutil.py b/confluent_client/confluent/sortutil.py new file mode 100644 index 00000000..109eacbc --- /dev/null +++ b/confluent_client/confluent/sortutil.py @@ -0,0 +1,43 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2014 IBM Corporation +# Copyright 2015-2016 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +numregex = re.compile('([0-9]+)') + + +def naturalize_string(key): + """Analyzes string in a human way to enable natural sort + + :param key: The node name to analyze + :returns: A structure that can be consumed by 'sorted' + """ + return [int(text) if text.isdigit() else text.lower() + for text in re.split(numregex, key)] + + +def natural_sort(iterable): + """Return a sort using natural sort if possible + + :param iterable: + :return: + """ + try: + return sorted(iterable, key=naturalize_string) + except TypeError: + # The natural sort attempt failed, fallback to ascii sort + return sorted(iterable)