From: Jan Maria Matejka Date: Fri, 14 Sep 2018 12:49:44 +0000 (+0200) Subject: Merge branch 'mq-coro' into mq-config-new X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a0bd04c04a31ee7aa065aea47e244a2177be364b;p=thirdparty%2Fbird.git Merge branch 'mq-coro' into mq-config-new --- a0bd04c04a31ee7aa065aea47e244a2177be364b diff --cc nest/cli.c index ac29677fe,f4b7628ca..6e7e9ecca --- a/nest/cli.c +++ b/nest/cli.c @@@ -39,30 -40,32 +40,32 @@@ * by the same lexical analyzer and parser as used for the configuration, but * it's switched to a special mode by prepending a fake token to the text, * so that it uses only the CLI command rules. Then the parser invokes - * an execution routine corresponding to the command, which either constructs - * the whole reply and returns it back or (in case it expects the reply will be long) - * it prints a partial reply and asks the CLI module (using the @cont hook) - * to call it again when the output is transferred to the user. + * an execution routine corresponding to the command, which constructs the + * reply. * - * The @this_cli variable points to a &cli structure of the session being - * currently parsed, but it's of course available only in command handlers - * not entered using the @cont hook. + * Replies are buffered in memory and then sent asynchronously. Commands + * which produce long outputs must split them to pieces and yield to other + * operations between pieces. To simplify this (and possibly also complex + * parsing of input), the CLI session runs in a coroutine with its own + * execution context. At any time, cli_yield() can be called to interrupt + * the current coroutine and have the buffered output sent. + * + * Alternatively, a long sequence of replies can be split to parts + * using the @cont hook, which translates to yielding internally. * - * TX buffer management works as follows: At cli.tx_buf there is a - * list of TX buffers (struct cli_out), cli.tx_write is the buffer - * currently used by the producer (cli_printf(), cli_alloc_out()) and - * cli.tx_pos is the buffer currently used by the consumer - * (cli_write(), in system dependent code). The producer uses - * cli_out.wpos ptr as the current write position and the consumer - * uses cli_out.outpos ptr as the current read position. When the - * producer produces something, it calls cli_write_trigger(). If there - * is not enough space in the current buffer, the producer allocates - * the new one. When the consumer processes everything in the buffer - * queue, it calls cli_written(), tha frees all buffers (except the - * first one) and schedules cli.event . + * The @this_cli variable points to a &cli structure of the session being + * currently parsed, but it's available only before the first yield. * + * A note on transmit buffer management: cli.tx_buf is a head of a list + * of TX buffers (struct cli_out). A buffer pointed to by cli.tx_write + * is the one currently written to using cli_printf() and cli_alloc_out(), + * its wpos field points to the position of the write head in that buffer. + * On the other side, cli.tx_pos is the buffer being set to the socket + * and its outpos field is the position of the read head. */ -#undef LOCAL_DEBUG ++#define LOCAL_DEBUG 1 + #include "nest/bird.h" #include "nest/cli.h" #include "conf/conf.h" @@@ -407,8 -350,205 +358,204 @@@ cli_echo(uint class, byte *msg } } - /* Hack for scheduled undo notification */ - extern cli *cmd_reconfig_stored_cli; + /* + * Reading of input + */ + + static int + cli_getchar(cli *c) + { + sock *s = c->socket; + + if (c->rx_aux == s->rpos) + { + DBG("CLI: Waiting on read\n"); + c->rx_aux = s->rpos = s->rbuf; + c->state = CLI_STATE_WAIT_RX; + int n = coro_sk_read(s); + c->state = CLI_STATE_RUN; + DBG("CLI: Read returned %d bytes\n", n); + ASSERT(n); + } + return *c->rx_aux++; + } + + static int + cli_read_line(cli *c) + { + byte *d = c->rx_buf; + byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2; + for (;;) + { + int ch = cli_getchar(c); + if (ch == '\r') + ; + else if (ch == '\n') + break; + else if (d < dend) + *d++ = ch; + } + + if (d >= dend) + return 0; + + *d = 0; + return 1; + } + + /* + * Execution of commands + */ + -static byte *cli_rh_pos; -static uint cli_rh_len; -static int cli_rh_trick_flag; + struct cli *this_cli; + -static int -cli_cmd_read_hook(byte *buf, uint max, UNUSED int fd) ++struct cli_conf_order { ++ struct conf_order co; ++ struct cli *cli; ++}; ++ ++static void ++cli_cmd_error(struct conf_order *co, const char *msg, va_list args) + { - if (!cli_rh_trick_flag) - { - cli_rh_trick_flag = 1; - buf[0] = '!'; - return 1; - } - if (max > cli_rh_len) - max = cli_rh_len; - memcpy(buf, cli_rh_pos, max); - cli_rh_pos += max; - cli_rh_len -= max; - return max; ++ struct cli_conf_order *cco = (struct cli_conf_order *) co; ++ cli_vprintf(cco->cli, 9001, msg, args); + } + + static void -cli_command(cli *c) ++cli_command(struct cli *c) + { - struct config f; - int res; ++ struct conf_state state = { ++ .name = "", ++ .lino = 1 ++ }; ++ ++ struct cli_conf_order o = { ++ .co = { ++ .ctx = NULL, ++ .state = &state, ++ .buf = c->rx_buf, ++ .len = strlen(c->rx_buf), ++ .cf_include = NULL, ++ .cf_outclude = NULL, ++ .cf_error = cli_cmd_error, ++ .lp = c->parser_pool, ++ .pool = c->pool, ++ }, ++ .cli = c, ++ }; + + if (config->cli_debug > 1) + log(L_TRACE "CLI: %s", c->rx_buf); - bzero(&f, sizeof(f)); - f.mem = c->parser_pool; - f.pool = rp_new(c->pool, "Config"); - cf_read_hook = cli_cmd_read_hook; - cli_rh_pos = c->rx_buf; - cli_rh_len = strlen(c->rx_buf); - cli_rh_trick_flag = 0; - this_cli = c; ++ + lp_flush(c->parser_pool); - res = cli_parse(&f); - if (!res) - cli_printf(c, 9001, f.err_msg); - - config_free(&f); ++ this_cli = c; ++ cli_parse(&(o.co)); + } + + /* + * Session control + */ + + static void + cli_event(void *data) + { + cli *c = data; + DBG("CLI: Event in state %u\n", (int) c->state); + + while (c->ring_read != c->ring_write && + c->async_msg_size < CLI_MAX_ASYNC_QUEUE) + cli_copy_message(c); + + cli_write_trigger(c); + + if (c->state == CLI_STATE_YIELD || + c->state == CLI_STATE_WAIT_TX && !c->tx_pos) + coro_resume(c->coro); + } + + void + cli_yield(cli *c) + { + c->state = CLI_STATE_YIELD; + DBG("CLI: Yielding\n"); + ev_schedule(c->event); + coro_suspend(); + c->state = CLI_STATE_RUN; + DBG("CLI: Yield resumed\n"); + } + + static void + cli_coroutine(void *_c) + { + cli *c = _c; + sock *s = c->socket; + + DBG("CLI: Coroutine started\n"); + c->rx_aux = s->rbuf; + + for (;;) + { + while (c->tx_pos) + { + DBG("CLI: Sleeping on write\n"); + c->state = CLI_STATE_WAIT_TX; + coro_suspend(); + c->state = CLI_STATE_RUN; + DBG("CLI: Woke up on write\n"); + } + + if (c->cont) + { + c->cont(c); + cli_write_trigger(c); + cli_yield(c); + continue; + } + + if (!cli_read_line(c)) + cli_printf(c, 9000, "Command too long"); + else + cli_command(c); + cli_write_trigger(c); + } + } + + cli * + cli_new(sock *s) + { + pool *p = rp_new(cli_pool, "CLI session"); + cli *c = mb_alloc(p, sizeof(cli)); + DBG("CLI: Created new session\n"); + + bzero(c, sizeof(cli)); + c->pool = p; + c->socket = s; + c->event = ev_new(p); + c->event->hook = cli_event; + c->event->data = c; + c->cont = cli_hello; + c->parser_pool = lp_new_default(c->pool); + c->show_pool = lp_new_default(c->pool); + c->rx_buf = mb_alloc(c->pool, CLI_RX_BUF_SIZE); + + s->pool = c->pool; /* We need to have all the socket buffers allocated in the cli pool */ + rmove(s, c->pool); + s->err_hook = cli_err_hook; + s->data = c; + + return c; + } + + void + cli_run(cli *c) + { + DBG("CLI: Running\n"); + c->state = CLI_STATE_RUN; + c->rx_pos = c->rx_buf; + c->rx_aux = NULL; + c->coro = coro_new(c->pool, cli_coroutine, c); + coro_resume(c->coro); + } void cli_free(cli *c) diff --cc sysdep/unix/Makefile index b74cfc47a,9da836133..3bc1806bc --- a/sysdep/unix/Makefile +++ b/sysdep/unix/Makefile @@@ -1,4 -1,4 +1,4 @@@ - src := io.c krt.c log.c main.c random.c conf.c -src := io.c krt.c log.c main.c random.c coroutine.c ++src := conf.c coroutine.c io.c krt.c log.c main.c random.c obj := $(src-o-files) $(all-daemon) $(cf-local)