From a78aa6816c51a721d25fa1686b86731896eb3a20 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 4 May 2018 15:16:30 -0400 Subject: [PATCH] Rough draft for ongoing syncronization Putting thoughts down on how xmit will work, will add recv and relay, do some testing, and then decide how much can be done to apply it neatly to the various points. --- .../confluent/collective/manager.py | 1 + .../confluent/config/configmanager.py | 49 ++++++++++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 1c66158a..00a17d2c 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -69,6 +69,7 @@ def connect_to_leader(cert=None, name=None): cfm.set_global(globvar, globaldata[globvar]) cfm.ConfigManager(tenant=None)._load_from_json(dbjson) cfm.ConfigManager._bg_sync_to_file() + cfm.set_leader_channel(remote) def handle_connection(connection, cert, request, local=False): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bd787e97..e2e24a9f 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -75,6 +75,7 @@ import copy import cPickle import errno import eventlet +import eventlet.event as event import fnmatch import json import operator @@ -82,6 +83,7 @@ import os import random import re import string +import struct import sys import threading import traceback @@ -94,6 +96,8 @@ _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None statelessmode = False _cfgstore = None +_pendingchangesets = {} +_txcount = 0 _attraliases = { 'bmc': 'hardwaremanagement.manager', @@ -372,7 +376,7 @@ def set_global(globalname, value): cfgstreams = {} def register_config_listener(name, listener): - cfgstreams[listener] = name + cfgstreams[name] = listener def clear_configuration(): global _cfgstore @@ -384,6 +388,10 @@ def clear_configuration(): except OSError as oe: pass +cfgleader = None +def set_leader_channel(channel): + global cfgleader + cfgleader = channel def add_collective_member(name, address, fingerprint): try: @@ -1338,7 +1346,46 @@ class ConfigManager(object): attribmap[node]['groups'] = [] self.set_node_attributes(attribmap, autocreate=True) + def set_leader_node_attributes(self, attribmap, autocreate): + xid = os.urandom(8) + while xid in _pendingchangesets: + xid = os.urandom(8) + _pendingchangesets[xid] = event.Event() + rpcpayload = cPickle.dumps({'function': 'set_node_attributes', + 'args': (attribmap, autocreate), + 'xid': xid}) + rpclen = len(rpcpayload) + cfgleader.sendall(struct.pack('!Q', rpclen)) + cfgleader.sendall(rpcpayload) + _pendingchangesets[xid].wait(0) + return + + def _push_rpc(self, stream, payload): + stream.sendall(struct.pack('!Q', len(payload))) + stream.sendall(payload) + + def set_follower_node_attributes(self, attribmap, autocreate): + global _txcount + _txcount += 1 + if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : + # the leader counts in addition to registered streams + raise Exception("collective does not have quorum") + pushes = eventlet.GreenPool() + payload = cPickle.dumps({'function': '_set_node_attributes', + 'args': (attribmap, autocreate), + 'txcount': _txcount}) + for res in pushes.imap(_push_rpc, + [(payload, s) for s in cfgstreams]): + print(repr(res)) + def set_node_attributes(self, attribmap, autocreate=False): + if cfgleader: # currently config slave to another + return self.set_leader_node_attributes(attribmap, autocreate) + if cfgstreams: + self.set_follower_node_attributes(attribmap, autocreate) + self._set_node_attributes(attribmap, autocreate) + + def _set_node_attributes(self, attribmap, autocreate, xid=None): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result