2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 17:43:14 +00:00

Implement attempt to assimilate

This commit is contained in:
Jarrod Johnson 2018-06-25 17:30:29 -04:00
parent f6342dd31f
commit 4de1fea7aa

View File

@ -37,21 +37,9 @@ def connect_to_leader(cert=None, name=None, leader=None):
if leader is None:
leader = currentleader
try:
remote = socket.create_connection((leader, 13001))
remote = connect_to_collective(cert, leader)
except socket.error:
return
# TLS cert validation is custom and will not pass normal CA vetting
# to override completely in the right place requires enormous effort, so just defer until after connect
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem',
certfile='/etc/confluent/srvcert.pem')
if cert:
fprint = util.get_fingerprint(cert)
else:
collent = cfm.get_collective_member_by_address(leader)
fprint = collent['fingerprint']
if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)):
# probably Janeway up to something
raise Exception("Certificate mismatch in the collective")
tlvdata.recv(remote) # the banner
tlvdata.recv(remote) # authpassed... 0..
if name is None:
@ -92,6 +80,24 @@ def connect_to_leader(cert=None, name=None, leader=None):
# The leader has folded, time to startup again...
eventlet.spawn_n(start_collective)
def connect_to_collective(cert, member):
remote = socket.create_connection((member, 13001))
# TLS cert validation is custom and will not pass normal CA vetting
# to override completely in the right place requires enormous effort, so just defer until after connect
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem',
certfile='/etc/confluent/srvcert.pem')
if cert:
fprint = util.get_fingerprint(cert)
else:
collent = cfm.get_collective_member_by_address(member)
fprint = collent['fingerprint']
if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)):
# probably Janeway up to something
raise Exception("Certificate mismatch in the collective")
return remote
def get_myname():
try:
with open('/etc/confluent/cfg/myname', 'r') as f:
@ -226,7 +232,11 @@ def handle_connection(connection, cert, request, local=False):
# He needs to bootstrap his configuration and subscribe it to updates
def try_assimilate(drone):
pass
remote = connect_to_collective(None, drone)
tlvdata.send(remote, {'operation': 'assimilate', 'name': get_myname(), 'txcount': cfm._txcount})
answer = tlvdata.recv(remote)
if 'error' in answer:
connect_to_leader(None, None, leader=remote.getpeername()[0])
def get_leader(connection):
if currentleader is None or connection.getpeername()[0] == currentleader: