};
#endif
+struct coroutine;
+
typedef struct birdsock {
resource r;
pool *pool; /* Pool where incoming connections should be allocated (for SK_xxx_PASSIVE) */
char *password; /* Password for MD5 authentication */
const char *err; /* Error message */
struct ssh_sock *ssh; /* Used in SK_SSH */
+
+ struct coroutine *rx_coroutine;
} sock;
sock *sock_new(pool *); /* Allocate new socket */
#include "nest/cli.h"
#include "conf/conf.h"
#include "lib/string.h"
+#include "sysdep/unix/unix.h" // FIXME
pool *cli_pool;
return max;
}
-static void
-cli_command(struct cli *c)
+void
+cli_command(cli *c)
{
struct config f;
int res;
cli_event(void *data)
{
cli *c = data;
- int err;
while (c->ring_read != c->ring_write &&
c->async_msg_size < CLI_MAX_ASYNC_QUEUE)
cli_copy_message(c);
- if (c->tx_pos)
- ;
- else if (c->cont)
- c->cont(c);
- else
- {
- err = cli_get_command(c);
- if (!err)
- return;
- if (err < 0)
- cli_printf(c, 9000, "Command too long");
- else
- cli_command(c);
- }
-
cli_write_trigger(c);
+
+ if (c->sleeping_on_yield)
+ coro_resume(c->coro);
}
cli *
return c;
}
-void
-cli_kick(cli *c)
-{
- if (!c->cont && !c->tx_pos)
- ev_schedule(c->event);
-}
-
static list cli_log_hooks;
static int cli_log_inited;
byte buf[0];
};
+struct coroutine;
+
typedef struct cli {
node n; /* Node in list of all log hooks */
pool *pool;
uint log_mask; /* Mask of allowed message levels */
uint log_threshold; /* When free < log_threshold, store only important messages */
uint async_msg_size; /* Total size of async messages queued in tx_buf */
+ struct coroutine *coro;
+ int sleeping_on_tx;
+ int sleeping_on_yield;
} cli;
extern pool *cli_pool;
void cli_kick(cli *);
void cli_written(cli *);
void cli_echo(uint class, byte *msg);
+void cli_command(cli *c);
static inline int cli_access_restricted(void)
{
die("I found another BIRD running.");
close(fd);
}
+
+/* EXPERIMENTAL: Support for coroutines */
+
+#include <ucontext.h>
+
+struct coroutine {
+ resource r;
+ ucontext_t ctx;
+ void *stack;
+ void (*entry_point)(void *arg);
+ void *arg;
+};
+
+static ucontext_t *main_context;
+static coroutine *coro_current; // NULL for main context
+
+static void
+coro_free(resource *r)
+{
+ coroutine *c = (coroutine *) r;
+ xfree(c->stack);
+}
+
+static void
+coro_dump(resource *r UNUSED)
+{
+ debug("\n");
+}
+
+static struct resclass coro_class = {
+ .name = "Coroutine",
+ .size = sizeof(struct coroutine),
+ .free = coro_free,
+ .dump = coro_dump,
+ // FIXME: Implement memsize
+};
+
+static void
+coro_do_start(void)
+{
+ ASSERT(coro_current);
+ coro_current->entry_point(coro_current->arg);
+ bug("Coroutine returned unexpectedly");
+}
+
+struct coroutine *
+coro_new(pool *p, void (*entry_point)(void *), void *arg)
+{
+ if (!main_context)
+ {
+ main_context = xmalloc(sizeof(*main_context));
+ if (getcontext(main_context) < 0)
+ bug("getcontext() failed");
+ }
+
+ coroutine *c = ralloc(p, &coro_class);
+ c->entry_point = entry_point;
+ c->arg = arg;
+ if (getcontext(&c->ctx) < 0)
+ bug("getcontext() failed");
+ c->stack = xmalloc(65536);
+ c->ctx.uc_stack.ss_sp = c->stack;
+ c->ctx.uc_stack.ss_size = 65536;
+
+ makecontext(&c->ctx, coro_do_start, 0);
+
+ return c;
+}
+
+// Return to main context
+void
+coro_suspend(void)
+{
+ ASSERT(coro_current);
+ ASSERT(main_context);
+ coroutine *c = coro_current;
+ coro_current = NULL;
+ swapcontext(&c->ctx, main_context);
+ ASSERT(coro_current == c);
+}
+
+// Resume context
+void
+coro_resume(coroutine *c)
+{
+ ASSERT(!coro_current);
+ coro_current = c;
+ swapcontext(main_context, &c->ctx);
+ ASSERT(!coro_current);
+}
+
+static int
+coro_sk_rx_hook(sock *sk, uint size UNUSED)
+{
+ ASSERT(sk->rx_coroutine);
+ ASSERT(!coro_current);
+ coro_resume(sk->rx_coroutine);
+ return 0;
+}
+
+int
+coro_sk_read(sock *s)
+{
+ ASSERT(coro_current);
+ s->rx_coroutine = coro_current;
+ s->rx_hook = coro_sk_rx_hook;
+ coro_suspend();
+ s->rx_hook = NULL;
+ return s->rpos - s->rbuf;
+}
/* Everything is written */
s->tbuf = NULL;
cli_written(c);
+
+ if (c->sleeping_on_tx)
+ coro_resume(c->coro);
}
void
cli_write(s->data);
}
-int
-cli_get_command(cli *c)
+static void
+cli_err(sock *s, int err)
+{
+ if (config->cli_debug)
+ {
+ if (err)
+ log(L_INFO "CLI connection dropped: %s", strerror(err));
+ else
+ log(L_INFO "CLI connection closed");
+ }
+ cli_free(s->data);
+}
+
+static int
+cli_getchar(cli *c)
{
sock *s = c->priv;
- byte *t = c->rx_aux ? : s->rbuf;
- byte *tend = s->rpos;
- byte *d = c->rx_pos;
- byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2;
- while (t < tend)
+ if (c->rx_aux == s->rpos)
{
- if (*t == '\r')
- t++;
- else if (*t == '\n')
- {
- t++;
- c->rx_pos = c->rx_buf;
- c->rx_aux = t;
- *d = 0;
- return (d < dend) ? 1 : -1;
- }
- else if (d < dend)
- *d++ = *t++;
+ log(L_INFO "CLI wants to read");
+ c->rx_aux = s->rpos = s->rbuf;
+ int n = coro_sk_read(s);
+ log(L_INFO "CLI read %d bytes", n);
+ ASSERT(n);
}
- c->rx_aux = s->rpos = s->rbuf;
- c->rx_pos = d;
- return 0;
+ return *c->rx_aux++;
}
-static int
-cli_rx(sock *s, uint size UNUSED)
+static void
+cli_yield(cli *c)
{
- cli_kick(s->data);
- return 0;
+ log(L_INFO "CLI: Yield");
+ c->sleeping_on_yield = 1;
+ ev_schedule(c->event);
+ coro_suspend();
+ c->sleeping_on_yield = 0;
+ log(L_INFO "CLI: Resumed after yield");
}
static void
-cli_err(sock *s, int err)
+cli_coroutine(void *_c)
{
- if (config->cli_debug)
+ cli *c = _c;
+ sock *s = c->priv;
+
+ log(L_INFO "CLI coroutine reached");
+ c->rx_aux = s->rbuf;
+ for (;;)
{
- if (err)
- log(L_INFO "CLI connection dropped: %s", strerror(err));
- else
- log(L_INFO "CLI connection closed");
+ while (c->tx_pos)
+ {
+ log(L_INFO "CLI sleeps on write");
+ c->sleeping_on_tx = 1;
+ coro_suspend();
+ c->sleeping_on_tx = 0;
+ log(L_INFO "CLI wakeup on write");
+ }
+
+ if (c->cont)
+ {
+ c->cont(c);
+ cli_write_trigger(c);
+ cli_yield(c);
+ continue;
+ }
+
+ byte *d = c->rx_buf;
+ byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2;
+ for (;;)
+ {
+ int ch = cli_getchar(c);
+ if (ch == '\r')
+ ;
+ else if (ch == '\n')
+ break;
+ else if (d < dend)
+ *d++ = ch;
+ }
+
+ if (d >= dend)
+ {
+ cli_printf(c, 9000, "Command too long");
+ cli_write_trigger(c);
+ continue;
+ }
+
+ *d = 0;
+ cli_command(c);
+ cli_write_trigger(c);
}
- cli_free(s->data);
}
static int
if (config->cli_debug)
log(L_INFO "CLI connect");
- s->rx_hook = cli_rx;
s->tx_hook = cli_tx;
s->err_hook = cli_err;
s->data = c = cli_new(s);
c->rx_pos = c->rx_buf;
c->rx_aux = NULL;
rmove(s, c->pool);
+ c->coro = coro_new(c->pool, cli_coroutine, c);
+ coro_resume(c->coro);
return 1;
}
void *tracked_fopen(struct pool *, char *name, char *mode);
void test_old_bird(char *path);
+/* Co-routines */
+typedef struct coroutine coroutine;
+coroutine *coro_new(struct pool *pool, void (*entry_point)(void *arg), void *arg);
+void coro_suspend(void);
+void coro_resume(coroutine *c);
+int coro_sk_read(struct birdsock *s);
+
/* krt.c bits */
void krt_io_init(void);