mirror of
https://github.com/xcat2/confluent.git
synced 2025-08-24 20:20:36 +00:00
Reap stale sync runners after a minute dead
If the client never claims the result, delete the sync task.
This commit is contained in:
@@ -285,9 +285,10 @@ def mkpathorlink(source, destination, appendexist=False):
|
||||
|
||||
|
||||
syncrunners = {}
|
||||
|
||||
cleaner = None
|
||||
|
||||
def start_syncfiles(nodename, cfg, suffixes, principals=[]):
|
||||
global cleaner
|
||||
peerip = None
|
||||
if nodename in syncrunners:
|
||||
return '503 Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename)
|
||||
@@ -319,7 +320,26 @@ def start_syncfiles(nodename, cfg, suffixes, principals=[]):
|
||||
return '200 OK', 'Empty synclist' # the synclist has no actual entries
|
||||
syncrunners[nodename] = eventlet.spawn(
|
||||
sync_list_to_node, sl, nodename, suffixes, peerip)
|
||||
return '202 Queued', 'Background synchronization initiated' # backgrounded
|
||||
if not cleaner:
|
||||
cleaner = eventlet.spawn(cleanit)
|
||||
return '202 Queued', 'Background synchronization initiated' # backgrounded
|
||||
|
||||
|
||||
def cleanit():
|
||||
toreap = {}
|
||||
while True:
|
||||
for nn in list(syncrunners):
|
||||
if syncrunners[nn].dead:
|
||||
if nn in toreap:
|
||||
syncrunners[nn].wait()
|
||||
del syncrunners[nn]
|
||||
del toreap[nn]
|
||||
else:
|
||||
toreap[nn] = 1
|
||||
elif nn is in toreap:
|
||||
del toreap[nn]
|
||||
eventlet.sleep(30)
|
||||
|
||||
|
||||
def get_syncresult(nodename):
|
||||
if nodename not in syncrunners:
|
||||
|
Reference in New Issue
Block a user