2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-26 23:32:10 +00:00

Change configuration sync to use msgpack

This removes use of pickle for config sync over network.
This commit is contained in:
Jarrod Johnson 2020-01-27 15:53:29 -05:00
parent 09582d7597
commit 4c8ba92856
3 changed files with 23 additions and 45 deletions

View File

@ -73,20 +73,12 @@ def connect_to_leader(cert=None, name=None, leader=None):
with cfm._initlock:
banner = tlvdata.recv(remote) # the banner
vers = banner.split()[2]
pvers = 0
reqver = 4
if vers == b'v0':
pvers = 2
elif vers == b'v1':
pvers = 4
if sys.version_info[0] < 3:
pvers = 2
reqver = 2
if vers != b'v2':
raise Exception('This instance only supports protocol 2, synchronize versions between collective members')
tlvdata.recv(remote) # authpassed... 0..
if name is None:
name = get_myname()
tlvdata.send(remote, {'collective': {'operation': 'connect',
'protover': reqver,
'name': name,
'txcount': cfm._txcount}})
keydata = tlvdata.recv(remote)
@ -160,15 +152,15 @@ def connect_to_leader(cert=None, name=None, leader=None):
raise
currentleader = leader
#spawn this as a thread...
follower = eventlet.spawn(follow_leader, remote, pvers, leader)
follower = eventlet.spawn(follow_leader, remote, leader)
return True
def follow_leader(remote, proto, leader):
def follow_leader(remote, leader):
global currentleader
cleanexit = False
try:
cfm.follow_channel(remote, proto)
cfm.follow_channel(remote)
except greenlet.GreenletExit:
cleanexit = True
finally:
@ -430,7 +422,6 @@ def handle_connection(connection, cert, request, local=False):
tlvdata.send(connection, collinfo)
if 'connect' == operation:
drone = request['name']
folver = request.get('protover', 2)
droneinfo = cfm.get_collective_member(drone)
if not (droneinfo and util.cert_matches(droneinfo['fingerprint'],
cert)):
@ -479,7 +470,7 @@ def handle_connection(connection, cert, request, local=False):
connection.sendall(cfgdata)
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
# so far unused anyway
if not cfm.relay_slaved_requests(drone, connection, folver):
if not cfm.relay_slaved_requests(drone, connection):
if not retrythread: # start a recovery if everyone else seems
# to have disappeared
retrythread = eventlet.spawn_after(30 + random.random(),

View File

@ -71,6 +71,7 @@ import eventlet.green.select as select
import eventlet.green.threading as gthread
import fnmatch
import json
import msgpack
import operator
import os
import random
@ -101,10 +102,6 @@ _cfgstore = None
_pendingchangesets = {}
_txcount = 0
_hasquorum = True
if sys.version_info[0] >= 3:
lowestver = 4
else:
lowestver = 2
_attraliases = {
'bmc': 'hardwaremanagement.manager',
@ -317,8 +314,8 @@ def exec_on_leader(function, *args):
while xid in _pendingchangesets:
xid = confluent.util.stringify(base64.b64encode(os.urandom(8)))
_pendingchangesets[xid] = event.Event()
rpcpayload = cPickle.dumps({'function': function, 'args': args,
'xid': xid}, protocol=cfgproto)
rpcpayload = msgpack.packb({'function': function, 'args': args,
'xid': xid}, use_bin_type=False)
rpclen = len(rpcpayload)
cfgleader.sendall(struct.pack('!Q', rpclen))
cfgleader.sendall(rpcpayload)
@ -343,8 +340,8 @@ def exec_on_followers_unconditional(fnname, *args):
global _txcount
pushes = eventlet.GreenPool()
_txcount += 1
payload = cPickle.dumps({'function': fnname, 'args': args,
'txcount': _txcount}, protocol=lowestver)
payload = msgpack.packb({'function': fnname, 'args': args,
'txcount': _txcount}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
pass
@ -565,14 +562,9 @@ def set_global(globalname, value, sync=True):
ConfigManager._bg_sync_to_file()
cfgstreams = {}
def relay_slaved_requests(name, listener, vers):
def relay_slaved_requests(name, listener):
global cfgleader
global _hasquorum
global lowestver
if vers > 2 and sys.version_info[0] < 3:
vers = 2
if vers < lowestver:
lowestver = vers
pushes = eventlet.GreenPool()
if name not in _followerlocks:
_followerlocks[name] = gthread.RLock()
@ -593,7 +585,7 @@ def relay_slaved_requests(name, listener, vers):
while _hasquorum != _newquorum:
if _newquorum is not None:
_hasquorum = _newquorum
payload = cPickle.dumps({'quorum': _hasquorum}, protocol=lowestver)
payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc,
[(cfgstreams[s], payload) for s in cfgstreams]):
@ -615,15 +607,15 @@ def relay_slaved_requests(name, listener, vers):
if not nrpc:
raise Exception('Truncated client error')
rpc += nrpc
rpc = cPickle.loads(rpc)
rpc = msgpack.unpackb(rpc)
exc = None
try:
globals()[rpc['function']](*rpc['args'])
except Exception as e:
exc = e
if 'xid' in rpc:
res = _push_rpc(listener, cPickle.dumps({'xid': rpc['xid'],
'exc': exc}, protocol=vers))
res = _push_rpc(listener, msgpack.packb({'xid': rpc['xid'],
'exc': exc}, use_bin_type=False))
if not res:
break
try:
@ -642,7 +634,7 @@ def relay_slaved_requests(name, listener, vers):
if cfgstreams:
_hasquorum = len(cfgstreams) >= (
len(_cfgstore['collective']) // 2)
payload = cPickle.dumps({'quorum': _hasquorum}, protocol=lowestver)
payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc,
[(cfgstreams[s], payload) for s in cfgstreams]):
@ -684,19 +676,15 @@ class StreamHandler(object):
self.sock = None
def stop_following(replacement=None, proto=2):
def stop_following(replacement=None):
with _leaderlock:
global cfgleader
global cfgproto
if cfgleader and not isinstance(cfgleader, bool):
try:
cfgleader.close()
except Exception:
pass
cfgleader = replacement
if proto > 2 and sys.version_info[0] < 3:
proto = 2
cfgproto = proto
def stop_leading():
for stream in list(cfgstreams):
@ -754,15 +742,14 @@ def commit_clear():
ConfigManager._bg_sync_to_file()
cfgleader = None
cfgproto = 2
def follow_channel(channel, proto=2):
def follow_channel(channel):
global _txcount
global _hasquorum
try:
stop_leading()
stop_following(channel, proto)
stop_following(channel)
lh = StreamHandler(channel)
msg = lh.get_next_msg()
while msg:
@ -774,7 +761,7 @@ def follow_channel(channel, proto=2):
if not nrpc:
raise Exception('Truncated message error')
rpc += nrpc
rpc = cPickle.loads(rpc)
rpc = msgpack.unpackb(rpc)
if 'txcount' in rpc:
_txcount = rpc['txcount']
if 'function' in rpc:

View File

@ -123,8 +123,8 @@ def sessionhdl(connection, authname, skipauth=False, cert=None):
if authdata:
cfm = authdata[1]
authenticated = True
# version 0 == original, version 1 == pickle3 allowed
send_data(connection, "Confluent -- v{0} --".format(sys.version_info[0] - 2))
# version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed
send_data(connection, "Confluent -- v2 --")
while not authenticated: # prompt for name and passphrase
send_data(connection, {'authpassed': 0})
response = tlvdata.recv(connection)