From: Maria Matejka Date: Fri, 11 Oct 2024 19:18:25 +0000 (+0200) Subject: Flock: now linker errors X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=58bd724167902d451d85882a6134d143fc065fc3;p=thirdparty%2Fbird.git Flock: now linker errors --- diff --git a/flock/Makefile b/flock/Makefile index a108abbb8..503df4da4 100644 --- a/flock/Makefile +++ b/flock/Makefile @@ -1,4 +1,4 @@ -src := container.c ctl.c flock.c hypervisor.c +src := container.c flock.c hypervisor.c obj := $(src-o-files) flock=$(exedir)/flock-sim diff --git a/flock/container.c b/flock/container.c index 476ef8ffa..bf45be895 100644 --- a/flock/container.c +++ b/flock/container.c @@ -21,7 +21,7 @@ static struct hypervisor_container_forker { sock *s; pool *p; - struct cbor_stream stream; + CBOR_STREAM_EMBED(stream, 4); struct birdloop *loop; HASH(struct container_runtime) hash; struct container_runtime *cur_crt; @@ -29,7 +29,7 @@ static struct hypervisor_container_forker { } hcf; struct container_fork_request { - struct cbor_channel cch; + CBOR_CHANNEL_EMBED(cch, 4); struct cbor_channel *ctl_ch; struct container_runtime *crt; int reply_state; @@ -40,7 +40,7 @@ struct container_runtime { uint hash; pid_t pid; sock *s; - struct cbor_stream stream; + CBOR_STREAM_EMBED(stream, 4); char hostname[]; }; @@ -51,22 +51,18 @@ struct container_runtime { #define CRT_EQ(a,h,b,i) ((h) == (i)) && (!strcmp(a,b)) #define CRT_FN(a,h) h -static sig_atomic_t poweroff, zombie; - static void container_poweroff_sighandler(int signo) { - poweroff = signo; + ev_send_loop(&main_birdloop, &poweroff_event); } static void container_child_sighandler(int signo UNUSED) { - zombie = 1; + ev_send_loop(&main_birdloop, &zombie_event); } -static int container_forker_fd = -1; - static void container_poweroff(int fd, int sig) { @@ -417,6 +413,18 @@ container_mainloop(int fd, struct flock_machine_container_config *ccf) container_init_logger(); /* Run worker threads */ + struct birdloop *loop = birdloop_new(&root_pool, DOMAIN_ORDER(control), 0, "Container control socket"); + birdloop_enter(loop); + sock *s = sk_new(birdloop_pool(loop)); + s->type = SK_MAGIC; + if (sk_open(s, loop) < 0) + bug("Container control socket open failed"); + + sk_set_rbsize(s, 128); + sk_set_tbsize(s, 128); + + s.type = SK_UNIX_MSG; + struct thread_config tc = {}; bird_thread_commit(&tc); @@ -527,11 +535,19 @@ container_mainloop(int fd, struct flock_machine_container_config *ccf) } } +struct container_fork_request_child { + CBOR_CHANNEL_EMBED(cch, 4); + struct flock_machine_container_config ccf; + u64 major_state; +}; + static uint container_counter = 0; static void -container_start(struct flock_machine_container_config *ccf) +container_start(struct container_fork_request_child *ccx) { + struct flock_machine_container_config *ccf = &ccx->ccf; + log(L_INFO "Requested to start a container, name %s, base %s, work %s", ccf->cf.name, ccf->basedir, ccf->workdir); @@ -604,41 +620,15 @@ container_start(struct flock_machine_container_config *ccf) close(fds[1]); - struct { - struct cbor_writer w; - struct cbor_writer_stack_item si[2]; - byte buf[128]; - } _cw; - - struct cbor_writer *cw = cbor_writer_init(&_cw.w, 2, _cw.buf, sizeof _cw.buf); - CBOR_PUT_MAP(cw) - { - cbor_put_int(cw, -2); - cbor_put_int(cw, pid); - } - - struct iovec v = { - .iov_base = cw->data.start, - .iov_len = cw->data.pos - cw->data.start, - }; - byte cbuf[CMSG_SPACE(sizeof fds[0])]; - struct msghdr m = { - .msg_iov = &v, - .msg_iovlen = 1, - .msg_control = &cbuf, - .msg_controllen = sizeof cbuf, - }; - struct cmsghdr *c = CMSG_FIRSTHDR(&m); - c->cmsg_level = SOL_SOCKET; - c->cmsg_type = SCM_RIGHTS; - c->cmsg_len = CMSG_LEN(sizeof fds[0]); - memcpy(CMSG_DATA(c), &fds[0], sizeof fds[0]); - log(L_INFO "Sending socket"); - e = sendmsg(container_forker_fd, &m, 0); - if (e < 0) - log(L_ERR "Failed to send socket: %m"); + ccx->cch.stream->s->txfd = fds[0]; + CBOR_REPLY(&ccx->cch, cw) + CBOR_PUT_MAP(cw) + { + cbor_put_int(cw, -2); + cbor_put_int(cw, pid); + } log(L_INFO "Socket sent"); exit(0); @@ -646,6 +636,25 @@ container_start(struct flock_machine_container_config *ccf) /* The Parent */ +static struct container_runtime * +container_find_by_name(const char *name) +{ + uint h = mem_hash(name, strlen(name)); + return HASH_FIND(hcf.hash, CRT, name, h); +} + +struct cbor_channel * +container_get_channel(const char *name) +{ + struct container_runtime *crt = container_find_by_name(name); + struct cbor_channel *cch = NULL; + if (crt) + BIRDLOOP_INSIDE(crt->stream.loop) + cch = cbor_channel_new(&crt->stream); + + return cch; +} + static void container_cleanup(struct container_runtime *crt) { @@ -655,12 +664,20 @@ container_cleanup(struct container_runtime *crt) } struct container_ctl_msg { - struct cbor_channel cch; + CBOR_CHANNEL_EMBED(cch, 4); struct cbor_channel *ctl_ch; int msg_state; int down_signal; }; +static void +container_ctl_cancel(struct cbor_stream *stream) +{ + SKIP_BACK_DECLARE(struct container_runtime, crt, stream, stream); + HASH_REMOVE(hcf.hash, CRT, crt); + mb_free(crt); +} + static enum cbor_parse_result container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res) { @@ -771,6 +788,10 @@ container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result re switch (cfr->reply_state) { case 3: cfr->reply_state = 4; + return CPR_MORE; + + case 4: + cfr->reply_state = 5; break; default: @@ -812,7 +833,7 @@ container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result re cbor_put_string(cw, "OK"); } - cbor_done_channel(&cfr->cch); + cbor_channel_done(&cfr->cch); return CPR_BLOCK_END; #undef FAIL @@ -826,7 +847,7 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai #define FAIL(id, msg) do { \ CBOR_REPLY(cch, cw) CBOR_PUT_MAP(cw) { \ cbor_put_int(cw, id); cbor_put_string(cw, msg);\ - } cbor_done_channel(cch); \ + } cbor_channel_done(cch); \ birdloop_leave(hcf.loop); \ return; } while (0) @@ -860,7 +881,9 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai cfr->cch.parse = container_fork_request_reply; crt->stream.parse = container_ctl_parse; - cbor_stream_init(&crt->stream, 2); + crt->stream.cancel = container_ctl_cancel; + + CBOR_STREAM_INIT(crt, stream, cch, hcf.p, struct container_ctl_msg); CBOR_REPLY(&cfr->cch, cw) CBOR_PUT_MAP(cw) @@ -941,7 +964,7 @@ container_stopped(struct cbor_channel *cch, enum cbor_parse_result res) cbor_put_string(cw, "OK"); } - cbor_done_channel(&ccc->cch); + cbor_channel_done(&ccc->cch); return CPR_BLOCK_END; #undef FAIL } @@ -964,7 +987,7 @@ hypervisor_container_shutdown(struct cbor_channel *cch, struct flock_machine_con cbor_put_string(cw, "BAD: Not found"); } - cbor_done_channel(cch); + cbor_channel_done(cch); birdloop_leave(hcf.loop); return; } @@ -983,35 +1006,21 @@ hypervisor_container_shutdown(struct cbor_channel *cch, struct flock_machine_con birdloop_leave(hcf.loop); } -struct ccs_parser_context { - struct cbor_parser_context *ctx; - - u64 bytes_consumed; - u64 major_state; -}; - #undef CBOR_PARSER_ERROR #define CBOR_PARSER_ERROR bug -static struct ccs_parser_context ccx_, *ccx = &ccx_; - -static void -hcf_parse(byte *buf, int size) +static enum cbor_parse_result +hcf_parse(struct cbor_channel *cch, enum cbor_parse_result res) { - ASSERT_DIE(size > 0); - struct cbor_parser_context *ctx = ccx->ctx; - - static struct flock_machine_container_config ccf; + SKIP_BACK_DECLARE(struct container_fork_request_child, ccx, cch, cch); + struct cbor_parser_context *ctx = &cch->stream->parser; + struct flock_machine_container_config *ccf = &ccx->ccf; - for (int pos = 0; pos < size; pos++) + switch (res) { - switch (cbor_parse_byte(ctx, buf[pos])) - { case CPR_ERROR: - bug("CBOR parser failure: %s", ctx->error); - case CPR_MORE: - continue; + CBOR_PARSER_ERROR("Invalid input"); case CPR_MAJOR: /* Check type acceptance */ @@ -1021,8 +1030,6 @@ hcf_parse(byte *buf, int size) if (ctx->type != 5) CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type); - ccf = (struct flock_machine_container_config) {}; - ccx->major_state = 1; break; @@ -1043,11 +1050,11 @@ hcf_parse(byte *buf, int size) if (ctx->tflags & CPT_VARLEN) CBOR_PARSER_ERROR("Variable length string not supported yet"); - if (ccf.cf.name) + if (ccf->cf.name) CBOR_PARSER_ERROR("Duplicate argument 0 / hostname"); ASSERT_DIE(!ctx->target_buf); - ccf.cf.name = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1); + ccf->cf.name = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1); ctx->target_len = ctx->value; break; @@ -1058,11 +1065,11 @@ hcf_parse(byte *buf, int size) if (ctx->tflags & CPT_VARLEN) CBOR_PARSER_ERROR("Variable length string not supported yet"); - if (ccf.workdir) + if (ccf->workdir) CBOR_PARSER_ERROR("Duplicate argument 1 / basedir"); ASSERT_DIE(!ctx->target_buf); - ccf.basedir = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1); + ccf->basedir = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1); ctx->target_len = ctx->value; break; @@ -1073,11 +1080,11 @@ hcf_parse(byte *buf, int size) if (ctx->tflags & CPT_VARLEN) CBOR_PARSER_ERROR("Variable length string not supported yet"); - if (ccf.workdir) + if (ccf->workdir) CBOR_PARSER_ERROR("Duplicate argument 2 / workdir"); ASSERT_DIE(!ctx->target_buf); - ccf.workdir = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1); + ccf->workdir = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1); ctx->target_len = ctx->value; break; @@ -1102,37 +1109,23 @@ hcf_parse(byte *buf, int size) } break; - case CPR_BLOCK_END: - bug("invalid parser state"); - } - - /* End of array or map */ - while (cbor_parse_block_end(ctx)) - { + case CPR_BLOCK_END: switch (ccx->major_state) { - /* Code to run at the end of the mapping */ - case 0: /* toplevel item ended */ - /* Reinit the parser */ - ccx->major_state = 0; - ccx->bytes_consumed = 0; - cbor_parser_reset(ccx->ctx); - - if (size > pos + 1) - hcf_parse(buf + pos + 1, size - pos - 1); - return; + case 0: + return CPR_BLOCK_END; case 1: /* the mapping ended */ - if (!ccf.cf.name) + if (!ccf->cf.name) CBOR_PARSER_ERROR("Missing hostname"); - if (!ccf.workdir) + if (!ccf->workdir) CBOR_PARSER_ERROR("Missing workdir"); - if (!ccf.basedir) + if (!ccf->basedir) CBOR_PARSER_ERROR("Missing basedir"); - container_start(&ccf); + container_start(ccx); ccx->major_state = 0; break; @@ -1140,10 +1133,15 @@ hcf_parse(byte *buf, int size) default: bug("Unexpected state to end a mapping in"); } - } } - ccx->bytes_consumed += size; + return CPR_MORE; +} + +static void +hcf_cancel(struct cbor_stream *stream UNUSED) +{ + bug("Forker child stream cancelled"); } void @@ -1169,6 +1167,7 @@ hypervisor_container_fork(void) hcf.p = rp_new(birdloop_pool(hcf.loop), birdloop_domain(hcf.loop), "Container forker pool"); hcf.s = sk_new(hcf.p); hcf.s->type = SK_MAGIC; + hcf.s->flags |= SKF_FD_RX; /* Set the hooks and fds according to the side we are at */ sk_set_tbsize(hcf.s, 16384); sk_set_rbsize(hcf.s, 128); @@ -1182,7 +1181,7 @@ hypervisor_container_fork(void) hcf.s->type = SK_UNIX_MSG; hcf.stream.parse = container_fork_request_reply; - cbor_stream_init(&hcf.stream, 3); + CBOR_STREAM_INIT(&hcf, stream, cch, hcf.p, struct container_fork_request); cbor_stream_attach(&hcf.stream, hcf.s); birdloop_leave(hcf.loop); @@ -1192,19 +1191,25 @@ hypervisor_container_fork(void) /* noreturn child side */ close(fds[0]); hexp_cleanup_after_fork(); - container_forker_fd = fds[1]; - this_thread_id |= 0xf000; /* initialize the forker */ - ccx->ctx = cbor_parser_new(&root_pool, 2); + sock *sk = sk_new(&root_pool); + sk->type = SK_UNIX_MSG; + sk->fd = fds[1]; + sk->flags |= SKF_FD_TX; + sk->rbuf = sk->rpos = alloca(sk->rbsize = 4096); + sk->tbuf = sk->tpos = alloca(sk->tbsize = 64); + + CBOR_STREAM_EMBED(s, 4) stream; + CBOR_STREAM_INIT(&stream, s, cch, &root_pool, struct container_fork_request_child); + cbor_stream_attach(&stream.s, sk); + stream.s.parse = hcf_parse; + stream.s.cancel = hcf_cancel; while (true) { - byte buf[4096]; - - ssize_t rx = read(fds[1], buf, sizeof buf); - + ssize_t rx = read(fds[1], sk->rpos, sk->rbsize); times_update(); if (rx == 0) @@ -1216,6 +1221,8 @@ hypervisor_container_fork(void) if (rx < 0) bug("Container forker child: failed to read: %m"); - hcf_parse(buf, rx); + sk->rpos += rx; + if (sk->rx_hook(sk, rx)) + sk->rpos = sk->rbuf; } } diff --git a/flock/ctl.c b/flock/ctl.c deleted file mode 100644 index c5b4c801e..000000000 --- a/flock/ctl.c +++ /dev/null @@ -1,286 +0,0 @@ -#include "lib/birdlib.h" -#include "lib/cbor.h" -#include "lib/string.h" -#include "lib/io-loop.h" - -#include "flock/flock.h" - -#include - -/* - * Hand-written parser for a very simple CBOR protocol: - * - * - on toplevel always array of three elements: - * - the ID (u64) - * - the command saying what to expect in the third element - * - 0 with NULL (7-22) = shutdown the hypervisor - * - 1 with NULL = open a telnet listener - * - 2 with one string = create a machine of this name - * - 3 with array of strings = run the given command inside the hypervisor - */ - -struct hcs_parser_stream { - struct cbor_parser_context *ctx; - struct hcs_parser_channel *channel; - sock *sock; - - u64 bytes_consumed; - u64 major_state; - - struct cbor_stream stream; -}; - -struct hcs_parser_channel { - struct cbor_channel cch; - struct hcs_parser_stream *htx; - - enum { - HCS_CMD_SHUTDOWN = 1, - HCS_CMD_TELNET, - HCS_CMD_MACHINE_START, - HCS_CMD_MACHINE_STOP, - HCS_CMD__MAX, - } cmd; - - union flock_machine_config cfg; -}; - -static void -hcs_request_poweroff(struct hcs_parser_channel *hpc) -{ - log(L_INFO "Requested shutdown via CLI"); - ev_send_loop(&main_birdloop, &poweroff_event); - - CBOR_REPLY(&hpc->cch, cw) - CBOR_PUT_MAP(cw) - { - cbor_put_int(cw, -1); - cbor_put_string(cw, "OK"); - } - - cbor_done_channel(&hpc->cch); -} - -struct hcs_parser_stream * -hcs_parser_init(sock *s) -{ - struct cbor_parser_context *ctx = cbor_parser_new(s->pool, 4); - struct hcs_parser_stream *htx = mb_allocz(s->pool, sizeof *htx); - - htx->ctx = ctx; - htx->sock = s; - cbor_stream_init(&htx->stream, 3); - - return htx; -} - -#define CBOR_PARSER_ERROR(...) do { \ - log(L_ERR "Hypervisor ctl parse: " __VA_ARGS__); \ - return CPR_ERROR; \ -} while (0) - -enum cbor_parse_result -hcs_parse(struct cbor_channel *cch, enum cbor_parse_result res) -{ - SKIP_BACK_DECLARE(struct hcs_parser_channel, hpc, cch, cch); - SKIP_BACK_DECLARE(struct hcs_parser_stream, htx, stream, cch->stream); - struct cbor_parser_context *ctx = &htx->stream.parser; - - switch (res) - { - case CPR_MAJOR: - /* Check type acceptance */ - switch (htx->major_state) - { - case 0: /* Command */ - CBOR_PARSE_ONLY(ctx, POSINT, hpc->cmd); - if (hpc->cmd > HCS_CMD__MAX) - CBOR_PARSER_ERROR("Command key too high, got %lu", hpc->cmd); - - htx->major_state = hpc->cmd + 10; - return CPR_MORE; - - case HCS_CMD_SHUTDOWN + 10: /* shutdown command: expected null */ - if ((ctx->type != 7) || (ctx->value != 22)) - CBOR_PARSER_ERROR("Expected null, got %u-%u", ctx->type, ctx->value); - - hcs_request_poweroff(hpc); - htx->major_state = 3; - return CPR_MORE; - - case HCS_CMD_TELNET + 10: /* telnet listener open */ - if ((ctx->type == 7) && (ctx->value == 22)) - { - hexp_get_telnet(hpc); - htx->major_state = 3; - return CPR_MORE; - } - - else CBOR_PARSE_IF(ctx, TEXT, hpc->cfg.cf.name) - ; - else - CBOR_PARSER_ERROR("Expected null or string, got %s", cbor_type_str(ctx->type)); - return CPR_MORE; - - case HCS_CMD_MACHINE_START + 10: /* machine creation request */ - if (ctx->type != 5) - CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type); - - htx->major_state = 501; - return CPR_MORE; - - case HCS_CMD_MACHINE_STOP + 1: /* machine shutdown request */ - if (ctx->type != 5) - CBOR_PARSER_ERROR("Expecting mapping, got %u", ctx->type); - - htx->major_state = 601; - return CPR_MORE; - - case 7: /* process spawner */ - bug("process spawner not implemented"); - - case 501: /* machine creation argument */ - CBOR_PARSE_ONLY(ctx, POSINT, htx->major_state); - - if (ctx->value >= 5) - CBOR_PARSER_ERROR("Command key too high, got %lu", ctx->value); - - htx->major_state += 502; - return CPR_MORE; - - case 502: /* machine creation argument 0: name */ - CBOR_PARSE_ONLY(ctx, TEXT, hpc->cfg.cf.name); - return CPR_MORE; - - case 503: /* machine creation argument 1: type */ - CBOR_PARSE_ONLY(ctx, POSINT, hpc->cfg.cf.type); - - if ((ctx->value < 1) && (ctx->value > 1) ) - CBOR_PARSER_ERROR("Unexpected type, got %lu", ctx->value); - - htx->major_state = 501; - return CPR_MORE; - - case 504: /* machine creation argument 2: basedir */ - CBOR_PARSE_ONLY(ctx, BYTES, hpc->cfg.container.basedir); - return CPR_MORE; - - case 505: /* machine creation argument 3: workdir */ - CBOR_PARSE_ONLY(ctx, BYTES, hpc->cfg.container.workdir); - return CPR_MORE; - - case 601: /* machine shutdown argument */ - CBOR_PARSE_ONLY(ctx, POSINT, htx->major_state); - - if (ctx->value >= 5) - CBOR_PARSER_ERROR("Command key too high, got %lu", ctx->value); - - htx->major_state += 602; - return CPR_MORE; - - case 602: /* machine creation argument 0: name */ - CBOR_PARSE_ONLY(ctx, TEXT, hpc->cfg.cf.name); - return CPR_MORE; - - default: - bug("invalid parser state"); - } - break; - - case CPR_STR_END: - /* Bytes read completely! */ - switch (htx->major_state) - { - case HCS_CMD_TELNET + 10: - hexp_get_telnet(hpc); - break; - - case 502: - case 504: - case 505: - htx->major_state = 501; - return CPR_MORE; - - case 602: - htx->major_state = 601; - return CPR_MORE; - - default: - bug("Unexpected state to end a (byte)string in"); - /* Code to run at the end of a (byte)string */ - } - break; - - case CPR_BLOCK_END: - switch (htx->major_state) - { - /* Code to run at the end of the mapping */ - case 0: /* toplevel item ended */ - htx->major_state = ~0ULL; - return CPR_BLOCK_END; - - case 1: - htx->major_state = 0; - return CPR_MORE; - - case 5: - /* Finalize the command to exec in hypervisor */ - CBOR_PARSER_ERROR("NOT IMPLEMENTED YET"); - htx->major_state = 1; - return CPR_MORE; - - case 501: - switch (hpc->cfg.cf.type) - { - case 1: - hypervisor_container_start(&hpc->cch, &hpc->cfg.container); - break; - default: - CBOR_PARSER_ERROR("Unknown machine type: %d", hpc->cfg.cf.type); - } - htx->major_state = 1; - return CPR_MORE; - - case 601: - /* - if (!htx->cfg.cf.name) - CBOR_PARSER_ERROR("Machine name not specified"); - - hypervisor_container_shutdown(htx->sock, htx->cfg.cf.name); - */ - - hypervisor_container_shutdown(&hpc->cch, &hpc->cfg.container); - htx->major_state = 1; - return CPR_MORE; - - default: - bug("Unexpected state to end a mapping in"); - } - break; - - case CPR_ERROR: - case CPR_MORE: - CBOR_PARSER_ERROR("Invalid input"); - - } - - return CPR_MORE; -} - -bool -hcs_complete(struct hcs_parser_stream *htx) -{ - return htx->major_state == ~0ULL; -} - -const char * -hcs_error(struct hcs_parser_stream *htx) -{ - return htx->ctx->error; -} - -void -hcs_parser_cleanup(struct hcs_parser_stream *htx) -{ - cbor_parser_free(htx->ctx); -} diff --git a/flock/flock-cli b/flock/flock-cli index 67e2153b3..dc137dd18 100755 --- a/flock/flock-cli +++ b/flock/flock-cli @@ -35,6 +35,9 @@ class HypervisorStaleError(HandlerError): def __init__(self, *args, **kwargs): return super().__init__("Hypervisor stale", *args, **kwargs) +class GarbledReplyError(HandlerError): + pass + def connect(where: pathlib.Path): if not where.exists(): raise HypervisorNonexistentError() @@ -49,15 +52,20 @@ def connect(where: pathlib.Path): def ctl_path(name: str): return DEFAULT_RUN_PATH / f"{name}.ctl" -def msg(name: str, data: dict): +def msg(name: str, cmd: int, data): try: ctl = connect(ctl_path(name)) except HypervisorNonexistentError as e: e.add_note(f"Failed to send message {data} to {name}") raise e - ctl.sendall(cbor2.dumps(data)) - return cbor2.loads(ctl.recv(1024)) + ctl.sendall(cbor2.dumps([ 0, cmd, data])) + data = cbor2.loads(ctl.recv(1024)) + if len(data) != 2: + raise GarbledReplyError(f"Expected 2 array items, got {len(data)}") + if data[0] != 0: + raise GarbledReplyError(f"Expected zero ID, got {data[0]}") + return data[1] @handler def start(name: str): @@ -72,7 +80,7 @@ def start(name: str): @handler def stop(name: str): - for k,v in msg(name, { 0: None }).items(): + for k,v in msg(name, 1, None).items(): assert(k == -1) assert(v == "OK") @@ -86,28 +94,28 @@ def cleanup(name: str): @handler def telnet(name: str): - for k,v in msg(name, { 1: None}).items(): + for k,v in msg(name, 2, None).items(): assert(k == -2) os.execlp("telnet", "telnet", "localhost", str(v)) @handler def container_start(hypervisor: str, name: str): - for k,v in msg(hypervisor, { 3: { + for k,v in msg(hypervisor, 3, { 0: name, 1: 1, 2: b"/", 3: bytes(DEFAULT_RUN_PATH / hypervisor / name), - }}).items(): + }).items(): print(k,v) @handler def container_stop(hypervisor: str, name: str): - for k,v in msg(hypervisor, { 4: { 0: name, }}).items(): + for k,v in msg(hypervisor, 4, { 0: name, }).items(): print(k,v) @handler def container_telnet(hypervisor: str, name: str): - for k,v in msg(hypervisor, { 1: name}).items(): + for k,v in msg(hypervisor, 2, name).items(): assert(k == -2) os.execlp("telnet", "telnet", "localhost", str(v)) diff --git a/flock/flock.h b/flock/flock.h index 1182fc858..eabef9be9 100644 --- a/flock/flock.h +++ b/flock/flock.h @@ -52,7 +52,8 @@ union flock_machine_config { void hypervisor_container_start(struct cbor_channel *, struct flock_machine_container_config *); void hypervisor_container_shutdown(struct cbor_channel *, struct flock_machine_container_config *); -int container_ctl_fd(const char *name); + +struct cbor_channel *container_get_channel(const char *name); void hexp_cleanup_after_fork(void); diff --git a/flock/hypervisor.c b/flock/hypervisor.c index 1310d9747..5526ec90e 100644 --- a/flock/hypervisor.c +++ b/flock/hypervisor.c @@ -5,6 +5,7 @@ #include "lib/io-loop.h" #include "lib/resource.h" #include "lib/socket.h" +#include "lib/string.h" #include "flock/flock.h" @@ -20,53 +21,12 @@ static pool *hcs_pool; OBSREF(struct shutdown_placeholder) hcs_shutdown_placeholder; -static int -hcs_rx(sock *s, uint size) -{ - s64 sz = hcs_parse(s->data, s->rbuf, size); - if (sz < 0) - { - log(L_INFO "CLI parser error at position %ld: %s", -sz-1, hcs_error(s->data)); - sk_close(s); - return 0; /* Must return 0 when closed */ - } - - if (!hcs_complete(s->data)) - { - ASSERT_DIE(sz == size); - return 1; - } - - log(L_INFO "Parsed command."); - - /* TODO do something more */ - if (sz < size) - memmove(s->rbuf, s->rbuf + sz, size - sz); - if (!s->rx_hook) - return (sz == size); - - hcs_parser_cleanup(s->data); - s->data = hcs_parser_init(s); - - return (sz < size) ? hcs_rx(s, size - sz) : 1; -} - -static void -hcs_err(sock *s, int err) -{ - log(L_INFO "CLI dropped: %s", strerror(err)); - hcs_parser_cleanup(s->data); - sk_close(s); -} - static int hcs_connect(sock *s, uint size UNUSED) { log(L_INFO "CLI connected: %p", s); - s->rx_hook = hcs_rx; - s->err_hook = hcs_err; - s->data = hcs_parser_init(s); + hcs_parser_init(s); return 1; } @@ -127,8 +87,7 @@ static struct hypervisor_exposed { pool *p; sock *s; struct birdloop *loop; - const char *port_name; - sock *port_sreq; + struct hcs_parser_channel *hpc; } he; /** @@ -281,13 +240,19 @@ hypervisor_exposed_child_rx(sock *sk, uint size) sk->txfd = sfd; - linpool *lp = lp_new(sk->pool); - struct cbor_writer *cw = cbor_init(sk->tbuf, sk->tbsize, lp); - cbor_open_block_with_length(cw, 1); - cbor_add_int(cw, -2); - cbor_add_int(cw, r); + struct { + struct cbor_writer cw; + struct cbor_writer_stack_item si[2]; + } cw; - e = sk_send(sk, cw->pt); + cbor_writer_init(&cw.cw, 2, sk->tbuf, sk->tbsize); + CBOR_PUT_MAP(&cw.cw) + { + cbor_put_int(&cw.cw, -2); + cbor_put_int(&cw.cw, r); + } + + e = sk_send(sk, cw.cw.data.pos - cw.cw.data.start); if (e < 0) log(L_ERR "Failed to send socket: %m"); @@ -385,12 +350,25 @@ hexp_cleanup_after_fork(void) * Hypervisor's mapping between external ports and names */ +struct hcs_parser_channel { + CBOR_CHANNEL_EMBED(cch, 4); + struct hcs_parser_stream *htx; + + enum { + HCS_CMD_SHUTDOWN = 1, + HCS_CMD_TELNET, + HCS_CMD_MACHINE_START, + HCS_CMD_MACHINE_STOP, + HCS_CMD__MAX, + } cmd; + + union flock_machine_config cfg; +}; + static void hexp_sock_err(sock *s, int err UNUSED) { - ASSERT_DIE(s == he.port_sreq); - he.port_name = NULL; - he.port_sreq = NULL; + he.hpc = NULL; } void @@ -399,6 +377,7 @@ hexp_get_telnet(struct hcs_parser_channel *hpc) if (he.hpc) log(L_ERR "Multiple telnet requests not supported yet"); + log(L_INFO "Get telnet: %p name %s", hpc, hpc->cfg.cf.name); he.hpc = hpc; /* TODO: use channels here as well */ @@ -406,24 +385,24 @@ hexp_get_telnet(struct hcs_parser_channel *hpc) int e = write(he.s->fd, buf, sizeof buf); if (e != sizeof buf) bug("write error handling not implemented, got %d (%m)", e); - - s->err_paused = hexp_sock_err; - sk_pause_rx(s->loop, s); } -static void hexp_received_telnet(struct hexp_received_telnet *hrt) +static void hexp_received_telnet(void *_hrt) { - if (hrt->name[0]) + struct hexp_received_telnet *hrt = _hrt; + + if (he.hpc->cfg.cf.name) { /* Transferring the received listening socket to the container */ - struct cbor_channel *ccc = container_get_channel(hrt->name); + struct cbor_channel *ccc = container_get_channel(he.hpc->cfg.cf.name); - CBOR_REPLY(ccc, cw) - CBOR_PUT_MAP(cw) { - cbor_put_int(cw, -2); - cbor_put_null(cw); - ccc->stream->s->txfd = hrt->fd; - } + BIRDLOOP_INSIDE(ccc->stream->loop) + CBOR_REPLY(ccc, cw) + CBOR_PUT_MAP(cw) { + cbor_put_int(cw, -2); + cbor_put_null(cw); + ccc->stream->s->txfd = hrt->fd; + } close(hrt->fd); } @@ -440,24 +419,271 @@ static void hexp_received_telnet(struct hexp_received_telnet *hrt) bug("Telnet listener: sk_open failed"); } - if (s) + if (he.hpc) { - linpool *lp = lp_new(hcs_pool); - struct cbor_writer *cw = cbor_init(s->tbuf, s->tbsize, lp); - cbor_open_block_with_length(cw, 1); - cbor_add_int(cw, -2); - cbor_add_int(cw, hrt->port); - - sk_send(s, cw->pt); - sk_resume_rx(hcs_loop, s); - - hcs_parser_cleanup(s->data); - s->data = hcs_parser_init(s); + CBOR_REPLY(&he.hpc->cch, cw) + CBOR_PUT_MAP(cw) + { + cbor_put_int(cw, -2); + cbor_put_int(cw, hrt->port); + } - rfree(lp); + cbor_channel_done(&he.hpc->cch); } birdloop_enter(he.loop); mb_free(hrt); birdloop_leave(he.loop); } + +/* + * Hand-written parser for a very simple CBOR protocol: + * + * - on toplevel always array of three elements: + * - the ID (u64) + * - the command saying what to expect in the third element + * - 0 with NULL (7-22) = shutdown the hypervisor + * - 1 with NULL = open a telnet listener + * - 2 with one string = create a machine of this name + * - 3 with array of strings = run the given command inside the hypervisor + */ + +struct hcs_parser_stream { + struct cbor_parser_context *ctx; + struct hcs_parser_channel *channel; + sock *sock; + + u64 bytes_consumed; + u64 major_state; + + CBOR_STREAM_EMBED(stream, 4); +}; + +static void +hcs_request_poweroff(struct hcs_parser_channel *hpc) +{ + log(L_INFO "Requested shutdown via CLI"); + ev_send_loop(&main_birdloop, &poweroff_event); + + CBOR_REPLY(&hpc->cch, cw) + CBOR_PUT_MAP(cw) + { + cbor_put_int(cw, -1); + cbor_put_string(cw, "OK"); + } + + cbor_channel_done(&hpc->cch); +} + +struct hcs_parser_stream * +hcs_parser_init(sock *s) +{ + struct hcs_parser_stream *htx = mb_allocz(s->pool, sizeof *htx); + + CBOR_STREAM_INIT(htx, stream, cch, s->pool, struct hcs_parser_channel); + cbor_stream_attach(&htx->stream, s); + htx->stream.parse = hcs_parse; + htx->stream.cancel = hcs_parser_cleanup; + + return htx; +} + +#define CBOR_PARSER_ERROR(...) do { \ + log(L_ERR "Hypervisor ctl parse: " __VA_ARGS__); \ + return CPR_ERROR; \ +} while (0) + +enum cbor_parse_result +hcs_parse(struct cbor_channel *cch, enum cbor_parse_result res) +{ + SKIP_BACK_DECLARE(struct hcs_parser_channel, hpc, cch, cch); + SKIP_BACK_DECLARE(struct hcs_parser_stream, htx, stream, cch->stream); + struct cbor_parser_context *ctx = &htx->stream.parser; + + switch (res) + { + case CPR_MAJOR: + /* Check type acceptance */ + switch (htx->major_state) + { + case 0: /* Command */ + CBOR_PARSE_ONLY(ctx, POSINT, hpc->cmd); + if (hpc->cmd > HCS_CMD__MAX) + CBOR_PARSER_ERROR("Command key too high, got %lu", hpc->cmd); + + htx->major_state = hpc->cmd + 10; + return CPR_MORE; + + case HCS_CMD_SHUTDOWN + 10: /* shutdown command: expected null */ + if ((ctx->type != 7) || (ctx->value != 22)) + CBOR_PARSER_ERROR("Expected null, got %u-%u", ctx->type, ctx->value); + + hcs_request_poweroff(hpc); + htx->major_state = 3; + return CPR_MORE; + + case HCS_CMD_TELNET + 10: /* telnet listener open */ + if ((ctx->type == 7) && (ctx->value == 22)) + { + hexp_get_telnet(hpc); + htx->major_state = 3; + return CPR_MORE; + } + + else CBOR_PARSE_IF(ctx, TEXT, hpc->cfg.cf.name) + ; + else + CBOR_PARSER_ERROR("Expected null or string, got %s", cbor_type_str(ctx->type)); + return CPR_MORE; + + case HCS_CMD_MACHINE_START + 10: /* machine creation request */ + if (ctx->type != 5) + CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type); + + htx->major_state = 501; + return CPR_MORE; + + case HCS_CMD_MACHINE_STOP + 1: /* machine shutdown request */ + if (ctx->type != 5) + CBOR_PARSER_ERROR("Expecting mapping, got %u", ctx->type); + + htx->major_state = 601; + return CPR_MORE; + + case 7: /* process spawner */ + bug("process spawner not implemented"); + + case 501: /* machine creation argument */ + CBOR_PARSE_ONLY(ctx, POSINT, htx->major_state); + + if (ctx->value >= 5) + CBOR_PARSER_ERROR("Command key too high, got %lu", ctx->value); + + htx->major_state += 502; + return CPR_MORE; + + case 502: /* machine creation argument 0: name */ + CBOR_PARSE_ONLY(ctx, TEXT, hpc->cfg.cf.name); + return CPR_MORE; + + case 503: /* machine creation argument 1: type */ + CBOR_PARSE_ONLY(ctx, POSINT, hpc->cfg.cf.type); + + if ((ctx->value < 1) && (ctx->value > 1) ) + CBOR_PARSER_ERROR("Unexpected type, got %lu", ctx->value); + + htx->major_state = 501; + return CPR_MORE; + + case 504: /* machine creation argument 2: basedir */ + CBOR_PARSE_ONLY(ctx, BYTES, hpc->cfg.container.basedir); + return CPR_MORE; + + case 505: /* machine creation argument 3: workdir */ + CBOR_PARSE_ONLY(ctx, BYTES, hpc->cfg.container.workdir); + return CPR_MORE; + + case 601: /* machine shutdown argument */ + CBOR_PARSE_ONLY(ctx, POSINT, htx->major_state); + + if (ctx->value >= 5) + CBOR_PARSER_ERROR("Command key too high, got %lu", ctx->value); + + htx->major_state += 602; + return CPR_MORE; + + case 602: /* machine creation argument 0: name */ + CBOR_PARSE_ONLY(ctx, TEXT, hpc->cfg.cf.name); + return CPR_MORE; + + default: + bug("invalid parser state"); + } + break; + + case CPR_STR_END: + /* Bytes read completely! */ + switch (htx->major_state) + { + case HCS_CMD_TELNET + 10: + hexp_get_telnet(hpc); + htx->major_state = 3; + return CPR_MORE; + + case 502: + case 504: + case 505: + htx->major_state = 501; + return CPR_MORE; + + case 602: + htx->major_state = 601; + return CPR_MORE; + + default: + bug("Unexpected state to end a (byte)string in"); + /* Code to run at the end of a (byte)string */ + } + break; + + case CPR_BLOCK_END: + switch (htx->major_state) + { + /* Code to run at the end of the mapping */ + case 0: /* toplevel item ended */ + htx->major_state = ~0ULL; + return CPR_BLOCK_END; + + case 3: + htx->major_state = 0; + return CPR_MORE; + + case 501: + switch (hpc->cfg.cf.type) + { + case 1: + hypervisor_container_start(&hpc->cch, &hpc->cfg.container); + break; + default: + CBOR_PARSER_ERROR("Unknown machine type: %d", hpc->cfg.cf.type); + } + htx->major_state = 3; + return CPR_MORE; + + case 601: + hypervisor_container_shutdown(&hpc->cch, &hpc->cfg.container); + htx->major_state = 3; + return CPR_MORE; + + default: + bug("Unexpected state to end a mapping in"); + } + break; + + case CPR_ERROR: + case CPR_MORE: + CBOR_PARSER_ERROR("Invalid input"); + + } + + return CPR_MORE; +} + +bool +hcs_complete(struct hcs_parser_stream *htx) +{ + return htx->major_state == ~0ULL; +} + +const char * +hcs_error(struct hcs_parser_stream *htx) +{ + return htx->ctx->error; +} + +void +hcs_parser_cleanup(struct hcs_parser_stream *htx) +{ + log(L_INFO "hcs parser cleanup"); + cbor_parser_free(htx->ctx); +}