#include "daemon/worker.h"
#include "daemon/tls.h"
#include "daemon/session.h"
+#include "lib/utils.h"
#define negotiate_bufsize(func, handle, bufsize_want) do { \
int bufsize = 0; (func)((handle), &bufsize); \
/* Set output streams */
FILE *out = stdout;
- uv_os_fd_t stream_fd = 0;
+ uv_os_fd_t stream_fd = -1;
struct args *args = the_args;
- if (uv_fileno((uv_handle_t *)stream, &stream_fd)) {
+ struct io_stream_data *data = (struct io_stream_data*) stream->data;
+ if (nread < 0 || uv_fileno((uv_handle_t *)stream, &stream_fd)) {
+ mp_delete(data->pool->ctx);
uv_close((uv_handle_t *)stream, (uv_close_cb) free);
free(commands);
return;
}
+ if (nread <= 0) {
+ free(commands);
+ return;
+ }
if (stream_fd != STDIN_FILENO) {
- if (nread < 0) { /* Close if disconnected */
- uv_close((uv_handle_t *)stream, (uv_close_cb) free);
- }
- if (nread <= 0) {
- free(commands);
- return;
- }
uv_os_fd_t dup_fd = dup(stream_fd);
if (dup_fd >= 0) {
out = fdopen(dup_fd, "w");
}
}
- char *cmd = NULL;
+ char *cmd, *cmd_next = NULL;
+ bool incomplete_cmd = false;
+
/* Execute */
if (stream && commands && nread > 0) {
+ if (commands[nread - 1] != '\n') {
+ incomplete_cmd = true;
+ }
/* Ensure commands is 0-terminated */
- if (commands[nread - 1] == '\n') {
- commands[nread - 1] = '\0';
- } else {
- if (nread >= buf->len) { /* only equality should be possible */
- char *newbuf = realloc(commands, nread + 1);
- if (!newbuf)
- goto finish;
- commands = newbuf;
- }
- commands[nread] = '\0';
+ if (nread >= buf->len) { /* only equality should be possible */
+ char *newbuf = realloc(commands, nread + 1);
+ if (!newbuf)
+ goto finish;
+ commands = newbuf;
}
+ commands[nread] = '\0';
const char *delim = args->quiet ? "" : "> ";
/* No command, just new line */
- if (nread == 1 && args->tty_binary_output == false && commands[nread-1] == '\0') {
+ if (nread == 1 && (data->mode == io_mode_text) == false && commands[nread-1] == '\0' && data->blen == 0) {
if (stream_fd != STDIN_FILENO) {
fprintf(out, "%s", delim);
}
}
}
+ char *boundary = "\n\0";
cmd = strtok(commands, "\n");
+ /* strtok skip '\n' but we need process alone '\n' too */
+ if (commands[0] == '\n') {
+ cmd_next = cmd;
+ cmd = boundary;
+ } else {
+ cmd_next = strtok(NULL, "\n");
+ }
+
+ char *pbuf = data->buf + data->blen;
while (cmd != NULL) {
+ /* Last command is incomplete - save it and execute later */
+ if (incomplete_cmd && cmd_next == NULL) {
+ pbuf = mp_append_string(data->pool->ctx, pbuf, cmd);
+ mp_append_char(data->pool->ctx, pbuf, '\0');
+ data->buf = mp_ptr(data->pool->ctx);
+ data->blen = data->blen + strlen(cmd);
+
+ cmd = cmd_next;
+ /* There is new incomplete command */
+ if (commands[nread - 1] == '\n')
+ incomplete_cmd = false;
+ cmd_next = strtok(NULL, "\n");
+ continue;
+ }
+
+ /* Process incomplete command from previously call */
+ if (data->blen > 0) {
+ if (commands[0] != '\n' && commands[0] != '\0') {
+ pbuf = mp_append_string(data->pool->ctx, pbuf, cmd);
+ mp_append_char(data->pool->ctx, pbuf, '\0');
+ data->buf = mp_ptr(data->pool->ctx);
+ cmd = data->buf;
+ } else {
+ cmd = data->buf;
+ }
+ data->blen = 0;
+ pbuf = data->buf;
+ }
+
/* Pseudo-command for switching to "binary output"; */
if (strcmp(cmd, "__binary") == 0) {
- stream->data = (void *)io_mode_binary;
- cmd = strtok(NULL, "\n");
+ data->mode = io_mode_binary;
+ cmd = cmd_next;
+ cmd_next = strtok(NULL, "\n");
continue;
}
}
/* Simpler output in binary mode */
- if (stream->data == (void *)io_mode_binary) {
+ if (data->mode == io_mode_binary) {
size_t len_s = strlen(message);
if (len_s > UINT32_MAX) {
- cmd = strtok(NULL, "\n");
+ cmd = cmd_next;
+ cmd_next = strtok(NULL, "\n");
continue;
}
uint32_t len_n = htonl(len_s);
fwrite(&len_n, sizeof(len_n), 1, out);
fwrite(message, len_s, 1, out);
lua_settop(L, 0);
- cmd = strtok(NULL, "\n");
+ cmd = cmd_next;
+ cmd_next = strtok(NULL, "\n");
continue;
}
/* Log to remote socket if connected */
fprintf(fp_out, "%s", delim);
}
lua_settop(L, 0);
- cmd = strtok(NULL, "\n");
+ cmd = cmd_next;
+ cmd_next = strtok(NULL, "\n");
}
}
finish:
buf->base = malloc(suggested);
}
+struct io_stream_data *io_tty_alloc_data() {
+ knot_mm_t _pool = {
+ .ctx = mp_new(4096),
+ .alloc = (knot_mm_alloc_t) mp_alloc,
+ };
+ knot_mm_t *pool = mm_alloc(&_pool, sizeof(*pool));
+ if (!pool) {
+ return NULL;
+ }
+ memcpy(pool, &_pool, sizeof(*pool));
+
+ struct io_stream_data *data = mm_alloc(pool, sizeof(struct io_stream_data));
+
+ data->buf = mp_start(pool->ctx, 512);
+ data->mode = io_mode_text;
+ data->blen = 0;
+ data->pool = pool;
+
+ return data;
+}
+
void io_tty_accept(uv_stream_t *master, int status)
{
+ struct io_stream_data *data = io_tty_alloc_data();
+ /* We can't use any allocations after mp_start() and it's easier anyway. */
uv_tcp_t *client = malloc(sizeof(*client));
+ client->data = data;
+
struct args *args = the_args;
- if (client) {
+ if (client && client->data) {
uv_tcp_init(master->loop, client);
if (uv_accept(master, (uv_stream_t *)client) != 0) {
- free(client);
+ mp_delete(data->pool->ctx);
return;
}
- client->data = (void *) io_mode_text;
uv_read_start((uv_stream_t *)client, io_tty_alloc, io_tty_process_input);
/* Write command line */
if (!args->quiet) {
}
/* Control sockets or TTY */
- uv_pipe_t pipe;
- uv_pipe_init(loop, &pipe, 0);
- pipe.data = args;
+ uv_pipe_t *pipe = malloc(sizeof(*pipe));
+ uv_pipe_init(loop, pipe, 0);
if (args->interactive) {
if (!args->quiet)
printf("[system] interactive mode\n> ");
- uv_pipe_open(&pipe, 0);
- uv_read_start((uv_stream_t*) &pipe, io_tty_alloc, io_tty_process_input);
- } else if (args->control_fd != -1 && uv_pipe_open(&pipe, args->control_fd) == 0) {
- uv_listen((uv_stream_t *) &pipe, 16, io_tty_accept);
+ pipe->data = io_tty_alloc_data();
+ uv_pipe_open(pipe, 0);
+ uv_read_start((uv_stream_t*)pipe, io_tty_alloc, io_tty_process_input);
+ } else if (args->control_fd != -1 && uv_pipe_open(pipe, args->control_fd) == 0) {
+ uv_listen((uv_stream_t *)pipe, 16, io_tty_accept);
}
/* Watch IPC pipes (or just assign them if leading the pgroup). */
if (!leader) {
#endif
/* Run event loop */
uv_run(loop, UV_RUN_DEFAULT);
- uv_close((uv_handle_t *)&pipe, NULL); /* Seems OK even on the stopped loop. */
+ /* Free pipe's data. Seems OK even on the stopped loop.
+ * In interactive case it may have been done in callbacks already (single leak). */
+ if (!args->interactive) {
+ uv_close((uv_handle_t *)pipe, NULL);
+ free(pipe);
+ }
return EXIT_SUCCESS;
}