2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-19 20:16:04 +00:00

Merge branch 'master' into async

This commit is contained in:
Jarrod Johnson 2024-04-11 08:14:20 -04:00
commit fb8ac158cb
12 changed files with 105 additions and 28 deletions

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -9,6 +9,7 @@ import os
import shutil
import pwd
import grp
import sys
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
@ -231,12 +232,14 @@ def synchronize():
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
sys.stderr.write(rsp.decode('utf8'))
sys.stderr.write('\n')
sys.stderr.flush()
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
time.sleep(1+(2*random.random()))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
@ -297,6 +300,8 @@ if __name__ == '__main__':
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
sys.stderr.write('\n')
sys.stderr.flush()
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)

View File

@ -517,8 +517,8 @@ def handle_request(req, make_response):
pals = get_extra_names(nodename, cfg, myip)
result = syncfiles.start_syncfiles(
nodename, cfg, json.loads(reqbody), pals)
start_response(result, ())
yield ''
start_response(result[0], ())
yield result[1]
return
if 'GET' == operation:
status, output = syncfiles.get_syncresult(nodename)

View File

@ -285,12 +285,11 @@ def mkpathorlink(source, destination, appendexist=False):
syncrunners = {}
cleaner = None
def start_syncfiles(nodename, cfg, suffixes, principals=[]):
global cleaner
peerip = None
if nodename in syncrunners:
return '503 Synchronization already in progress '
if 'myips' in suffixes:
targips = suffixes['myips']
del suffixes['myips']
@ -313,13 +312,41 @@ def start_syncfiles(nodename, cfg, suffixes, principals=[]):
raise Exception('Cannot perform syncfiles without profile assigned')
synclist = '/var/lib/confluent/public/os/{}/syncfiles'.format(profile)
if not os.path.exists(synclist):
return '200 OK' # not running
return '200 OK', 'No synclist' # not running
sl = SyncList(synclist, nodename, cfg)
if not (sl.appendmap or sl.mergemap or sl.replacemap or sl.appendoncemap):
return '200 OK' # the synclist has no actual entries
return '200 OK', 'Empty synclist' # the synclist has no actual entries
if nodename in syncrunners:
if syncrunners[nodename].dead:
syncrunners[nodename].wait()
else:
return '503 Synchronization already in progress', 'Synchronization already in progress for {}'.format(nodename)
syncrunners[nodename] = eventlet.spawn(
sync_list_to_node, sl, nodename, suffixes, peerip)
return '202 Queued' # backgrounded
if not cleaner:
cleaner = eventlet.spawn(cleanit)
return '202 Queued', 'Background synchronization initiated' # backgrounded
def cleanit():
toreap = {}
while True:
for nn in list(syncrunners):
if syncrunners[nn].dead:
if nn in toreap:
try:
syncrunners[nn].wait()
except Exception as e:
print(repr(e))
pass
del syncrunners[nn]
del toreap[nn]
else:
toreap[nn] = 1
elif nn in toreap:
del toreap[nn]
eventlet.sleep(30)
def get_syncresult(nodename):
if nodename not in syncrunners: