From 39bf446319d5b91e395aa8afb23c6874acda6675 Mon Sep 17 00:00:00 2001 From: Maria Matejka Date: Mon, 7 Oct 2024 09:45:16 +0200 Subject: [PATCH] Lib: CBOR message streams and channels --- lib/birdlib.h | 2 +- lib/cbor-parser.c | 277 ++++++++++++++++++++++++++++++++++++++++++++-- lib/cbor.c | 36 +++++- lib/cbor.h | 88 ++++++++++++++- 4 files changed, 389 insertions(+), 14 deletions(-) diff --git a/lib/birdlib.h b/lib/birdlib.h index eac0fc4d8..f21a7a32d 100644 --- a/lib/birdlib.h +++ b/lib/birdlib.h @@ -283,7 +283,7 @@ asm( u32 random_u32(void); void random_init(void); void random_bytes(void *buf, size_t size); - +#define random_type(T) ({ T out; random_bytes(&out, sizeof out); out; }) /* Hashing */ diff --git a/lib/cbor-parser.c b/lib/cbor-parser.c index caa3c7284..86959a947 100644 --- a/lib/cbor-parser.c +++ b/lib/cbor-parser.c @@ -8,6 +8,30 @@ #include "lib/birdlib.h" #include "lib/cbor.h" +#include "lib/hash.h" + +/* + * Basic parser bits + */ + +static void +cbor_parser_init(struct cbor_parser_context *ctx, linpool *lp, uint max_depth) +{ + ctx->lp = lp; + ctx->flush = lp_save(lp); + + ctx->stack_countdown[0] = 1; + ctx->stack_pos = 0; + ctx->stack_max = max_depth; + + ctx->target_buf = NULL; + ctx->target_len = 0; + + ctx->type = 0xff; + + ctx->partial_state = CPE_TYPE; + ctx->partial_countdown = 0; +} struct cbor_parser_context * cbor_parser_new(pool *p, uint stack_max_depth) @@ -16,13 +40,7 @@ cbor_parser_new(pool *p, uint stack_max_depth) struct cbor_parser_context *ctx = lp_allocz( lp, sizeof *ctx + (stack_max_depth + 1) * sizeof ctx->stack_countdown[0]); - ctx->lp = lp; - ctx->flush = lp_save(lp); - - ctx->type = 0xff; - ctx->stack_countdown[0] = 1; - ctx->stack_max = stack_max_depth; - + cbor_parser_init(ctx, lp, stack_max_depth); return ctx; } @@ -178,3 +196,248 @@ cbor_parse_block_end(struct cbor_parser_context *ctx) return true; } + +/* + * CBOR channel multiplexer + */ + +#define CCH_EQ(a,b) (a)->id == (b)->id +#define CCH_FN(x) (x)->idhash +#define CCH_KEY(x) (x) +#define CCH_NEXT(x) (x)->next_hash + +struct cbor_channel cbor_channel_parse_error; + +#define CSTR_PARSER_ERROR(...) do { \ + log(L_ERR __VA_ARGS__); \ + sk_close(s); \ + return 0; \ +} while (0) + +#define CCH_CALL_PARSER(cch, kind) ( \ + cch->parse ? cch->parse(cch, kind) : \ + (ctx->stack_pos > 1) ? CPR_MORE : CPR_BLOCK_END \ + ) + +#define CCH_PARSE(kind) do { \ + ASSERT_DIE(cch); \ + switch (CCH_CALL_PARSER(cch, kind)) { \ + case CPR_MORE: continue; \ + case CPR_ERROR: sk_close(s); \ + return 0; \ + case CPR_BLOCK_END: stream->state = CSTR_FINISH; \ + break; \ + default: bug("Invalid return value from channel parser"); \ + }} while(0) + +static int +cbor_stream_rx(sock *s, uint sz) +{ + struct cbor_stream *stream = s->data; + struct cbor_parser_context *ctx = &stream->parser; + struct cbor_channel *cch = stream->cur_rx_channel; + u64 id; + + for (uint pos = 0; pos < sz; pos++) + { + switch (cbor_parse_byte(ctx, s->rbuf[pos])) + { + case CPR_MORE: + continue; + + case CPR_ERROR: + log(L_ERR "CBOR parser failure: %s", ctx->error); + sk_close(s); + return 0; + + case CPR_MAJOR: + switch (stream->state) + { + case CSTR_INIT: + if (ctx->type != 4) + CSTR_PARSER_ERROR("Expected array, got %u", ctx->type); + + if (ctx->value < 2) + CSTR_PARSER_ERROR("Expected array of length at least 2"); + + stream->state = CSTR_EXPECT_ID; + break; + + case CSTR_EXPECT_ID: + CBOR_PARSE_ONLY(ctx, POSINT, id); + stream->state = CSTR_MSG; + stream->cur_rx_channel = cch = ( + cbor_channel_find(stream, id) ?: + cbor_channel_create(stream, id) + ); + break; + + case CSTR_MSG: + CCH_PARSE(CPR_MAJOR); + break; + + case CSTR_FINISH: + case CSTR_CLEANUP: + bug("Invalid stream pre-parser state"); + } + break; + + case CPR_STR_END: + ASSERT_DIE(stream->state == CSTR_MSG); + CCH_PARSE(CPR_STR_END); + break; + + case CPR_BLOCK_END: + bug("Impossible value returned from cbor_parse_byte()"); + } + + while (cbor_parse_block_end(ctx)) + { + switch (stream->state) + { + case CSTR_INIT: + case CSTR_EXPECT_ID: + case CSTR_CLEANUP: +// CSTR_PARSER_ERROR("Invalid stream pre-parser state"); + bug("Invalid stream pre-parser state"); + + case CSTR_MSG: + CCH_PARSE(CPR_BLOCK_END); + break; + + case CSTR_FINISH: + stream->state = CSTR_CLEANUP; + break; + } + } + + if (stream->state == CSTR_CLEANUP) + { + if (ctx->partial_state != CPE_EXIT) + CSTR_PARSER_ERROR("Garbled end of message"); + + stream->cur_rx_channel = NULL; + + if (!cch->parse) + cbor_channel_done(cch); + + ctx->partial_state = CPE_TYPE; + stream->state = CSTR_INIT; + + if (pos + 1 < sz) + { + memmove(s->rbuf, s->rbuf + pos + 1, sz - pos - 1); + s->rpos = s->rbuf + sz - pos - 1; + } + + return 0; + } + } + + return 1; +} + +static void +cbor_stream_err(sock *sk, int err) +{ + struct cbor_stream *stream = sk->data; + if (err) + log(L_INFO "CBOR stream %p error: %d (%M)", sk, err, err); + else + log(L_INFO "CBOR stream %p hangup", sk); + + stream->cur_rx_channel = NULL; + + HASH_WALK_DELSAFE(stream->channels, next_hash, cch) + { + cbor_channel_done(cch); + } + HASH_WALK_DELSAFE_END; + + stream->cancel(stream); + + sk_close(sk); +} + +void +cbor_stream_init(struct cbor_stream *stream, pool *p, uint parser_depth, uint writer_depth, uint channel_size) +{ + stream->p = rp_newf(p, p->domain, "Stream pool"); + HASH_INIT(stream->channels, stream->p, 4); + stream->slab = sl_new(stream->p, channel_size); + + random_bytes(&stream->hmul, sizeof stream->hmul); + stream->writer_depth = writer_depth; + stream->state = CSTR_INIT; + + cbor_parser_init(&stream->parser, lp_new(p), parser_depth); +} + +void +cbor_stream_attach(struct cbor_stream *stream, sock *sk) +{ + sk->data = stream; + sk->rx_hook = cbor_stream_rx; + sk->err_hook = cbor_stream_err; + + stream->s = sk; + stream->loop = sk->loop; +} + +struct cbor_channel * +cbor_channel_create(struct cbor_stream *stream, u64 id) +{ + struct cbor_channel *cch = sl_allocz(stream->slab); + *cch = (struct cbor_channel) { + .id = id, + .idhash = id * stream->hmul, + .p = rp_newf(stream->p, stream->p->domain, "Channel 0x%lx", id), + .stream = stream, + .parse = stream->parse, + }; + + log(L_TRACE "CBOR channel create in stream %p, id %lx", stream, id); + HASH_INSERT(stream->channels, CCH, cch); + return cch; +} + +struct cbor_channel * +cbor_channel_find(struct cbor_stream *stream, u64 id) +{ + struct cbor_channel cchloc; + cchloc.id = id; + cchloc.idhash = cchloc.id * stream->hmul; + + return HASH_FIND(stream->channels, CCH, &cchloc); +} + +struct cbor_channel * +cbor_channel_new(struct cbor_stream *stream) +{ + u64 id; + while (cbor_channel_find(stream, id = random_type(u64))) + ; + + return cbor_channel_create(stream, id); +} + +void +cbor_channel_done(struct cbor_channel *channel) +{ + struct cbor_stream *stream = channel->stream; + bool active = (stream->cur_rx_channel == channel); + + log(L_TRACE "CBOR channel%s done in stream %p, id %lx", + active ? " (active)" : "", stream, channel->id); + + if (active) + { + channel->parse = NULL; + } + else + { + HASH_REMOVE(stream->channels, CCH, channel); + rp_free(channel->p); + sl_free(channel); + } +} diff --git a/lib/cbor.c b/lib/cbor.c index 233758ec2..4f21273fa 100644 --- a/lib/cbor.c +++ b/lib/cbor.c @@ -130,8 +130,13 @@ bool cbor_put_close(struct cbor_writer *w, u64 actual_size, bool strict) w->stack_pos--; + /* The open mark puts its item counter one level + * too deep; fixing this. */ + items--; + w->stack[w->stack_pos].items++; + /* Check the original head position */ - ASSERT_DIE((head[0] & 0x1f) == 0x1f); + ASSERT_DIE((head[0] & 0x1f) == 0x1b); ASSERT_DIE(w->data.pos >= w->data.start + 9); switch (head[0] >> 5) { @@ -194,6 +199,31 @@ bool cbor_put_close(struct cbor_writer *w, u64 actual_size, bool strict) /* Tags: TODO! */ +/* Writer contexts */ +struct cbor_writer * +cbor_reply_init(struct cbor_channel *cch) +{ + ASSERT_DIE(cch->stream->s->tbsize > 16); + ASSERT_DIE(cch->stream->s->tbuf); + struct cbor_writer *cw = &cch->writer; + if (cch->stream->s->tbuf != cch->stream->s->tpos) + bug("Not implemented reply to not-fully-flushed buffer"); + + cbor_writer_init(cw, cch->stream->writer_depth, cch->stream->s->tbuf, cch->stream->s->tbsize); + + ASSERT_DIE(cbor_open_array(cw)); + ASSERT_DIE(cbor_put_posint(cw, cch->id)); + return cw; +} + +void +cbor_reply_send(struct cbor_channel *cch, struct cbor_writer *cw) +{ + ASSERT_DIE(cw == &cch->writer); + ASSERT_DIE(cbor_close_array(cw)); + sk_send(cch->stream->s, cw->data.pos - cw->data.start); +} + #if 0 void cbor_epoch_time(struct cbor_writer *writer, int64_t time, int shift) @@ -332,14 +362,14 @@ void write_item(struct cbor_writer *writer, uint8_t major, uint64_t num) return; } //log("write item major %i num %i writer->pt %i writer->capacity %i writer %i", major, num, writer->pt, writer->capacity, writer); - major += num; // we can store the num as additional value + major += num; // we can store the num as additional value writer->cbor[writer->pt] = major; writer->pt++; } void cbor_write_item_with_constant_val_length_4(struct cbor_writer *writer, uint8_t major, uint64_t num) { -// this is only for headers which should be constantly long. +// this is only for headers which should be constantly long. major = major<<5; check_memory(writer, 10); major += 0x1a; // reserving those bytes diff --git a/lib/cbor.h b/lib/cbor.h index c3d3b32a4..ed273f6af 100644 --- a/lib/cbor.h +++ b/lib/cbor.h @@ -2,6 +2,8 @@ #define CBOR_H #include "nest/bird.h" +#include "lib/hash.h" +#include "lib/socket.h" /** * CBOR Commonalities @@ -167,9 +169,10 @@ void cbor_parser_reset(struct cbor_parser_context *ctx); enum cbor_parse_result { CPR_ERROR = 0, - CPR_MORE = 1, - CPR_MAJOR = 2, - CPR_STR_END = 3, + CPR_MORE, + CPR_MAJOR, + CPR_STR_END, + CPR_BLOCK_END, } cbor_parse_byte(struct cbor_parser_context *, const byte); bool cbor_parse_block_end(struct cbor_parser_context *); @@ -187,4 +190,83 @@ bool cbor_parse_block_end(struct cbor_parser_context *); #define CBOR_STORE_TEXT CBOR_STORE_BYTES +/* + * Message channels + */ + +struct cbor_channel; +typedef enum cbor_parse_result (*cbor_stream_parse_fn)(struct cbor_channel *, enum cbor_parse_result); + +struct cbor_stream { + HASH(struct cbor_channel) channels; + pool *p; + struct birdloop *loop; + slab *slab; + sock *s; + cbor_stream_parse_fn parse; + void (*cancel)(struct cbor_stream *); + struct cbor_channel *cur_rx_channel; + u64 hmul; + enum { + CSTR_INIT, + CSTR_EXPECT_ID, + CSTR_MSG, + CSTR_FINISH, + CSTR_CLEANUP, + } state; + uint writer_depth; + struct cbor_parser_context parser; +}; + +#define CBOR_STREAM_EMBED(name, N) struct { \ + struct cbor_stream name; \ + u64 _##name##_stack_countdown[N]; \ +} + +#define CBOR_STREAM_INIT(up, name, chname, p, T) \ + cbor_stream_init(&(up)->name, p, \ + ARRAY_SIZE((up)->_##name##_stack_countdown), \ + ARRAY_SIZE(((T *) NULL)->_##chname##_writer_stack), \ + sizeof(T)) + +/* Init and cleanup of CBOR stream */ +void cbor_stream_init(struct cbor_stream *stream, pool *p, uint parser_depth, uint writer_depth, uint channel_size); +void cbor_stream_attach(struct cbor_stream *, sock *); +void cbor_stream_cleanup(struct cbor_stream *); + +struct cbor_channel { + struct cbor_channel *next_hash; + struct cbor_stream *stream; + cbor_stream_parse_fn parse; + void (*cancel)(struct cbor_channel *); + pool *p; + u64 id; + u64 idhash; + struct cbor_writer writer; +}; + +#define CBOR_CHANNEL_EMBED(name, N) struct { \ + struct cbor_channel name; \ + struct cbor_writer_stack_item _##name##_writer_stack[N]; \ +} + +extern struct cbor_channel cbor_channel_parse_error; + +/* Locally define a new channel */ +struct cbor_channel *cbor_channel_new(struct cbor_stream *); + +/* Create a channel with a pre-determined ID. + * You have to check nonexistence manually. */ +struct cbor_channel *cbor_channel_create(struct cbor_stream *stream, u64 id); +/* Find an existing channel */ +struct cbor_channel *cbor_channel_find(struct cbor_stream *, u64 id); + +/* Drop the channel */ +void cbor_channel_done(struct cbor_channel *); + +struct cbor_writer *cbor_reply_init(struct cbor_channel *); +void cbor_reply_send(struct cbor_channel *, struct cbor_writer *); +#define CBOR_REPLY(ch, cw) for (struct cbor_writer *cw = cbor_reply_init(ch); cw; cbor_reply_send(ch, cw), cw = NULL) + + #endif -- 2.47.2