mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-17 21:23:18 +00:00
Fix race condition on configuration writeback
If the sync thread had moved beyond the point of checking _writepending, but had not quite managed to trigger isAlive() to be false, the sync cfg to file would transition to never work. Address this by adding one lock and one boolean. Inside the lock, the two booleans are manipulated to atomically mark the end of thread sync activity linked with check for pending data to write.
This commit is contained in:
parent
e9ac1617a8
commit
89ca5b412b
@ -405,6 +405,8 @@ class ConfigManager(object):
|
||||
_cfgdir = "/etc/confluent/cfg/"
|
||||
_cfgwriter = None
|
||||
_writepending = False
|
||||
_syncrunning = False
|
||||
_syncstate = threading.RLock()
|
||||
_attribwatchers = {}
|
||||
_nodecollwatchers = {}
|
||||
_notifierids = {}
|
||||
@ -1123,15 +1125,13 @@ class ConfigManager(object):
|
||||
|
||||
@classmethod
|
||||
def _bg_sync_to_file(cls):
|
||||
if cls._writepending:
|
||||
# already have a write scheduled
|
||||
return
|
||||
elif cls._cfgwriter is not None and cls._cfgwriter.isAlive():
|
||||
#write in progress, request write when done
|
||||
cls._writepending = True
|
||||
else:
|
||||
cls._cfgwriter = threading.Thread(target=cls._sync_to_file)
|
||||
cls._cfgwriter.start()
|
||||
with cls._syncstate:
|
||||
if cls._syncrunning:
|
||||
cls._writepending = True
|
||||
return
|
||||
cls._syncrunning = True
|
||||
cls._cfgwriter = threading.Thread(target=cls._sync_to_file)
|
||||
cls._cfgwriter.start()
|
||||
|
||||
@classmethod
|
||||
def _sync_to_file(cls):
|
||||
@ -1170,8 +1170,14 @@ class ConfigManager(object):
|
||||
del dbf[ck]
|
||||
else:
|
||||
dbf[ck] = cPickle.dumps(currdict[category][ck])
|
||||
if cls._writepending:
|
||||
cls._writepending = False
|
||||
willrun = False
|
||||
with cls._syncstate:
|
||||
if cls._writepending:
|
||||
cls._writepending = False
|
||||
willrun = True
|
||||
else:
|
||||
cls._syncrunning = False
|
||||
if willrun:
|
||||
return cls._sync_to_file()
|
||||
|
||||
def _recalculate_expressions(self, cfgobj, formatter, node, changeset):
|
||||
|
Loading…
x
Reference in New Issue
Block a user