mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-28 11:57:37 +00:00
Create a test socket api layer
This commit is contained in:
parent
e9c6c549cb
commit
2644801929
@ -47,15 +47,23 @@ class ConsoleSession(object):
|
||||
:param node: Name of the node for which this session will be created
|
||||
"""
|
||||
|
||||
def __init__(self, node, configmanager):
|
||||
def __init__(self, node, configmanager, datacallback=None):
|
||||
self.databuffer = ""
|
||||
if node not in _handled_consoles:
|
||||
_handled_consoles[node] = _ConsoleHandler(node, configmanager)
|
||||
self.conshdl = _handled_consoles[node]
|
||||
self.write = _handled_consoles[node].write
|
||||
_handled_consoles[node].register_rcpt(self.got_data)
|
||||
if datacallback is None:
|
||||
_handled_consoles[node].register_rcpt(self.got_data)
|
||||
else:
|
||||
_handled_consoles[node].register_rcpt(datacallback)
|
||||
|
||||
def got_data(self, data):
|
||||
"""Receive data from console and buffer
|
||||
|
||||
If the caller does not provide a callback and instead will be polling
|
||||
for data, we must maintain data in a buffer until retrieved
|
||||
"""
|
||||
self.databuffer += data
|
||||
|
||||
def get_next_output(self, timeout=45):
|
||||
|
@ -24,6 +24,8 @@ def run():
|
||||
pluginapi.load_plugins()
|
||||
webservice = httpapi.HttpApi()
|
||||
webservice.start()
|
||||
sockapi = sockapi.SockApi()
|
||||
sockapi.start()
|
||||
while (1):
|
||||
eventlet.sleep(100)
|
||||
|
||||
|
35
confluent/sockapi.py
Normal file
35
confluent/sockapi.py
Normal file
@ -0,0 +1,35 @@
|
||||
# Copyright 2013 IBM Corporation
|
||||
# ALl rights reserved
|
||||
|
||||
# This is the socket api layer.
|
||||
# It implement unix and tls sockets
|
||||
# TODO: SO_PEERCRED for unix socket
|
||||
import confluent.console as console
|
||||
import confluent.config as config
|
||||
import eventlet.green.socket as socket
|
||||
import eventlet.green.ssl as ssl
|
||||
|
||||
def sessionhdl(connection):
|
||||
#TODO: authenticate and authorize peer
|
||||
# For now, trying to test the console stuff, so let's just do n1.
|
||||
cfm = config.ConfigManager(tenant=0)
|
||||
consession = console.ConsoleSession(node='n1', configmanager=cfm,
|
||||
datacallback=connection.write)
|
||||
while (1):
|
||||
data = connection.read()
|
||||
consession.write(data)
|
||||
|
||||
|
||||
def _handler():
|
||||
plainsocket = socket.socket()
|
||||
srv = ssl.wrap_socket(plainsocket, keyfile="/etc/confluent/privkey.pem",
|
||||
cert="/etc/confluent/srvcert.pem", server_side=True)
|
||||
srv.bind(('0.0.0.0', 4001))
|
||||
srv.listen(5)
|
||||
while (1): # TODO: exithook
|
||||
cnn, addr = srv.accept()
|
||||
eventlet.spawn_n(sessionhdl, cnn)
|
||||
|
||||
class SockApi(object):
|
||||
def start(self):
|
||||
self.server = eventlet.spawn(_handler)
|
Loading…
x
Reference in New Issue
Block a user