diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index ebfd8c97..19509eb5 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -62,39 +62,38 @@ def chunk_output(output, n): yield output[i:i + n] def get_buffer_output(nodename): - out = _bufferdaemon.stdin - instream = _bufferdaemon.stdout + out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1) + out.connect("\x00confluent-vtbuffer") if not isinstance(nodename, bytes): nodename = nodename.encode('utf8') outdata = bytearray() - with _bufferlock: - out.write(struct.pack('I', len(nodename))) - out.write(nodename) - out.flush() - select.select((instream,), (), (), 30) - while not outdata or outdata[-1]: - try: - chunk = os.read(instream.fileno(), 128) - except IOError: - chunk = None - if chunk: - outdata.extend(chunk) - else: - select.select((instream,), (), (), 0) - return bytes(outdata[:-1]) + out.send(struct.pack('I', len(nodename))) + out.send(nodename) + select.select((out,), (), (), 30) + while not outdata or outdata[-1]: + try: + chunk = os.read(out.fileno(), 128) + except IOError: + chunk = None + if chunk: + outdata.extend(chunk) + else: + select.select((out,), (), (), 0) + return bytes(outdata[:-1]) def send_output(nodename, output): if not isinstance(nodename, bytes): nodename = nodename.encode('utf8') - with _bufferlock: - _bufferdaemon.stdin.write(struct.pack('I', len(nodename) | (1 << 29))) - _bufferdaemon.stdin.write(nodename) - _bufferdaemon.stdin.flush() - for chunk in chunk_output(output, 8192): - _bufferdaemon.stdin.write(struct.pack('I', len(chunk) | (2 << 29))) - _bufferdaemon.stdin.write(chunk) - _bufferdaemon.stdin.flush() + out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1) + out.connect("\x00confluent-vtbuffer") + out.send(struct.pack('I', len(nodename) | (1 << 29))) + out.send(nodename) + for chunk in chunk_output(output, 8192): + out.send(struct.pack('I', len(chunk) | (2 << 29))) + out.send(chunk) def _utf8_normalize(data, decoder): # first we give the stateful decoder a crack at the byte stream, @@ -607,11 +606,8 @@ def initialize(): _bufferlock = semaphore.Semaphore() _tracelog = log.Logger('trace') _bufferdaemon = subprocess.Popen( - ['/opt/confluent/bin/vtbufferd'], bufsize=0, stdin=subprocess.PIPE, - stdout=subprocess.PIPE) - fl = fcntl.fcntl(_bufferdaemon.stdout.fileno(), fcntl.F_GETFL) - fcntl.fcntl(_bufferdaemon.stdout.fileno(), - fcntl.F_SETFL, fl | os.O_NONBLOCK) + ['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL) def start_console_sessions(): configmodule.hook_new_configmanagers(_start_tenant_sessions) diff --git a/confluent_vtbufferd/vtbufferd.c b/confluent_vtbufferd/vtbufferd.c index e89269b4..055a5263 100644 --- a/confluent_vtbufferd/vtbufferd.c +++ b/confluent_vtbufferd/vtbufferd.c @@ -1,8 +1,14 @@ +#include +#define _GNU_SOURCE #include #include #include #include #include +#include +#include +#include +#include #include "tmt.h" #define HASHSIZE 2053 #define MAXNAMELEN 256 @@ -10,13 +16,17 @@ struct terment { struct terment *next; char *name; + int fd; TMT *vt; }; #define SETNODE 1 #define WRITE 2 #define READBUFF 0 +#define CLOSECONN 3 +#define MAXEVTS 16 static struct terment *buffers[HASHSIZE]; +static char* nodenames[HASHSIZE]; unsigned long hash(char *str) /* djb2a */ @@ -37,10 +47,13 @@ TMT *get_termentbyname(char *name) { return NULL; } -TMT *set_termentbyname(char *name) { +TMT *set_termentbyname(char *name, int fd) { struct terment *ret; int idx; + if (nodenames[fd] == NULL) { + nodenames[fd] = strdup(name); + } idx = hash(name); for (ret = buffers[idx]; ret != NULL; ret = ret->next) if (strcmp(name, ret->name) == 0) @@ -48,12 +61,13 @@ TMT *set_termentbyname(char *name) { ret = (struct terment *)malloc(sizeof(*ret)); ret->next = buffers[idx]; ret->name = strdup(name); + ret->fd = fd; ret->vt = tmt_open(31, 100, NULL, NULL, L"→←↑↓■◆▒°±▒┘┐┌└┼⎺───⎽├┤┴┬│≤≥π≠£•"); buffers[idx] = ret; return ret->vt; } -void dump_vt(TMT* outvt) { +void dump_vt(TMT* outvt, int outfd) { const TMTSCREEN *out = tmt_screen(outvt); const TMTPOINT *curs = tmt_cursor(outvt); int line, idx, maxcol, maxrow; @@ -67,9 +81,10 @@ void dump_vt(TMT* outvt) { tmt_color_t fg = TMT_COLOR_DEFAULT; tmt_color_t bg = TMT_COLOR_DEFAULT; wchar_t sgrline[30]; + char strbuffer[128]; size_t srgidx = 0; char colorcode = 0; - wprintf(L"\033c"); + write(outfd, "\033c", 2); maxcol = 0; maxrow = 0; for (line = out->nline - 1; line >= 0; --line) { @@ -148,60 +163,136 @@ void dump_vt(TMT* outvt) { } if (sgrline[0] != 0) { sgrline[wcslen(sgrline) - 1] = 0; // Trim last ; - wprintf(L"\033[%lsm", sgrline); + + snprintf(strbuffer, sizeof(strbuffer), "\033[%lsm", sgrline); + write(outfd, strbuffer, strlen(strbuffer)); + write(outfd, "\033[]", 3); } - wprintf(L"%lc", out->lines[line]->chars[idx].c); + snprintf(strbuffer, sizeof(strbuffer), "%lc", out->lines[line]->chars[idx].c); + write(outfd, strbuffer, strlen(strbuffer)); } if (line < maxrow) - wprintf(L"\r\n"); + write(outfd, "\r\n", 2); } - fflush(stdout); - wprintf(L"\x1b[%ld;%ldH", curs->r + 1, curs->c + 1); - fflush(stdout); + //fflush(stdout); + snprintf(strbuffer, sizeof(strbuffer), "\x1b[%ld;%ldH", curs->r + 1, curs->c + 1); + write(outfd, strbuffer, strlen(strbuffer)); + //fflush(stdout); +} + +int handle_traffic(int fd) { + int cmd, length; + char currnode[MAXNAMELEN]; + char cmdbuf[MAXDATALEN]; + char *nodename; + TMT *currvt = NULL; + TMT *outvt = NULL; + length = read(fd, &cmd, 4); + if (length <= 0) { + return 0; + } + length = cmd & 536870911; + cmd = cmd >> 29; + if (cmd == SETNODE) { + cmd = read(fd, currnode, length); + currnode[length] = 0; + if (cmd < 0) + return 0; + currvt = set_termentbyname(currnode, fd); + } else if (cmd == WRITE) { + if (currvt == NULL) { + nodename = nodenames[fd]; + currvt = set_termentbyname(nodename, fd); + } + cmd = read(fd, cmdbuf, length); + cmdbuf[length] = 0; + if (cmd < 0) + return 0; + tmt_write(currvt, cmdbuf, length); + } else if (cmd == READBUFF) { + cmd = read(fd, cmdbuf, length); + cmdbuf[length] = 0; + if (cmd < 0) + return 0; + outvt = get_termentbyname(cmdbuf); + if (outvt != NULL) + dump_vt(outvt, fd); + length = write(fd, "\x00", 1); + if (length < 0) + return 0; + } else if (cmd == CLOSECONN) { + return 0; + } + return 1; } int main(int argc, char* argv[]) { - int cmd, length; setlocale(LC_ALL, ""); - char cmdbuf[MAXDATALEN]; - char currnode[MAXNAMELEN]; - TMT *currvt = NULL; - TMT *outvt = NULL; + struct sockaddr_un addr; + int numevts; + int status; + int poller; + int n; + socklen_t len; + int ctlsock, currsock; + socklen_t addrlen; + struct ucred ucr; + + struct epoll_event epvt, evts[MAXEVTS]; stdin = freopen(NULL, "rb", stdin); if (stdin == NULL) { exit(1); } + memset(&addr, 0, sizeof(struct sockaddr_un)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path + 1, argv[1], sizeof(addr.sun_path) - 2); // abstract namespace socket + ctlsock = socket(AF_UNIX, SOCK_STREAM, 0); + status = bind(ctlsock, (const struct sockaddr*)&addr, sizeof(sa_family_t) + strlen(argv[1]) + 1); //sizeof(struct sockaddr_un)); + if (status < 0) { + perror("Unable to open unix socket - "); + exit(1); + } + listen(ctlsock, 128); + poller = epoll_create(1); + memset(&epvt, 0, sizeof(struct epoll_event)); + epvt.events = EPOLLIN; + epvt.data.fd = ctlsock; + if (epoll_ctl(poller, EPOLL_CTL_ADD, ctlsock, &epvt) < 0) { + perror("Unable to poll the socket"); + exit(1); + } + // create a unix domain socket for accepting, each connection is only allowed to either read or write, not both while (1) { - length = fread(&cmd, 4, 1, stdin); - if (length < 0) - continue; - length = cmd & 536870911; - cmd = cmd >> 29; - if (cmd == SETNODE) { - cmd = fread(currnode, 1, length, stdin); - currnode[length] = 0; - if (cmd < 0) - continue; - currvt = set_termentbyname(currnode); - } else if (cmd == WRITE) { - if (currvt == NULL) - currvt = set_termentbyname(""); - cmd = fread(cmdbuf, 1, length, stdin); - cmdbuf[length] = 0; - if (cmd < 0) - continue; - tmt_write(currvt, cmdbuf, length); - } else if (cmd == READBUFF) { - cmd = fread(cmdbuf, 1, length, stdin); - cmdbuf[length] = 0; - if (cmd < 0) - continue; - outvt = get_termentbyname(cmdbuf); - if (outvt != NULL) - dump_vt(outvt); - length = write(1, "\x00", 1); - if (length < 0) - continue; + numevts = epoll_wait(poller, evts, MAXEVTS, -1); + if (numevts < 0) { + perror("Failed wait"); + exit(1); + } + for (n = 0; n < numevts; ++n) { + if (evts[n].data.fd == ctlsock) { + currsock = accept(ctlsock, (struct sockaddr *) &addr, &addrlen); + len = sizeof(ucr); + getsockopt(currsock, SOL_SOCKET, SO_PEERCRED, &ucr, &len); + if (ucr.uid != getuid()) { // block access for other users + close(currsock); + continue; + } + memset(&epvt, 0, sizeof(struct epoll_event)); + epvt.events = EPOLLIN; + epvt.data.fd = currsock; + epoll_ctl(poller, EPOLL_CTL_ADD, currsock, &epvt); + } else { + if (!handle_traffic(evts[n].data.fd)) { + epoll_ctl(poller, EPOLL_CTL_DEL, evts[n].data.fd, NULL); + close(evts[n].data.fd); + if (nodenames[evts[n].data.fd] != NULL) { + free(nodenames[evts[n].data.fd]); + nodenames[evts[n].data.fd] = NULL; + } + } + } } } } + +