From f68f9f46939ed115b501b125212359777cf045c6 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 9 Apr 2024 11:07:11 -0400 Subject: [PATCH] Make syncfile step robust or pause If syncfiles fails, keep it retrying. Also, slow down sync checking to avoid hammering the system. Further, randomized delay to spread highly synchronized requestors. Block attempts to do multiple concurrent syncfile runs. --- .../profiles/default/scripts/syncfileclient | 2 ++ .../profiles/default/scripts/syncfileclient | 2 ++ .../profiles/default/scripts/syncfileclient | 18 +++++++++++++++++- .../profiles/default/scripts/syncfileclient | 18 +++++++++++++++++- .../profiles/default/scripts/syncfileclient | 18 +++++++++++++++++- .../profiles/default/scripts/syncfileclient | 18 +++++++++++++++++- .../profiles/default/scripts/syncfileclient | 18 +++++++++++++++++- .../suse15/profiles/hpc/scripts/syncfileclient | 18 +++++++++++++++++- .../profiles/server/scripts/syncfileclient | 18 +++++++++++++++++- .../profiles/default/scripts/syncfileclient | 18 +++++++++++++++++- .../profiles/default/scripts/syncfileclient | 18 +++++++++++++++++- .../profiles/default/scripts/syncfileclient | 18 +++++++++++++++++- confluent_server/confluent/syncfiles.py | 2 ++ 13 files changed, 176 insertions(+), 10 deletions(-) diff --git a/confluent_osdeploy/el7-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/el7-diskless/profiles/default/scripts/syncfileclient index 8d37d43a..cca0f57d 100644 --- a/confluent_osdeploy/el7-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/el7-diskless/profiles/default/scripts/syncfileclient @@ -1,4 +1,5 @@ #!/usr/bin/python +import time import importlib import tempfile import json @@ -223,6 +224,7 @@ def synchronize(): if status == 202: lastrsp = '' while status != 204: + time.sleep(2) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') diff --git a/confluent_osdeploy/el7/profiles/default/scripts/syncfileclient b/confluent_osdeploy/el7/profiles/default/scripts/syncfileclient index 8d37d43a..02dbcc4d 100644 --- a/confluent_osdeploy/el7/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/el7/profiles/default/scripts/syncfileclient @@ -5,6 +5,7 @@ import json import os import shutil import pwd +import time import grp try: from importlib.machinery import SourceFileLoader @@ -223,6 +224,7 @@ def synchronize(): if status == 202: lastrsp = '' while status != 204: + time.sleep(2) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') diff --git a/confluent_osdeploy/el8-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/el8-diskless/profiles/default/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/el8-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/el8-diskless/profiles/default/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/el8/profiles/default/scripts/syncfileclient b/confluent_osdeploy/el8/profiles/default/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/el8/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/el8/profiles/default/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/el9-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/el9-diskless/profiles/default/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/el9-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/el9-diskless/profiles/default/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/genesis/profiles/default/scripts/syncfileclient b/confluent_osdeploy/genesis/profiles/default/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/genesis/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/genesis/profiles/default/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/suse15-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/suse15-diskless/profiles/default/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/suse15-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/suse15-diskless/profiles/default/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/suse15/profiles/hpc/scripts/syncfileclient b/confluent_osdeploy/suse15/profiles/hpc/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/suse15/profiles/hpc/scripts/syncfileclient +++ b/confluent_osdeploy/suse15/profiles/hpc/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/suse15/profiles/server/scripts/syncfileclient b/confluent_osdeploy/suse15/profiles/server/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/suse15/profiles/server/scripts/syncfileclient +++ b/confluent_osdeploy/suse15/profiles/server/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/ubuntu20.04-diskless/profiles/default/scripts/syncfileclient b/confluent_osdeploy/ubuntu20.04-diskless/profiles/default/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/ubuntu20.04-diskless/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/ubuntu20.04-diskless/profiles/default/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/ubuntu20.04/profiles/default/scripts/syncfileclient b/confluent_osdeploy/ubuntu20.04/profiles/default/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/ubuntu20.04/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/ubuntu20.04/profiles/default/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_osdeploy/ubuntu22.04/profiles/default/scripts/syncfileclient b/confluent_osdeploy/ubuntu22.04/profiles/default/scripts/syncfileclient index f7d4c0b4..088fa9f7 100644 --- a/confluent_osdeploy/ubuntu22.04/profiles/default/scripts/syncfileclient +++ b/confluent_osdeploy/ubuntu22.04/profiles/default/scripts/syncfileclient @@ -1,4 +1,6 @@ #!/usr/bin/python3 +import random +import time import subprocess import importlib import tempfile @@ -227,9 +229,14 @@ def synchronize(): myips.append(addr) data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips}) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data) + if status >= 300: + sys.stderr.write("Error starting syncfiles - {}:\n".format(status)) + sys.stderr.write(repr(rsp)) + return status if status == 202: lastrsp = '' while status != 204: + time.sleep(1+(2*random.random(a))) status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles') if not isinstance(rsp, str): rsp = rsp.decode('utf8') @@ -277,10 +284,19 @@ def synchronize(): os.chmod(fname, int(opts[fname][opt], 8)) if uid != -1 or gid != -1: os.chown(fname, uid, gid) + return status finally: shutil.rmtree(tmpdir) shutil.rmtree(appendoncedir) if __name__ == '__main__': - synchronize() + status = 202 + while status not in (204, 200): + try: + status = synchronize() + except Exception as e: + sys.stderr.write(str(e)) + status = 300 + if status not in (204, 200): + time.sleep((random.random()*3)+2) diff --git a/confluent_server/confluent/syncfiles.py b/confluent_server/confluent/syncfiles.py index 94b74eea..df5574e3 100644 --- a/confluent_server/confluent/syncfiles.py +++ b/confluent_server/confluent/syncfiles.py @@ -289,6 +289,8 @@ syncrunners = {} def start_syncfiles(nodename, cfg, suffixes, principals=[]): peerip = None + if nodename in syncrunners: + return '503 Synchronization already in progress ' if 'myips' in suffixes: targips = suffixes['myips'] del suffixes['myips']