diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 1c66158a..00a17d2c 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -69,6 +69,7 @@ def connect_to_leader(cert=None, name=None): cfm.set_global(globvar, globaldata[globvar]) cfm.ConfigManager(tenant=None)._load_from_json(dbjson) cfm.ConfigManager._bg_sync_to_file() + cfm.set_leader_channel(remote) def handle_connection(connection, cert, request, local=False): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bd787e97..e2e24a9f 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -75,6 +75,7 @@ import copy import cPickle import errno import eventlet +import eventlet.event as event import fnmatch import json import operator @@ -82,6 +83,7 @@ import os import random import re import string +import struct import sys import threading import traceback @@ -94,6 +96,8 @@ _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None statelessmode = False _cfgstore = None +_pendingchangesets = {} +_txcount = 0 _attraliases = { 'bmc': 'hardwaremanagement.manager', @@ -372,7 +376,7 @@ def set_global(globalname, value): cfgstreams = {} def register_config_listener(name, listener): - cfgstreams[listener] = name + cfgstreams[name] = listener def clear_configuration(): global _cfgstore @@ -384,6 +388,10 @@ def clear_configuration(): except OSError as oe: pass +cfgleader = None +def set_leader_channel(channel): + global cfgleader + cfgleader = channel def add_collective_member(name, address, fingerprint): try: @@ -1338,7 +1346,46 @@ class ConfigManager(object): attribmap[node]['groups'] = [] self.set_node_attributes(attribmap, autocreate=True) + def set_leader_node_attributes(self, attribmap, autocreate): + xid = os.urandom(8) + while xid in _pendingchangesets: + xid = os.urandom(8) + _pendingchangesets[xid] = event.Event() + rpcpayload = cPickle.dumps({'function': 'set_node_attributes', + 'args': (attribmap, autocreate), + 'xid': xid}) + rpclen = len(rpcpayload) + cfgleader.sendall(struct.pack('!Q', rpclen)) + cfgleader.sendall(rpcpayload) + _pendingchangesets[xid].wait(0) + return + + def _push_rpc(self, stream, payload): + stream.sendall(struct.pack('!Q', len(payload))) + stream.sendall(payload) + + def set_follower_node_attributes(self, attribmap, autocreate): + global _txcount + _txcount += 1 + if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : + # the leader counts in addition to registered streams + raise Exception("collective does not have quorum") + pushes = eventlet.GreenPool() + payload = cPickle.dumps({'function': '_set_node_attributes', + 'args': (attribmap, autocreate), + 'txcount': _txcount}) + for res in pushes.imap(_push_rpc, + [(payload, s) for s in cfgstreams]): + print(repr(res)) + def set_node_attributes(self, attribmap, autocreate=False): + if cfgleader: # currently config slave to another + return self.set_leader_node_attributes(attribmap, autocreate) + if cfgstreams: + self.set_follower_node_attributes(attribmap, autocreate) + self._set_node_attributes(attribmap, autocreate) + + def _set_node_attributes(self, attribmap, autocreate, xid=None): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result