From 2f566fb81ddfdd14b3b623ee6d1ff48d67e636b4 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 10 Oct 2018 09:41:25 -0400 Subject: [PATCH] Provide fallback for unexpected reply in collective show --- confluent_server/bin/collective | 2 +- .../confluent/config/configmanager.py | 21 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 5f0c78f6..c8d31e62 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -68,7 +68,7 @@ def join_collective(server, invitation): res = tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) - print(res.get('status', res['error'])) + print(res.get('status', res.get('error', repr(res)))) def show_collective(): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bc114426..cf01c73e 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -63,6 +63,7 @@ import eventlet import eventlet.event as event import eventlet.green.select as select import eventlet.green.threading as gthread +import eventlet.greenpool as gpool import fnmatch import json import operator @@ -947,7 +948,7 @@ class ConfigManager(object): os.getenv('SystemDrive'), '\\ProgramData', 'confluent', 'cfg') else: _cfgdir = "/etc/confluent/cfg" - _cfgwriter = None + _cfgwriter = gpool.GreenPool(1) _writepending = False _syncrunning = False _syncstate = threading.RLock() @@ -2045,11 +2046,9 @@ class ConfigManager(object): @classmethod def wait_for_sync(cls, fullsync=False): - if cls._cfgwriter is not None: - cls._cfgwriter.join() + cls._cfgwriter.waitall() cls._bg_sync_to_file(fullsync) - if cls._cfgwriter is not None: - cls._cfgwriter.join() + cls._cfgwriter.waitall() @classmethod def shutdown(cls): @@ -2065,11 +2064,13 @@ class ConfigManager(object): cls._writepending = True return cls._syncrunning = True - # if the thread is exiting, join it to let it close, just in case - if cls._cfgwriter is not None: - cls._cfgwriter.join() - cls._cfgwriter = threading.Thread(target=cls._sync_to_file, args=(fullsync,)) - cls._cfgwriter.start() + cls._cfgwriter.spawn_n(cls._g_sync_to_file, fullsync) + + @classmethod + def _g_sync_to_file(cls, fullsync): + cls._sync_to_file(fullsync) + + @classmethod def _sync_to_file(cls, fullsync=False):