#include "lib/birdlib.h"
#include "lib/cbor.h"
+#include "lib/hash.h"
+
+/*
+ * Basic parser bits
+ */
+
+static void
+cbor_parser_init(struct cbor_parser_context *ctx, linpool *lp, uint max_depth)
+{
+ ctx->lp = lp;
+ ctx->flush = lp_save(lp);
+
+ ctx->stack_countdown[0] = 1;
+ ctx->stack_pos = 0;
+ ctx->stack_max = max_depth;
+
+ ctx->target_buf = NULL;
+ ctx->target_len = 0;
+
+ ctx->type = 0xff;
+
+ ctx->partial_state = CPE_TYPE;
+ ctx->partial_countdown = 0;
+}
struct cbor_parser_context *
cbor_parser_new(pool *p, uint stack_max_depth)
struct cbor_parser_context *ctx = lp_allocz(
lp, sizeof *ctx + (stack_max_depth + 1) * sizeof ctx->stack_countdown[0]);
- ctx->lp = lp;
- ctx->flush = lp_save(lp);
-
- ctx->type = 0xff;
- ctx->stack_countdown[0] = 1;
- ctx->stack_max = stack_max_depth;
-
+ cbor_parser_init(ctx, lp, stack_max_depth);
return ctx;
}
return true;
}
+
+/*
+ * CBOR channel multiplexer
+ */
+
+#define CCH_EQ(a,b) (a)->id == (b)->id
+#define CCH_FN(x) (x)->idhash
+#define CCH_KEY(x) (x)
+#define CCH_NEXT(x) (x)->next_hash
+
+struct cbor_channel cbor_channel_parse_error;
+
+#define CSTR_PARSER_ERROR(...) do { \
+ log(L_ERR __VA_ARGS__); \
+ sk_close(s); \
+ return 0; \
+} while (0)
+
+#define CCH_CALL_PARSER(cch, kind) ( \
+ cch->parse ? cch->parse(cch, kind) : \
+ (ctx->stack_pos > 1) ? CPR_MORE : CPR_BLOCK_END \
+ )
+
+#define CCH_PARSE(kind) do { \
+ ASSERT_DIE(cch); \
+ switch (CCH_CALL_PARSER(cch, kind)) { \
+ case CPR_MORE: continue; \
+ case CPR_ERROR: sk_close(s); \
+ return 0; \
+ case CPR_BLOCK_END: stream->state = CSTR_FINISH; \
+ break; \
+ default: bug("Invalid return value from channel parser"); \
+ }} while(0)
+
+static int
+cbor_stream_rx(sock *s, uint sz)
+{
+ struct cbor_stream *stream = s->data;
+ struct cbor_parser_context *ctx = &stream->parser;
+ struct cbor_channel *cch = stream->cur_rx_channel;
+ u64 id;
+
+ for (uint pos = 0; pos < sz; pos++)
+ {
+ switch (cbor_parse_byte(ctx, s->rbuf[pos]))
+ {
+ case CPR_MORE:
+ continue;
+
+ case CPR_ERROR:
+ log(L_ERR "CBOR parser failure: %s", ctx->error);
+ sk_close(s);
+ return 0;
+
+ case CPR_MAJOR:
+ switch (stream->state)
+ {
+ case CSTR_INIT:
+ if (ctx->type != 4)
+ CSTR_PARSER_ERROR("Expected array, got %u", ctx->type);
+
+ if (ctx->value < 2)
+ CSTR_PARSER_ERROR("Expected array of length at least 2");
+
+ stream->state = CSTR_EXPECT_ID;
+ break;
+
+ case CSTR_EXPECT_ID:
+ CBOR_PARSE_ONLY(ctx, POSINT, id);
+ stream->state = CSTR_MSG;
+ stream->cur_rx_channel = cch = (
+ cbor_channel_find(stream, id) ?:
+ cbor_channel_create(stream, id)
+ );
+ break;
+
+ case CSTR_MSG:
+ CCH_PARSE(CPR_MAJOR);
+ break;
+
+ case CSTR_FINISH:
+ case CSTR_CLEANUP:
+ bug("Invalid stream pre-parser state");
+ }
+ break;
+
+ case CPR_STR_END:
+ ASSERT_DIE(stream->state == CSTR_MSG);
+ CCH_PARSE(CPR_STR_END);
+ break;
+
+ case CPR_BLOCK_END:
+ bug("Impossible value returned from cbor_parse_byte()");
+ }
+
+ while (cbor_parse_block_end(ctx))
+ {
+ switch (stream->state)
+ {
+ case CSTR_INIT:
+ case CSTR_EXPECT_ID:
+ case CSTR_CLEANUP:
+// CSTR_PARSER_ERROR("Invalid stream pre-parser state");
+ bug("Invalid stream pre-parser state");
+
+ case CSTR_MSG:
+ CCH_PARSE(CPR_BLOCK_END);
+ break;
+
+ case CSTR_FINISH:
+ stream->state = CSTR_CLEANUP;
+ break;
+ }
+ }
+
+ if (stream->state == CSTR_CLEANUP)
+ {
+ if (ctx->partial_state != CPE_EXIT)
+ CSTR_PARSER_ERROR("Garbled end of message");
+
+ stream->cur_rx_channel = NULL;
+
+ if (!cch->parse)
+ cbor_channel_done(cch);
+
+ ctx->partial_state = CPE_TYPE;
+ stream->state = CSTR_INIT;
+
+ if (pos + 1 < sz)
+ {
+ memmove(s->rbuf, s->rbuf + pos + 1, sz - pos - 1);
+ s->rpos = s->rbuf + sz - pos - 1;
+ }
+
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
+static void
+cbor_stream_err(sock *sk, int err)
+{
+ struct cbor_stream *stream = sk->data;
+ if (err)
+ log(L_INFO "CBOR stream %p error: %d (%M)", sk, err, err);
+ else
+ log(L_INFO "CBOR stream %p hangup", sk);
+
+ stream->cur_rx_channel = NULL;
+
+ HASH_WALK_DELSAFE(stream->channels, next_hash, cch)
+ {
+ cbor_channel_done(cch);
+ }
+ HASH_WALK_DELSAFE_END;
+
+ stream->cancel(stream);
+
+ sk_close(sk);
+}
+
+void
+cbor_stream_init(struct cbor_stream *stream, pool *p, uint parser_depth, uint writer_depth, uint channel_size)
+{
+ stream->p = rp_newf(p, p->domain, "Stream pool");
+ HASH_INIT(stream->channels, stream->p, 4);
+ stream->slab = sl_new(stream->p, channel_size);
+
+ random_bytes(&stream->hmul, sizeof stream->hmul);
+ stream->writer_depth = writer_depth;
+ stream->state = CSTR_INIT;
+
+ cbor_parser_init(&stream->parser, lp_new(p), parser_depth);
+}
+
+void
+cbor_stream_attach(struct cbor_stream *stream, sock *sk)
+{
+ sk->data = stream;
+ sk->rx_hook = cbor_stream_rx;
+ sk->err_hook = cbor_stream_err;
+
+ stream->s = sk;
+ stream->loop = sk->loop;
+}
+
+struct cbor_channel *
+cbor_channel_create(struct cbor_stream *stream, u64 id)
+{
+ struct cbor_channel *cch = sl_allocz(stream->slab);
+ *cch = (struct cbor_channel) {
+ .id = id,
+ .idhash = id * stream->hmul,
+ .p = rp_newf(stream->p, stream->p->domain, "Channel 0x%lx", id),
+ .stream = stream,
+ .parse = stream->parse,
+ };
+
+ log(L_TRACE "CBOR channel create in stream %p, id %lx", stream, id);
+ HASH_INSERT(stream->channels, CCH, cch);
+ return cch;
+}
+
+struct cbor_channel *
+cbor_channel_find(struct cbor_stream *stream, u64 id)
+{
+ struct cbor_channel cchloc;
+ cchloc.id = id;
+ cchloc.idhash = cchloc.id * stream->hmul;
+
+ return HASH_FIND(stream->channels, CCH, &cchloc);
+}
+
+struct cbor_channel *
+cbor_channel_new(struct cbor_stream *stream)
+{
+ u64 id;
+ while (cbor_channel_find(stream, id = random_type(u64)))
+ ;
+
+ return cbor_channel_create(stream, id);
+}
+
+void
+cbor_channel_done(struct cbor_channel *channel)
+{
+ struct cbor_stream *stream = channel->stream;
+ bool active = (stream->cur_rx_channel == channel);
+
+ log(L_TRACE "CBOR channel%s done in stream %p, id %lx",
+ active ? " (active)" : "", stream, channel->id);
+
+ if (active)
+ {
+ channel->parse = NULL;
+ }
+ else
+ {
+ HASH_REMOVE(stream->channels, CCH, channel);
+ rp_free(channel->p);
+ sl_free(channel);
+ }
+}
w->stack_pos--;
+ /* The open mark puts its item counter one level
+ * too deep; fixing this. */
+ items--;
+ w->stack[w->stack_pos].items++;
+
/* Check the original head position */
- ASSERT_DIE((head[0] & 0x1f) == 0x1f);
+ ASSERT_DIE((head[0] & 0x1f) == 0x1b);
ASSERT_DIE(w->data.pos >= w->data.start + 9);
switch (head[0] >> 5)
{
/* Tags: TODO! */
+/* Writer contexts */
+struct cbor_writer *
+cbor_reply_init(struct cbor_channel *cch)
+{
+ ASSERT_DIE(cch->stream->s->tbsize > 16);
+ ASSERT_DIE(cch->stream->s->tbuf);
+ struct cbor_writer *cw = &cch->writer;
+ if (cch->stream->s->tbuf != cch->stream->s->tpos)
+ bug("Not implemented reply to not-fully-flushed buffer");
+
+ cbor_writer_init(cw, cch->stream->writer_depth, cch->stream->s->tbuf, cch->stream->s->tbsize);
+
+ ASSERT_DIE(cbor_open_array(cw));
+ ASSERT_DIE(cbor_put_posint(cw, cch->id));
+ return cw;
+}
+
+void
+cbor_reply_send(struct cbor_channel *cch, struct cbor_writer *cw)
+{
+ ASSERT_DIE(cw == &cch->writer);
+ ASSERT_DIE(cbor_close_array(cw));
+ sk_send(cch->stream->s, cw->data.pos - cw->data.start);
+}
+
#if 0
void cbor_epoch_time(struct cbor_writer *writer, int64_t time, int shift)
return;
}
//log("write item major %i num %i writer->pt %i writer->capacity %i writer %i", major, num, writer->pt, writer->capacity, writer);
- major += num; // we can store the num as additional value
+ major += num; // we can store the num as additional value
writer->cbor[writer->pt] = major;
writer->pt++;
}
void cbor_write_item_with_constant_val_length_4(struct cbor_writer *writer, uint8_t major, uint64_t num)
{
-// this is only for headers which should be constantly long.
+// this is only for headers which should be constantly long.
major = major<<5;
check_memory(writer, 10);
major += 0x1a; // reserving those bytes
#define CBOR_H
#include "nest/bird.h"
+#include "lib/hash.h"
+#include "lib/socket.h"
/**
* CBOR Commonalities
enum cbor_parse_result {
CPR_ERROR = 0,
- CPR_MORE = 1,
- CPR_MAJOR = 2,
- CPR_STR_END = 3,
+ CPR_MORE,
+ CPR_MAJOR,
+ CPR_STR_END,
+ CPR_BLOCK_END,
} cbor_parse_byte(struct cbor_parser_context *, const byte);
bool cbor_parse_block_end(struct cbor_parser_context *);
#define CBOR_STORE_TEXT CBOR_STORE_BYTES
+/*
+ * Message channels
+ */
+
+struct cbor_channel;
+typedef enum cbor_parse_result (*cbor_stream_parse_fn)(struct cbor_channel *, enum cbor_parse_result);
+
+struct cbor_stream {
+ HASH(struct cbor_channel) channels;
+ pool *p;
+ struct birdloop *loop;
+ slab *slab;
+ sock *s;
+ cbor_stream_parse_fn parse;
+ void (*cancel)(struct cbor_stream *);
+ struct cbor_channel *cur_rx_channel;
+ u64 hmul;
+ enum {
+ CSTR_INIT,
+ CSTR_EXPECT_ID,
+ CSTR_MSG,
+ CSTR_FINISH,
+ CSTR_CLEANUP,
+ } state;
+ uint writer_depth;
+ struct cbor_parser_context parser;
+};
+
+#define CBOR_STREAM_EMBED(name, N) struct { \
+ struct cbor_stream name; \
+ u64 _##name##_stack_countdown[N]; \
+}
+
+#define CBOR_STREAM_INIT(up, name, chname, p, T) \
+ cbor_stream_init(&(up)->name, p, \
+ ARRAY_SIZE((up)->_##name##_stack_countdown), \
+ ARRAY_SIZE(((T *) NULL)->_##chname##_writer_stack), \
+ sizeof(T))
+
+/* Init and cleanup of CBOR stream */
+void cbor_stream_init(struct cbor_stream *stream, pool *p, uint parser_depth, uint writer_depth, uint channel_size);
+void cbor_stream_attach(struct cbor_stream *, sock *);
+void cbor_stream_cleanup(struct cbor_stream *);
+
+struct cbor_channel {
+ struct cbor_channel *next_hash;
+ struct cbor_stream *stream;
+ cbor_stream_parse_fn parse;
+ void (*cancel)(struct cbor_channel *);
+ pool *p;
+ u64 id;
+ u64 idhash;
+ struct cbor_writer writer;
+};
+
+#define CBOR_CHANNEL_EMBED(name, N) struct { \
+ struct cbor_channel name; \
+ struct cbor_writer_stack_item _##name##_writer_stack[N]; \
+}
+
+extern struct cbor_channel cbor_channel_parse_error;
+
+/* Locally define a new channel */
+struct cbor_channel *cbor_channel_new(struct cbor_stream *);
+
+/* Create a channel with a pre-determined ID.
+ * You have to check nonexistence manually. */
+struct cbor_channel *cbor_channel_create(struct cbor_stream *stream, u64 id);
+/* Find an existing channel */
+struct cbor_channel *cbor_channel_find(struct cbor_stream *, u64 id);
+
+/* Drop the channel */
+void cbor_channel_done(struct cbor_channel *);
+
+struct cbor_writer *cbor_reply_init(struct cbor_channel *);
+void cbor_reply_send(struct cbor_channel *, struct cbor_writer *);
+#define CBOR_REPLY(ch, cw) for (struct cbor_writer *cw = cbor_reply_init(ch); cw; cbor_reply_send(ch, cw), cw = NULL)
+
+
#endif