mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-27 19:37:57 +00:00
Provide collective show on all members
This commit is contained in:
parent
96671ace4e
commit
9bcca6bfad
@ -191,20 +191,31 @@ def handle_connection(connection, cert, request, local=False):
|
||||
{'collective':
|
||||
{'error': 'Collective does not have quorum'}})
|
||||
return
|
||||
collinfo = {}
|
||||
if follower:
|
||||
collinfo['leader'] = cfm.get_collective_member_by_address(
|
||||
currentleader)['name']
|
||||
linfo = cfm.get_collective_member_by_address(currentleader)
|
||||
remote = socket.create_connection((currentleader, 13001))
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
|
||||
keyfile='/etc/confluent/privkey.pem',
|
||||
certfile='/etc/confluent/srvcert.pem')
|
||||
cert = remote.getpeercert(binary_form=True)
|
||||
if not (linfo and util.cert_matches(
|
||||
linfo['fingerprint'],
|
||||
cert)):
|
||||
remote.close()
|
||||
tlvdata.send(connection,
|
||||
{'error': 'Invalid certificate, '
|
||||
'redo invitation process'})
|
||||
connection.close()
|
||||
return
|
||||
tlvdata.recv(remote) # ignore banner
|
||||
tlvdata.recv(remote) # ignore authpassed: 0
|
||||
tlvdata.send(remote,
|
||||
{'collective': {'operation': 'getinfo',
|
||||
'name': get_myname()}})
|
||||
collinfo = tlvdata.recv(remote)
|
||||
else:
|
||||
iam = get_myname()
|
||||
collinfo['leader'] = iam
|
||||
collinfo['active'] = list(cfm.cfgstreams)
|
||||
activemembers = set(cfm.cfgstreams)
|
||||
activemembers.add(iam)
|
||||
collinfo['offline'] = []
|
||||
for member in cfm.list_collective():
|
||||
if member not in activemembers:
|
||||
collinfo['offline'].append(member)
|
||||
collinfo = {}
|
||||
populate_collinfo(collinfo)
|
||||
tlvdata.send(connection, {'collective': collinfo})
|
||||
return
|
||||
if 'invite' == operation:
|
||||
@ -325,6 +336,19 @@ def handle_connection(connection, cert, request, local=False):
|
||||
leader=connection.getpeername()[0])
|
||||
tlvdata.send(connection, {'status': 0})
|
||||
connection.close()
|
||||
if 'getinfo' == operation:
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
if not (droneinfo and util.cert_matches(droneinfo['fingerprint'],
|
||||
cert)):
|
||||
tlvdata.send(connection,
|
||||
{'error': 'Invalid certificate, '
|
||||
'redo invitation process'})
|
||||
connection.close()
|
||||
return
|
||||
collinfo = {}
|
||||
populate_collinfo(collinfo)
|
||||
tlvdata.send(connection, collinfo)
|
||||
if 'connect' == operation:
|
||||
myself = connection.getsockname()[0]
|
||||
if myself != get_leader(connection):
|
||||
@ -376,6 +400,19 @@ def handle_connection(connection, cert, request, local=False):
|
||||
# ok, we have a connecting member whose certificate checks out
|
||||
# He needs to bootstrap his configuration and subscribe it to updates
|
||||
|
||||
|
||||
def populate_collinfo(collinfo):
|
||||
iam = get_myname()
|
||||
collinfo['leader'] = iam
|
||||
collinfo['active'] = list(cfm.cfgstreams)
|
||||
activemembers = set(cfm.cfgstreams)
|
||||
activemembers.add(iam)
|
||||
collinfo['offline'] = []
|
||||
for member in cfm.list_collective():
|
||||
if member not in activemembers:
|
||||
collinfo['offline'].append(member)
|
||||
|
||||
|
||||
def try_assimilate(drone):
|
||||
try:
|
||||
remote = connect_to_collective(None, drone)
|
||||
|
Loading…
x
Reference in New Issue
Block a user