mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-26 11:30:23 +00:00
Address several collective issues
When a stream has been deleted from cfgstreams, contiue exception handling since the desired result. For connections to a manager, institute a 15 second socket level timeout. This avoids an abandoned server conversation from locking a colleective member startup. When scheduling the failover check, first block any redundant attempts to schedule. Wrap the collective startup in an exception handler, so that a retry is more well guaranteed.
This commit is contained in:
parent
3be3d4a588
commit
670fc87e1d
@ -181,7 +181,7 @@ def follow_leader(remote, leader):
|
||||
|
||||
|
||||
def connect_to_collective(cert, member):
|
||||
remote = socket.create_connection((member, 13001))
|
||||
remote = socket.create_connection((member, 13001), 15)
|
||||
# TLS cert validation is custom and will not pass normal CA vetting
|
||||
# to override completely in the right place requires enormous effort, so just defer until after connect
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem',
|
||||
@ -231,7 +231,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
return
|
||||
if follower:
|
||||
linfo = cfm.get_collective_member_by_address(currentleader)
|
||||
remote = socket.create_connection((currentleader, 13001))
|
||||
remote = socket.create_connection((currentleader, 13001), 15)
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
|
||||
keyfile='/etc/confluent/privkey.pem',
|
||||
certfile='/etc/confluent/srvcert.pem')
|
||||
@ -307,7 +307,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
return
|
||||
host = request['server']
|
||||
try:
|
||||
remote = socket.create_connection((host, 13001))
|
||||
remote = socket.create_connection((host, 13001), 15)
|
||||
# This isn't what it looks like. We do CERT_NONE to disable
|
||||
# openssl verification, but then use the invitation as a
|
||||
# shared secret to validate the certs as part of the join
|
||||
@ -483,7 +483,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
if not cfm.relay_slaved_requests(drone, connection):
|
||||
if not retrythread: # start a recovery if everyone else seems
|
||||
# to have disappeared
|
||||
retrythread = eventlet.spawn_after(30 + random.random(),
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
# ok, we have a connecting member whose certificate checks out
|
||||
# He needs to bootstrap his configuration and subscribe it to updates
|
||||
@ -629,39 +629,41 @@ def check_managers():
|
||||
def schedule_rebalance():
|
||||
global failovercheck
|
||||
if not failovercheck:
|
||||
failovercheck = True
|
||||
failovercheck = eventlet.spawn_after(10, check_managers)
|
||||
|
||||
def start_collective():
|
||||
cfm.membership_callback = schedule_rebalance
|
||||
global follower
|
||||
global retrythread
|
||||
if follower:
|
||||
follower.kill()
|
||||
cfm.stop_following()
|
||||
follower = None
|
||||
try:
|
||||
if cfm.cfgstreams:
|
||||
cfm.check_quorum()
|
||||
# Do not start if we have quorum and are leader
|
||||
cfm.membership_callback = schedule_rebalance
|
||||
if follower:
|
||||
follower.kill()
|
||||
cfm.stop_following()
|
||||
follower = None
|
||||
try:
|
||||
if cfm.cfgstreams:
|
||||
cfm.check_quorum()
|
||||
# Do not start if we have quorum and are leader
|
||||
return
|
||||
except exc.DegradedCollective:
|
||||
pass
|
||||
if leader_init.active: # do not start trying to connect if we are
|
||||
# xmitting data to a follower
|
||||
return
|
||||
except exc.DegradedCollective:
|
||||
pass
|
||||
if leader_init.active: # do not start trying to connect if we are
|
||||
# xmitting data to a follower
|
||||
return
|
||||
myname = get_myname()
|
||||
for member in sorted(list(cfm.list_collective())):
|
||||
if member == myname:
|
||||
continue
|
||||
if cfm.cfgleader is None:
|
||||
cfm.stop_following(True)
|
||||
ldrcandidate = cfm.get_collective_member(member)['address']
|
||||
log.log({'info': 'Performing startup attempt to {0}'.format(
|
||||
ldrcandidate), 'subsystem': 'collective'})
|
||||
if connect_to_leader(name=myname, leader=ldrcandidate):
|
||||
break
|
||||
else:
|
||||
retrythread = eventlet.spawn_after(30 + random.random(),
|
||||
start_collective)
|
||||
|
||||
|
||||
myname = get_myname()
|
||||
for member in sorted(list(cfm.list_collective())):
|
||||
if member == myname:
|
||||
continue
|
||||
if cfm.cfgleader is None:
|
||||
cfm.stop_following(True)
|
||||
ldrcandidate = cfm.get_collective_member(member)['address']
|
||||
log.log({'info': 'Performing startup attempt to {0}'.format(
|
||||
ldrcandidate), 'subsystem': 'collective'})
|
||||
if connect_to_leader(name=myname, leader=ldrcandidate):
|
||||
break
|
||||
else:
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
except Exception as e:
|
||||
retrythread = eventlet.spawn_after(random.random(), start_collective)
|
||||
|
@ -409,7 +409,10 @@ def _push_rpc(stream, payload):
|
||||
return True
|
||||
except Exception:
|
||||
logException()
|
||||
del cfgstreams[stream]
|
||||
try:
|
||||
del cfgstreams[stream]
|
||||
except KeyError:
|
||||
pass
|
||||
if membership_callback:
|
||||
membership_callback()
|
||||
stream.close()
|
||||
|
Loading…
Reference in New Issue
Block a user