struct cbor_channel cch;
struct cbor_channel *ctl_ch;
struct container_runtime *crt;
+ int reply_state;
};
struct container_runtime {
uint hash;
pid_t pid;
sock *s;
+ struct cbor_stream stream;
char hostname[];
};
-#define CRT_KEY(c) c->ccf.hostname, c->hash
+#define CBOR_PARSER_ERROR FAIL
+
+#define CRT_KEY(c) c->hostname, c->hash
#define CRT_NEXT(c) c->next
#define CRT_EQ(a,h,b,i) ((h) == (i)) && (!strcmp(a,b))
#define CRT_FN(a,h) h
mb_free(crt);
}
-static int
-hypervisor_container_rx(sock *sk, uint _sz UNUSED)
+struct container_ctl_channel {
+ struct cbor_channel cch;
+ int msg_state;
+ int down_signal;
+};
+
+static enum cbor_parse_result
+container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res)
{
- byte buf[128];
- ssize_t sz = read(sk->fd, buf, sizeof buf);
- if (sz < 0)
+ SKIP_BACK_DECLARE(struct container_ctl_channel, ccc, cch, cch);
+ SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream);
+ struct cbor_parser_context *ctx = &crt->stream.parser;
+
+#define FAIL(...) do { log(L_ERR "Container ctl parse: " __VA_ARGS__); return CPR_ERROR; } while (0)
+
+ switch (res)
{
- log(L_ERR "error reading data from %p (container_rx): %m", sk);
- sk_close(sk);
- return 0;
- }
+ case CPR_MAJOR:
+ switch (ccc->msg_state)
+ {
+ case 0:
+ if ((ctx->type != CBOR_MAP) || (ctx->value != 1))
+ FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value);
- struct container_runtime *crt = sk->data;
- ASSERT_DIE(crt->s == sk);
+ ccc->msg_state = 1;
+ return CPR_MORE;
- ASSERT_DIE(sz >= 3);
- ASSERT_DIE(buf[0] == 0xa1);
+ case 1:
+ if ((ctx->type != CBOR_NEGINT) || (ctx->value != 3))
+ FAIL("Expected key -4, got %d-%d", ctx->type, ctx->value);
- switch (buf[1]) {
- case 0x23:
- log(L_INFO "container %s ended by signal %d", crt->ccf.hostname, buf[2]);
- if (crt->ccc)
- callback_activate(&crt->ccc->cb);
- container_cleanup(crt);
- break;
+ ccc->msg_state = 2;
+ return CPR_MORE;
+
+ case 2:
+ CBOR_PARSE_ONLY(ctx, POSINT, ccc->down_signal);
+ ccc->msg_state = 3;
+ return CPR_MORE;
+
+ default:
+ FAIL("Input overflow to state %d", ccc->msg_state);
+ }
+ bug("Overrun switch");
+
+ case CPR_STR_END:
+ FAIL("Unexpected string end");
- default:
- log(L_ERR "container %s sent a weird message 0x%02x sz %d", crt->ccf.hostname, buf[1], sz);
+ case CPR_BLOCK_END:
+ switch (ccc->msg_state) {
+ case 3:
+ ccc->msg_state = 4;
+ break;
+
+ default:
+ FAIL("Unexpected block end in state %d", ccc->msg_state);
+ }
break;
+
+ case CPR_ERROR:
+ case CPR_MORE:
+ FAIL("Invalid input");
}
+
+ log(L_INFO "container %s ended by signal %d", crt->hostname, ccc->down_signal);
+ container_cleanup(crt);
- return 0;
+#undef FAIL
+ return CPR_BLOCK_END;
}
-static void
-hypervisor_container_err(sock *sk, int err)
+static enum cbor_parse_result
+container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result res)
{
- struct container_runtime *crt = sk->data;
- log(L_ERR "Container %s socket closed unexpectedly: %s", crt->ccf.hostname, strerror(err));
- container_cleanup(crt);
-}
+ ASSERT_DIE(cch->stream == hcf.stream);
-static int
-hypervisor_container_forker_rx(sock *sk, uint sz)
-{
- if ((sz < 3) || (sk->rxfd < 0))
+ SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cch);
+ struct container_runtime *crt = cfr->crt;
+ struct cbor_parser_context *ctx = &cch->stream->parser;
+
+#define FAIL(...) do { log(L_ERR "Container fork request reply: " __VA_ARGS__); return CPR_ERROR; } while (0)
+
+ switch (res)
{
- log(L_ERR "Container forker RX strange behavior, what the hell (sz %d, fd %d)", sz, sk->rxfd);
- sk_close(sk);
- ev_send_loop(&main_birdloop, &poweroff_event);
- return 0;
+ case CPR_MAJOR:
+ switch (cfr->reply_state) {
+ case 0:
+ if ((ctx->type != CBOR_MAP) || (ctx->value != 1))
+ FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value);
+
+ cfr->reply_state = 1;
+ return CPR_MORE;
+
+ case 1:
+ if ((ctx->type != CBOR_NEGINT) || (ctx->value != 1))
+ FAIL("Expected key -2, got %d-%d", ctx->type, ctx->value);
+
+ cfr->reply_state = 2;
+ return CPR_MORE;
+
+ case 2:
+ CBOR_PARSE_ONLY(ctx, POSINT, crt->pid);
+ cfr->reply_state = 3;
+ return CPR_MORE;
+
+ default:
+ FAIL("Input overflow to state %d", cfr->reply_state);
+ }
+ bug("Overrun switch");
+
+ case CPR_STR_END:
+ FAIL("Unexpected string end");
+
+ case CPR_BLOCK_END:
+ switch (cfr->reply_state) {
+ case 3:
+ cfr->reply_state = 4;
+ break;
+
+ default:
+ FAIL("Unexpected block end in state %d", cfr->reply_state);
+ }
+ break;
+
+ case CPR_ERROR:
+ case CPR_MORE:
+ FAIL("Invalid input");
}
+
+ log(L_INFO "Machine started with PID %d", crt->pid);
- ASSERT_DIE(sk->rbuf[0] == 0xa1);
- ASSERT_DIE(sk->rbuf[1] == 0x21);
- pid_t pid;
- if (sk->rbuf[2] < 0x18)
- pid = sk->rbuf[2];
- else if (sk->rbuf[2] == 24)
- pid = sk->rbuf[3];
- else if (sk->rbuf[2] == 25)
- pid = sk->rbuf[3] << 8 + sk->rbuf[4];
- else if (sk->rbuf[3] == 26)
- pid = sk->rbuf[3] << 32 + sk->rbuf[4] << 24 + sk->rbuf[5] << 16 + sk->rbuf[6];
- else
- bug("not implemented");
+ if (hcf.s->rxfd < 0)
+ FAIL("No control socket of the new machine");
- log(L_INFO "Machine started with PID %d", pid);
+ ASSERT_DIE(birdloop_inside(hcf.loop));
- sock *skl = sk_new(sk->pool);
+ sock *skl = sk_new(hcf.p);
skl->type = SK_MAGIC;
- skl->rx_hook = hypervisor_container_rx;
- skl->err_hook = hypervisor_container_err;
- skl->fd = sk->rxfd;
- sk_set_tbsize(skl, 1024);
- if (sk_open(skl, sk->loop) < 0)
+ skl->fd = hcf.s->rxfd;
+ hcf.s->rxfd = -1;
+
+ if (sk_open(skl, hcf.loop) < 0)
bug("Machine control socket: sk_open failed");
- ASSERT_DIE(birdloop_inside(hcf.loop));
+ sk_set_tbsize(skl, 1024);
+ skl->type = SK_UNIX_MSG;
- ASSERT_DIE(hcf.cur_crt);
- skl->data = hcf.cur_crt;
+ crt->s = skl;
+ cbor_stream_attach(&crt->stream, skl);
- hcf.cur_crt->pid = pid;
- hcf.cur_crt->s = skl;
- if (hcf.cur_crt->ccc)
- callback_activate(&hcf.cur_crt->ccc->cb);
- hcf.cur_crt->ccc = NULL;
- hcf.cur_crt = NULL;
+ CBOR_REPLY(cfr->ctl_ch, cw)
+ CBOR_PUT_MAP(cw)
+ {
+ cbor_put_int(cw, -1);
+ cbor_put_string(cw, "OK");
+ }
- return 0;
+ cbor_done_channel(&cfr->cch);
+
+ return CPR_BLOCK_END;
+#undef FAIL
}
static void
HASH_INSERT(hcf.hash, CRT, crt);
+ /* Create a new channel atop the forker stream */
log(L_INFO "requesting machine creation, name %s", name);
SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(hcf.stream));
- cfr->crl_ch = cch;
+ cfr->ctl_ch = cch;
cfr->crt = crt;
cfr->cch.parse = container_fork_request_reply;
+ crt->stream.parse = container_ctl_parse;
+ cbor_stream_init(&crt->stream, 2);
+
CBOR_REPLY(&cfr->cch, cw)
CBOR_PUT_MAP(cw)
{