From 4818bd57bb81e541c63fee1c77ff14ff78c7d66a Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 16 Mar 2021 17:19:23 -0400 Subject: [PATCH] Advance state of runansible Begin work to wire the results back to the supervisor and ultimately back to the node affected. --- confluent_server/confluent/runansible.py | 51 ++++++++++++++---------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/confluent_server/confluent/runansible.py b/confluent_server/confluent/runansible.py index e234d9ee..444444c3 100644 --- a/confluent_server/confluent/runansible.py +++ b/confluent_server/confluent/runansible.py @@ -20,21 +20,42 @@ try: import confluent.sshutil as sshutil except ImportError: pass +import eventlet import eventlet.green.subprocess as subprocess import msgpack import os import sys running_status = {} +class PlayRunner(object): + def __init__(self, playfiles, nodes): + self.playfiles = playfiles + self.nodes = nodes + self.worker = None + self.stdout = None + + def _start_playbooks(self): + self.worker = eventlet.spawn(self._really_run_playbooks) + + def _really_run_playbooks(self): + with os.open(os.devnull, os.O_RDWR) as devnull: + targnodes = ','.join(self.nodes) + for playfilename in self.playfiles: + worker = subprocess.Popen( + [sys.executable, __file__, targnodes, playfilename], + stdin=devnull, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.stdout, self.stderr = worker.communicate() + print(repr(self.stdout)) + + def run_playbooks(playfiles, nodes): sshutil.prep_ssh_key('/etc/confluent/ssh/automation') - targnodes = ','.join(nodes) - for playfilename in playfiles: - worker = subprocess.Popen( - [sys.executable, __file__, targnodes, playfilename]) - for node in nodes: - running_status[node] = worker + runner = PlayRunner(playfiles, nodes) + for node in nodes: + running_status[node] = runner + runner._start_playbooks() if __name__ == '__main__': @@ -50,25 +71,13 @@ if __name__ == '__main__': class ResultsCollector(CallbackBase): def v2_runner_on_unreachable(self, result): - print(dir(result)) - print(repr(result._result)) + sys.stdout.write(msgpack.packb(result._result, use_bin_type=True)) def v2_runner_on_ok(self, result, *args, **kwargs): - print(repr(result)) - print(repr(result.task_name)) - print(repr(result.is_changed())) - print(repr(result.is_skipped())) - print(repr(result.is_failed())) - print(repr(result.is_unreachable())) - print(dir(result)) - print(repr(result._result)) - print(repr(args)) - print(repr(kwargs)) + sys.stdout.write(msgpack.packb(result._result, use_bin_type=True)) def v2_runner_on_failed(self, result, *args, **kwargs): - print(repr(result)) - print(repr(args)) - print(repr(kwargs)) + sys.stdout.write(msgpack.packb(result._result, use_bin_type=True)) context.CLIARGS = ImmutableDict( connection='smart', module_path=['/usr/share/ansible'], forks=10,