From 1dad69097b5bf8291dba730e044d1335edf4437b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 13 Jul 2018 21:52:17 -0400 Subject: [PATCH] Be consistent with sync during load of leader cfg Pass through sync as appropriate. Also changes meant for previous commit --- .../confluent/collective/manager.py | 2 +- .../confluent/config/configmanager.py | 25 ++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 7612e4e2..f3205186 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -114,7 +114,7 @@ def connect_to_leader(cert=None, name=None, leader=None): colldata[c]['fingerprint'], sync=False) for globvar in globaldata: - cfm.set_global(globvar, globaldata[globvar]) + cfm.set_global(globvar, globaldata[globvar], False) cfm._txcount = dbi.get('txcount', 0) cfm.ConfigManager(tenant=None)._load_from_json(dbjson, sync=False) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 3d069f1e..c94db976 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -68,6 +68,7 @@ import operator import os import random import re +import socket import string import struct import sys @@ -471,7 +472,10 @@ def relay_slaved_requests(name, listener): except Exception: pass cfgstreams[name] = listener - msg = listener.recv(8) + try: + msg = listener.recv(8) + except socket.error: + msg = None while msg: if name not in cfgstreams: raise Exception("Unexpected loss of node in followers: " + name) @@ -487,7 +491,10 @@ def relay_slaved_requests(name, listener): globals()[rpc['function']](*rpc['args']) if 'xid' in rpc: _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) - msg = listener.recv(8) + try: + msg = listener.recv(8) + except socket.error: + msg = None finally: try: listener.close() @@ -572,7 +579,10 @@ def follow_channel(channel): global _txcount stop_leading() stop_following(channel) - msg = channel.recv(8) + try: + msg = channel.recv(8) + except socket.error: + msg = None while msg: sz = struct.unpack('!Q', msg)[0] if sz != 0: @@ -589,7 +599,10 @@ def follow_channel(channel): globals()[rpc['function']](*rpc['args']) if 'xid' in rpc and rpc['xid']: _pendingchangesets[rpc['xid']].send() - msg = channel.recv(8) + try: + msg = channel.recv(8) + except socket.error: + msg = None # mark the connection as broken stop_following(True) @@ -2041,9 +2054,9 @@ def _restore_keys(jsond, password, newpassword=None, sync=True): with open(keyfilename, 'r') as keyfile: newpassword = keyfile.read() set_global('master_privacy_key', _format_key(cryptkey, - password=newpassword)) + password=newpassword), sync) set_global('master_integrity_key', _format_key(integritykey, - password=newpassword)) + password=newpassword), sync) _masterkey = cryptkey _masterintegritykey = integritykey if sync: