mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-23 18:11:15 +00:00
Be consistent with sync during load of leader cfg
Pass through sync as appropriate. Also changes meant for previous commit
This commit is contained in:
parent
fd7c428d1f
commit
1dad69097b
@ -114,7 +114,7 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
colldata[c]['fingerprint'],
|
||||
sync=False)
|
||||
for globvar in globaldata:
|
||||
cfm.set_global(globvar, globaldata[globvar])
|
||||
cfm.set_global(globvar, globaldata[globvar], False)
|
||||
cfm._txcount = dbi.get('txcount', 0)
|
||||
cfm.ConfigManager(tenant=None)._load_from_json(dbjson,
|
||||
sync=False)
|
||||
|
@ -68,6 +68,7 @@ import operator
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import string
|
||||
import struct
|
||||
import sys
|
||||
@ -471,7 +472,10 @@ def relay_slaved_requests(name, listener):
|
||||
except Exception:
|
||||
pass
|
||||
cfgstreams[name] = listener
|
||||
msg = listener.recv(8)
|
||||
try:
|
||||
msg = listener.recv(8)
|
||||
except socket.error:
|
||||
msg = None
|
||||
while msg:
|
||||
if name not in cfgstreams:
|
||||
raise Exception("Unexpected loss of node in followers: " + name)
|
||||
@ -487,7 +491,10 @@ def relay_slaved_requests(name, listener):
|
||||
globals()[rpc['function']](*rpc['args'])
|
||||
if 'xid' in rpc:
|
||||
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
|
||||
msg = listener.recv(8)
|
||||
try:
|
||||
msg = listener.recv(8)
|
||||
except socket.error:
|
||||
msg = None
|
||||
finally:
|
||||
try:
|
||||
listener.close()
|
||||
@ -572,7 +579,10 @@ def follow_channel(channel):
|
||||
global _txcount
|
||||
stop_leading()
|
||||
stop_following(channel)
|
||||
msg = channel.recv(8)
|
||||
try:
|
||||
msg = channel.recv(8)
|
||||
except socket.error:
|
||||
msg = None
|
||||
while msg:
|
||||
sz = struct.unpack('!Q', msg)[0]
|
||||
if sz != 0:
|
||||
@ -589,7 +599,10 @@ def follow_channel(channel):
|
||||
globals()[rpc['function']](*rpc['args'])
|
||||
if 'xid' in rpc and rpc['xid']:
|
||||
_pendingchangesets[rpc['xid']].send()
|
||||
msg = channel.recv(8)
|
||||
try:
|
||||
msg = channel.recv(8)
|
||||
except socket.error:
|
||||
msg = None
|
||||
# mark the connection as broken
|
||||
stop_following(True)
|
||||
|
||||
@ -2041,9 +2054,9 @@ def _restore_keys(jsond, password, newpassword=None, sync=True):
|
||||
with open(keyfilename, 'r') as keyfile:
|
||||
newpassword = keyfile.read()
|
||||
set_global('master_privacy_key', _format_key(cryptkey,
|
||||
password=newpassword))
|
||||
password=newpassword), sync)
|
||||
set_global('master_integrity_key', _format_key(integritykey,
|
||||
password=newpassword))
|
||||
password=newpassword), sync)
|
||||
_masterkey = cryptkey
|
||||
_masterintegritykey = integritykey
|
||||
if sync:
|
||||
|
Loading…
Reference in New Issue
Block a user