mirror of
https://github.com/xcat2/confluent.git
synced 2025-02-15 18:19:52 +00:00
Advance state of collective in asyncio
Eventlet is nominally removed from collective manager, however the join process still needs to be reworked, and a lot more flows need to be adjusted.
This commit is contained in:
parent
c24da59216
commit
e890276bf6
@ -742,7 +742,7 @@ async def updateattrib(session, updateargs, nodetype, noderange, options, dictas
|
||||
value = os.environ.get(
|
||||
key, os.environ[key.upper()])
|
||||
if (nodetype == "nodegroups"):
|
||||
exitcode = await ession.simple_nodegroups_command(noderange,
|
||||
exitcode = await session.simple_nodegroups_command(noderange,
|
||||
'attributes/all',
|
||||
value, key)
|
||||
else:
|
||||
|
@ -41,7 +41,7 @@ def check_server_proof(invitation, mycert, peercert, proof):
|
||||
def check_client_proof(servername, mycert, peercert, proof):
|
||||
servername = servername.encode('utf-8')
|
||||
if servername not in pending_invites:
|
||||
return False
|
||||
return False, None
|
||||
invitation = pending_invites[servername]
|
||||
role = invitation['role']
|
||||
invitation = invitation['invitation']
|
||||
|
@ -14,6 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import confluent.collective.invites as invites
|
||||
import confluent.config.configmanager as cfm
|
||||
@ -22,12 +23,11 @@ import confluent.log as log
|
||||
import confluent.noderange as noderange
|
||||
import confluent.tlvdata as tlvdata
|
||||
import confluent.util as util
|
||||
import eventlet
|
||||
import eventlet.greenpool as greenpool
|
||||
import eventlet.green.socket as socket
|
||||
import eventlet.green.ssl as ssl
|
||||
import eventlet.green.threading as threading
|
||||
import socket
|
||||
import ssl
|
||||
import confluent.sortutil as sortutil
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
import greenlet
|
||||
import random
|
||||
import time
|
||||
@ -36,6 +36,18 @@ import sys
|
||||
import OpenSSL.crypto as crypto
|
||||
|
||||
|
||||
class PyObject_HEAD(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("ob_refcnt", ctypes.c_ssize_t),
|
||||
("ob_type", ctypes.c_void_p),
|
||||
]
|
||||
|
||||
class PySSLContext(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("ob_base", PyObject_HEAD),
|
||||
("ctx", ctypes.c_void_p),
|
||||
]
|
||||
|
||||
currentleader = None
|
||||
follower = None
|
||||
retrythread = None
|
||||
@ -43,24 +55,34 @@ failovercheck = None
|
||||
initting = True
|
||||
reassimilate = None
|
||||
|
||||
libssl = ctypes.CDLL(ctypes.util.find_library('ssl'))
|
||||
libssl.SSL_CTX_set_cert_verify_callback.argtypes = [
|
||||
ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
|
||||
|
||||
|
||||
@ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p)
|
||||
def verify_stub(store, misc):
|
||||
return 1
|
||||
|
||||
|
||||
class ContextBool(object):
|
||||
def __init__(self):
|
||||
self.active = False
|
||||
self.mylock = threading.RLock()
|
||||
self.mylock = asyncio.Lock()
|
||||
|
||||
def __enter__(self):
|
||||
async def __aenter__(self):
|
||||
self.active = True
|
||||
self.mylock.__enter__()
|
||||
return await self.mylock.__aenter__()
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
self.active = False
|
||||
self.mylock.__exit__(exc_type, exc_val, exc_tb)
|
||||
return await self.mylock.__aexit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
connecting = ContextBool()
|
||||
leader_init = ContextBool()
|
||||
enrolling = ContextBool()
|
||||
|
||||
def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=False):
|
||||
async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=False):
|
||||
global currentleader
|
||||
global follower
|
||||
ocert = cert
|
||||
@ -73,27 +95,28 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa
|
||||
log.log({'info': 'Attempting connection to leader {0}'.format(leader),
|
||||
'subsystem': 'collective'})
|
||||
try:
|
||||
remote = connect_to_collective(cert, leader, remote)
|
||||
remote = await connect_to_collective(cert, leader, remote)
|
||||
except Exception as e:
|
||||
log.log({'error': 'Collective connection attempt to {0} failed: {1}'
|
||||
''.format(leader, str(e)),
|
||||
'subsystem': 'collective'})
|
||||
return False
|
||||
with connecting:
|
||||
async with connecting:
|
||||
with cfm._initlock:
|
||||
banner = tlvdata.recv(remote) # the banner
|
||||
# remote is a socket...
|
||||
banner = await tlvdata.recv(remote) # the banner
|
||||
if not banner:
|
||||
return
|
||||
vers = banner.split()[2]
|
||||
if vers != b'v4':
|
||||
raise Exception('This instance only supports protocol 4, synchronize versions between collective members')
|
||||
tlvdata.recv(remote) # authpassed... 0..
|
||||
await tlvdata.recv(remote) # authpassed... 0..
|
||||
if name is None:
|
||||
name = get_myname()
|
||||
tlvdata.send(remote, {'collective': {'operation': 'connect',
|
||||
await tlvdata.send(remote, {'collective': {'operation': 'connect',
|
||||
'name': name,
|
||||
'txcount': cfm._txcount}})
|
||||
keydata = tlvdata.recv(remote)
|
||||
keydata = await tlvdata.recv(remote)
|
||||
if not keydata:
|
||||
return False
|
||||
if 'error' in keydata:
|
||||
@ -104,7 +127,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa
|
||||
'subsystem': 'collective'})
|
||||
return False
|
||||
if 'waitinline' in keydata:
|
||||
eventlet.sleep(0.3)
|
||||
await asyncio.sleep(0.3)
|
||||
return connect_to_leader(cert, name, leader, None, isretry=True)
|
||||
if 'leader' in keydata:
|
||||
if keydata['leader'] == None:
|
||||
@ -125,7 +148,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa
|
||||
'transaction count, becoming leader'
|
||||
''.format(leader), 'subsystem': 'collective',
|
||||
'subsystem': 'collective'})
|
||||
return become_leader(remote)
|
||||
return await become_leader(remote)
|
||||
return False
|
||||
follower.kill()
|
||||
cfm.stop_following()
|
||||
@ -136,9 +159,9 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa
|
||||
follower = None
|
||||
log.log({'info': 'Following leader {0}'.format(leader),
|
||||
'subsystem': 'collective'})
|
||||
colldata = tlvdata.recv(remote)
|
||||
globaldata = tlvdata.recv(remote)
|
||||
dbi = tlvdata.recv(remote)
|
||||
colldata = await tlvdata.recv(remote)
|
||||
globaldata = await tlvdata.recv(remote)
|
||||
dbi = await tlvdata.recv(remote)
|
||||
dbsize = dbi['dbsize']
|
||||
dbjson = b''
|
||||
while (len(dbjson) < dbsize):
|
||||
@ -172,7 +195,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None, isretry=Fa
|
||||
currentleader = leader
|
||||
#spawn this as a thread...
|
||||
remote.settimeout(90)
|
||||
follower = eventlet.spawn(follow_leader, remote, leader)
|
||||
follower = util.spawn(follow_leader(remote, leader))
|
||||
return True
|
||||
|
||||
|
||||
@ -208,25 +231,48 @@ def follow_leader(remote, leader):
|
||||
cfm.stop_following()
|
||||
currentleader = None
|
||||
if retrythread is None: # start a recovery
|
||||
retrythread = eventlet.spawn_after(
|
||||
retrythread = util.spawn_after(
|
||||
random.random(), start_collective)
|
||||
|
||||
def create_connection(member):
|
||||
async def _create_tls_connection(host, port):
|
||||
cloop = asyncio.get_event_loop()
|
||||
ainfo = await cloop.getaddrinfo(
|
||||
host, port, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM)
|
||||
for res in ainfo:
|
||||
af, socktype, proto, canonname, sa = res
|
||||
remote = socket.socket(af, socktype, proto)
|
||||
remote.setsockopt(
|
||||
socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
remote.settimeout(0)
|
||||
await cloop.sock_connect(remote, sa)
|
||||
break
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
||||
ssl_ctx = PySSLContext.from_address(id(ctx)).ctx
|
||||
libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0)
|
||||
ctx.load_cert_chain('/etc/confluent/srvcert.pem', '/etc/confluent/privkey.pem')
|
||||
sreader = asyncio.StreamReader()
|
||||
sreaderprot = asyncio.StreamReaderProtocol(sreader)
|
||||
tport, _ = await cloop.create_connection(
|
||||
lambda: sreaderprot, sock=remote, ssl=ctx, server_hostname='x')
|
||||
swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop)
|
||||
return (sreader, swriter)
|
||||
|
||||
|
||||
async def create_connection(member):
|
||||
remote = None
|
||||
try:
|
||||
remote = socket.create_connection((member, 13001), 2)
|
||||
remote.settimeout(15)
|
||||
remote = await _create_tls_connection(member, 13001)
|
||||
#remote = socket.create_connection((member, 13001), 2)
|
||||
#remote.settimeout(15)
|
||||
# TLS cert validation is custom and will not pass normal CA vetting
|
||||
# to override completely in the right place requires enormous effort, so just defer until after connect
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem',
|
||||
certfile='/etc/confluent/srvcert.pem')
|
||||
except Exception as e:
|
||||
return member, e
|
||||
return member, remote
|
||||
|
||||
def connect_to_collective(cert, member, remote=None):
|
||||
async def connect_to_collective(cert, member, remote=None):
|
||||
if remote is None:
|
||||
_, remote = create_connection(member)
|
||||
_, remote = await create_connection(member)
|
||||
if isinstance(remote, Exception):
|
||||
raise remote
|
||||
if cert:
|
||||
@ -234,7 +280,8 @@ def connect_to_collective(cert, member, remote=None):
|
||||
else:
|
||||
collent = cfm.get_collective_member_by_address(member)
|
||||
fprint = collent['fingerprint']
|
||||
if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)):
|
||||
cnn = remote[1].transport.get_extra_info('ssl_object')
|
||||
if not util.cert_matches(fprint, cnn.getpeercert(binary_form=True)):
|
||||
# probably Janeway up to something
|
||||
raise Exception("Certificate mismatch in the collective")
|
||||
return remote
|
||||
@ -263,11 +310,8 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
global currentleader
|
||||
global retrythread
|
||||
global initting
|
||||
connection.settimeout(5)
|
||||
operation = request['operation']
|
||||
if cert:
|
||||
cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
|
||||
else:
|
||||
if not cert:
|
||||
if not local:
|
||||
return
|
||||
if operation in ('show', 'delete'):
|
||||
@ -295,7 +339,7 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
await tlvdata.send(connection,
|
||||
{'error': 'Invalid certificate, '
|
||||
'redo invitation process'})
|
||||
connection.close()
|
||||
# connection.close()
|
||||
return
|
||||
await tlvdata.recv(remote) # ignore banner
|
||||
await tlvdata.recv(remote) # ignore authpassed: 0
|
||||
@ -333,7 +377,7 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
try:
|
||||
cfm.check_quorum()
|
||||
except exc.DegradedCollective:
|
||||
tlvdata.send(connection,
|
||||
await tlvdata.send(connection,
|
||||
{'collective':
|
||||
{'error': 'Collective does not have quorum'}})
|
||||
return
|
||||
@ -360,24 +404,46 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
return
|
||||
host = request['server']
|
||||
try:
|
||||
remote = socket.create_connection((host, 13001), 15)
|
||||
cloop = asyncio.get_event_loop()
|
||||
ainfo = await cloop.getaddrinfo(
|
||||
host, 13001, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM)
|
||||
for res in ainfo:
|
||||
af, socktype, proto, canonname, sa = res
|
||||
remote = socket.socket(af, socktype, proto)
|
||||
remote.setsockopt(
|
||||
socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
remote.settimeout(0)
|
||||
await cloop.sock_connect(remote, sa)
|
||||
break
|
||||
#remote = socket.create_connection((host, 13001), 15)
|
||||
# This isn't what it looks like. We do CERT_NONE to disable
|
||||
# openssl verification, but then use the invitation as a
|
||||
# shared secret to validate the certs as part of the join
|
||||
# operation
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
|
||||
keyfile='/etc/confluent/privkey.pem',
|
||||
certfile='/etc/confluent/srvcert.pem')
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
||||
ssl_ctx = PySSLContext.from_address(id(ctx)).ctx
|
||||
libssl.SSL_CTX_set_cert_verify_callback(ssl_ctx, verify_stub, 0)
|
||||
ctx.load_cert_chain('/etc/confluent/srvcert.pem', '/etc/confluent/privkey.pem')
|
||||
sreader = asyncio.StreamReader()
|
||||
sreaderprot = asyncio.StreamReaderProtocol(sreader)
|
||||
tport, _ = await cloop.create_connection(
|
||||
lambda: sreaderprot, sock=remote, ssl=ctx, server_hostname='x')
|
||||
swriter = asyncio.StreamWriter(tport, sreaderprot, sreader, cloop)
|
||||
remote = (sreader, swriter)
|
||||
#remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
|
||||
# keyfile='/etc/confluent/privkey.pem',
|
||||
# certfile='/etc/confluent/srvcert.pem')
|
||||
except Exception:
|
||||
await tlvdata.send(
|
||||
connection,
|
||||
{'collective':
|
||||
{'status': 'Failed to connect to {0}'.format(host)}})
|
||||
connection.close()
|
||||
raise
|
||||
return
|
||||
mycert = util.get_certificate_from_file(
|
||||
'/etc/confluent/srvcert.pem')
|
||||
cert = remote.getpeercert(binary_form=True)
|
||||
cert = tport.get_extra_info('ssl_object').getpeercert(binary_form=True)
|
||||
proof = base64.b64encode(invites.create_client_proof(
|
||||
invitation, mycert, cert))
|
||||
await tlvdata.recv(remote) # ignore banner
|
||||
@ -386,39 +452,37 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
'name': name, 'hmac': proof}})
|
||||
rsp = await tlvdata.recv(remote)
|
||||
if 'error' in rsp:
|
||||
tlvdata.send(connection, {'collective':
|
||||
await tlvdata.send(connection, {'collective':
|
||||
{'status': rsp['error']}})
|
||||
connection.close()
|
||||
return
|
||||
proof = rsp['collective']['approval']
|
||||
proof = base64.b64decode(proof)
|
||||
j = invites.check_server_proof(invitation, mycert, cert, proof)
|
||||
print(repr(j))
|
||||
if not j:
|
||||
remote.close()
|
||||
await tlvdata.send(connection, {'collective':
|
||||
{'status': 'Bad server token'}})
|
||||
connection.close()
|
||||
return
|
||||
tlvdata.send(connection, {'collective': {'status': 'Success'}})
|
||||
connection.close()
|
||||
await tlvdata.send(connection, {'collective': {'status': 'Success'}})
|
||||
# connection.close()
|
||||
currentleader = rsp['collective']['leader']
|
||||
f = open('/etc/confluent/cfg/myname', 'w')
|
||||
f.write(name)
|
||||
f.close()
|
||||
log.log({'info': 'Connecting to collective due to join',
|
||||
'subsystem': 'collective'})
|
||||
eventlet.spawn_n(connect_to_leader, rsp['collective'][
|
||||
'fingerprint'], name)
|
||||
util.spawn(connect_to_leader(rsp['collective'][
|
||||
'fingerprint'], name))
|
||||
if 'enroll' == operation:
|
||||
with enrolling:
|
||||
async with enrolling:
|
||||
cfm.check_quorum()
|
||||
mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem')
|
||||
proof = base64.b64decode(request['hmac'])
|
||||
myrsp, role = invites.check_client_proof(request['name'], mycert,
|
||||
cert, proof)
|
||||
if not myrsp:
|
||||
tlvdata.send(connection, {'error': 'Invalid token'})
|
||||
connection.close()
|
||||
await tlvdata.send(connection, {'error': 'Invalid token'})
|
||||
return
|
||||
if not list(cfm.list_collective()):
|
||||
# First enrollment of a collective, since the collective doesn't
|
||||
@ -431,23 +495,23 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
iam = cfm.get_collective_member(get_myname())
|
||||
if not iam:
|
||||
cfm.add_collective_member(get_myname(),
|
||||
connection.getsockname()[0], myfprint)
|
||||
connection[1].transport.get_extra_info('socket').getsockname()[0], myfprint)
|
||||
cfm.add_collective_member(request['name'],
|
||||
connection.getpeername()[0], fprint, role)
|
||||
myleader = get_leader(connection)
|
||||
connection[1].transport.get_extra_info('socket').getpeername()[0], fprint, role)
|
||||
myleader = await get_leader(connection)
|
||||
ldrfprint = cfm.get_collective_member_by_address(
|
||||
myleader)['fingerprint']
|
||||
tlvdata.send(connection,
|
||||
await tlvdata.send(connection,
|
||||
{'collective': {'approval': myrsp,
|
||||
'fingerprint': ldrfprint,
|
||||
'leader': get_leader(connection)}})
|
||||
'leader': await get_leader(connection)}})
|
||||
havequorum = False
|
||||
while not havequorum:
|
||||
try:
|
||||
cfm.check_quorum()
|
||||
havequorum = True
|
||||
except exc.DegradedCollective:
|
||||
eventlet.sleep(0.1)
|
||||
await asyncio.sleep(0.1)
|
||||
if 'assimilate' == operation:
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
@ -518,44 +582,45 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
connection.close()
|
||||
if not connect_to_leader(None, None, leader=newleader):
|
||||
if retrythread is None:
|
||||
retrythread = eventlet.spawn_after(random.random(),
|
||||
retrythread = util.spawn_after(random.random(),
|
||||
start_collective)
|
||||
if 'getinfo' == operation:
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
if not (droneinfo and util.cert_matches(droneinfo['fingerprint'],
|
||||
cert)):
|
||||
tlvdata.send(connection,
|
||||
await tlvdata.send(connection,
|
||||
{'error': 'Invalid certificate, '
|
||||
'redo invitation process'})
|
||||
connection.close()
|
||||
return
|
||||
collinfo = {}
|
||||
populate_collinfo(collinfo)
|
||||
tlvdata.send(connection, collinfo)
|
||||
await tlvdata.send(connection, collinfo)
|
||||
if 'connect' == operation:
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
if not (droneinfo and util.cert_matches(droneinfo['fingerprint'],
|
||||
cert)):
|
||||
tlvdata.send(connection,
|
||||
await tlvdata.send(connection,
|
||||
{'error': 'Invalid certificate, '
|
||||
'redo invitation process'})
|
||||
connection.close()
|
||||
return
|
||||
myself = connection.getsockname()[0]
|
||||
cnn = connection[1].transport.get_extra_info('socket')
|
||||
myself = cnn.getsockname()[0]
|
||||
if connecting.active or initting:
|
||||
tlvdata.send(connection, {'error': 'Connecting right now',
|
||||
await tlvdata.send(connection, {'error': 'Connecting right now',
|
||||
'backoff': True})
|
||||
connection.close()
|
||||
return
|
||||
if leader_init.active:
|
||||
tlvdata.send(connection, {'error': 'Servicing a connection',
|
||||
await tlvdata.send(connection, {'error': 'Servicing a connection',
|
||||
'waitinline': True})
|
||||
connection.close()
|
||||
return
|
||||
if myself != get_leader(connection):
|
||||
tlvdata.send(
|
||||
if myself != await get_leader(connection):
|
||||
await tlvdata.send(
|
||||
connection,
|
||||
{'error': 'Cannot assimilate, our leader is '
|
||||
'in another castle', 'leader': currentleader})
|
||||
@ -573,22 +638,23 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
if not connect_to_leader(
|
||||
None, None, connection.getpeername()[0]):
|
||||
if retrythread is None:
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
retrythread = util.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
return
|
||||
if retrythread is not None:
|
||||
retrythread.cancel()
|
||||
retrythread = None
|
||||
with leader_init:
|
||||
async with leader_init:
|
||||
cnn = connection[1].get_extra_info('socket')
|
||||
cfm.update_collective_address(request['name'],
|
||||
connection.getpeername()[0])
|
||||
tlvdata.send(connection, cfm._dump_keys(None, False))
|
||||
tlvdata.send(connection, cfm._cfgstore['collective'])
|
||||
tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals())
|
||||
cnn.getpeername()[0])
|
||||
await tlvdata.send(connection, cfm._dump_keys(None, False))
|
||||
await tlvdata.send(connection, cfm._cfgstore['collective'])
|
||||
await tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals())
|
||||
cfgdata = cfm.ConfigManager(None)._dump_to_json()
|
||||
try:
|
||||
tlvdata.send(connection, {'txcount': cfm._txcount,
|
||||
'dbsize': len(cfgdata)})
|
||||
await tlvdata.send(connection, {'txcount': cfm._txcount,
|
||||
'dbsize': len(cfgdata)})
|
||||
connection.sendall(cfgdata)
|
||||
except Exception:
|
||||
try:
|
||||
@ -603,7 +669,7 @@ async def handle_connection(connection, cert, request, local=False):
|
||||
'subsystem': 'collective'})
|
||||
if retrythread is None: # start a recovery if everyone else seems
|
||||
# to have disappeared
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
retrythread = util.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
# ok, we have a connecting member whose certificate checks out
|
||||
# He needs to bootstrap his configuration and subscribe it to updates
|
||||
@ -624,10 +690,10 @@ def populate_collinfo(collinfo):
|
||||
collinfo['nonvoting'].append(member)
|
||||
|
||||
|
||||
def try_assimilate(drone, followcount, remote):
|
||||
async def try_assimilate(drone, followcount, remote):
|
||||
global retrythread
|
||||
try:
|
||||
remote = connect_to_collective(None, drone, remote)
|
||||
remote = await connect_to_collective(None, drone, remote)
|
||||
except socket.error:
|
||||
# Oh well, unable to connect, hopefully the rest will be
|
||||
# in order
|
||||
@ -652,7 +718,7 @@ def try_assimilate(drone, followcount, remote):
|
||||
retire_as_leader(drone)
|
||||
if not connect_to_leader(None, None, leader=remote.getpeername()[0]):
|
||||
if retrythread is None:
|
||||
retrythread = eventlet.spawn_after(random.random(),
|
||||
retrythread = util.spawn_after(random.random(),
|
||||
start_collective)
|
||||
return False
|
||||
if 'leader' in answer:
|
||||
@ -669,8 +735,9 @@ def try_assimilate(drone, followcount, remote):
|
||||
return True
|
||||
|
||||
|
||||
def get_leader(connection):
|
||||
if currentleader is None or connection.getpeername()[0] == currentleader:
|
||||
async def get_leader(connection):
|
||||
cnn = connection[1].transport.get_extra_info('socket')
|
||||
if currentleader is None or cnn.getpeername()[0] == currentleader:
|
||||
# cancel retry if a retry is pending
|
||||
if currentleader is None:
|
||||
msg = 'Becoming leader as no leader known'
|
||||
@ -678,7 +745,7 @@ def get_leader(connection):
|
||||
msg = 'Becoming leader because {0} attempted to connect and it ' \
|
||||
'is current leader'.format(currentleader)
|
||||
log.log({'info': msg, 'subsystem': 'collective'})
|
||||
become_leader(connection)
|
||||
await become_leader(connection)
|
||||
return currentleader
|
||||
|
||||
def retire_as_leader(newleader=None):
|
||||
@ -690,7 +757,7 @@ def retire_as_leader(newleader=None):
|
||||
reassimilate = None
|
||||
currentleader = None
|
||||
|
||||
def become_leader(connection):
|
||||
async def become_leader(connection):
|
||||
global currentleader
|
||||
global follower
|
||||
global retrythread
|
||||
@ -708,26 +775,27 @@ def become_leader(connection):
|
||||
if retrythread is not None:
|
||||
retrythread.cancel()
|
||||
retrythread = None
|
||||
currentleader = connection.getsockname()[0]
|
||||
skipaddr = connection.getpeername()[0]
|
||||
cnn = connection[1].transport.get_extra_info('socket')
|
||||
currentleader = cnn.getsockname()[0]
|
||||
skipaddr = cnn.getpeername()[0]
|
||||
if reassimilate is not None:
|
||||
reassimilate.kill()
|
||||
reassimilate = eventlet.spawn(reassimilate_missing)
|
||||
reassimilate = util.spawn(reassimilate_missing())
|
||||
cfm._ready = True
|
||||
if _assimilate_missing(skipaddr):
|
||||
if await _assimilate_missing(skipaddr):
|
||||
schedule_rebalance()
|
||||
|
||||
|
||||
def reassimilate_missing():
|
||||
eventlet.sleep(30)
|
||||
async def reassimilate_missing():
|
||||
await asyncio.sleep(30)
|
||||
while True:
|
||||
try:
|
||||
_assimilate_missing()
|
||||
await _assimilate_missing()
|
||||
except Exception as e:
|
||||
cfm.logException()
|
||||
eventlet.sleep(30)
|
||||
await asyncio.sleep(30)
|
||||
|
||||
def _assimilate_missing(skipaddr=None):
|
||||
async def _assimilate_missing(skipaddr=None):
|
||||
connecto = []
|
||||
myname = get_myname()
|
||||
skipem = set(cfm.cfgstreams)
|
||||
@ -742,13 +810,13 @@ def _assimilate_missing(skipaddr=None):
|
||||
connecto.append(dronecandidate)
|
||||
if not connecto:
|
||||
return True
|
||||
conpool = greenpool.GreenPool(64)
|
||||
connections = conpool.imap(create_connection, connecto)
|
||||
for ct in connecto:
|
||||
util.spawn(create_connection(ct))
|
||||
for ent in connections:
|
||||
member, remote = ent
|
||||
if isinstance(remote, Exception):
|
||||
continue
|
||||
if not try_assimilate(member, numfollowers, remote):
|
||||
if not await try_assimilate(member, numfollowers, remote):
|
||||
return False
|
||||
return True
|
||||
|
||||
@ -758,9 +826,9 @@ def startup():
|
||||
if len(members) < 2:
|
||||
# Not in collective mode, return
|
||||
return
|
||||
eventlet.spawn_n(start_collective)
|
||||
util.spawn(start_collective())
|
||||
|
||||
def check_managers():
|
||||
async def check_managers():
|
||||
global failovercheck
|
||||
if not follower:
|
||||
try:
|
||||
@ -803,16 +871,16 @@ def check_managers():
|
||||
continue
|
||||
c.set_node_attributes({node: {'collective.manager': {'value': targets[0]}}})
|
||||
availmanagers[targets[0]] += 1
|
||||
_assimilate_missing()
|
||||
await _assimilate_missing()
|
||||
failovercheck = None
|
||||
|
||||
def schedule_rebalance():
|
||||
global failovercheck
|
||||
if not failovercheck:
|
||||
failovercheck = True
|
||||
failovercheck = eventlet.spawn_after(10, check_managers)
|
||||
failovercheck = util.spawn_after(10, check_managers)
|
||||
|
||||
def start_collective():
|
||||
async def start_collective():
|
||||
global follower
|
||||
global retrythread
|
||||
global initting
|
||||
@ -844,8 +912,8 @@ def start_collective():
|
||||
cfm.stop_following(True)
|
||||
ldrcandidate = cfm.get_collective_member(member)['address']
|
||||
connecto.append(ldrcandidate)
|
||||
conpool = greenpool.GreenPool(64)
|
||||
connections = conpool.imap(create_connection, connecto)
|
||||
for ct in connecto:
|
||||
util.spawn(create_connection(ct))
|
||||
for ent in connections:
|
||||
member, remote = ent
|
||||
if isinstance(remote, Exception):
|
||||
@ -853,7 +921,7 @@ def start_collective():
|
||||
if follower is None:
|
||||
log.log({'info': 'Performing startup attempt to {0}'.format(
|
||||
member), 'subsystem': 'collective'})
|
||||
if not connect_to_leader(name=myname, leader=member, remote=remote):
|
||||
if not await connect_to_leader(name=myname, leader=member, remote=remote):
|
||||
remote.close()
|
||||
else:
|
||||
remote.close()
|
||||
@ -861,6 +929,7 @@ def start_collective():
|
||||
pass
|
||||
finally:
|
||||
if retrythread is None and follower is None:
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
#retrythread = asyncio.create_task(start_collective())
|
||||
retrythread = util.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
initting = False
|
||||
|
@ -416,6 +416,7 @@ async def _tlsstartup(cnn):
|
||||
ctx.options |= ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
|
||||
ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
|
||||
ctx.options |= ssl.OP_CIPHER_SERVER_PREFERENCE
|
||||
ctx.verify_mode = ssl.CERT_OPTIONAL
|
||||
ctx.set_ciphers(ciphers)
|
||||
ctx.load_cert_chain('/etc/confluent/srvcert.pem',
|
||||
'/etc/confluent/privkey.pem')
|
||||
|
@ -40,10 +40,13 @@ def mkdirp(path, mode=0o777):
|
||||
|
||||
async def _sleep_and_run(sleeptime, func, args):
|
||||
await asyncio.sleep(sleeptime)
|
||||
func(*args)
|
||||
print(repr(func))
|
||||
await func(*args)
|
||||
|
||||
|
||||
def spawn_after(sleeptime, func, *args):
|
||||
if func is None:
|
||||
raise Exception('tf')
|
||||
return spawn(_sleep_and_run(sleeptime, func, args))
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user