]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Container streaming
authorMaria Matejka <mq@ucw.cz>
Thu, 10 Oct 2024 17:12:43 +0000 (19:12 +0200)
committerMaria Matejka <mq@ucw.cz>
Sun, 23 Feb 2025 18:07:35 +0000 (19:07 +0100)
flock/container.c

index 98f3db32c0f7c65455b76a9cfe8e669851d0d11a..568433ec4c5be19a059f3ec7d06967830640984b 100644 (file)
@@ -21,7 +21,7 @@
 static struct hypervisor_container_forker {
   sock *s;
   pool *p;
-  struct cbor_stream *stream;
+  struct cbor_stream stream;
   struct birdloop *loop;
   HASH(struct container_runtime) hash;
   struct container_runtime *cur_crt;
@@ -654,8 +654,9 @@ container_cleanup(struct container_runtime *crt)
   mb_free(crt);
 }
 
-struct container_ctl_channel {
+struct container_ctl_msg {
   struct cbor_channel cch;
+  struct cbor_channel *ctl_ch;
   int msg_state;
   int down_signal;
 };
@@ -663,7 +664,7 @@ struct container_ctl_channel {
 static enum cbor_parse_result
 container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res)
 {
-  SKIP_BACK_DECLARE(struct container_ctl_channel, ccc, cch, cch);
+  SKIP_BACK_DECLARE(struct container_ctl_msg, ccc, cch, cch);
   SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream);
   struct cbor_parser_context *ctx = &crt->stream.parser;
 
@@ -727,7 +728,7 @@ container_ctl_parse(struct cbor_channel *cch, enum cbor_parse_result res)
 static enum cbor_parse_result
 container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result res)
 {
-  ASSERT_DIE(cch->stream == hcf.stream);
+  ASSERT_DIE(cch->stream == &hcf.stream);
 
   SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cch);
   struct container_runtime *crt = cfr->crt;
@@ -817,59 +818,6 @@ container_fork_request_reply(struct cbor_channel *cch, enum cbor_parse_result re
 #undef FAIL
 }
 
-static void
-hypervisor_container_forker_err(sock *sk, int e UNUSED)
-{
-  sk_close(sk);
-}
-
-/* The child */
-
-#if 0
-static void
-crt_err(sock *s, int err UNUSED)
-{
-  struct container_runtime *crt = s->data;
-  s->data = crt->ccc->data;
-  callback_cancel(&crt->ccc->cb);
-  mb_free(crt->ccc);
-  crt->ccc = NULL;
-}
-
-int
-container_ctl_fd(const char *name)
-{
-  uint h = mem_hash(name, strlen(name));
-  struct container_runtime *crt = HASH_FIND(hcf.hash, CRT, name, h);
-  return (crt && crt->s) ? crt->s->fd : -1;
-}
-
-static void
-container_created(callback *cb)
-{
-  SKIP_BACK_DECLARE(struct container_operation_callback, ccc, cb, cb);
-
-  sock *s = ccc->s;
-  struct {
-    struct cbor_writer w;
-    struct cbor_writer_stack_item si[2];
-  } _cw;
-
-  struct cbor_writer *cw = cbor_writer_init(&_cw.w, 2, s->tbuf, s->tbsize);
-  CBOR_PUT_MAP(cw)
-  {
-    cbor_put_int(cw, -1);
-    cbor_put_string(cw, "OK");
-  }
-  sk_send(s, cw->data.pos - cw->data.start);
-
-  s->data = ccc->data;
-  sk_resume_rx(s->loop, s);
-
-  mb_free(ccc);
-}
-#endif
-
 void
 hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_container_config *ccf)
 {
@@ -902,7 +850,7 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai
 
   /* Create a new channel atop the forker stream */
   log(L_INFO "requesting machine creation, name %s", name);
-  SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(hcf.stream));
+  SKIP_BACK_DECLARE(struct container_fork_request, cfr, cch, cbor_channel_new(&hcf.stream));
   cfr->ctl_ch = cch;
   cfr->crt = crt;
   cfr->cch.parse = container_fork_request_reply;
@@ -924,30 +872,73 @@ hypervisor_container_start(struct cbor_channel *cch, struct flock_machine_contai
   birdloop_leave(hcf.loop);
 }
 
-static void
-container_stopped(callback *cb)
+static enum cbor_parse_result
+container_stopped(struct cbor_channel *cch, enum cbor_parse_result res)
 {
-  SKIP_BACK_DECLARE(struct container_operation_callback, ccc, cb, cb);
+  SKIP_BACK_DECLARE(struct container_ctl_msg, ccc, cch, cch);
+  SKIP_BACK_DECLARE(struct container_runtime, crt, stream, cch->stream);
+  struct cbor_parser_context *ctx = &crt->stream.parser;
 
-  sock *s = ccc->s;
-  struct {
-    struct cbor_writer w;
-    struct cbor_writer_stack_item si[2];
-  } _cw;
+#define FAIL(...) do { log(L_ERR "Container stopped parse: " __VA_ARGS__); return CPR_ERROR; } while (0)
 
-  struct cbor_writer *cw = cbor_writer_init(&_cw.w, 2, s->tbuf, s->tbsize);
-  CBOR_PUT_MAP(cw)
+  switch (res)
   {
-    cbor_put_int(cw, -1);
-    cbor_put_string(cw, "OK");
-  }
+    case CPR_MAJOR:
+      switch (ccc->msg_state)
+      {
+       case 0:
+         if ((ctx->type != CBOR_MAP) || (ctx->value != 1))
+           FAIL("Expected map of size 1, got %d-%d", ctx->type, ctx->value);
 
-  sk_send(s, cw->data.pos - cw->data.start);
+         ccc->msg_state = 1;
+         return CPR_MORE;
+
+       case 1:
+         if ((ctx->type != CBOR_NEGINT) || (ctx->value != 3))
+           FAIL("Expected key -4, got %d-%d", ctx->type, ctx->value);
+
+         ccc->msg_state = 2;
+         return CPR_MORE;
+
+       case 2:
+         CBOR_PARSE_ONLY(ctx, POSINT, ccc->down_signal);
+         ccc->msg_state = 3;
+         return CPR_MORE;
+
+       default:
+         FAIL("Input overflow to state %d", ccc->msg_state);
+      }
+      bug("Overrun switch");
 
-  s->data = ccc->data;
-  sk_resume_rx(s->loop, s);
+    case CPR_STR_END:
+      FAIL("Unexpected string end");
 
-  mb_free(ccc);
+    case CPR_BLOCK_END:
+      switch (ccc->msg_state) {
+       case 3:
+         ccc->msg_state = 4;
+         break;
+
+       default:
+         FAIL("Unexpected block end in state %d", ccc->msg_state);
+      }
+      break;
+
+    case CPR_ERROR:
+    case CPR_MORE:
+      FAIL("Invalid input");
+  }
+
+  CBOR_REPLY(ccc->ctl_ch, cw)
+    CBOR_PUT_MAP(cw)
+    {
+      cbor_put_int(cw, -1);
+      cbor_put_string(cw, "OK");
+    }
+
+  cbor_done_channel(&ccc->cch);
+  return CPR_BLOCK_END;
+#undef FAIL
 }
 
 void
@@ -973,22 +964,16 @@ hypervisor_container_shutdown(struct cbor_channel *cch, struct flock_machine_con
     return;
   }
 
-  struct cbor_channel *xch = cbor_channel_new(crt->stream);
-  CBOR_REPLY(xch, cw)
+  SKIP_BACK_DECLARE(struct container_ctl_msg, ccr, cch, cbor_channel_new(&crt->stream));
+  CBOR_REPLY(&ccr->cch, cw)
     CBOR_PUT_MAP(cw)
     {
       cbor_put_int(cw, 0);
       cbor_put_null(cw);
     }
 
-  struct container_operation_callback *ccc = mb_alloc(cch->pool, sizeof *ccc);
-  *ccc = (struct container_operation_callback) {
-    .cch = cch,
-    .ccf = ccf,
-    .cancel = container_operation_hangup,
-  };
-  callback_init(&ccc->cb, container_stopped, s->loop);
-  crt->ccc = ccc;
+  ccr->cch.parse = container_stopped;
+  ccr->ctl_ch = cch;
 
   birdloop_leave(hcf.loop);
 }
@@ -1000,6 +985,7 @@ struct ccs_parser_context {
   u64 major_state;
 };
 
+#undef CBOR_PARSER_ERROR
 #define CBOR_PARSER_ERROR bug
 
 static struct ccs_parser_context ccx_, *ccx = &ccx_;
@@ -1010,6 +996,8 @@ hcf_parse(byte *buf, int size)
   ASSERT_DIE(size > 0);
   struct cbor_parser_context *ctx = ccx->ctx;
 
+  static struct flock_machine_container_config ccf;
+
   for (int pos = 0; pos < size; pos++)
   {
     switch (cbor_parse_byte(ctx, buf[pos]))
@@ -1028,7 +1016,7 @@ hcf_parse(byte *buf, int size)
            if (ctx->type != 5)
              CBOR_PARSER_ERROR("Expected mapping, got %u", ctx->type);
 
-           ccf = (struct container_config) {};
+           ccf = (struct flock_machine_container_config) {};
 
            ccx->major_state = 1;
            break;
@@ -1050,11 +1038,11 @@ hcf_parse(byte *buf, int size)
            if (ctx->tflags & CPT_VARLEN)
              CBOR_PARSER_ERROR("Variable length string not supported yet");
 
-           if (ccf.hostname)
+           if (ccf.cf.name)
              CBOR_PARSER_ERROR("Duplicate argument 0 / hostname");
 
            ASSERT_DIE(!ctx->target_buf);
-           ccf.hostname = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
+           ccf.cf.name = ctx->target_buf = lp_alloc(ctx->lp, ctx->value + 1);
            ctx->target_len = ctx->value;
            break;
 
@@ -1109,6 +1097,8 @@ hcf_parse(byte *buf, int size)
        }
        break;
 
+      case CPR_BLOCK_END:
+       bug("invalid parser state");
     }
 
     /* End of array or map */
@@ -1128,7 +1118,7 @@ hcf_parse(byte *buf, int size)
          return;
 
        case 1: /* the mapping ended */
-         if (!ccf.hostname)
+         if (!ccf.cf.name)
            CBOR_PARSER_ERROR("Missing hostname");
 
          if (!ccf.workdir)
@@ -1137,7 +1127,7 @@ hcf_parse(byte *buf, int size)
          if (!ccf.basedir)
            CBOR_PARSER_ERROR("Missing basedir");
 
-         container_start();
+         container_start(&ccf);
 
          ccx->major_state = 0;
          break;
@@ -1175,8 +1165,6 @@ hypervisor_container_fork(void)
     hcf.s = sk_new(hcf.p);
     hcf.s->type = SK_MAGIC;
     /* Set the hooks and fds according to the side we are at */
-    hcf.s->rx_hook = hypervisor_container_forker_rx;
-    hcf.s->err_hook = hypervisor_container_forker_err;
     sk_set_tbsize(hcf.s, 16384);
     sk_set_rbsize(hcf.s, 128);
     hcf.s->fd = fds[0];
@@ -1188,6 +1176,9 @@ hypervisor_container_fork(void)
       bug("Container forker parent: sk_open failed");
 
     hcf.s->type = SK_UNIX_MSG;
+    hcf.stream.parse = container_fork_request_reply;
+    cbor_stream_init(&hcf.stream, 3);
+    cbor_stream_attach(&hcf.stream, hcf.s);
 
     birdloop_leave(hcf.loop);
     return;