2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-23 01:53:28 +00:00

Have socket API shrug off client disconnect

If a client fails to stick around for data, shrug it
off rather than adding to stderr log.
This commit is contained in:
Jarrod Johnson 2015-03-26 10:59:51 -04:00
parent 2fe0425191
commit c98d2e32d3

View File

@ -20,6 +20,7 @@
#
import atexit
import errno
import os
import pwd
import stat
@ -62,15 +63,23 @@ class ClientConsole(object):
if not self.xmit:
self.pendingdata.append(data)
return
tlvdata.send(self.client, data)
send_data(self.client, data)
def startsending(self):
self.xmit = True
for datum in self.pendingdata:
tlvdata.send(self.client, datum)
send_data(self.client, datum)
self.pendingdata = None
def send_data(connection, data):
try:
tlvdata.send(connection, data)
except IOError as ie:
if ie.errno != errno.EPIPE:
raise
def sessionhdl(connection, authname, skipauth=False):
# For now, trying to test the console stuff, so let's just do n4.
authenticated = False
@ -84,9 +93,9 @@ def sessionhdl(connection, authname, skipauth=False):
if authdata is not None:
cfm = authdata[1]
authenticated = True
tlvdata.send(connection, "Confluent -- v0 --")
send_data(connection, "Confluent -- v0 --")
while not authenticated: # prompt for name and passphrase
tlvdata.send(connection, {'authpassed': 0})
send_data(connection, {'authpassed': 0})
response = tlvdata.recv(connection)
authname = response['username']
passphrase = response['password']
@ -101,45 +110,45 @@ def sessionhdl(connection, authname, skipauth=False):
else:
authenticated = True
cfm = authdata[1]
tlvdata.send(connection, {'authpassed': 1})
send_data(connection, {'authpassed': 1})
request = tlvdata.recv(connection)
while request is not None:
try:
process_request(
connection, request, cfm, authdata, authname, skipauth)
except exc.ForbiddenRequest:
tlvdata.send(connection, {'errorcode': 403,
send_data(connection, {'errorcode': 403,
'error': 'Forbidden'})
tlvdata.send(connection, {'_requestdone': 1})
send_data(connection, {'_requestdone': 1})
except exc.TargetEndpointBadCredentials:
tlvdata.send(connection, {'errorcode': 502,
send_data(connection, {'errorcode': 502,
'error': 'Bad Credentials'})
tlvdata.send(connection, {'_requestdone': 1})
send_data(connection, {'_requestdone': 1})
except exc.TargetEndpointUnreachable as tu:
tlvdata.send(connection, {'errorcode': 504,
send_data(connection, {'errorcode': 504,
'error': 'Unreachable Target - ' + str(
tu)})
tlvdata.send(connection, {'_requestdone': 1})
send_data(connection, {'_requestdone': 1})
except exc.NotImplementedException:
tlvdata.send(connection, {'errorcode': 501,
send_data(connection, {'errorcode': 501,
'error': 'Not Implemented'})
tlvdata.send(connection, {'_requestdone': 1})
send_data(connection, {'_requestdone': 1})
except exc.NotFoundException as nfe:
tlvdata.send(connection, {'errorcode': 404,
send_data(connection, {'errorcode': 404,
'error': str(nfe)})
tlvdata.send(connection, {'_requestdone': 1})
send_data(connection, {'_requestdone': 1})
except exc.InvalidArgumentException as iae:
tlvdata.send(connection, {'errorcode': 400,
send_data(connection, {'errorcode': 400,
'error': 'Bad Request - ' + str(iae)})
tlvdata.send(connection, {'_requestdone': 1})
send_data(connection, {'_requestdone': 1})
except SystemExit:
sys.exit(0)
except:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
tlvdata.send(connection, {'errorcode': 500,
send_data(connection, {'errorcode': 500,
'error': 'Unexpected error'})
tlvdata.send(connection, {'_requestdone': 1})
send_data(connection, {'_requestdone': 1})
request = tlvdata.recv(connection)
@ -147,8 +156,8 @@ def send_response(responses, connection):
if responses is None:
return
for rsp in responses:
tlvdata.send(connection, rsp.raw())
tlvdata.send(connection, {'_requestdone': 1})
send_data(connection, rsp.raw())
send_data(connection, {'_requestdone': 1})
def process_request(connection, request, cfm, authdata, authname, skipauth):
@ -188,11 +197,11 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
datacallback=ccons.sendall, skipreplay=skipreplay)
if consession is None:
raise Exception("TODO")
tlvdata.send(connection, {'started': 1})
send_data(connection, {'started': 1})
ccons.startsending()
bufferage = consession.get_buffer_age()
if bufferage is not False:
tlvdata.send(connection, {'bufferage': bufferage})
send_data(connection, {'bufferage': bufferage})
while consession is not None:
data = tlvdata.recv(connection)
if type(data) == dict:
@ -216,13 +225,13 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
else:
hdlr = pluginapi.handle_path(path, operation, cfm, params)
except exc.NotFoundException as e:
tlvdata.send(connection, {"errorcode": 404,
send_data(connection, {"errorcode": 404,
"error": "Target not found - " + str(e)})
tlvdata.send(connection, {"_requestdone": 1})
send_data(connection, {"_requestdone": 1})
except exc.InvalidArgumentException as e:
tlvdata.send(connection, {"errorcode": 400,
send_data(connection, {"errorcode": 400,
"error": "Bad Request - " + str(e)})
tlvdata.send(connection, {"_requestdone": 1})
send_data(connection, {"_requestdone": 1})
send_response(hdlr, connection)
return