]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Lib: CBOR message streams and channels mq-splitlib 35/head
authorMaria Matejka <mq@ucw.cz>
Mon, 7 Oct 2024 07:45:16 +0000 (09:45 +0200)
committerMaria Matejka <mq@ucw.cz>
Sun, 23 Feb 2025 18:01:48 +0000 (19:01 +0100)
lib/birdlib.h
lib/cbor-parser.c
lib/cbor.c
lib/cbor.h

index eac0fc4d8c331ad15916b81bd98e7ff3aab994d4..f21a7a32dcfe8e1c59b628b342a4ce419c5773be 100644 (file)
@@ -283,7 +283,7 @@ asm(
 u32 random_u32(void);
 void random_init(void);
 void random_bytes(void *buf, size_t size);
-
+#define random_type(T) ({ T out; random_bytes(&out, sizeof out); out; })
 
 /* Hashing */
 
index caa3c728434afd5fad7335dea37f7d49a94b17c9..86959a947a7211205a9e3b444bcf1850f6a30373 100644 (file)
@@ -8,6 +8,30 @@
 
 #include "lib/birdlib.h"
 #include "lib/cbor.h"
+#include "lib/hash.h"
+
+/*
+ * Basic parser bits
+ */
+
+static void
+cbor_parser_init(struct cbor_parser_context *ctx, linpool *lp, uint max_depth)
+{
+  ctx->lp = lp;
+  ctx->flush = lp_save(lp);
+
+  ctx->stack_countdown[0] = 1;
+  ctx->stack_pos = 0;
+  ctx->stack_max = max_depth;
+
+  ctx->target_buf = NULL;
+  ctx->target_len = 0;
+
+  ctx->type = 0xff;
+
+  ctx->partial_state = CPE_TYPE;
+  ctx->partial_countdown = 0;
+}
 
 struct cbor_parser_context *
 cbor_parser_new(pool *p, uint stack_max_depth)
@@ -16,13 +40,7 @@ cbor_parser_new(pool *p, uint stack_max_depth)
   struct cbor_parser_context *ctx = lp_allocz(
       lp, sizeof *ctx + (stack_max_depth + 1) * sizeof ctx->stack_countdown[0]);
 
-  ctx->lp = lp;
-  ctx->flush = lp_save(lp);
-
-  ctx->type = 0xff;
-  ctx->stack_countdown[0] = 1;
-  ctx->stack_max = stack_max_depth;
-
+  cbor_parser_init(ctx, lp, stack_max_depth);
   return ctx;
 }
 
@@ -178,3 +196,248 @@ cbor_parse_block_end(struct cbor_parser_context *ctx)
 
   return true;
 }
+
+/*
+ * CBOR channel multiplexer
+ */
+
+#define CCH_EQ(a,b)    (a)->id == (b)->id
+#define CCH_FN(x)      (x)->idhash
+#define CCH_KEY(x)     (x)
+#define CCH_NEXT(x)    (x)->next_hash
+
+struct cbor_channel cbor_channel_parse_error;
+
+#define CSTR_PARSER_ERROR(...) do {            \
+  log(L_ERR __VA_ARGS__);                      \
+  sk_close(s);                                 \
+  return 0;                                    \
+} while (0)
+
+#define CCH_CALL_PARSER(cch, kind)  (                  \
+    cch->parse ? cch->parse(cch, kind) :               \
+    (ctx->stack_pos > 1) ? CPR_MORE : CPR_BLOCK_END    \
+    )
+
+#define CCH_PARSE(kind)  do {                          \
+  ASSERT_DIE(cch);                                     \
+  switch (CCH_CALL_PARSER(cch, kind)) {                        \
+    case CPR_MORE:     continue;                       \
+    case CPR_ERROR:    sk_close(s);                    \
+                       return 0;                       \
+    case CPR_BLOCK_END: stream->state = CSTR_FINISH;   \
+                       break;                          \
+    default: bug("Invalid return value from channel parser");  \
+  }} while(0)
+
+static int
+cbor_stream_rx(sock *s, uint sz)
+{
+  struct cbor_stream *stream = s->data;
+  struct cbor_parser_context *ctx = &stream->parser;
+  struct cbor_channel *cch = stream->cur_rx_channel;
+  u64 id;
+
+  for (uint pos = 0; pos < sz; pos++)
+  {
+    switch (cbor_parse_byte(ctx, s->rbuf[pos]))
+    {
+      case CPR_MORE:
+       continue;
+
+      case CPR_ERROR:
+       log(L_ERR "CBOR parser failure: %s", ctx->error);
+       sk_close(s);
+       return 0;
+
+      case CPR_MAJOR:
+       switch (stream->state)
+       {
+         case CSTR_INIT:
+           if (ctx->type != 4)
+             CSTR_PARSER_ERROR("Expected array, got %u", ctx->type);
+
+           if (ctx->value < 2)
+             CSTR_PARSER_ERROR("Expected array of length at least 2");
+
+           stream->state = CSTR_EXPECT_ID;
+           break;
+
+         case CSTR_EXPECT_ID:
+           CBOR_PARSE_ONLY(ctx, POSINT, id);
+           stream->state = CSTR_MSG;
+           stream->cur_rx_channel = cch = (
+               cbor_channel_find(stream, id) ?:
+               cbor_channel_create(stream, id)
+               );
+           break;
+
+         case CSTR_MSG:
+           CCH_PARSE(CPR_MAJOR);
+           break;
+
+         case CSTR_FINISH:
+         case CSTR_CLEANUP:
+           bug("Invalid stream pre-parser state");
+       }
+       break;
+
+      case CPR_STR_END:
+       ASSERT_DIE(stream->state == CSTR_MSG);
+       CCH_PARSE(CPR_STR_END);
+       break;
+
+      case CPR_BLOCK_END:
+       bug("Impossible value returned from cbor_parse_byte()");
+    }
+
+    while (cbor_parse_block_end(ctx))
+    {
+      switch (stream->state)
+      {
+       case CSTR_INIT:
+       case CSTR_EXPECT_ID:
+       case CSTR_CLEANUP:
+//       CSTR_PARSER_ERROR("Invalid stream pre-parser state");
+         bug("Invalid stream pre-parser state");
+
+       case CSTR_MSG:
+         CCH_PARSE(CPR_BLOCK_END);
+         break;
+
+       case CSTR_FINISH:
+         stream->state = CSTR_CLEANUP;
+         break;
+      }
+    }
+
+    if (stream->state == CSTR_CLEANUP)
+    {
+      if (ctx->partial_state != CPE_EXIT)
+       CSTR_PARSER_ERROR("Garbled end of message");
+
+      stream->cur_rx_channel = NULL;
+
+      if (!cch->parse)
+       cbor_channel_done(cch);
+
+      ctx->partial_state = CPE_TYPE;
+      stream->state = CSTR_INIT;
+
+      if (pos + 1 < sz)
+      {
+       memmove(s->rbuf, s->rbuf + pos + 1, sz - pos - 1);
+       s->rpos = s->rbuf + sz - pos - 1;
+      }
+
+      return 0;
+    }
+  }
+
+  return 1;
+}
+
+static void
+cbor_stream_err(sock *sk, int err)
+{
+  struct cbor_stream *stream = sk->data;
+  if (err)
+    log(L_INFO "CBOR stream %p error: %d (%M)", sk, err, err);
+  else
+    log(L_INFO "CBOR stream %p hangup", sk);
+
+  stream->cur_rx_channel = NULL;
+
+  HASH_WALK_DELSAFE(stream->channels, next_hash, cch)
+  {
+    cbor_channel_done(cch);
+  }
+  HASH_WALK_DELSAFE_END;
+
+  stream->cancel(stream);
+
+  sk_close(sk);
+}
+
+void
+cbor_stream_init(struct cbor_stream *stream, pool *p, uint parser_depth, uint writer_depth, uint channel_size)
+{
+  stream->p = rp_newf(p, p->domain, "Stream pool");
+  HASH_INIT(stream->channels, stream->p, 4);
+  stream->slab = sl_new(stream->p, channel_size);
+
+  random_bytes(&stream->hmul, sizeof stream->hmul);
+  stream->writer_depth = writer_depth;
+  stream->state = CSTR_INIT;
+
+  cbor_parser_init(&stream->parser, lp_new(p), parser_depth);
+}
+
+void
+cbor_stream_attach(struct cbor_stream *stream, sock *sk)
+{
+  sk->data = stream;
+  sk->rx_hook = cbor_stream_rx;
+  sk->err_hook = cbor_stream_err;
+
+  stream->s = sk;
+  stream->loop = sk->loop;
+}
+
+struct cbor_channel *
+cbor_channel_create(struct cbor_stream *stream, u64 id)
+{
+  struct cbor_channel *cch = sl_allocz(stream->slab);
+  *cch = (struct cbor_channel) {
+    .id = id,
+    .idhash = id * stream->hmul,
+    .p = rp_newf(stream->p, stream->p->domain, "Channel 0x%lx", id),
+    .stream = stream,
+    .parse = stream->parse,
+  };
+
+  log(L_TRACE "CBOR channel create in stream %p, id %lx", stream, id);
+  HASH_INSERT(stream->channels, CCH, cch);
+  return cch;
+}
+
+struct cbor_channel *
+cbor_channel_find(struct cbor_stream *stream, u64 id)
+{
+  struct cbor_channel cchloc;
+  cchloc.id = id;
+  cchloc.idhash = cchloc.id * stream->hmul;
+
+  return HASH_FIND(stream->channels, CCH, &cchloc);
+}
+
+struct cbor_channel *
+cbor_channel_new(struct cbor_stream *stream)
+{
+  u64 id;
+  while (cbor_channel_find(stream, id = random_type(u64)))
+    ;
+
+  return cbor_channel_create(stream, id);
+}
+
+void
+cbor_channel_done(struct cbor_channel *channel)
+{
+  struct cbor_stream *stream = channel->stream;
+  bool active = (stream->cur_rx_channel == channel);
+
+  log(L_TRACE "CBOR channel%s done in stream %p, id %lx",
+      active ? " (active)" : "", stream, channel->id);
+
+  if (active)
+  {
+    channel->parse = NULL;
+  }
+  else
+  {
+    HASH_REMOVE(stream->channels, CCH, channel);
+    rp_free(channel->p);
+    sl_free(channel);
+  }
+}
index 233758ec2e0cf6665085c0e7fef6c284687c69dd..4f21273fadefa66764a0c485794062cac8b4858f 100644 (file)
@@ -130,8 +130,13 @@ bool cbor_put_close(struct cbor_writer *w, u64 actual_size, bool strict)
 
   w->stack_pos--;
 
+  /* The open mark puts its item counter one level
+   * too deep; fixing this. */
+  items--;
+  w->stack[w->stack_pos].items++;
+
   /* Check the original head position */
-  ASSERT_DIE((head[0] & 0x1f) == 0x1f);
+  ASSERT_DIE((head[0] & 0x1f) == 0x1b);
   ASSERT_DIE(w->data.pos >= w->data.start + 9);
   switch (head[0] >> 5)
   {
@@ -194,6 +199,31 @@ bool cbor_put_close(struct cbor_writer *w, u64 actual_size, bool strict)
 /* Tags: TODO! */
 
 
+/* Writer contexts */
+struct cbor_writer *
+cbor_reply_init(struct cbor_channel *cch)
+{
+  ASSERT_DIE(cch->stream->s->tbsize > 16);
+  ASSERT_DIE(cch->stream->s->tbuf);
+  struct cbor_writer *cw = &cch->writer;
+  if (cch->stream->s->tbuf != cch->stream->s->tpos)
+    bug("Not implemented reply to not-fully-flushed buffer");
+
+  cbor_writer_init(cw, cch->stream->writer_depth, cch->stream->s->tbuf, cch->stream->s->tbsize);
+
+  ASSERT_DIE(cbor_open_array(cw));
+  ASSERT_DIE(cbor_put_posint(cw, cch->id));
+  return cw;
+}
+
+void
+cbor_reply_send(struct cbor_channel *cch, struct cbor_writer *cw)
+{
+  ASSERT_DIE(cw == &cch->writer);
+  ASSERT_DIE(cbor_close_array(cw));
+  sk_send(cch->stream->s, cw->data.pos - cw->data.start);
+}
+
 #if 0
 
 void cbor_epoch_time(struct cbor_writer *writer, int64_t time, int shift)
@@ -332,14 +362,14 @@ void write_item(struct cbor_writer *writer, uint8_t major, uint64_t num)
     return;
   }
   //log("write item major %i num %i writer->pt %i writer->capacity %i writer %i", major, num, writer->pt, writer->capacity, writer);
-  major += num;  // we can store the num as additional value 
+  major += num;  // we can store the num as additional value
   writer->cbor[writer->pt] = major;
   writer->pt++;
 }
 
 void cbor_write_item_with_constant_val_length_4(struct cbor_writer *writer, uint8_t major, uint64_t num)
 {
-// this is only for headers which should be constantly long. 
+// this is only for headers which should be constantly long.
   major = major<<5;
   check_memory(writer, 10);
   major += 0x1a; // reserving those bytes
index c3d3b32a4ff1b70d96573a87839787e92fe604ff..ed273f6af0656116f53b5330396b929ad746a976 100644 (file)
@@ -2,6 +2,8 @@
 #define CBOR_H
 
 #include "nest/bird.h"
+#include "lib/hash.h"
+#include "lib/socket.h"
 
 /**
  * CBOR Commonalities
@@ -167,9 +169,10 @@ void cbor_parser_reset(struct cbor_parser_context *ctx);
 
 enum cbor_parse_result {
   CPR_ERROR    = 0,
-  CPR_MORE     = 1,
-  CPR_MAJOR    = 2,
-  CPR_STR_END  = 3,
+  CPR_MORE,
+  CPR_MAJOR,
+  CPR_STR_END,
+  CPR_BLOCK_END,
 } cbor_parse_byte(struct cbor_parser_context *, const byte);
 bool cbor_parse_block_end(struct cbor_parser_context *);
 
@@ -187,4 +190,83 @@ bool cbor_parse_block_end(struct cbor_parser_context *);
 #define CBOR_STORE_TEXT CBOR_STORE_BYTES
 
 
+/*
+ * Message channels
+ */
+
+struct cbor_channel;
+typedef enum cbor_parse_result (*cbor_stream_parse_fn)(struct cbor_channel *, enum cbor_parse_result);
+
+struct cbor_stream {
+  HASH(struct cbor_channel) channels;
+  pool *p;
+  struct birdloop *loop;
+  slab *slab;
+  sock *s;
+  cbor_stream_parse_fn parse;
+  void (*cancel)(struct cbor_stream *);
+  struct cbor_channel *cur_rx_channel;
+  u64 hmul;
+  enum {
+    CSTR_INIT,
+    CSTR_EXPECT_ID,
+    CSTR_MSG,
+    CSTR_FINISH,
+    CSTR_CLEANUP,
+  } state;
+  uint writer_depth;
+  struct cbor_parser_context parser;
+};
+
+#define CBOR_STREAM_EMBED(name, N)  struct {   \
+  struct cbor_stream name;                     \
+  u64 _##name##_stack_countdown[N];            \
+}
+
+#define CBOR_STREAM_INIT(up, name, chname, p, T)               \
+  cbor_stream_init(&(up)->name, p,                             \
+      ARRAY_SIZE((up)->_##name##_stack_countdown),             \
+      ARRAY_SIZE(((T *) NULL)->_##chname##_writer_stack),      \
+      sizeof(T))
+
+/* Init and cleanup of CBOR stream */
+void cbor_stream_init(struct cbor_stream *stream, pool *p, uint parser_depth, uint writer_depth, uint channel_size);
+void cbor_stream_attach(struct cbor_stream *, sock *);
+void cbor_stream_cleanup(struct cbor_stream *);
+
+struct cbor_channel {
+  struct cbor_channel *next_hash;
+  struct cbor_stream *stream;
+  cbor_stream_parse_fn parse;
+  void (*cancel)(struct cbor_channel *);
+  pool *p;
+  u64 id;
+  u64 idhash;
+  struct cbor_writer writer;
+};
+
+#define CBOR_CHANNEL_EMBED(name, N)  struct {                  \
+  struct cbor_channel name;                                    \
+  struct cbor_writer_stack_item _##name##_writer_stack[N];     \
+}
+
+extern struct cbor_channel cbor_channel_parse_error;
+
+/* Locally define a new channel */
+struct cbor_channel *cbor_channel_new(struct cbor_stream *);
+
+/* Create a channel with a pre-determined ID.
+ * You have to check nonexistence manually. */
+struct cbor_channel *cbor_channel_create(struct cbor_stream *stream, u64 id);
+/* Find an existing channel */
+struct cbor_channel *cbor_channel_find(struct cbor_stream *, u64 id);
+
+/* Drop the channel */
+void cbor_channel_done(struct cbor_channel *);
+
+struct cbor_writer *cbor_reply_init(struct cbor_channel *);
+void cbor_reply_send(struct cbor_channel *, struct cbor_writer *);
+#define CBOR_REPLY(ch, cw) for (struct cbor_writer *cw = cbor_reply_init(ch); cw; cbor_reply_send(ch, cw), cw = NULL)
+
+
 #endif