2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-21 17:11:58 +00:00

Have runansible complete execution

It now completes runs and stores results for retrieval by deploying node.
This commit is contained in:
Jarrod Johnson 2021-03-17 16:54:37 -04:00
parent a149b0bcd0
commit 98d14344ce

View File

@ -1,8 +1,7 @@
#!/usr/bin/python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
# Copyright 2015-2018 Lenovo
# Copyright 2021 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -24,6 +23,7 @@ import eventlet
import eventlet.green.subprocess as subprocess
import msgpack
import os
import struct
import sys
running_status = {}
@ -32,23 +32,34 @@ class PlayRunner(object):
self.playfiles = playfiles
self.nodes = nodes
self.worker = None
self.stdout = None
self.results = []
self.complete = False
def _start_playbooks(self):
self.worker = eventlet.spawn(self._really_run_playbooks)
def get_available_results(self):
avail = self.results
self.results = []
return avail
def _really_run_playbooks(self):
with os.open(os.devnull, os.O_RDWR) as devnull:
with open(os.devnull, 'w+') as devnull:
targnodes = ','.join(self.nodes)
for playfilename in self.playfiles:
worker = subprocess.Popen(
[sys.executable, __file__, targnodes, playfilename],
stdin=devnull, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.stdout, self.stderr = worker.communicate()
print(repr(self.stdout))
stdout, self.stderr = worker.communicate()
current = memoryview(stdout)
while len(current):
sz = struct.unpack('=q', current[:8])[0]
result = msgpack.unpackb(current[8:8+sz], raw=False)
self.results.append(result)
current = current[8+sz:]
self.complete = True
def run_playbooks(playfiles, nodes):
sshutil.prep_ssh_key('/etc/confluent/ssh/automation')
@ -58,6 +69,24 @@ def run_playbooks(playfiles, nodes):
runner._start_playbooks()
def print_result(result, state, collector):
output = {
'task_name': result.task_name,
'changed': result._result['changed'],
}
output['state'] = state
output['warnings'] = result._result.get('warnings', [])
try:
del result._result['warnings']
except KeyError:
pass
if collector:
output['errorinfo'] = collector._dump_results(result._result)
msg = msgpack.packb(output, use_bin_type=True)
msglen = len(msg)
sys.stdout.buffer.write(struct.pack('=q', msglen))
sys.stdout.buffer.write(msg)
if __name__ == '__main__':
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
@ -70,14 +99,15 @@ if __name__ == '__main__':
import yaml
class ResultsCollector(CallbackBase):
def v2_runner_on_unreachable(self, result):
sys.stdout.write(msgpack.packb(result._result, use_bin_type=True))
print_result(result, 'UNREACHABLE', self)
def v2_runner_on_ok(self, result, *args, **kwargs):
sys.stdout.write(msgpack.packb(result._result, use_bin_type=True))
print_result(result, 'ok', self)
def v2_runner_on_failed(self, result, *args, **kwargs):
sys.stdout.write(msgpack.packb(result._result, use_bin_type=True))
print_result(result, 'FAILED', self)
context.CLIARGS = ImmutableDict(
connection='smart', module_path=['/usr/share/ansible'], forks=10,