2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-26 11:30:23 +00:00

Rough draft for ongoing syncronization

Putting thoughts down on how xmit will work, will add recv and relay,
do some testing, and then decide how much can be done to apply it neatly
to the various points.
This commit is contained in:
Jarrod Johnson 2018-05-04 15:16:30 -04:00
parent 830e6bb4e4
commit 035f10e7d7
2 changed files with 49 additions and 1 deletions

View File

@ -69,6 +69,7 @@ def connect_to_leader(cert=None, name=None):
cfm.set_global(globvar, globaldata[globvar])
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
cfm.ConfigManager._bg_sync_to_file()
cfm.set_leader_channel(remote)
def handle_connection(connection, cert, request, local=False):

View File

@ -75,6 +75,7 @@ import copy
import cPickle
import errno
import eventlet
import eventlet.event as event
import fnmatch
import json
import operator
@ -82,6 +83,7 @@ import os
import random
import re
import string
import struct
import sys
import threading
import traceback
@ -94,6 +96,8 @@ _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users')
tracelog = None
statelessmode = False
_cfgstore = None
_pendingchangesets = {}
_txcount = 0
_attraliases = {
'bmc': 'hardwaremanagement.manager',
@ -372,7 +376,7 @@ def set_global(globalname, value):
cfgstreams = {}
def register_config_listener(name, listener):
cfgstreams[listener] = name
cfgstreams[name] = listener
def clear_configuration():
global _cfgstore
@ -384,6 +388,10 @@ def clear_configuration():
except OSError as oe:
pass
cfgleader = None
def set_leader_channel(channel):
global cfgleader
cfgleader = channel
def add_collective_member(name, address, fingerprint):
try:
@ -1338,7 +1346,46 @@ class ConfigManager(object):
attribmap[node]['groups'] = []
self.set_node_attributes(attribmap, autocreate=True)
def set_leader_node_attributes(self, attribmap, autocreate):
xid = os.urandom(8)
while xid in _pendingchangesets:
xid = os.urandom(8)
_pendingchangesets[xid] = event.Event()
rpcpayload = cPickle.dumps({'function': 'set_node_attributes',
'args': (attribmap, autocreate),
'xid': xid})
rpclen = len(rpcpayload)
cfgleader.sendall(struct.pack('!Q', rpclen))
cfgleader.sendall(rpcpayload)
_pendingchangesets[xid].wait(0)
return
def _push_rpc(self, stream, payload):
stream.sendall(struct.pack('!Q', len(payload)))
stream.sendall(payload)
def set_follower_node_attributes(self, attribmap, autocreate):
global _txcount
_txcount += 1
if len(cfgstreams) < (len(_cfgstore['collective']) // 2) :
# the leader counts in addition to registered streams
raise Exception("collective does not have quorum")
pushes = eventlet.GreenPool()
payload = cPickle.dumps({'function': '_set_node_attributes',
'args': (attribmap, autocreate),
'txcount': _txcount})
for res in pushes.imap(_push_rpc,
[(payload, s) for s in cfgstreams]):
print(repr(res))
def set_node_attributes(self, attribmap, autocreate=False):
if cfgleader: # currently config slave to another
return self.set_leader_node_attributes(attribmap, autocreate)
if cfgstreams:
self.set_follower_node_attributes(attribmap, autocreate)
self._set_node_attributes(attribmap, autocreate)
def _set_node_attributes(self, attribmap, autocreate, xid=None):
# TODO(jbjohnso): multi mgr support, here if we have peers,
# pickle the arguments and fire them off in eventlet
# flows to peers, all should have the same result