2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-25 19:10:10 +00:00

Restrict lifetime of port relay to session

If a session is closed, also kill off any associated
relays in progress.  One exception, video port relay
in ESTABLISHED is left alone due to limitation, but
at least no new open.
This commit is contained in:
Jarrod Johnson 2017-10-27 14:47:10 -04:00
parent 45b8a18f14
commit d5be1ccf8c
2 changed files with 47 additions and 11 deletions

View File

@ -21,6 +21,9 @@ import eventlet
import eventlet.green.select as select
import eventlet.green.socket as socket
forwardersbyclient = {}
relaysbysession = {}
sessionsbyip = {}
ipsbysession = {}
sockhandler = {}
vidtargetbypeer = {}
vidforwarder = None
@ -38,7 +41,7 @@ def handle_connection(incoming, outgoing):
incoming.sendall(data)
def forward_port(sock, target, clientip):
def forward_port(sock, target, clientip, sessionid):
while True:
conn, cli = sock.accept()
if cli[0] != clientip:
@ -49,14 +52,17 @@ def forward_port(sock, target, clientip):
except Exception:
conn.close()
continue
eventlet.spawn_n(handle_connection, conn, client)
if sessionid not in relaysbysession:
relaysbysession[sessionid] = {}
relaysbysession[sessionid][eventlet.spawn(
handle_connection, conn, client)] = conn
def forward_video():
sock = eventlet.listen(('::', 3900, 0, 0), family=socket.AF_INET6)
while True:
conn, cli = sock.accept()
if cli[0] not in vidtargetbypeer:
if cli[0] not in vidtargetbypeer or not sessionsbyip.get(cli[0], None):
conn.close()
continue
try:
@ -67,20 +73,47 @@ def forward_video():
continue
eventlet.spawn_n(handle_connection, conn, vidclient)
def get_port(addr, clientip):
def close_session(sessionid):
for addr in forwardersbyclient.get(sessionid, []):
killsock = forwardersbyclient[sessionid][addr]
sockhandler[killsock].kill()
del sockhandler[killsock]
killsock.close()
if sessionid in forwardersbyclient:
del forwardersbyclient[sessionid]
for clip in ipsbysession[sessionid]:
sessionsbyip[clip].discard(sessionid)
if sessionid in ipsbysession:
del ipsbysession[sessionid]
for relay in list(relaysbysession[sessionid]):
conn = relaysbysession[sessionid][relay]
relay.kill()
conn.close()
if sessionid in relaysbysession:
del relaysbysession[sessionid]
def get_port(addr, clientip, sessionid):
global vidforwarder
if socket.getaddrinfo(clientip, 0)[0][0] == socket.AF_INET:
clientip = '::ffff:' + clientip
if clientip not in forwardersbyclient:
forwardersbyclient[clientip] = {}
if addr not in forwardersbyclient[clientip]:
if sessionid not in ipsbysession:
ipsbysession[sessionid] = set([])
if clientip not in sessionsbyip:
sessionsbyip[clientip] = set([])
sessionsbyip[clientip].add(sessionid)
ipsbysession[sessionid].add(clientip)
if sessionid not in forwardersbyclient:
forwardersbyclient[sessionid] = {}
if addr not in forwardersbyclient[sessionid]:
newsock = eventlet.listen(('::', 0, 0, 0),
family=socket.AF_INET6)
forwardersbyclient[clientip][addr] = newsock
forwardersbyclient[sessionid][addr] = newsock
sockhandler[newsock] = eventlet.spawn(forward_port, newsock, addr,
clientip)
clientip, sessionid)
if not vidforwarder:
vidforwarder = eventlet.spawn(forward_video)
vidtargetbypeer[clientip] = addr
return forwardersbyclient[clientip][addr].getsockname()[1]
return forwardersbyclient[sessionid][addr].getsockname()[1]

View File

@ -152,6 +152,7 @@ def _sessioncleaner():
if httpsessions[session]['expiry'] < currtime:
targsessions.append(session)
for session in targsessions:
forwarder.close_session(session)
del httpsessions[session]
targsessions = []
for session in consolesessions:
@ -283,6 +284,7 @@ def _authorize_request(env, operation):
targets.append(mythread)
for mythread in targets:
eventlet.greenthread.kill(mythread)
forwarder.close_session(sessionid)
del httpsessions[sessionid]
return ('logout',)
httpsessions[sessionid]['expiry'] = time.time() + 90
@ -464,7 +466,8 @@ def resourcehandler_backend(env, start_response):
start_response('404 Not Found', headers)
yield 'No hardwaremanagemnet.manager defined for node'
return
funport = forwarder.get_port(targip, env['HTTP_X_FORWARDED_FOR'])
funport = forwarder.get_port(targip, env['HTTP_X_FORWARDED_FOR'],
authorized['sessionid'])
host = env['HTTP_X_FORWARDED_HOST']
url = 'https://{0}:{1}/'.format(host, funport)
start_response('302', [('Location', url)])