]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Flock: now linker errors
authorMaria Matejka <mq@ucw.cz>
Fri, 11 Oct 2024 19:18:25 +0000 (21:18 +0200)
committerMaria Matejka <mq@ucw.cz>
Sun, 23 Feb 2025 18:07:35 +0000 (19:07 +0100)
flock/Makefile
flock/container.c
flock/ctl.c [deleted file]
flock/flock-cli
flock/flock.h
flock/hypervisor.c

index a108abbb8032e86ab3cd7109ffad7af13c802586..503df4da4912d4a415c7b7af2d852b40f6d79483 100644 (file)
@@ -1,4 +1,4 @@
-src := container.c ctl.c flock.c hypervisor.c
+src := container.c flock.c hypervisor.c
 obj := $(src-o-files)
 
 flock=$(exedir)/flock-sim
index 476ef8ffa546b597d2401ec1e568c74c5958022e..bf45be8953606c3f46a20c746cbd9484a528b03a 100644 (file)
@@ -21,7 +21,7 @@
 static struct hypervisor_container_forker {
   sock *s;
   pool *p;
-  struct cbor_stream stream;
+  CBOR_STREAM_EMBED(stream, 4);
   struct birdloop *loop;
   HASH(struct container_runtime) hash;
   struct container_runtime *cur_crt;
@@ -29,7 +29,7 @@ static struct hypervisor_container_forker {
 } hcf;
 
 struct container_fork_request {
-  struct cbor_channel cch;
+  CBOR_CHANNEL_EMBED(cch, 4);
   struct cbor_channel *ctl_ch;
   struct container_runtime *crt;
   int reply_state;
@@ -40,7 +40,7 @@ struct container_runtime {
   uint hash;
   pid_t pid;
   sock *s;
-  struct cbor_stream stream;
+  CBOR_STREAM_EMBED(stream, 4);
   char hostname[];
 };
 
@@ -51,22 +51,18 @@ struct container_runtime {
 #define CRT_EQ(a,h,b,i)        ((h) == (i)) && (!strcmp(a,b))
 #define CRT_FN(a,h)    h
 
-static sig_atomic_t poweroff, zombie;
-
 static void
 container_poweroff_sighandler(int signo)
 {
-  poweroff = signo;
+  ev_send_loop(&main_birdloop, &poweroff_event);
 }
 
 static void
 container_child_sighandler(int signo UNUSED)
 {
-  zombie = 1;
+  ev_send_loop(&main_birdloop, &zombie_event);
 }
 
-static int container_forker_fd = -1;
-
 static void
 container_poweroff(int fd, int sig)
 {
@@ -417,6 +413,18 @@ container_mainloop(int fd, struct flock_machine_container_config *ccf)
   container_init_logger();
 
   /* Run worker threads */
+  struct birdloop *loop = birdloop_new(&root_pool, DOMAIN_ORDER(control), 0, "Container control socket");
+  birdloop_enter(loop);
+  sock *s = sk_new(birdloop_pool(loop));
+  s->type = SK_MAGIC;
+  if (sk_open(s, loop) < 0)
+    bug("Container control socket open failed");
+
+  sk_set_rbsize(s, 128);
+  sk_set_tbsize(s, 128);
+
+  s.type = SK_UNIX_MSG;
+
   struct thread_config tc = {};
   bird_thread_commit(&tc);
 
@@ -527,11 +535,19 @@ container_mainloop(int fd, struct flock_machine_container_config *ccf)
   }
 }
 
+struct container_fork_request_child {
+  CBOR_CHANNEL_EMBED(cch, 4);
+  struct flock_machine_container_config ccf;
+  u64 major_state;
+};
+
 static uint container_counter = 0;
 
 static void
-container_start(struct flock_machine_container_config *ccf)
+container_start(struct container_fork_request_child *ccx)
 {
+  struct flock_machine_container_config *ccf = &ccx->ccf;
+
   log(L_INFO "Requested to start a container, name %s, base %s, work %s",
       ccf->cf.name, ccf->basedir, ccf->workdir);
 
@@ -604,41 +620,15 @@ container_start(struct flock_machine_container_config *ccf)
 
   close(fds[1]);
 
-  struct {
-    struct cbor_writer w;
-    struct cbor_writer_stack_item si[2];
-    byte buf[128];
-  } _cw;
-
-  struct cbor_writer *cw = cbor_writer_init(&_cw.w, 2, _cw.buf, sizeof _cw.buf);
-  CBOR_PUT_MAP(cw)
-  {
-    cbor_put_int(cw, -2);
-    cbor_put_int(cw, pid);
-  }
-
-  struct iovec v = {
-    .iov_base = cw->data.start,
-    .iov_len = cw->data.pos - cw->data.start,
-  };
-  byte cbuf[CMSG_SPACE(sizeof fds[0])];
-  struct msghdr m = {
-    .msg_iov = &v,
-    .msg_iovlen = 1,
-    .msg_control = &cbuf,
-    .msg_controllen = sizeof cbuf,
-  };
-  struct cmsghdr *c = CMSG_FIRSTHDR(&m);
-  c->cmsg_level = SOL_SOCKET;
-  c->cmsg_type = SCM_RIGHTS;
-  c->cmsg_len = CMSG_LEN(sizeof fds[0]);
-  memcpy(CMSG_DATA(c), &fds[0], sizeof fds[0]);
-
   log(L_INFO "Sending socket");
 
-  e = sendmsg(container_forker_fd, &m, 0);
-  if (e < 0)
-    log(L_ERR "Failed to send socket: %m");
+  ccx->cch.stream->s->txfd = fds[0];
+  CBOR_REPLY(&ccx->cch, cw)
+    CBOR_PUT_MAP(cw)
+    {
+      cbor_put_int(cw, -2);
+      cbor_put_int(cw, pid);
+    }
 
   log(L_INFO "Socket sent");
   exit(0);
@@ -646,6 +636,25 @@ container_start(struct flock_machine_container_config *ccf)
 
 /* The Parent */
 
+static struct container_runtime *
+container_find_by_name(const char *name)
+{
+  uint h = mem_hash(name, strlen(name));
+  return HASH_FIND(hcf.hash, CRT, name, h);
+}
+
+struct cbor_channel *
+container_get_channel(const char *name)
+{
+  struct container_runtime *crt = container_find_by_name(name);
+  struct cbor_channel *cch = NULL;
+  if (crt)
+    BIRDLOOP_INSIDE(crt->stream.loop)
+      cch = cbor_channel_new(&crt->stream);
+
+  return cch;
+}
+
 static void
 container_cleanup(struct container_runtime *crt)
 {
@@ -655,12 +664,20 @@ container_cleanup(struct container_runtime *crt)
 }
 
 struct container_ctl_msg {
-  struct cbor_channel cch;
+  CBOR_CHANNEL_EMBED(cch, 4);
   struct cbor_channel *ctl_ch;
   int msg_state;
   int down_signal;
 };
 
+static void
+container_ctl_cancel(struct cbor_stream *stream)
+{
+  SKIP_BACK_DECLARE(struct container_runtime, crt, stream, stream);
+  HASH_REMOVE(hcf.hash, CRT, crt);
+  mb_free(crt);
+}
+
 static enum cbor_parse_result
 container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res)
 {
@@ -771,6 +788,10 @@ container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result re
       switch (cfr->reply_state) {
        case 3:
          cfr->reply_state = 4;
+         return CPR_MORE;
+
+       case 4:
+         cfr->reply_state = 5;
          break;
 
        default:
@@ -812,7 +833,7 @@ container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result re
       cbor_put_string(cw, "OK");
     }
 
-  cbor_done_channel(&cfr->cch);
+  cbor_channel_done(&cfr->cch);
 
   return CPR_BLOCK_END;
 #undef FAIL
@@ -826,7 +847,7 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai
 #define FAIL(id, msg) do { \
   CBOR_REPLY(cch, cw) CBOR_PUT_MAP(cw) { \
     cbor_put_int(cw, id); cbor_put_string(cw, msg);\
-  } cbor_done_channel(cch); \
+  } cbor_channel_done(cch); \
   birdloop_leave(hcf.loop); \
   return; } while (0)
 
@@ -860,7 +881,9 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai
   cfr->cch.parse = container_fork_request_reply;
 
   crt->stream.parse = container_ctl_parse;
-  cbor_stream_init(&crt->stream, 2);
+  crt->stream.cancel = container_ctl_cancel;
+
+  CBOR_STREAM_INIT(crt, stream, cch, hcf.p, struct container_ctl_msg);
 
   CBOR_REPLY(&cfr->cch, cw)
     CBOR_PUT_MAP(cw)
@@ -941,7 +964,7 @@ container_stopped(struct cbor_channel *cch, enum cbor_parse_result res)
       cbor_put_string(cw, "OK");
     }
 
-  cbor_done_channel(&ccc->cch);
+  cbor_channel_done(&ccc->cch);
   return CPR_BLOCK_END;
 #undef FAIL
 }
@@ -964,7 +987,7 @@ hypervisor_container_shutdown(struct cbor_channel *cch, struct flock_machine_con
        cbor_put_string(cw, "BAD: Not found");
       }
 
-    cbor_done_channel(cch);
+    cbor_channel_done(cch);
     birdloop_leave(hcf.loop);
     return;
   }
@@ -983,35 +1006,21 @@ hypervisor_container_shutdown(struct cbor_channel *cch, struct flock_machine_con
   birdloop_leave(hcf.loop);
 }
 
-struct ccs_parser_context {
-  struct cbor_parser_context *ctx;
-
-  u64 bytes_consumed;
-  u64 major_state;
-};
-
 #undef CBOR_PARSER_ERROR
 #define CBOR_PARSER_ERROR bug
 
-static struct ccs_parser_context ccx_, *ccx = &ccx_;
-
-static void
-hcf_parse(byte *buf, int size)
+static enum cbor_parse_result
+hcf_parse(struct cbor_channel *cch, enum cbor_parse_result res)
 {
-  ASSERT_DIE(size > 0);
-  struct cbor_parser_context *ctx = ccx->ctx;
-
-  static struct flock_machine_container_config ccf;
+  SKIP_BACK_DECLARE(struct container_fork_request_child, ccx, cch, cch);
+  struct cbor_parser_context *ctx = &cch->stream->parser;
+  struct flock_machine_container_config *ccf = &ccx->ccf;
 
-  for (int pos = 0; pos < size; pos++)
+  switch (res)
   {
-    switch (cbor_parse_byte(ctx, buf[pos]))
-    {
       case CPR_ERROR:
-       bug("CBOR parser failure: %s", ctx->error);
-
       case CPR_MORE:
-       continue;
+       CBOR_PARSER_ERROR("Invalid input");
 
       case CPR_MAJOR:
        /* Check type acceptance */
@@ -1021,8 +1030,6 @@ hcf_parse(byte *buf, int size)
            if (ctx->type != 5)
              CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type);
 
-           ccf = (struct flock_machine_container_config) {};
-
            ccx->major_state = 1;
            break;
 
@@ -1043,11 +1050,11 @@ hcf_parse(byte *buf, int size)
            if (ctx->tflags & CPT_VARLEN)
              CBOR_PARSER_ERROR("Variable length string not supported yet");
 
-           if (ccf.cf.name)
+           if (ccf->cf.name)
              CBOR_PARSER_ERROR("Duplicate argument 0 / hostname");
 
            ASSERT_DIE(!ctx->target_buf);
-           ccf.cf.name = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
+           ccf->cf.name = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
            ctx->target_len = ctx->value;
            break;
 
@@ -1058,11 +1065,11 @@ hcf_parse(byte *buf, int size)
            if (ctx->tflags & CPT_VARLEN)
              CBOR_PARSER_ERROR("Variable length string not supported yet");
 
-           if (ccf.workdir)
+           if (ccf->workdir)
              CBOR_PARSER_ERROR("Duplicate argument 1 / basedir");
 
            ASSERT_DIE(!ctx->target_buf);
-           ccf.basedir = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
+           ccf->basedir = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
            ctx->target_len = ctx->value;
            break;
 
@@ -1073,11 +1080,11 @@ hcf_parse(byte *buf, int size)
            if (ctx->tflags & CPT_VARLEN)
              CBOR_PARSER_ERROR("Variable length string not supported yet");
 
-           if (ccf.workdir)
+           if (ccf->workdir)
              CBOR_PARSER_ERROR("Duplicate argument 2 / workdir");
 
            ASSERT_DIE(!ctx->target_buf);
-           ccf.workdir = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
+           ccf->workdir = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
            ctx->target_len = ctx->value;
            break;
 
@@ -1102,37 +1109,23 @@ hcf_parse(byte *buf, int size)
        }
        break;
 
-      case CPR_BLOCK_END:
-       bug("invalid parser state");
-    }
-
-    /* End of array or map */
-    while (cbor_parse_block_end(ctx))
-    {
+    case CPR_BLOCK_END:
       switch (ccx->major_state)
       {
-       /* Code to run at the end of the mapping */
-       case 0: /* toplevel item ended */
-         /* Reinit the parser */
-         ccx->major_state = 0;
-         ccx->bytes_consumed = 0;
-         cbor_parser_reset(ccx->ctx);
-
-         if (size > pos + 1)
-           hcf_parse(buf + pos + 1, size - pos - 1);
-         return;
+       case 0:
+         return CPR_BLOCK_END;
 
        case 1: /* the mapping ended */
-         if (!ccf.cf.name)
+         if (!ccf->cf.name)
            CBOR_PARSER_ERROR("Missing hostname");
 
-         if (!ccf.workdir)
+         if (!ccf->workdir)
            CBOR_PARSER_ERROR("Missing workdir");
 
-         if (!ccf.basedir)
+         if (!ccf->basedir)
            CBOR_PARSER_ERROR("Missing basedir");
 
-         container_start(&ccf);
+         container_start(ccx);
 
          ccx->major_state = 0;
          break;
@@ -1140,10 +1133,15 @@ hcf_parse(byte *buf, int size)
        default:
          bug("Unexpected state to end a mapping in");
       }
-    }
   }
 
-  ccx->bytes_consumed += size;
+  return CPR_MORE;
+}
+
+static void
+hcf_cancel(struct cbor_stream *stream UNUSED)
+{
+  bug("Forker child stream cancelled");
 }
 
 void
@@ -1169,6 +1167,7 @@ hypervisor_container_fork(void)
     hcf.p = rp_new(birdloop_pool(hcf.loop), birdloop_domain(hcf.loop), "Container forker pool");
     hcf.s = sk_new(hcf.p);
     hcf.s->type = SK_MAGIC;
+    hcf.s->flags |= SKF_FD_RX;
     /* Set the hooks and fds according to the side we are at */
     sk_set_tbsize(hcf.s, 16384);
     sk_set_rbsize(hcf.s, 128);
@@ -1182,7 +1181,7 @@ hypervisor_container_fork(void)
 
     hcf.s->type = SK_UNIX_MSG;
     hcf.stream.parse = container_fork_request_reply;
-    cbor_stream_init(&hcf.stream, 3);
+    CBOR_STREAM_INIT(&hcf, stream, cch, hcf.p, struct container_fork_request);
     cbor_stream_attach(&hcf.stream, hcf.s);
 
     birdloop_leave(hcf.loop);
@@ -1192,19 +1191,25 @@ hypervisor_container_fork(void)
   /* noreturn child side */
   close(fds[0]);
   hexp_cleanup_after_fork();
-  container_forker_fd = fds[1];
-
   this_thread_id |= 0xf000;
 
   /* initialize the forker */
-  ccx->ctx = cbor_parser_new(&root_pool, 2);
+  sock *sk = sk_new(&root_pool);
+  sk->type = SK_UNIX_MSG;
+  sk->fd = fds[1];
+  sk->flags |= SKF_FD_TX;
+  sk->rbuf = sk->rpos = alloca(sk->rbsize = 4096);
+  sk->tbuf = sk->tpos = alloca(sk->tbsize = 64);
+
+  CBOR_STREAM_EMBED(s, 4) stream;
+  CBOR_STREAM_INIT(&stream, s, cch, &root_pool, struct container_fork_request_child);
+  cbor_stream_attach(&stream.s, sk);
+  stream.s.parse = hcf_parse;
+  stream.s.cancel = hcf_cancel;
 
   while (true)
   {
-    byte buf[4096];
-
-    ssize_t rx = read(fds[1], buf, sizeof buf);
-
+    ssize_t rx = read(fds[1], sk->rpos, sk->rbsize);
     times_update();
 
     if (rx == 0)
@@ -1216,6 +1221,8 @@ hypervisor_container_fork(void)
     if (rx < 0)
       bug("Container forker child: failed to read: %m");
 
-    hcf_parse(buf, rx);
+    sk->rpos += rx;
+    if (sk->rx_hook(sk, rx))
+      sk->rpos = sk->rbuf;
   }
 }
diff --git a/flock/ctl.c b/flock/ctl.c
deleted file mode 100644 (file)
index c5b4c80..0000000
+++ /dev/null
@@ -1,286 +0,0 @@
-#include "lib/birdlib.h"
-#include "lib/cbor.h"
-#include "lib/string.h"
-#include "lib/io-loop.h"
-
-#include "flock/flock.h"
-
-#include <stdlib.h>
-
-/*
- * Hand-written parser for a very simple CBOR protocol:
- *
- * - on toplevel always array of three elements:
- *   - the ID (u64)
- *   - the command saying what to expect in the third element
- *     - 0 with NULL (7-22) = shutdown the hypervisor
- *     - 1 with NULL = open a telnet listener
- *     - 2 with one string = create a machine of this name
- *     - 3 with array of strings = run the given command inside the hypervisor
- */
-
-struct hcs_parser_stream {
-  struct cbor_parser_context *ctx;
-  struct hcs_parser_channel *channel;
-  sock *sock;
-
-  u64 bytes_consumed;
-  u64 major_state;
-
-  struct cbor_stream stream;
-};
-
-struct hcs_parser_channel {
-  struct cbor_channel cch;
-  struct hcs_parser_stream *htx;
-
-  enum {
-    HCS_CMD_SHUTDOWN = 1,
-    HCS_CMD_TELNET,
-    HCS_CMD_MACHINE_START,
-    HCS_CMD_MACHINE_STOP,
-    HCS_CMD__MAX,
-  } cmd;
-
-  union flock_machine_config cfg;
-};
-
-static void
-hcs_request_poweroff(struct hcs_parser_channel *hpc)
-{
-  log(L_INFO "Requested shutdown via CLI");
-  ev_send_loop(&main_birdloop, &poweroff_event);
-
-  CBOR_REPLY(&hpc->cch, cw)
-    CBOR_PUT_MAP(cw)
-    {
-      cbor_put_int(cw, -1);
-      cbor_put_string(cw, "OK");
-    }
-
-  cbor_done_channel(&hpc->cch);
-}
-
-struct hcs_parser_stream *
-hcs_parser_init(sock *s)
-{
-  struct cbor_parser_context *ctx = cbor_parser_new(s->pool, 4);
-  struct hcs_parser_stream *htx = mb_allocz(s->pool, sizeof *htx);
-
-  htx->ctx = ctx;
-  htx->sock = s;
-  cbor_stream_init(&htx->stream, 3);
-
-  return htx;
-}
-
-#define CBOR_PARSER_ERROR(...) do {                    \
-  log(L_ERR "Hypervisor ctl parse: " __VA_ARGS__);     \
-  return CPR_ERROR;                                    \
-} while (0)
-
-enum cbor_parse_result
-hcs_parse(struct cbor_channel *cch, enum cbor_parse_result res)
-{
-  SKIP_BACK_DECLARE(struct hcs_parser_channel, hpc, cch, cch);
-  SKIP_BACK_DECLARE(struct hcs_parser_stream, htx, stream, cch->stream);
-  struct cbor_parser_context *ctx = &htx->stream.parser;
-
-  switch (res)
-  {
-      case CPR_MAJOR:
-       /* Check type acceptance */
-       switch (htx->major_state)
-       {
-         case 0: /* Command */
-           CBOR_PARSE_ONLY(ctx, POSINT, hpc->cmd);
-           if (hpc->cmd > HCS_CMD__MAX)
-             CBOR_PARSER_ERROR("Command key too high, got %lu", hpc->cmd);
-
-           htx->major_state = hpc->cmd + 10;
-           return CPR_MORE;
-
-         case HCS_CMD_SHUTDOWN + 10: /* shutdown command: expected null */
-           if ((ctx->type != 7) || (ctx->value != 22))
-             CBOR_PARSER_ERROR("Expected null, got %u-%u", ctx->type, ctx->value);
-
-           hcs_request_poweroff(hpc);
-           htx->major_state = 3;
-           return CPR_MORE;
-
-         case HCS_CMD_TELNET + 10: /* telnet listener open */
-           if ((ctx->type == 7) && (ctx->value == 22))
-           {
-             hexp_get_telnet(hpc);
-             htx->major_state = 3;
-             return CPR_MORE;
-           }
-
-           else CBOR_PARSE_IF(ctx, TEXT, hpc->cfg.cf.name)
-             ;
-           else
-             CBOR_PARSER_ERROR("Expected null or string, got %s", cbor_type_str(ctx->type));
-           return CPR_MORE;
-
-         case HCS_CMD_MACHINE_START + 10: /* machine creation request */
-           if (ctx->type != 5)
-             CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type);
-
-           htx->major_state = 501;
-           return CPR_MORE;
-
-         case HCS_CMD_MACHINE_STOP + 1: /* machine shutdown request */
-           if (ctx->type != 5)
-             CBOR_PARSER_ERROR("Expecting mapping, got %u", ctx->type);
-
-           htx->major_state = 601;
-           return CPR_MORE;
-
-         case 7: /* process spawner */
-           bug("process spawner not implemented");
-
-         case 501: /* machine creation argument */
-           CBOR_PARSE_ONLY(ctx, POSINT, htx->major_state);
-
-           if (ctx->value >= 5)
-             CBOR_PARSER_ERROR("Command key too high, got %lu", ctx->value);
-
-           htx->major_state += 502;
-           return CPR_MORE;
-
-         case 502: /* machine creation argument 0: name */
-           CBOR_PARSE_ONLY(ctx, TEXT, hpc->cfg.cf.name);
-           return CPR_MORE;
-
-         case 503: /* machine creation argument 1: type */
-           CBOR_PARSE_ONLY(ctx, POSINT, hpc->cfg.cf.type);
-
-           if ((ctx->value < 1) && (ctx->value > 1) )
-             CBOR_PARSER_ERROR("Unexpected type, got %lu", ctx->value);
-
-           htx->major_state = 501;
-           return CPR_MORE;
-
-         case 504: /* machine creation argument 2: basedir */
-           CBOR_PARSE_ONLY(ctx, BYTES, hpc->cfg.container.basedir);
-           return CPR_MORE;
-
-         case 505: /* machine creation argument 3: workdir */
-           CBOR_PARSE_ONLY(ctx, BYTES, hpc->cfg.container.workdir);
-           return CPR_MORE;
-
-         case 601: /* machine shutdown argument */
-           CBOR_PARSE_ONLY(ctx, POSINT, htx->major_state);
-
-           if (ctx->value >= 5)
-             CBOR_PARSER_ERROR("Command key too high, got %lu", ctx->value);
-
-           htx->major_state += 602;
-           return CPR_MORE;
-
-         case 602: /* machine creation argument 0: name */
-           CBOR_PARSE_ONLY(ctx, TEXT, hpc->cfg.cf.name);
-           return CPR_MORE;
-
-         default:
-           bug("invalid parser state");
-       }
-       break;
-
-      case CPR_STR_END:
-       /* Bytes read completely! */
-       switch (htx->major_state)
-       {
-         case HCS_CMD_TELNET + 10:
-           hexp_get_telnet(hpc);
-           break;
-
-         case 502:
-         case 504:
-         case 505:
-           htx->major_state = 501;
-           return CPR_MORE;
-
-         case 602:
-           htx->major_state = 601;
-           return CPR_MORE;
-
-         default:
-           bug("Unexpected state to end a (byte)string in");
-         /* Code to run at the end of a (byte)string */
-       }
-       break;
-
-    case CPR_BLOCK_END:
-      switch (htx->major_state)
-      {
-       /* Code to run at the end of the mapping */
-       case 0: /* toplevel item ended */
-         htx->major_state = ~0ULL;
-         return CPR_BLOCK_END;
-
-       case 1:
-         htx->major_state = 0;
-         return CPR_MORE;
-
-       case 5:
-         /* Finalize the command to exec in hypervisor */
-         CBOR_PARSER_ERROR("NOT IMPLEMENTED YET");
-         htx->major_state = 1;
-         return CPR_MORE;
-
-       case 501:
-         switch (hpc->cfg.cf.type)
-         {
-           case 1:
-             hypervisor_container_start(&hpc->cch, &hpc->cfg.container);
-             break;
-           default:
-             CBOR_PARSER_ERROR("Unknown machine type: %d", hpc->cfg.cf.type);
-         }
-         htx->major_state = 1;
-         return CPR_MORE;
-
-       case 601:
-         /*
-         if (!htx->cfg.cf.name)
-           CBOR_PARSER_ERROR("Machine name not specified");
-
-         hypervisor_container_shutdown(htx->sock, htx->cfg.cf.name);
-           */
-
-         hypervisor_container_shutdown(&hpc->cch, &hpc->cfg.container);
-         htx->major_state = 1;
-         return CPR_MORE;
-
-       default:
-         bug("Unexpected state to end a mapping in");
-      }
-      break;
-
-    case CPR_ERROR:
-    case CPR_MORE:
-      CBOR_PARSER_ERROR("Invalid input");
-
-  }
-
-  return CPR_MORE;
-}
-
-bool
-hcs_complete(struct hcs_parser_stream *htx)
-{
-  return htx->major_state == ~0ULL;
-}
-
-const char *
-hcs_error(struct hcs_parser_stream *htx)
-{
-  return htx->ctx->error;
-}
-
-void
-hcs_parser_cleanup(struct hcs_parser_stream *htx)
-{
-  cbor_parser_free(htx->ctx);
-}
index 67e2153b386db48d9056089e6bc4584c2925e06e..dc137dd18436a0111ea2546e8dfd83c2e2d543b1 100755 (executable)
@@ -35,6 +35,9 @@ class HypervisorStaleError(HandlerError):
     def __init__(self, *args, **kwargs):
         return super().__init__("Hypervisor stale", *args, **kwargs)
 
+class GarbledReplyError(HandlerError):
+    pass
+
 def connect(where: pathlib.Path):
     if not where.exists():
         raise HypervisorNonexistentError()
@@ -49,15 +52,20 @@ def connect(where: pathlib.Path):
 def ctl_path(name: str):
     return DEFAULT_RUN_PATH / f"{name}.ctl"
 
-def msg(name: str, data: dict):
+def msg(name: str, cmd: int, data):
     try:
         ctl = connect(ctl_path(name))
     except HypervisorNonexistentError as e:
         e.add_note(f"Failed to send message {data} to {name}")
         raise e
 
-    ctl.sendall(cbor2.dumps(data))
-    return cbor2.loads(ctl.recv(1024))
+    ctl.sendall(cbor2.dumps([ 0, cmd, data]))
+    data = cbor2.loads(ctl.recv(1024))
+    if len(data) != 2:
+        raise GarbledReplyError(f"Expected 2 array items, got {len(data)}")
+    if data[0] != 0:
+        raise GarbledReplyError(f"Expected zero ID, got {data[0]}")
+    return data[1]
 
 @handler
 def start(name: str):
@@ -72,7 +80,7 @@ def start(name: str):
 
 @handler
 def stop(name: str):
-    for k,v in msg(name, { 0: None }).items():
+    for k,v in msg(name, 1, None).items():
         assert(k == -1)
         assert(v == "OK")
 
@@ -86,28 +94,28 @@ def cleanup(name: str):
 
 @handler
 def telnet(name: str):
-    for k,v in msg(name, { 1: None}).items():
+    for k,v in msg(name, 2, None).items():
         assert(k == -2)
         os.execlp("telnet", "telnet", "localhost", str(v))
 
 @handler
 def container_start(hypervisor: str, name: str):
-    for k,v in msg(hypervisor, { 3: {
+    for k,v in msg(hypervisor, 3, {
         0: name,
         1: 1,
         2: b"/",
         3: bytes(DEFAULT_RUN_PATH / hypervisor / name),
-        }}).items():
+        }).items():
         print(k,v)
 
 @handler
 def container_stop(hypervisor: str, name: str):
-    for k,v in msg(hypervisor, { 4: { 0: name, }}).items():
+    for k,v in msg(hypervisor, 4, { 0: name, }).items():
         print(k,v)
 
 @handler
 def container_telnet(hypervisor: str, name: str):
-    for k,v in msg(hypervisor, { 1: name}).items():
+    for k,v in msg(hypervisor, 2, name).items():
         assert(k == -2)
         os.execlp("telnet", "telnet", "localhost", str(v))
 
index 1182fc858e0c0bf11d8a408064c0ccc72502124c..eabef9be9b8dff29979bf4f48da8f19fed6178c1 100644 (file)
@@ -52,7 +52,8 @@ union flock_machine_config {
 
 void hypervisor_container_start(struct cbor_channel *, struct flock_machine_container_config *);
 void hypervisor_container_shutdown(struct cbor_channel *, struct flock_machine_container_config *);
-int container_ctl_fd(const char *name);
+
+struct cbor_channel *container_get_channel(const char *name);
 
 void hexp_cleanup_after_fork(void);
 
index 1310d9747dfc6fb619e72899001265a1a90446f3..5526ec90e5ffbff6745ef7f0116d33491318a5c0 100644 (file)
@@ -5,6 +5,7 @@
 #include "lib/io-loop.h"
 #include "lib/resource.h"
 #include "lib/socket.h"
+#include "lib/string.h"
 
 #include "flock/flock.h"
 
@@ -20,53 +21,12 @@ static pool *hcs_pool;
 
 OBSREF(struct shutdown_placeholder) hcs_shutdown_placeholder;
 
-static int
-hcs_rx(sock *s, uint size)
-{
-  s64 sz = hcs_parse(s->data, s->rbuf, size);
-  if (sz < 0)
-  {
-    log(L_INFO "CLI parser error at position %ld: %s", -sz-1, hcs_error(s->data));
-    sk_close(s);
-    return 0; /* Must return 0 when closed */
-  }
-
-  if (!hcs_complete(s->data))
-  {
-    ASSERT_DIE(sz == size);
-    return 1;
-  }
-
-  log(L_INFO "Parsed command.");
-
-  /* TODO do something more */
-  if (sz < size)
-    memmove(s->rbuf, s->rbuf + sz, size - sz);
-  if (!s->rx_hook)
-    return (sz == size);
-
-  hcs_parser_cleanup(s->data);
-  s->data = hcs_parser_init(s);
-
-  return (sz < size) ? hcs_rx(s, size - sz) : 1;
-}
-
-static void
-hcs_err(sock *s, int err)
-{
-  log(L_INFO "CLI dropped: %s", strerror(err));
-  hcs_parser_cleanup(s->data);
-  sk_close(s);
-}
-
 static int
 hcs_connect(sock *s, uint size UNUSED)
 {
   log(L_INFO "CLI connected: %p", s);
 
-  s->rx_hook = hcs_rx;
-  s->err_hook = hcs_err;
-  s->data = hcs_parser_init(s);
+  hcs_parser_init(s);
   return 1;
 }
 
@@ -127,8 +87,7 @@ static struct hypervisor_exposed {
   pool *p;
   sock *s;
   struct birdloop *loop;
-  const char *port_name;
-  sock *port_sreq;
+  struct hcs_parser_channel *hpc;
 } he;
 
 /**
@@ -281,13 +240,19 @@ hypervisor_exposed_child_rx(sock *sk, uint size)
 
     sk->txfd = sfd;
 
-    linpool *lp = lp_new(sk->pool);
-    struct cbor_writer *cw = cbor_init(sk->tbuf, sk->tbsize, lp);
-    cbor_open_block_with_length(cw, 1);
-    cbor_add_int(cw, -2);
-    cbor_add_int(cw, r);
+    struct {
+      struct cbor_writer cw;
+      struct cbor_writer_stack_item si[2];
+    } cw;
 
-    e = sk_send(sk, cw->pt);
+    cbor_writer_init(&cw.cw, 2, sk->tbuf, sk->tbsize);
+    CBOR_PUT_MAP(&cw.cw)
+    {
+      cbor_put_int(&cw.cw, -2);
+      cbor_put_int(&cw.cw, r);
+    }
+
+    e = sk_send(sk, cw.cw.data.pos - cw.cw.data.start);
     if (e < 0)
       log(L_ERR "Failed to send socket: %m");
 
@@ -385,12 +350,25 @@ hexp_cleanup_after_fork(void)
  * Hypervisor's mapping between external ports and names
  */
 
+struct hcs_parser_channel {
+  CBOR_CHANNEL_EMBED(cch, 4);
+  struct hcs_parser_stream *htx;
+
+  enum {
+    HCS_CMD_SHUTDOWN = 1,
+    HCS_CMD_TELNET,
+    HCS_CMD_MACHINE_START,
+    HCS_CMD_MACHINE_STOP,
+    HCS_CMD__MAX,
+  } cmd;
+
+  union flock_machine_config cfg;
+};
+
 static void
 hexp_sock_err(sock *s, int err UNUSED)
 {
-  ASSERT_DIE(s == he.port_sreq);
-  he.port_name = NULL;
-  he.port_sreq = NULL;
+  he.hpc = NULL;
 }
 
 void
@@ -399,6 +377,7 @@ hexp_get_telnet(struct hcs_parser_channel *hpc)
   if (he.hpc)
     log(L_ERR "Multiple telnet requests not supported yet");
 
+  log(L_INFO "Get telnet: %p name %s", hpc, hpc->cfg.cf.name);
   he.hpc = hpc;
 
   /* TODO: use channels here as well */
@@ -406,24 +385,24 @@ hexp_get_telnet(struct hcs_parser_channel *hpc)
   int e = write(he.s->fd, buf, sizeof buf);
   if (e != sizeof buf)
     bug("write error handling not implemented, got %d (%m)", e);
-
-  s->err_paused = hexp_sock_err;
-  sk_pause_rx(s->loop, s);
 }
 
-static void hexp_received_telnet(struct hexp_received_telnet *hrt)
+static void hexp_received_telnet(void *_hrt)
 {
-  if (hrt->name[0])
+  struct hexp_received_telnet *hrt = _hrt;
+
+  if (he.hpc->cfg.cf.name)
   {
     /* Transferring the received listening socket to the container */
-    struct cbor_channel *ccc = container_get_channel(hrt->name);
+    struct cbor_channel *ccc = container_get_channel(he.hpc->cfg.cf.name);
 
-    CBOR_REPLY(ccc, cw)
-      CBOR_PUT_MAP(cw) {
-       cbor_put_int(cw, -2);
-       cbor_put_null(cw);
-       ccc->stream->s->txfd = hrt->fd;
-      }
+    BIRDLOOP_INSIDE(ccc->stream->loop)
+      CBOR_REPLY(ccc, cw)
+       CBOR_PUT_MAP(cw) {
+         cbor_put_int(cw, -2);
+         cbor_put_null(cw);
+         ccc->stream->s->txfd = hrt->fd;
+       }
 
     close(hrt->fd);
   }
@@ -440,24 +419,271 @@ static void hexp_received_telnet(struct hexp_received_telnet *hrt)
       bug("Telnet listener: sk_open failed");
   }
 
-  if (s)
+  if (he.hpc)
   {
-    linpool *lp = lp_new(hcs_pool);
-    struct cbor_writer *cw = cbor_init(s->tbuf, s->tbsize, lp);
-    cbor_open_block_with_length(cw, 1);
-    cbor_add_int(cw, -2);
-    cbor_add_int(cw, hrt->port);
-
-    sk_send(s, cw->pt);
-    sk_resume_rx(hcs_loop, s);
-
-    hcs_parser_cleanup(s->data);
-    s->data = hcs_parser_init(s);
+    CBOR_REPLY(&he.hpc->cch, cw)
+      CBOR_PUT_MAP(cw)
+      {
+       cbor_put_int(cw, -2);
+       cbor_put_int(cw, hrt->port);
+      }
 
-    rfree(lp);
+    cbor_channel_done(&he.hpc->cch);
   }
 
   birdloop_enter(he.loop);
   mb_free(hrt);
   birdloop_leave(he.loop);
 }
+
+/*
+ * Hand-written parser for a very simple CBOR protocol:
+ *
+ * - on toplevel always array of three elements:
+ *   - the ID (u64)
+ *   - the command saying what to expect in the third element
+ *     - 0 with NULL (7-22) = shutdown the hypervisor
+ *     - 1 with NULL = open a telnet listener
+ *     - 2 with one string = create a machine of this name
+ *     - 3 with array of strings = run the given command inside the hypervisor
+ */
+
+struct hcs_parser_stream {
+  struct cbor_parser_context *ctx;
+  struct hcs_parser_channel *channel;
+  sock *sock;
+
+  u64 bytes_consumed;
+  u64 major_state;
+
+  CBOR_STREAM_EMBED(stream, 4);
+};
+
+static void
+hcs_request_poweroff(struct hcs_parser_channel *hpc)
+{
+  log(L_INFO "Requested shutdown via CLI");
+  ev_send_loop(&main_birdloop, &poweroff_event);
+
+  CBOR_REPLY(&hpc->cch, cw)
+    CBOR_PUT_MAP(cw)
+    {
+      cbor_put_int(cw, -1);
+      cbor_put_string(cw, "OK");
+    }
+
+  cbor_channel_done(&hpc->cch);
+}
+
+struct hcs_parser_stream *
+hcs_parser_init(sock *s)
+{
+  struct hcs_parser_stream *htx = mb_allocz(s->pool, sizeof *htx);
+
+  CBOR_STREAM_INIT(htx, stream, cch, s->pool, struct hcs_parser_channel);
+  cbor_stream_attach(&htx->stream, s);
+  htx->stream.parse = hcs_parse;
+  htx->stream.cancel = hcs_parser_cleanup;
+
+  return htx;
+}
+
+#define CBOR_PARSER_ERROR(...) do {                    \
+  log(L_ERR "Hypervisor ctl parse: " __VA_ARGS__);     \
+  return CPR_ERROR;                                    \
+} while (0)
+
+enum cbor_parse_result
+hcs_parse(struct cbor_channel *cch, enum cbor_parse_result res)
+{
+  SKIP_BACK_DECLARE(struct hcs_parser_channel, hpc, cch, cch);
+  SKIP_BACK_DECLARE(struct hcs_parser_stream, htx, stream, cch->stream);
+  struct cbor_parser_context *ctx = &htx->stream.parser;
+
+  switch (res)
+  {
+      case CPR_MAJOR:
+       /* Check type acceptance */
+       switch (htx->major_state)
+       {
+         case 0: /* Command */
+           CBOR_PARSE_ONLY(ctx, POSINT, hpc->cmd);
+           if (hpc->cmd > HCS_CMD__MAX)
+             CBOR_PARSER_ERROR("Command key too high, got %lu", hpc->cmd);
+
+           htx->major_state = hpc->cmd + 10;
+           return CPR_MORE;
+
+         case HCS_CMD_SHUTDOWN + 10: /* shutdown command: expected null */
+           if ((ctx->type != 7) || (ctx->value != 22))
+             CBOR_PARSER_ERROR("Expected null, got %u-%u", ctx->type, ctx->value);
+
+           hcs_request_poweroff(hpc);
+           htx->major_state = 3;
+           return CPR_MORE;
+
+         case HCS_CMD_TELNET + 10: /* telnet listener open */
+           if ((ctx->type == 7) && (ctx->value == 22))
+           {
+             hexp_get_telnet(hpc);
+             htx->major_state = 3;
+             return CPR_MORE;
+           }
+
+           else CBOR_PARSE_IF(ctx, TEXT, hpc->cfg.cf.name)
+             ;
+           else
+             CBOR_PARSER_ERROR("Expected null or string, got %s", cbor_type_str(ctx->type));
+           return CPR_MORE;
+
+         case HCS_CMD_MACHINE_START + 10: /* machine creation request */
+           if (ctx->type != 5)
+             CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type);
+
+           htx->major_state = 501;
+           return CPR_MORE;
+
+         case HCS_CMD_MACHINE_STOP + 1: /* machine shutdown request */
+           if (ctx->type != 5)
+             CBOR_PARSER_ERROR("Expecting mapping, got %u", ctx->type);
+
+           htx->major_state = 601;
+           return CPR_MORE;
+
+         case 7: /* process spawner */
+           bug("process spawner not implemented");
+
+         case 501: /* machine creation argument */
+           CBOR_PARSE_ONLY(ctx, POSINT, htx->major_state);
+
+           if (ctx->value >= 5)
+             CBOR_PARSER_ERROR("Command key too high, got %lu", ctx->value);
+
+           htx->major_state += 502;
+           return CPR_MORE;
+
+         case 502: /* machine creation argument 0: name */
+           CBOR_PARSE_ONLY(ctx, TEXT, hpc->cfg.cf.name);
+           return CPR_MORE;
+
+         case 503: /* machine creation argument 1: type */
+           CBOR_PARSE_ONLY(ctx, POSINT, hpc->cfg.cf.type);
+
+           if ((ctx->value < 1) && (ctx->value > 1) )
+             CBOR_PARSER_ERROR("Unexpected type, got %lu", ctx->value);
+
+           htx->major_state = 501;
+           return CPR_MORE;
+
+         case 504: /* machine creation argument 2: basedir */
+           CBOR_PARSE_ONLY(ctx, BYTES, hpc->cfg.container.basedir);
+           return CPR_MORE;
+
+         case 505: /* machine creation argument 3: workdir */
+           CBOR_PARSE_ONLY(ctx, BYTES, hpc->cfg.container.workdir);
+           return CPR_MORE;
+
+         case 601: /* machine shutdown argument */
+           CBOR_PARSE_ONLY(ctx, POSINT, htx->major_state);
+
+           if (ctx->value >= 5)
+             CBOR_PARSER_ERROR("Command key too high, got %lu", ctx->value);
+
+           htx->major_state += 602;
+           return CPR_MORE;
+
+         case 602: /* machine creation argument 0: name */
+           CBOR_PARSE_ONLY(ctx, TEXT, hpc->cfg.cf.name);
+           return CPR_MORE;
+
+         default:
+           bug("invalid parser state");
+       }
+       break;
+
+      case CPR_STR_END:
+       /* Bytes read completely! */
+       switch (htx->major_state)
+       {
+         case HCS_CMD_TELNET + 10:
+           hexp_get_telnet(hpc);
+           htx->major_state = 3;
+           return CPR_MORE;
+
+         case 502:
+         case 504:
+         case 505:
+           htx->major_state = 501;
+           return CPR_MORE;
+
+         case 602:
+           htx->major_state = 601;
+           return CPR_MORE;
+
+         default:
+           bug("Unexpected state to end a (byte)string in");
+         /* Code to run at the end of a (byte)string */
+       }
+       break;
+
+    case CPR_BLOCK_END:
+      switch (htx->major_state)
+      {
+       /* Code to run at the end of the mapping */
+       case 0: /* toplevel item ended */
+         htx->major_state = ~0ULL;
+         return CPR_BLOCK_END;
+
+       case 3:
+         htx->major_state = 0;
+         return CPR_MORE;
+
+       case 501:
+         switch (hpc->cfg.cf.type)
+         {
+           case 1:
+             hypervisor_container_start(&hpc->cch, &hpc->cfg.container);
+             break;
+           default:
+             CBOR_PARSER_ERROR("Unknown machine type: %d", hpc->cfg.cf.type);
+         }
+         htx->major_state = 3;
+         return CPR_MORE;
+
+       case 601:
+         hypervisor_container_shutdown(&hpc->cch, &hpc->cfg.container);
+         htx->major_state = 3;
+         return CPR_MORE;
+
+       default:
+         bug("Unexpected state to end a mapping in");
+      }
+      break;
+
+    case CPR_ERROR:
+    case CPR_MORE:
+      CBOR_PARSER_ERROR("Invalid input");
+
+  }
+
+  return CPR_MORE;
+}
+
+bool
+hcs_complete(struct hcs_parser_stream *htx)
+{
+  return htx->major_state == ~0ULL;
+}
+
+const char *
+hcs_error(struct hcs_parser_stream *htx)
+{
+  return htx->ctx->error;
+}
+
+void
+hcs_parser_cleanup(struct hcs_parser_stream *htx)
+{
+  log(L_INFO "hcs parser cleanup");
+  cbor_parser_free(htx->ctx);
+}