mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-22 17:43:14 +00:00
Implement a confluent instance uuid
This allows clients to indicate after install whether they belong to the collective or not, allowing ascertaining new deployment server.
This commit is contained in:
parent
6e7b6188dd
commit
fd8dd03587
@ -129,6 +129,8 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
|
||||
colldata = tlvdata.recv(remote)
|
||||
# the protocol transmits global data, but for now we ignore it
|
||||
globaldata = tlvdata.recv(remote)
|
||||
if 'confluent_uuid' in globaldata:
|
||||
cfm.set_global('confluent_uuid', globaldata['confluent_uuid'])
|
||||
dbi = tlvdata.recv(remote)
|
||||
dbsize = dbi['dbsize']
|
||||
dbjson = b''
|
||||
@ -550,7 +552,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
connection.getpeername()[0])
|
||||
tlvdata.send(connection, cfm._dump_keys(None, False))
|
||||
tlvdata.send(connection, cfm._cfgstore['collective'])
|
||||
tlvdata.send(connection, {}) # cfm.get_globals())
|
||||
tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals())
|
||||
cfgdata = cfm.ConfigManager(None)._dump_to_json()
|
||||
tlvdata.send(connection, {'txcount': cfm._txcount,
|
||||
'dbsize': len(cfgdata)})
|
||||
|
@ -145,6 +145,7 @@ def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
|
||||
net4.bind(('', 1900))
|
||||
net6.bind(('', 1900))
|
||||
peerbymacaddress = {}
|
||||
myuuid = cfm.get_global('confluent_uuid')
|
||||
while True:
|
||||
try:
|
||||
newmacs = set([])
|
||||
@ -180,6 +181,7 @@ def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
|
||||
headline = headline.partition(':')
|
||||
if len(headline) < 3:
|
||||
continue
|
||||
forcereply = False
|
||||
if headline[0] == 'ST' and headline[-1].startswith(' urn:xcat.org:service:confluent:'):
|
||||
try:
|
||||
cfm.check_quorum()
|
||||
@ -187,22 +189,28 @@ def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
|
||||
continue
|
||||
for query in headline[-1].split('/'):
|
||||
node = None
|
||||
if query.startswith('uuid='):
|
||||
if query.startswith('confluentuuid='):
|
||||
curruuid = query.split('=', 1)[1].lower()
|
||||
if curruuid != myuuid:
|
||||
break
|
||||
forcereply = True
|
||||
elif query.startswith('uuid='):
|
||||
curruuid = query.split('=', 1)[1].lower()
|
||||
node = uuidlookup(curruuid)
|
||||
elif query.startswith('mac='):
|
||||
currmac = query.split('=', 1)[1].lower()
|
||||
node = uuidlookup(currmac)
|
||||
if node:
|
||||
# Do not bother replying to a node that
|
||||
# we have no deployment activity
|
||||
# planned for
|
||||
cfg = cfm.ConfigManager(None)
|
||||
cfd = cfg.get_node_attributes(
|
||||
node, ['deployment.pendingprofile', 'collective.managercandidates'])
|
||||
if not cfd.get(node, {}).get(
|
||||
'deployment.pendingprofile', {}).get('value', None):
|
||||
break
|
||||
if not forcereply:
|
||||
# Do not bother replying to a node that
|
||||
# we have no deployment activity
|
||||
# planned for
|
||||
cfg = cfm.ConfigManager(None)
|
||||
cfd = cfg.get_node_attributes(
|
||||
node, ['deployment.pendingprofile', 'collective.managercandidates'])
|
||||
if not cfd.get(node, {}).get(
|
||||
'deployment.pendingprofile', {}).get('value', None):
|
||||
break
|
||||
candmgrs = cfd.get(node, {}).get('collective.managercandidates', {}).get('value', None)
|
||||
if candmgrs:
|
||||
candmgrs = noderange.NodeRange(candmgrs, cfg).nodes
|
||||
@ -225,6 +233,7 @@ def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
|
||||
if not isinstance(reply, bytes):
|
||||
reply = reply.encode('utf8')
|
||||
s.sendto(reply, peer)
|
||||
break
|
||||
r, _, _ = select.select((net4, net6), (), (), 0.2)
|
||||
if deferrednotifies:
|
||||
eventlet.sleep(2.2)
|
||||
|
@ -64,6 +64,7 @@ import signal
|
||||
import socket
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
|
||||
def _daemonize():
|
||||
@ -267,6 +268,10 @@ def run(args):
|
||||
signal.signal(signal.SIGINT, terminate)
|
||||
signal.signal(signal.SIGTERM, terminate)
|
||||
atexit.register(doexit)
|
||||
confluentuuid = configmanager.get_global('confluent_uuid')
|
||||
if not confluentuuid:
|
||||
confluentuuid = str(uuid.uuid4())
|
||||
configmanager.set_global('confluent_uuid', confluentuuid)
|
||||
if dbgif:
|
||||
oumask = os.umask(0o077)
|
||||
try:
|
||||
|
@ -146,6 +146,7 @@ def handle_request(env, start_response):
|
||||
ncfg['profile'] = profile
|
||||
protocol = deployinfo.get('deployment.useinsecureprotocols', {}).get(
|
||||
'value', 'never')
|
||||
ncfg['confluent_uuid'] = configmanager.get_global('confluent_uuid')
|
||||
ncfg['textconsole'] = bool(deployinfo.get(
|
||||
'console.method', {}).get('value', None))
|
||||
if protocol == 'always':
|
||||
|
Loading…
Reference in New Issue
Block a user