2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-23 01:53:28 +00:00

Check and try to start collective on startup

Not yet good enough for a leader to rejoin, but enough for a follower
to rejoin automatically.
This commit is contained in:
Jarrod Johnson 2018-05-10 16:19:46 -04:00
parent 1c930eba9d
commit d4babbffa4
3 changed files with 30 additions and 18 deletions

View File

@ -32,8 +32,10 @@ except ImportError:
currentleader = None
def connect_to_leader(cert=None, name=None):
remote = socket.create_connection((currentleader, 13001))
def connect_to_leader(cert=None, name=None, leader=None):
if leader is None:
leader = currentleader
remote = socket.create_connection((leader, 13001))
# TLS cert validation is custom and will not pass normal CA vetting
# to override completely in the right place requires enormous effort, so just defer until after connect
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem',
@ -41,7 +43,7 @@ def connect_to_leader(cert=None, name=None):
if cert:
fprint = util.get_fingerprint(cert)
else:
collent = cfm.get_collective_member_by_address(currentleader)
collent = cfm.get_collective_member_by_address(leader)
fprint = collent['fingerprint']
if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)):
# probably Janeway up to something
@ -51,6 +53,10 @@ def connect_to_leader(cert=None, name=None):
tlvdata.send(remote, {'collective': {'operation': 'connect',
'name': name}})
keydata = tlvdata.recv(remote)
if 'error' in keydata:
if 'leader' in keydata:
return connect_to_leader(name=name, leader=keydata['leader'])
raise Exception(keydata['error'])
colldata = tlvdata.recv(remote)
globaldata = tlvdata.recv(remote)
dbsize = tlvdata.recv(remote)['dbsize']
@ -168,3 +174,19 @@ def get_leader(connection):
if currentleader is None:
currentleader = connection.getsockname()[0]
return currentleader
def startup():
members = list(cfm.list_collective())
if len(members) < 2:
# Not in collective mode, return
return
eventlet.spawn_n(start_collective)
def start_collective():
myname = socket.gethostname()
for member in members:
if member == myname:
continue
ldrcandidate = cfm.get_collective_member(member)['address']
connect_to_leader(name=myname, leader=ldrcandidate)

View File

@ -27,21 +27,6 @@
# encrypted fields do not support expressions, either as a source or
# destination
#TODO: clustered mode
# In clustered case, only one instance is the 'master'. If some 'def set'
# is requested on a slave, it creates a transaction id and an event, firing it
# to master. It then waits on the event. When the master reflects the data
# back and that reflection data goes into memory, the wait will be satisfied
# this means that set on a slave will be much longer.
# the assumption is that only the calls to 'def set' need be pushed to/from
# master and all the implicit activity that ensues will pan out since
# the master is ensuring a strict ordering of transactions
# for missed transactions, transaction log will be used to track transactions
# transaction log can have a constrained size if we want, in which case full
# replication will trigger.
# uuid.uuid4() will be used for transaction ids
# Note on the cryptography. Default behavior is mostly just to pave the
# way to meaningful security. Root all potentially sensitive data in
# one key. That key is in plain sight, so not meaningfully protected
@ -460,6 +445,9 @@ def add_collective_member(name, address, fingerprint):
_cfgstore['collectivedirty'].add(name)
ConfigManager._bg_sync_to_file()
def list_collective():
return iter(_cfgstore['collective'])
def get_collective_member(name):
return _cfgstore['collective'][name]

View File

@ -33,6 +33,7 @@ import confluent.consoleserver as consoleserver
import confluent.core as confluentcore
import confluent.httpapi as httpapi
import confluent.log as log
import confluent.collective.manager as collective
try:
import confluent.sockapi as sockapi
except ImportError:
@ -228,6 +229,7 @@ def run():
_updatepidfile()
signal.signal(signal.SIGINT, terminate)
signal.signal(signal.SIGTERM, terminate)
collective.startup()
if dbgif:
oumask = os.umask(0077)
try: