From b92487da24fcdcfe29226184607e8f9899ddf1b9 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 28 May 2014 12:25:13 -0400 Subject: [PATCH] Fix handling of children in shellmodule plugins Exit of a shell plugin was not handled correctly. Fix that --- confluent_server/confluent/shellmodule.py | 56 +++++++++++++++++------ 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/confluent_server/confluent/shellmodule.py b/confluent_server/confluent/shellmodule.py index a941111a..6af11682 100644 --- a/confluent_server/confluent/shellmodule.py +++ b/confluent_server/confluent/shellmodule.py @@ -35,6 +35,10 @@ import subprocess class ExecConsole(conapi.Console): def __init__(self, executable, node): + self.subproc = None + self._master = None + self._datacallback = None + self.readerthread = None self.executable = executable self.subenv = { 'TERM': 'xterm', @@ -42,17 +46,34 @@ class ExecConsole(conapi.Console): } def relaydata(self): - while 1: - select.select( + while self.subproc is not None: + rdylist, _, _ = select.select( (self._master, self.subproc.stderr), (), (), 3600 + (random.random() * 120)) - try: - while 1: - self._datacallback(os.read(self._master, 128)) - eventlet.sleep(0) - except OSError as e: - if e.errno == 11: - pass + if self._master in rdylist: + try: + somedata = os.read(self._master, 128) + while somedata: + self._datacallback(somedata) + eventlet.sleep(0) + somedata = os.read(self._master, 128) + except OSError as e: + if e.errno != 11: + raise + if self.subproc.stderr in rdylist: + try: + somedata = self.subproc.stderr.read() + while somedata: + self._datacallback(somedata) + eventlet.sleep(0) + somedata = self.subproc.stderr.read() + except IOError as e: + if e.errno != 11: + raise + childstate = self.subproc.poll() + if childstate is not None: + self._datacallback(conapi.ConsoleEvent.Disconnect) + self.subproc = None def connect(self, callback): self._datacallback = callback @@ -61,18 +82,27 @@ class ExecConsole(conapi.Console): self.subproc = subprocess.Popen( [self.executable], env=self.subenv, stdin=slave, stdout=slave, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, close_fds=True) + os.close(slave) fcntl.fcntl(master, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(self.subproc.stderr.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) self.readerthread = eventlet.spawn(self.relaydata) def write(self, data): - self.subproc.stdin.write(data) + os.write(self._master, data) def close(self): + if self.subproc is None or self.subproc.poll() is not None: + return self.subproc.terminate() - eventlet.sleep(10) - self.subproc.kill() + waittime = 10 + while self.subproc is not None or self.subproc.poll() is None: + eventlet.sleep(1) + waittime -= 1 + if waittime == 0: + break + if self.subproc is not None and self.subproc.poll() is None: + self.subproc.kill() class Plugin(object):