mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-16 04:39:16 +00:00
Do not restart on intentional kill
Additionally, add some output to help filter events log
This commit is contained in:
parent
e634817b34
commit
51194b275b
@ -25,6 +25,7 @@ import eventlet
|
||||
import eventlet.green.socket as socket
|
||||
import eventlet.green.ssl as ssl
|
||||
import eventlet.green.threading as threading
|
||||
import greenlet
|
||||
import random
|
||||
try:
|
||||
import OpenSSL.crypto as crypto
|
||||
@ -51,7 +52,6 @@ class ContextBool(object):
|
||||
connecting = ContextBool()
|
||||
leader_init = ContextBool()
|
||||
|
||||
|
||||
def connect_to_leader(cert=None, name=None, leader=None):
|
||||
global currentleader
|
||||
global cfginitlock
|
||||
@ -60,12 +60,14 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
cfginitlock = threading.RLock()
|
||||
if leader is None:
|
||||
leader = currentleader
|
||||
log.log({'info': 'Attempting connection to leader {0}'.format(leader)})
|
||||
log.log({'info': 'Attempting connection to leader {0}'.format(leader),
|
||||
'subsystem': 'collective'})
|
||||
try:
|
||||
remote = connect_to_collective(cert, leader)
|
||||
except socket.error as e:
|
||||
log.log({'error': 'Collective connection attempt to {0} failed: {1}'
|
||||
''.format(leader, str(e))})
|
||||
''.format(leader, str(e)),
|
||||
'subsystem': 'collective'})
|
||||
return False
|
||||
with connecting:
|
||||
with cfginitlock:
|
||||
@ -83,14 +85,16 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
if 'backoff' in keydata:
|
||||
log.log({
|
||||
'info': 'Collective initialization in progress on '
|
||||
'{0}, will retry connection'.format(leader)})
|
||||
'{0}, will retry connection'.format(leader),
|
||||
'subsystem': 'collective'})
|
||||
eventlet.spawn_after(random.random(), connect_to_leader,
|
||||
cert, name, leader)
|
||||
return True
|
||||
if 'leader' in keydata:
|
||||
log.log(
|
||||
{'info': 'Prospective leader {0} has redirected this '
|
||||
'member to {1}'.format(leader, keydata['leader'])})
|
||||
'member to {1}'.format(leader, keydata['leader']),
|
||||
'subsystem': 'collective'})
|
||||
ldrc = cfm.get_collective_member_by_address(
|
||||
keydata['leader'])
|
||||
if ldrc and ldrc['name'] == name:
|
||||
@ -108,7 +112,8 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
follower.kill()
|
||||
cfm.stop_following()
|
||||
follower = None
|
||||
log.log({'info': 'Following leader {0}'.format(leader)})
|
||||
log.log({'info': 'Following leader {0}'.format(leader),
|
||||
'subsystem': 'collective'})
|
||||
colldata = tlvdata.recv(remote)
|
||||
globaldata = tlvdata.recv(remote)
|
||||
dbi = tlvdata.recv(remote)
|
||||
@ -148,11 +153,18 @@ def connect_to_leader(cert=None, name=None, leader=None):
|
||||
|
||||
def follow_leader(remote):
|
||||
global currentleader
|
||||
cleanexit = False
|
||||
try:
|
||||
cfm.follow_channel(remote)
|
||||
except greenlet.GreenletExit:
|
||||
cleanexit = True
|
||||
finally:
|
||||
if cleanexit:
|
||||
log.log({'info': 'Previous following cleanly closed',
|
||||
'subsystem': 'collective'})
|
||||
return
|
||||
log.log({'info': 'Current leader has disappeared, restarting '
|
||||
'collective membership'})
|
||||
'collective membership', 'subsystem': 'collective'})
|
||||
# The leader has folded, time to startup again...
|
||||
cfm.stop_following()
|
||||
currentleader = None
|
||||
@ -457,19 +469,22 @@ def try_assimilate(drone):
|
||||
log.log(
|
||||
{'error':
|
||||
'No answer from {0} while trying to assimilate'.format(
|
||||
drone)})
|
||||
drone),
|
||||
'subsystem': 'collective'})
|
||||
return
|
||||
if 'txcount' in answer:
|
||||
log.log({'info': 'Deferring to {0} due to transaction count'.format(
|
||||
drone)})
|
||||
drone), 'subsystem': 'collective'})
|
||||
connect_to_leader(None, None, leader=remote.getpeername()[0])
|
||||
return
|
||||
if 'error' in answer:
|
||||
log.log({
|
||||
'error': 'Error encountered while attempting to '
|
||||
'assimilate {0}: {1}'.format(drone, answer['error'])})
|
||||
'assimilate {0}: {1}'.format(drone, answer['error']),
|
||||
'subsystem': 'collective'})
|
||||
return
|
||||
log.log({'Assimilated {0} into collective'.format(drone)})
|
||||
log.log({'info': 'Assimilated {0} into collective'.format(drone),
|
||||
'subsystem': 'collective'})
|
||||
|
||||
|
||||
def get_leader(connection):
|
||||
@ -485,7 +500,8 @@ def retire_as_leader():
|
||||
def become_leader(connection):
|
||||
global currentleader
|
||||
global follower
|
||||
log.log({'info': 'Becoming leader of collective'})
|
||||
log.log({'info': 'Becoming leader of collective',
|
||||
'subsystem': 'collective'})
|
||||
if follower:
|
||||
follower.kill()
|
||||
follower = None
|
||||
|
Loading…
x
Reference in New Issue
Block a user