diff --git a/confluent_server/confluent/forwarder.py b/confluent_server/confluent/forwarder.py index 6d72c1f1..38153cdd 100644 --- a/confluent_server/confluent/forwarder.py +++ b/confluent_server/confluent/forwarder.py @@ -21,6 +21,9 @@ import eventlet import eventlet.green.select as select import eventlet.green.socket as socket forwardersbyclient = {} +relaysbysession = {} +sessionsbyip = {} +ipsbysession = {} sockhandler = {} vidtargetbypeer = {} vidforwarder = None @@ -38,7 +41,7 @@ def handle_connection(incoming, outgoing): incoming.sendall(data) -def forward_port(sock, target, clientip): +def forward_port(sock, target, clientip, sessionid): while True: conn, cli = sock.accept() if cli[0] != clientip: @@ -49,14 +52,17 @@ def forward_port(sock, target, clientip): except Exception: conn.close() continue - eventlet.spawn_n(handle_connection, conn, client) + if sessionid not in relaysbysession: + relaysbysession[sessionid] = {} + relaysbysession[sessionid][eventlet.spawn( + handle_connection, conn, client)] = conn def forward_video(): sock = eventlet.listen(('::', 3900, 0, 0), family=socket.AF_INET6) while True: conn, cli = sock.accept() - if cli[0] not in vidtargetbypeer: + if cli[0] not in vidtargetbypeer or not sessionsbyip.get(cli[0], None): conn.close() continue try: @@ -67,20 +73,47 @@ def forward_video(): continue eventlet.spawn_n(handle_connection, conn, vidclient) -def get_port(addr, clientip): + +def close_session(sessionid): + for addr in forwardersbyclient.get(sessionid, []): + killsock = forwardersbyclient[sessionid][addr] + sockhandler[killsock].kill() + del sockhandler[killsock] + killsock.close() + if sessionid in forwardersbyclient: + del forwardersbyclient[sessionid] + for clip in ipsbysession[sessionid]: + sessionsbyip[clip].discard(sessionid) + if sessionid in ipsbysession: + del ipsbysession[sessionid] + for relay in list(relaysbysession[sessionid]): + conn = relaysbysession[sessionid][relay] + relay.kill() + conn.close() + if sessionid in relaysbysession: + del relaysbysession[sessionid] + + +def get_port(addr, clientip, sessionid): global vidforwarder if socket.getaddrinfo(clientip, 0)[0][0] == socket.AF_INET: clientip = '::ffff:' + clientip - if clientip not in forwardersbyclient: - forwardersbyclient[clientip] = {} - if addr not in forwardersbyclient[clientip]: + if sessionid not in ipsbysession: + ipsbysession[sessionid] = set([]) + if clientip not in sessionsbyip: + sessionsbyip[clientip] = set([]) + sessionsbyip[clientip].add(sessionid) + ipsbysession[sessionid].add(clientip) + if sessionid not in forwardersbyclient: + forwardersbyclient[sessionid] = {} + if addr not in forwardersbyclient[sessionid]: newsock = eventlet.listen(('::', 0, 0, 0), family=socket.AF_INET6) - forwardersbyclient[clientip][addr] = newsock + forwardersbyclient[sessionid][addr] = newsock sockhandler[newsock] = eventlet.spawn(forward_port, newsock, addr, - clientip) + clientip, sessionid) if not vidforwarder: vidforwarder = eventlet.spawn(forward_video) vidtargetbypeer[clientip] = addr - return forwardersbyclient[clientip][addr].getsockname()[1] + return forwardersbyclient[sessionid][addr].getsockname()[1] diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 1239fbfd..cfe19ec2 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -152,6 +152,7 @@ def _sessioncleaner(): if httpsessions[session]['expiry'] < currtime: targsessions.append(session) for session in targsessions: + forwarder.close_session(session) del httpsessions[session] targsessions = [] for session in consolesessions: @@ -283,6 +284,7 @@ def _authorize_request(env, operation): targets.append(mythread) for mythread in targets: eventlet.greenthread.kill(mythread) + forwarder.close_session(sessionid) del httpsessions[sessionid] return ('logout',) httpsessions[sessionid]['expiry'] = time.time() + 90 @@ -464,7 +466,8 @@ def resourcehandler_backend(env, start_response): start_response('404 Not Found', headers) yield 'No hardwaremanagemnet.manager defined for node' return - funport = forwarder.get_port(targip, env['HTTP_X_FORWARDED_FOR']) + funport = forwarder.get_port(targip, env['HTTP_X_FORWARDED_FOR'], + authorized['sessionid']) host = env['HTTP_X_FORWARDED_HOST'] url = 'https://{0}:{1}/'.format(host, funport) start_response('302', [('Location', url)])