2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-27 19:37:57 +00:00

Do not restart on intentional kill

Additionally, add some output to help filter events log
This commit is contained in:
Jarrod Johnson 2018-10-01 09:35:02 -04:00
parent e57cdf9a7b
commit 61e7c90ad1

View File

@ -25,6 +25,7 @@ import eventlet
import eventlet.green.socket as socket
import eventlet.green.ssl as ssl
import eventlet.green.threading as threading
import greenlet
import random
try:
import OpenSSL.crypto as crypto
@ -51,7 +52,6 @@ class ContextBool(object):
connecting = ContextBool()
leader_init = ContextBool()
def connect_to_leader(cert=None, name=None, leader=None):
global currentleader
global cfginitlock
@ -60,12 +60,14 @@ def connect_to_leader(cert=None, name=None, leader=None):
cfginitlock = threading.RLock()
if leader is None:
leader = currentleader
log.log({'info': 'Attempting connection to leader {0}'.format(leader)})
log.log({'info': 'Attempting connection to leader {0}'.format(leader),
'subsystem': 'collective'})
try:
remote = connect_to_collective(cert, leader)
except socket.error as e:
log.log({'error': 'Collective connection attempt to {0} failed: {1}'
''.format(leader, str(e))})
''.format(leader, str(e)),
'subsystem': 'collective'})
return False
with connecting:
with cfginitlock:
@ -83,14 +85,16 @@ def connect_to_leader(cert=None, name=None, leader=None):
if 'backoff' in keydata:
log.log({
'info': 'Collective initialization in progress on '
'{0}, will retry connection'.format(leader)})
'{0}, will retry connection'.format(leader),
'subsystem': 'collective'})
eventlet.spawn_after(random.random(), connect_to_leader,
cert, name, leader)
return True
if 'leader' in keydata:
log.log(
{'info': 'Prospective leader {0} has redirected this '
'member to {1}'.format(leader, keydata['leader'])})
'member to {1}'.format(leader, keydata['leader']),
'subsystem': 'collective'})
ldrc = cfm.get_collective_member_by_address(
keydata['leader'])
if ldrc and ldrc['name'] == name:
@ -108,7 +112,8 @@ def connect_to_leader(cert=None, name=None, leader=None):
follower.kill()
cfm.stop_following()
follower = None
log.log({'info': 'Following leader {0}'.format(leader)})
log.log({'info': 'Following leader {0}'.format(leader),
'subsystem': 'collective'})
colldata = tlvdata.recv(remote)
globaldata = tlvdata.recv(remote)
dbi = tlvdata.recv(remote)
@ -148,11 +153,18 @@ def connect_to_leader(cert=None, name=None, leader=None):
def follow_leader(remote):
global currentleader
cleanexit = False
try:
cfm.follow_channel(remote)
except greenlet.GreenletExit:
cleanexit = True
finally:
if cleanexit:
log.log({'info': 'Previous following cleanly closed',
'subsystem': 'collective'})
return
log.log({'info': 'Current leader has disappeared, restarting '
'collective membership'})
'collective membership', 'subsystem': 'collective'})
# The leader has folded, time to startup again...
cfm.stop_following()
currentleader = None
@ -457,19 +469,22 @@ def try_assimilate(drone):
log.log(
{'error':
'No answer from {0} while trying to assimilate'.format(
drone)})
drone),
'subsystem': 'collective'})
return
if 'txcount' in answer:
log.log({'info': 'Deferring to {0} due to transaction count'.format(
drone)})
drone), 'subsystem': 'collective'})
connect_to_leader(None, None, leader=remote.getpeername()[0])
return
if 'error' in answer:
log.log({
'error': 'Error encountered while attempting to '
'assimilate {0}: {1}'.format(drone, answer['error'])})
'assimilate {0}: {1}'.format(drone, answer['error']),
'subsystem': 'collective'})
return
log.log({'Assimilated {0} into collective'.format(drone)})
log.log({'info': 'Assimilated {0} into collective'.format(drone),
'subsystem': 'collective'})
def get_leader(connection):
@ -485,7 +500,8 @@ def retire_as_leader():
def become_leader(connection):
global currentleader
global follower
log.log({'info': 'Becoming leader of collective'})
log.log({'info': 'Becoming leader of collective',
'subsystem': 'collective'})
if follower:
follower.kill()
follower = None