2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 09:32:21 +00:00

Advance state of runansible

Begin work to wire the results back to the supervisor and ultimately
back to the node affected.
This commit is contained in:
Jarrod Johnson 2021-03-16 17:19:23 -04:00
parent 7763327a63
commit 4818bd57bb

View File

@ -20,21 +20,42 @@ try:
import confluent.sshutil as sshutil
except ImportError:
pass
import eventlet
import eventlet.green.subprocess as subprocess
import msgpack
import os
import sys
running_status = {}
class PlayRunner(object):
def __init__(self, playfiles, nodes):
self.playfiles = playfiles
self.nodes = nodes
self.worker = None
self.stdout = None
def _start_playbooks(self):
self.worker = eventlet.spawn(self._really_run_playbooks)
def _really_run_playbooks(self):
with os.open(os.devnull, os.O_RDWR) as devnull:
targnodes = ','.join(self.nodes)
for playfilename in self.playfiles:
worker = subprocess.Popen(
[sys.executable, __file__, targnodes, playfilename],
stdin=devnull, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.stdout, self.stderr = worker.communicate()
print(repr(self.stdout))
def run_playbooks(playfiles, nodes):
sshutil.prep_ssh_key('/etc/confluent/ssh/automation')
targnodes = ','.join(nodes)
for playfilename in playfiles:
worker = subprocess.Popen(
[sys.executable, __file__, targnodes, playfilename])
for node in nodes:
running_status[node] = worker
runner = PlayRunner(playfiles, nodes)
for node in nodes:
running_status[node] = runner
runner._start_playbooks()
if __name__ == '__main__':
@ -50,25 +71,13 @@ if __name__ == '__main__':
class ResultsCollector(CallbackBase):
def v2_runner_on_unreachable(self, result):
print(dir(result))
print(repr(result._result))
sys.stdout.write(msgpack.packb(result._result, use_bin_type=True))
def v2_runner_on_ok(self, result, *args, **kwargs):
print(repr(result))
print(repr(result.task_name))
print(repr(result.is_changed()))
print(repr(result.is_skipped()))
print(repr(result.is_failed()))
print(repr(result.is_unreachable()))
print(dir(result))
print(repr(result._result))
print(repr(args))
print(repr(kwargs))
sys.stdout.write(msgpack.packb(result._result, use_bin_type=True))
def v2_runner_on_failed(self, result, *args, **kwargs):
print(repr(result))
print(repr(args))
print(repr(kwargs))
sys.stdout.write(msgpack.packb(result._result, use_bin_type=True))
context.CLIARGS = ImmutableDict(
connection='smart', module_path=['/usr/share/ansible'], forks=10,