mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-21 17:11:58 +00:00
Change to unix domain for vtbuffer communication
The semaphore arbitrated single channel sharing was proving to be too slow. Make the communication lockless by having dedicated sockets per request.
This commit is contained in:
parent
ac1f7c57b6
commit
c1afc144cb
@ -62,39 +62,38 @@ def chunk_output(output, n):
|
||||
yield output[i:i + n]
|
||||
|
||||
def get_buffer_output(nodename):
|
||||
out = _bufferdaemon.stdin
|
||||
instream = _bufferdaemon.stdout
|
||||
out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
out.connect("\x00confluent-vtbuffer")
|
||||
if not isinstance(nodename, bytes):
|
||||
nodename = nodename.encode('utf8')
|
||||
outdata = bytearray()
|
||||
with _bufferlock:
|
||||
out.write(struct.pack('I', len(nodename)))
|
||||
out.write(nodename)
|
||||
out.flush()
|
||||
select.select((instream,), (), (), 30)
|
||||
while not outdata or outdata[-1]:
|
||||
try:
|
||||
chunk = os.read(instream.fileno(), 128)
|
||||
except IOError:
|
||||
chunk = None
|
||||
if chunk:
|
||||
outdata.extend(chunk)
|
||||
else:
|
||||
select.select((instream,), (), (), 0)
|
||||
return bytes(outdata[:-1])
|
||||
out.send(struct.pack('I', len(nodename)))
|
||||
out.send(nodename)
|
||||
select.select((out,), (), (), 30)
|
||||
while not outdata or outdata[-1]:
|
||||
try:
|
||||
chunk = os.read(out.fileno(), 128)
|
||||
except IOError:
|
||||
chunk = None
|
||||
if chunk:
|
||||
outdata.extend(chunk)
|
||||
else:
|
||||
select.select((out,), (), (), 0)
|
||||
return bytes(outdata[:-1])
|
||||
|
||||
|
||||
def send_output(nodename, output):
|
||||
if not isinstance(nodename, bytes):
|
||||
nodename = nodename.encode('utf8')
|
||||
with _bufferlock:
|
||||
_bufferdaemon.stdin.write(struct.pack('I', len(nodename) | (1 << 29)))
|
||||
_bufferdaemon.stdin.write(nodename)
|
||||
_bufferdaemon.stdin.flush()
|
||||
for chunk in chunk_output(output, 8192):
|
||||
_bufferdaemon.stdin.write(struct.pack('I', len(chunk) | (2 << 29)))
|
||||
_bufferdaemon.stdin.write(chunk)
|
||||
_bufferdaemon.stdin.flush()
|
||||
out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
|
||||
out.connect("\x00confluent-vtbuffer")
|
||||
out.send(struct.pack('I', len(nodename) | (1 << 29)))
|
||||
out.send(nodename)
|
||||
for chunk in chunk_output(output, 8192):
|
||||
out.send(struct.pack('I', len(chunk) | (2 << 29)))
|
||||
out.send(chunk)
|
||||
|
||||
def _utf8_normalize(data, decoder):
|
||||
# first we give the stateful decoder a crack at the byte stream,
|
||||
@ -604,11 +603,8 @@ def initialize():
|
||||
_bufferlock = semaphore.Semaphore()
|
||||
_tracelog = log.Logger('trace')
|
||||
_bufferdaemon = subprocess.Popen(
|
||||
['/opt/confluent/bin/vtbufferd'], bufsize=0, stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE)
|
||||
fl = fcntl.fcntl(_bufferdaemon.stdout.fileno(), fcntl.F_GETFL)
|
||||
fcntl.fcntl(_bufferdaemon.stdout.fileno(),
|
||||
fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
||||
['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.DEVNULL)
|
||||
|
||||
def start_console_sessions():
|
||||
configmodule.hook_new_configmanagers(_start_tenant_sessions)
|
||||
|
@ -1,8 +1,14 @@
|
||||
#include <asm-generic/socket.h>
|
||||
#define _GNU_SOURCE
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <locale.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/un.h>
|
||||
#include <fcntl.h>
|
||||
#include "tmt.h"
|
||||
#define HASHSIZE 2053
|
||||
#define MAXNAMELEN 256
|
||||
@ -10,13 +16,17 @@
|
||||
struct terment {
|
||||
struct terment *next;
|
||||
char *name;
|
||||
int fd;
|
||||
TMT *vt;
|
||||
};
|
||||
|
||||
#define SETNODE 1
|
||||
#define WRITE 2
|
||||
#define READBUFF 0
|
||||
#define CLOSECONN 3
|
||||
#define MAXEVTS 16
|
||||
static struct terment *buffers[HASHSIZE];
|
||||
static char* nodenames[HASHSIZE];
|
||||
|
||||
unsigned long hash(char *str)
|
||||
/* djb2a */
|
||||
@ -37,10 +47,13 @@ TMT *get_termentbyname(char *name) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TMT *set_termentbyname(char *name) {
|
||||
TMT *set_termentbyname(char *name, int fd) {
|
||||
struct terment *ret;
|
||||
int idx;
|
||||
|
||||
if (nodenames[fd] == NULL) {
|
||||
nodenames[fd] = strdup(name);
|
||||
}
|
||||
idx = hash(name);
|
||||
for (ret = buffers[idx]; ret != NULL; ret = ret->next)
|
||||
if (strcmp(name, ret->name) == 0)
|
||||
@ -48,12 +61,13 @@ TMT *set_termentbyname(char *name) {
|
||||
ret = (struct terment *)malloc(sizeof(*ret));
|
||||
ret->next = buffers[idx];
|
||||
ret->name = strdup(name);
|
||||
ret->fd = fd;
|
||||
ret->vt = tmt_open(31, 100, NULL, NULL, L"→←↑↓■◆▒°±▒┘┐┌└┼⎺───⎽├┤┴┬│≤≥π≠£•");
|
||||
buffers[idx] = ret;
|
||||
return ret->vt;
|
||||
}
|
||||
|
||||
void dump_vt(TMT* outvt) {
|
||||
void dump_vt(TMT* outvt, int outfd) {
|
||||
const TMTSCREEN *out = tmt_screen(outvt);
|
||||
const TMTPOINT *curs = tmt_cursor(outvt);
|
||||
int line, idx, maxcol, maxrow;
|
||||
@ -67,9 +81,10 @@ void dump_vt(TMT* outvt) {
|
||||
tmt_color_t fg = TMT_COLOR_DEFAULT;
|
||||
tmt_color_t bg = TMT_COLOR_DEFAULT;
|
||||
wchar_t sgrline[30];
|
||||
char strbuffer[128];
|
||||
size_t srgidx = 0;
|
||||
char colorcode = 0;
|
||||
wprintf(L"\033c");
|
||||
write(outfd, "\033c", 2);
|
||||
maxcol = 0;
|
||||
maxrow = 0;
|
||||
for (line = out->nline - 1; line >= 0; --line) {
|
||||
@ -148,60 +163,136 @@ void dump_vt(TMT* outvt) {
|
||||
}
|
||||
if (sgrline[0] != 0) {
|
||||
sgrline[wcslen(sgrline) - 1] = 0; // Trim last ;
|
||||
wprintf(L"\033[%lsm", sgrline);
|
||||
|
||||
snprintf(strbuffer, sizeof(strbuffer), "\033[%lsm", sgrline);
|
||||
write(outfd, strbuffer, strlen(strbuffer));
|
||||
write(outfd, "\033[]", 3);
|
||||
}
|
||||
wprintf(L"%lc", out->lines[line]->chars[idx].c);
|
||||
snprintf(strbuffer, sizeof(strbuffer), "%lc", out->lines[line]->chars[idx].c);
|
||||
write(outfd, strbuffer, strlen(strbuffer));
|
||||
}
|
||||
if (line < maxrow)
|
||||
wprintf(L"\r\n");
|
||||
write(outfd, "\r\n", 2);
|
||||
}
|
||||
fflush(stdout);
|
||||
wprintf(L"\x1b[%ld;%ldH", curs->r + 1, curs->c + 1);
|
||||
fflush(stdout);
|
||||
//fflush(stdout);
|
||||
snprintf(strbuffer, sizeof(strbuffer), "\x1b[%ld;%ldH", curs->r + 1, curs->c + 1);
|
||||
write(outfd, strbuffer, strlen(strbuffer));
|
||||
//fflush(stdout);
|
||||
}
|
||||
|
||||
int handle_traffic(int fd) {
|
||||
int cmd, length;
|
||||
char currnode[MAXNAMELEN];
|
||||
char cmdbuf[MAXDATALEN];
|
||||
char *nodename;
|
||||
TMT *currvt = NULL;
|
||||
TMT *outvt = NULL;
|
||||
length = read(fd, &cmd, 4);
|
||||
if (length <= 0) {
|
||||
return 0;
|
||||
}
|
||||
length = cmd & 536870911;
|
||||
cmd = cmd >> 29;
|
||||
if (cmd == SETNODE) {
|
||||
cmd = read(fd, currnode, length);
|
||||
currnode[length] = 0;
|
||||
if (cmd < 0)
|
||||
return 0;
|
||||
currvt = set_termentbyname(currnode, fd);
|
||||
} else if (cmd == WRITE) {
|
||||
if (currvt == NULL) {
|
||||
nodename = nodenames[fd];
|
||||
currvt = set_termentbyname(nodename, fd);
|
||||
}
|
||||
cmd = read(fd, cmdbuf, length);
|
||||
cmdbuf[length] = 0;
|
||||
if (cmd < 0)
|
||||
return 0;
|
||||
tmt_write(currvt, cmdbuf, length);
|
||||
} else if (cmd == READBUFF) {
|
||||
cmd = read(fd, cmdbuf, length);
|
||||
cmdbuf[length] = 0;
|
||||
if (cmd < 0)
|
||||
return 0;
|
||||
outvt = get_termentbyname(cmdbuf);
|
||||
if (outvt != NULL)
|
||||
dump_vt(outvt, fd);
|
||||
length = write(fd, "\x00", 1);
|
||||
if (length < 0)
|
||||
return 0;
|
||||
} else if (cmd == CLOSECONN) {
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int cmd, length;
|
||||
setlocale(LC_ALL, "");
|
||||
char cmdbuf[MAXDATALEN];
|
||||
char currnode[MAXNAMELEN];
|
||||
TMT *currvt = NULL;
|
||||
TMT *outvt = NULL;
|
||||
struct sockaddr_un addr;
|
||||
int numevts;
|
||||
int status;
|
||||
int poller;
|
||||
int n;
|
||||
socklen_t len;
|
||||
int ctlsock, currsock;
|
||||
socklen_t addrlen;
|
||||
struct ucred ucr;
|
||||
|
||||
struct epoll_event epvt, evts[MAXEVTS];
|
||||
stdin = freopen(NULL, "rb", stdin);
|
||||
if (stdin == NULL) {
|
||||
exit(1);
|
||||
}
|
||||
memset(&addr, 0, sizeof(struct sockaddr_un));
|
||||
addr.sun_family = AF_UNIX;
|
||||
strncpy(addr.sun_path + 1, argv[1], sizeof(addr.sun_path) - 2); // abstract namespace socket
|
||||
ctlsock = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
status = bind(ctlsock, (const struct sockaddr*)&addr, sizeof(sa_family_t) + strlen(argv[1]) + 1); //sizeof(struct sockaddr_un));
|
||||
if (status < 0) {
|
||||
perror("Unable to open unix socket - ");
|
||||
exit(1);
|
||||
}
|
||||
listen(ctlsock, 128);
|
||||
poller = epoll_create(1);
|
||||
memset(&epvt, 0, sizeof(struct epoll_event));
|
||||
epvt.events = EPOLLIN;
|
||||
epvt.data.fd = ctlsock;
|
||||
if (epoll_ctl(poller, EPOLL_CTL_ADD, ctlsock, &epvt) < 0) {
|
||||
perror("Unable to poll the socket");
|
||||
exit(1);
|
||||
}
|
||||
// create a unix domain socket for accepting, each connection is only allowed to either read or write, not both
|
||||
while (1) {
|
||||
length = fread(&cmd, 4, 1, stdin);
|
||||
if (length < 0)
|
||||
continue;
|
||||
length = cmd & 536870911;
|
||||
cmd = cmd >> 29;
|
||||
if (cmd == SETNODE) {
|
||||
cmd = fread(currnode, 1, length, stdin);
|
||||
currnode[length] = 0;
|
||||
if (cmd < 0)
|
||||
continue;
|
||||
currvt = set_termentbyname(currnode);
|
||||
} else if (cmd == WRITE) {
|
||||
if (currvt == NULL)
|
||||
currvt = set_termentbyname("");
|
||||
cmd = fread(cmdbuf, 1, length, stdin);
|
||||
cmdbuf[length] = 0;
|
||||
if (cmd < 0)
|
||||
continue;
|
||||
tmt_write(currvt, cmdbuf, length);
|
||||
} else if (cmd == READBUFF) {
|
||||
cmd = fread(cmdbuf, 1, length, stdin);
|
||||
cmdbuf[length] = 0;
|
||||
if (cmd < 0)
|
||||
continue;
|
||||
outvt = get_termentbyname(cmdbuf);
|
||||
if (outvt != NULL)
|
||||
dump_vt(outvt);
|
||||
length = write(1, "\x00", 1);
|
||||
if (length < 0)
|
||||
continue;
|
||||
numevts = epoll_wait(poller, evts, MAXEVTS, -1);
|
||||
if (numevts < 0) {
|
||||
perror("Failed wait");
|
||||
exit(1);
|
||||
}
|
||||
for (n = 0; n < numevts; ++n) {
|
||||
if (evts[n].data.fd == ctlsock) {
|
||||
currsock = accept(ctlsock, (struct sockaddr *) &addr, &addrlen);
|
||||
len = sizeof(ucr);
|
||||
getsockopt(currsock, SOL_SOCKET, SO_PEERCRED, &ucr, &len);
|
||||
if (ucr.uid != getuid()) { // block access for other users
|
||||
close(currsock);
|
||||
continue;
|
||||
}
|
||||
memset(&epvt, 0, sizeof(struct epoll_event));
|
||||
epvt.events = EPOLLIN;
|
||||
epvt.data.fd = currsock;
|
||||
epoll_ctl(poller, EPOLL_CTL_ADD, currsock, &epvt);
|
||||
} else {
|
||||
if (!handle_traffic(evts[n].data.fd)) {
|
||||
epoll_ctl(poller, EPOLL_CTL_DEL, evts[n].data.fd, NULL);
|
||||
close(evts[n].data.fd);
|
||||
if (nodenames[evts[n].data.fd] != NULL) {
|
||||
free(nodenames[evts[n].data.fd]);
|
||||
nodenames[evts[n].data.fd] = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user