2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-22 23:53:37 +00:00

Modify clear_commit to use the same thread

Additionally, wrap a lock around the dbm operations, in case something
in the future makes a mistake.
This commit is contained in:
Jarrod Johnson 2018-07-13 15:27:16 -04:00
parent c74fdf5924
commit 042d7ab5cf

View File

@ -79,6 +79,7 @@ _masterkey = None
_masterintegritykey = None
_dirtylock = threading.RLock()
_leaderlock = gthread.RLock()
_synclock = threading.RLock()
_config_areas = ('nodegroups', 'nodes', 'usergroups', 'users')
tracelog = None
statelessmode = False
@ -548,7 +549,7 @@ def commit_clear():
os.remove(os.path.join(ConfigManager._cfgdir, cfg))
except OSError as oe:
pass
ConfigManager._sync_to_file(fullsync=True)
ConfigManager.wait_for_sync(True)
ConfigManager._bg_sync_to_file()
cfgleader = None
@ -1877,8 +1878,8 @@ class ConfigManager(object):
pass
@classmethod
def wait_for_sync(cls):
cls._bg_sync_to_file()
def wait_for_sync(cls, fullsync=False):
cls._bg_sync_to_file(fullsync)
if cls._cfgwriter is not None:
cls._cfgwriter.join()
@ -1888,7 +1889,7 @@ class ConfigManager(object):
sys.exit(0)
@classmethod
def _bg_sync_to_file(cls):
def _bg_sync_to_file(cls, fullsync=False):
if statelessmode:
return
with cls._syncstate:
@ -1899,88 +1900,89 @@ class ConfigManager(object):
# if the thread is exiting, join it to let it close, just in case
if cls._cfgwriter is not None:
cls._cfgwriter.join()
cls._cfgwriter = threading.Thread(target=cls._sync_to_file)
cls._cfgwriter = threading.Thread(target=cls._sync_to_file, args=fullsync)
cls._cfgwriter.start()
@classmethod
def _sync_to_file(cls, fullsync=False):
if statelessmode:
return
with open(os.path.join(cls._cfgdir, 'transactioncount'), 'w') as f:
f.write(struct.pack('!Q', _txcount))
if fullsync or 'dirtyglobals' in _cfgstore:
if fullsync:
dirtyglobals = _cfgstore['globals']
else:
with _dirtylock:
dirtyglobals = copy.deepcopy(_cfgstore['dirtyglobals'])
del _cfgstore['dirtyglobals']
_mkpath(cls._cfgdir)
globalf = dbm.open(os.path.join(cls._cfgdir, "globals"), 'c', 384) # 0600
try:
for globalkey in dirtyglobals:
if globalkey in _cfgstore['globals']:
globalf[globalkey] = \
cPickle.dumps(_cfgstore['globals'][globalkey])
else:
if globalkey in globalf:
del globalf[globalkey]
finally:
globalf.close()
if fullsync or 'collectivedirty' in _cfgstore:
collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"),
'c', 384)
try:
with _synclock:
if statelessmode:
return
with open(os.path.join(cls._cfgdir, 'transactioncount'), 'w') as f:
f.write(struct.pack('!Q', _txcount))
if fullsync or 'dirtyglobals' in _cfgstore:
if fullsync:
colls = _cfgstore['collective']
dirtyglobals = _cfgstore['globals']
else:
with _dirtylock:
colls = copy.deepcopy(_cfgstore['collectivedirty'])
del _cfgstore['collectivedirty']
for coll in colls:
if coll in _cfgstore['collective']:
collectivef[coll] = cPickle.dumps(
_cfgstore['collective'][coll])
else:
if coll in collectivef:
del globalf[coll]
finally:
collectivef.close()
if fullsync:
pathname = cls._cfgdir
currdict = _cfgstore['main']
for category in currdict:
_mkpath(pathname)
dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600
dirtyglobals = copy.deepcopy(_cfgstore['dirtyglobals'])
del _cfgstore['dirtyglobals']
_mkpath(cls._cfgdir)
globalf = dbm.open(os.path.join(cls._cfgdir, "globals"), 'c', 384) # 0600
try:
for ck in currdict[category]:
dbf[ck] = cPickle.dumps(currdict[category][ck])
for globalkey in dirtyglobals:
if globalkey in _cfgstore['globals']:
globalf[globalkey] = \
cPickle.dumps(_cfgstore['globals'][globalkey])
else:
if globalkey in globalf:
del globalf[globalkey]
finally:
dbf.close()
elif 'dirtykeys' in _cfgstore:
with _dirtylock:
currdirt = copy.deepcopy(_cfgstore['dirtykeys'])
del _cfgstore['dirtykeys']
for tenant in currdirt:
dkdict = currdirt[tenant]
if tenant is None:
pathname = cls._cfgdir
currdict = _cfgstore['main']
else:
pathname = os.path.join(cls._cfgdir, 'tenants', tenant)
currdict = _cfgstore['tenant'][tenant]
for category in dkdict:
globalf.close()
if fullsync or 'collectivedirty' in _cfgstore:
collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"),
'c', 384)
try:
if fullsync:
colls = _cfgstore['collective']
else:
with _dirtylock:
colls = copy.deepcopy(_cfgstore['collectivedirty'])
del _cfgstore['collectivedirty']
for coll in colls:
if coll in _cfgstore['collective']:
collectivef[coll] = cPickle.dumps(
_cfgstore['collective'][coll])
else:
if coll in collectivef:
del globalf[coll]
finally:
collectivef.close()
if fullsync:
pathname = cls._cfgdir
currdict = _cfgstore['main']
for category in currdict:
_mkpath(pathname)
dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600
try:
for ck in dkdict[category]:
if ck not in currdict[category]:
if ck in dbf:
del dbf[ck]
else:
dbf[ck] = cPickle.dumps(currdict[category][ck])
for ck in currdict[category]:
dbf[ck] = cPickle.dumps(currdict[category][ck])
finally:
dbf.close()
elif 'dirtykeys' in _cfgstore:
with _dirtylock:
currdirt = copy.deepcopy(_cfgstore['dirtykeys'])
del _cfgstore['dirtykeys']
for tenant in currdirt:
dkdict = currdirt[tenant]
if tenant is None:
pathname = cls._cfgdir
currdict = _cfgstore['main']
else:
pathname = os.path.join(cls._cfgdir, 'tenants', tenant)
currdict = _cfgstore['tenant'][tenant]
for category in dkdict:
_mkpath(pathname)
dbf = dbm.open(os.path.join(pathname, category), 'c', 384) # 0600
try:
for ck in dkdict[category]:
if ck not in currdict[category]:
if ck in dbf:
del dbf[ck]
else:
dbf[ck] = cPickle.dumps(currdict[category][ck])
finally:
dbf.close()
willrun = False
with cls._syncstate:
if cls._writepending: