2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 01:22:00 +00:00

Change to unix domain for vtbuffer communication

The semaphore arbitrated single channel sharing
was proving to be too slow.  Make the communication
lockless by having dedicated sockets per request.
This commit is contained in:
Jarrod Johnson 2024-02-22 15:05:56 -05:00
parent 21f691cbd8
commit 72e26caf36
2 changed files with 161 additions and 74 deletions

View File

@ -62,39 +62,38 @@ def chunk_output(output, n):
yield output[i:i + n]
def get_buffer_output(nodename):
out = _bufferdaemon.stdin
instream = _bufferdaemon.stdout
out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
out.connect("\x00confluent-vtbuffer")
if not isinstance(nodename, bytes):
nodename = nodename.encode('utf8')
outdata = bytearray()
with _bufferlock:
out.write(struct.pack('I', len(nodename)))
out.write(nodename)
out.flush()
select.select((instream,), (), (), 30)
while not outdata or outdata[-1]:
try:
chunk = os.read(instream.fileno(), 128)
except IOError:
chunk = None
if chunk:
outdata.extend(chunk)
else:
select.select((instream,), (), (), 0)
return bytes(outdata[:-1])
out.send(struct.pack('I', len(nodename)))
out.send(nodename)
select.select((out,), (), (), 30)
while not outdata or outdata[-1]:
try:
chunk = os.read(out.fileno(), 128)
except IOError:
chunk = None
if chunk:
outdata.extend(chunk)
else:
select.select((out,), (), (), 0)
return bytes(outdata[:-1])
def send_output(nodename, output):
if not isinstance(nodename, bytes):
nodename = nodename.encode('utf8')
with _bufferlock:
_bufferdaemon.stdin.write(struct.pack('I', len(nodename) | (1 << 29)))
_bufferdaemon.stdin.write(nodename)
_bufferdaemon.stdin.flush()
for chunk in chunk_output(output, 8192):
_bufferdaemon.stdin.write(struct.pack('I', len(chunk) | (2 << 29)))
_bufferdaemon.stdin.write(chunk)
_bufferdaemon.stdin.flush()
out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
out.connect("\x00confluent-vtbuffer")
out.send(struct.pack('I', len(nodename) | (1 << 29)))
out.send(nodename)
for chunk in chunk_output(output, 8192):
out.send(struct.pack('I', len(chunk) | (2 << 29)))
out.send(chunk)
def _utf8_normalize(data, decoder):
# first we give the stateful decoder a crack at the byte stream,
@ -607,11 +606,8 @@ def initialize():
_bufferlock = semaphore.Semaphore()
_tracelog = log.Logger('trace')
_bufferdaemon = subprocess.Popen(
['/opt/confluent/bin/vtbufferd'], bufsize=0, stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
fl = fcntl.fcntl(_bufferdaemon.stdout.fileno(), fcntl.F_GETFL)
fcntl.fcntl(_bufferdaemon.stdout.fileno(),
fcntl.F_SETFL, fl | os.O_NONBLOCK)
['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL)
def start_console_sessions():
configmodule.hook_new_configmanagers(_start_tenant_sessions)

View File

@ -1,8 +1,14 @@
#include <asm-generic/socket.h>
#define _GNU_SOURCE
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <locale.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/un.h>
#include <fcntl.h>
#include "tmt.h"
#define HASHSIZE 2053
#define MAXNAMELEN 256
@ -10,13 +16,17 @@
struct terment {
struct terment *next;
char *name;
int fd;
TMT *vt;
};
#define SETNODE 1
#define WRITE 2
#define READBUFF 0
#define CLOSECONN 3
#define MAXEVTS 16
static struct terment *buffers[HASHSIZE];
static char* nodenames[HASHSIZE];
unsigned long hash(char *str)
/* djb2a */
@ -37,10 +47,13 @@ TMT *get_termentbyname(char *name) {
return NULL;
}
TMT *set_termentbyname(char *name) {
TMT *set_termentbyname(char *name, int fd) {
struct terment *ret;
int idx;
if (nodenames[fd] == NULL) {
nodenames[fd] = strdup(name);
}
idx = hash(name);
for (ret = buffers[idx]; ret != NULL; ret = ret->next)
if (strcmp(name, ret->name) == 0)
@ -48,12 +61,13 @@ TMT *set_termentbyname(char *name) {
ret = (struct terment *)malloc(sizeof(*ret));
ret->next = buffers[idx];
ret->name = strdup(name);
ret->fd = fd;
ret->vt = tmt_open(31, 100, NULL, NULL, L"→←↑↓■◆▒°±▒┘┐┌└┼⎺───⎽├┤┴┬│≤≥π≠£•");
buffers[idx] = ret;
return ret->vt;
}
void dump_vt(TMT* outvt) {
void dump_vt(TMT* outvt, int outfd) {
const TMTSCREEN *out = tmt_screen(outvt);
const TMTPOINT *curs = tmt_cursor(outvt);
int line, idx, maxcol, maxrow;
@ -67,9 +81,10 @@ void dump_vt(TMT* outvt) {
tmt_color_t fg = TMT_COLOR_DEFAULT;
tmt_color_t bg = TMT_COLOR_DEFAULT;
wchar_t sgrline[30];
char strbuffer[128];
size_t srgidx = 0;
char colorcode = 0;
wprintf(L"\033c");
write(outfd, "\033c", 2);
maxcol = 0;
maxrow = 0;
for (line = out->nline - 1; line >= 0; --line) {
@ -148,60 +163,136 @@ void dump_vt(TMT* outvt) {
}
if (sgrline[0] != 0) {
sgrline[wcslen(sgrline) - 1] = 0; // Trim last ;
wprintf(L"\033[%lsm", sgrline);
snprintf(strbuffer, sizeof(strbuffer), "\033[%lsm", sgrline);
write(outfd, strbuffer, strlen(strbuffer));
write(outfd, "\033[]", 3);
}
wprintf(L"%lc", out->lines[line]->chars[idx].c);
snprintf(strbuffer, sizeof(strbuffer), "%lc", out->lines[line]->chars[idx].c);
write(outfd, strbuffer, strlen(strbuffer));
}
if (line < maxrow)
wprintf(L"\r\n");
write(outfd, "\r\n", 2);
}
fflush(stdout);
wprintf(L"\x1b[%ld;%ldH", curs->r + 1, curs->c + 1);
fflush(stdout);
//fflush(stdout);
snprintf(strbuffer, sizeof(strbuffer), "\x1b[%ld;%ldH", curs->r + 1, curs->c + 1);
write(outfd, strbuffer, strlen(strbuffer));
//fflush(stdout);
}
int handle_traffic(int fd) {
int cmd, length;
char currnode[MAXNAMELEN];
char cmdbuf[MAXDATALEN];
char *nodename;
TMT *currvt = NULL;
TMT *outvt = NULL;
length = read(fd, &cmd, 4);
if (length <= 0) {
return 0;
}
length = cmd & 536870911;
cmd = cmd >> 29;
if (cmd == SETNODE) {
cmd = read(fd, currnode, length);
currnode[length] = 0;
if (cmd < 0)
return 0;
currvt = set_termentbyname(currnode, fd);
} else if (cmd == WRITE) {
if (currvt == NULL) {
nodename = nodenames[fd];
currvt = set_termentbyname(nodename, fd);
}
cmd = read(fd, cmdbuf, length);
cmdbuf[length] = 0;
if (cmd < 0)
return 0;
tmt_write(currvt, cmdbuf, length);
} else if (cmd == READBUFF) {
cmd = read(fd, cmdbuf, length);
cmdbuf[length] = 0;
if (cmd < 0)
return 0;
outvt = get_termentbyname(cmdbuf);
if (outvt != NULL)
dump_vt(outvt, fd);
length = write(fd, "\x00", 1);
if (length < 0)
return 0;
} else if (cmd == CLOSECONN) {
return 0;
}
return 1;
}
int main(int argc, char* argv[]) {
int cmd, length;
setlocale(LC_ALL, "");
char cmdbuf[MAXDATALEN];
char currnode[MAXNAMELEN];
TMT *currvt = NULL;
TMT *outvt = NULL;
struct sockaddr_un addr;
int numevts;
int status;
int poller;
int n;
socklen_t len;
int ctlsock, currsock;
socklen_t addrlen;
struct ucred ucr;
struct epoll_event epvt, evts[MAXEVTS];
stdin = freopen(NULL, "rb", stdin);
if (stdin == NULL) {
exit(1);
}
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path + 1, argv[1], sizeof(addr.sun_path) - 2); // abstract namespace socket
ctlsock = socket(AF_UNIX, SOCK_STREAM, 0);
status = bind(ctlsock, (const struct sockaddr*)&addr, sizeof(sa_family_t) + strlen(argv[1]) + 1); //sizeof(struct sockaddr_un));
if (status < 0) {
perror("Unable to open unix socket - ");
exit(1);
}
listen(ctlsock, 128);
poller = epoll_create(1);
memset(&epvt, 0, sizeof(struct epoll_event));
epvt.events = EPOLLIN;
epvt.data.fd = ctlsock;
if (epoll_ctl(poller, EPOLL_CTL_ADD, ctlsock, &epvt) < 0) {
perror("Unable to poll the socket");
exit(1);
}
// create a unix domain socket for accepting, each connection is only allowed to either read or write, not both
while (1) {
length = fread(&cmd, 4, 1, stdin);
if (length < 0)
continue;
length = cmd & 536870911;
cmd = cmd >> 29;
if (cmd == SETNODE) {
cmd = fread(currnode, 1, length, stdin);
currnode[length] = 0;
if (cmd < 0)
continue;
currvt = set_termentbyname(currnode);
} else if (cmd == WRITE) {
if (currvt == NULL)
currvt = set_termentbyname("");
cmd = fread(cmdbuf, 1, length, stdin);
cmdbuf[length] = 0;
if (cmd < 0)
continue;
tmt_write(currvt, cmdbuf, length);
} else if (cmd == READBUFF) {
cmd = fread(cmdbuf, 1, length, stdin);
cmdbuf[length] = 0;
if (cmd < 0)
continue;
outvt = get_termentbyname(cmdbuf);
if (outvt != NULL)
dump_vt(outvt);
length = write(1, "\x00", 1);
if (length < 0)
continue;
numevts = epoll_wait(poller, evts, MAXEVTS, -1);
if (numevts < 0) {
perror("Failed wait");
exit(1);
}
for (n = 0; n < numevts; ++n) {
if (evts[n].data.fd == ctlsock) {
currsock = accept(ctlsock, (struct sockaddr *) &addr, &addrlen);
len = sizeof(ucr);
getsockopt(currsock, SOL_SOCKET, SO_PEERCRED, &ucr, &len);
if (ucr.uid != getuid()) { // block access for other users
close(currsock);
continue;
}
memset(&epvt, 0, sizeof(struct epoll_event));
epvt.events = EPOLLIN;
epvt.data.fd = currsock;
epoll_ctl(poller, EPOLL_CTL_ADD, currsock, &epvt);
} else {
if (!handle_traffic(evts[n].data.fd)) {
epoll_ctl(poller, EPOLL_CTL_DEL, evts[n].data.fd, NULL);
close(evts[n].data.fd);
if (nodenames[evts[n].data.fd] != NULL) {
free(nodenames[evts[n].data.fd]);
nodenames[evts[n].data.fd] = NULL;
}
}
}
}
}
}