From 4c8ba92856857a00d7c4b070acfc473b30cf0ecc Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 27 Jan 2020 15:53:29 -0500 Subject: [PATCH] Change configuration sync to use msgpack This removes use of pickle for config sync over network. --- .../confluent/collective/manager.py | 21 +++------ .../confluent/config/configmanager.py | 43 +++++++------------ confluent_server/confluent/sockapi.py | 4 +- 3 files changed, 23 insertions(+), 45 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index efde39ab..7a032203 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -73,20 +73,12 @@ def connect_to_leader(cert=None, name=None, leader=None): with cfm._initlock: banner = tlvdata.recv(remote) # the banner vers = banner.split()[2] - pvers = 0 - reqver = 4 - if vers == b'v0': - pvers = 2 - elif vers == b'v1': - pvers = 4 - if sys.version_info[0] < 3: - pvers = 2 - reqver = 2 + if vers != b'v2': + raise Exception('This instance only supports protocol 2, synchronize versions between collective members') tlvdata.recv(remote) # authpassed... 0.. if name is None: name = get_myname() tlvdata.send(remote, {'collective': {'operation': 'connect', - 'protover': reqver, 'name': name, 'txcount': cfm._txcount}}) keydata = tlvdata.recv(remote) @@ -160,15 +152,15 @@ def connect_to_leader(cert=None, name=None, leader=None): raise currentleader = leader #spawn this as a thread... - follower = eventlet.spawn(follow_leader, remote, pvers, leader) + follower = eventlet.spawn(follow_leader, remote, leader) return True -def follow_leader(remote, proto, leader): +def follow_leader(remote, leader): global currentleader cleanexit = False try: - cfm.follow_channel(remote, proto) + cfm.follow_channel(remote) except greenlet.GreenletExit: cleanexit = True finally: @@ -430,7 +422,6 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, collinfo) if 'connect' == operation: drone = request['name'] - folver = request.get('protover', 2) droneinfo = cfm.get_collective_member(drone) if not (droneinfo and util.cert_matches(droneinfo['fingerprint'], cert)): @@ -479,7 +470,7 @@ def handle_connection(connection, cert, request, local=False): connection.sendall(cfgdata) #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway - if not cfm.relay_slaved_requests(drone, connection, folver): + if not cfm.relay_slaved_requests(drone, connection): if not retrythread: # start a recovery if everyone else seems # to have disappeared retrythread = eventlet.spawn_after(30 + random.random(), diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 77fd730e..084ddd50 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -71,6 +71,7 @@ import eventlet.green.select as select import eventlet.green.threading as gthread import fnmatch import json +import msgpack import operator import os import random @@ -101,10 +102,6 @@ _cfgstore = None _pendingchangesets = {} _txcount = 0 _hasquorum = True -if sys.version_info[0] >= 3: - lowestver = 4 -else: - lowestver = 2 _attraliases = { 'bmc': 'hardwaremanagement.manager', @@ -317,8 +314,8 @@ def exec_on_leader(function, *args): while xid in _pendingchangesets: xid = confluent.util.stringify(base64.b64encode(os.urandom(8))) _pendingchangesets[xid] = event.Event() - rpcpayload = cPickle.dumps({'function': function, 'args': args, - 'xid': xid}, protocol=cfgproto) + rpcpayload = msgpack.packb({'function': function, 'args': args, + 'xid': xid}, use_bin_type=False) rpclen = len(rpcpayload) cfgleader.sendall(struct.pack('!Q', rpclen)) cfgleader.sendall(rpcpayload) @@ -343,8 +340,8 @@ def exec_on_followers_unconditional(fnname, *args): global _txcount pushes = eventlet.GreenPool() _txcount += 1 - payload = cPickle.dumps({'function': fnname, 'args': args, - 'txcount': _txcount}, protocol=lowestver) + payload = msgpack.packb({'function': fnname, 'args': args, + 'txcount': _txcount}, use_bin_type=False) for _ in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): pass @@ -565,14 +562,9 @@ def set_global(globalname, value, sync=True): ConfigManager._bg_sync_to_file() cfgstreams = {} -def relay_slaved_requests(name, listener, vers): +def relay_slaved_requests(name, listener): global cfgleader global _hasquorum - global lowestver - if vers > 2 and sys.version_info[0] < 3: - vers = 2 - if vers < lowestver: - lowestver = vers pushes = eventlet.GreenPool() if name not in _followerlocks: _followerlocks[name] = gthread.RLock() @@ -593,7 +585,7 @@ def relay_slaved_requests(name, listener, vers): while _hasquorum != _newquorum: if _newquorum is not None: _hasquorum = _newquorum - payload = cPickle.dumps({'quorum': _hasquorum}, protocol=lowestver) + payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) for _ in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): @@ -615,15 +607,15 @@ def relay_slaved_requests(name, listener, vers): if not nrpc: raise Exception('Truncated client error') rpc += nrpc - rpc = cPickle.loads(rpc) + rpc = msgpack.unpackb(rpc) exc = None try: globals()[rpc['function']](*rpc['args']) except Exception as e: exc = e if 'xid' in rpc: - res = _push_rpc(listener, cPickle.dumps({'xid': rpc['xid'], - 'exc': exc}, protocol=vers)) + res = _push_rpc(listener, msgpack.packb({'xid': rpc['xid'], + 'exc': exc}, use_bin_type=False)) if not res: break try: @@ -642,7 +634,7 @@ def relay_slaved_requests(name, listener, vers): if cfgstreams: _hasquorum = len(cfgstreams) >= ( len(_cfgstore['collective']) // 2) - payload = cPickle.dumps({'quorum': _hasquorum}, protocol=lowestver) + payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) for _ in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): @@ -684,19 +676,15 @@ class StreamHandler(object): self.sock = None -def stop_following(replacement=None, proto=2): +def stop_following(replacement=None): with _leaderlock: global cfgleader - global cfgproto if cfgleader and not isinstance(cfgleader, bool): try: cfgleader.close() except Exception: pass cfgleader = replacement - if proto > 2 and sys.version_info[0] < 3: - proto = 2 - cfgproto = proto def stop_leading(): for stream in list(cfgstreams): @@ -754,15 +742,14 @@ def commit_clear(): ConfigManager._bg_sync_to_file() cfgleader = None -cfgproto = 2 -def follow_channel(channel, proto=2): +def follow_channel(channel): global _txcount global _hasquorum try: stop_leading() - stop_following(channel, proto) + stop_following(channel) lh = StreamHandler(channel) msg = lh.get_next_msg() while msg: @@ -774,7 +761,7 @@ def follow_channel(channel, proto=2): if not nrpc: raise Exception('Truncated message error') rpc += nrpc - rpc = cPickle.loads(rpc) + rpc = msgpack.unpackb(rpc) if 'txcount' in rpc: _txcount = rpc['txcount'] if 'function' in rpc: diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index e6a659e9..8ea2ab15 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -123,8 +123,8 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): if authdata: cfm = authdata[1] authenticated = True - # version 0 == original, version 1 == pickle3 allowed - send_data(connection, "Confluent -- v{0} --".format(sys.version_info[0] - 2)) + # version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed + send_data(connection, "Confluent -- v2 --") while not authenticated: # prompt for name and passphrase send_data(connection, {'authpassed': 0}) response = tlvdata.recv(connection)