diff --git a/confluent_server/confluent/runansible.py b/confluent_server/confluent/runansible.py index 444444c3..bd608c49 100644 --- a/confluent_server/confluent/runansible.py +++ b/confluent_server/confluent/runansible.py @@ -1,8 +1,7 @@ #!/usr/bin/python # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2014 IBM Corporation -# Copyright 2015-2018 Lenovo +# Copyright 2021 Lenovo # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,6 +23,7 @@ import eventlet import eventlet.green.subprocess as subprocess import msgpack import os +import struct import sys running_status = {} @@ -32,23 +32,34 @@ class PlayRunner(object): self.playfiles = playfiles self.nodes = nodes self.worker = None - self.stdout = None + self.results = [] + self.complete = False def _start_playbooks(self): self.worker = eventlet.spawn(self._really_run_playbooks) + def get_available_results(self): + avail = self.results + self.results = [] + return avail + def _really_run_playbooks(self): - with os.open(os.devnull, os.O_RDWR) as devnull: + with open(os.devnull, 'w+') as devnull: targnodes = ','.join(self.nodes) for playfilename in self.playfiles: worker = subprocess.Popen( [sys.executable, __file__, targnodes, playfilename], stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.stdout, self.stderr = worker.communicate() - print(repr(self.stdout)) - - + stdout, self.stderr = worker.communicate() + current = memoryview(stdout) + while len(current): + sz = struct.unpack('=q', current[:8])[0] + result = msgpack.unpackb(current[8:8+sz], raw=False) + self.results.append(result) + current = current[8+sz:] + self.complete = True + def run_playbooks(playfiles, nodes): sshutil.prep_ssh_key('/etc/confluent/ssh/automation') @@ -58,6 +69,24 @@ def run_playbooks(playfiles, nodes): runner._start_playbooks() +def print_result(result, state, collector): + output = { + 'task_name': result.task_name, + 'changed': result._result['changed'], + } + output['state'] = state + output['warnings'] = result._result.get('warnings', []) + try: + del result._result['warnings'] + except KeyError: + pass + if collector: + output['errorinfo'] = collector._dump_results(result._result) + msg = msgpack.packb(output, use_bin_type=True) + msglen = len(msg) + sys.stdout.buffer.write(struct.pack('=q', msglen)) + sys.stdout.buffer.write(msg) + if __name__ == '__main__': from ansible.inventory.manager import InventoryManager from ansible.parsing.dataloader import DataLoader @@ -70,14 +99,15 @@ if __name__ == '__main__': import yaml class ResultsCollector(CallbackBase): + def v2_runner_on_unreachable(self, result): - sys.stdout.write(msgpack.packb(result._result, use_bin_type=True)) + print_result(result, 'UNREACHABLE', self) def v2_runner_on_ok(self, result, *args, **kwargs): - sys.stdout.write(msgpack.packb(result._result, use_bin_type=True)) + print_result(result, 'ok', self) def v2_runner_on_failed(self, result, *args, **kwargs): - sys.stdout.write(msgpack.packb(result._result, use_bin_type=True)) + print_result(result, 'FAILED', self) context.CLIARGS = ImmutableDict( connection='smart', module_path=['/usr/share/ansible'], forks=10,