struct ssh_sock *ssh; /* Used in SK_SSH */
struct coroutine *rx_coroutine;
+ struct coroutine *tx_coroutine;
} sock;
sock *sock_new(pool *); /* Allocate new socket */
s->tbuf = o->outpos;
o->outpos = o->wpos;
- if (sk_send(s, len) <= 0)
- return;
+ coro_sk_write(s, len);
c->tx_pos = o->next;
}
cli_write(c);
}
-static void
-cli_tx_hook(sock *s)
-{
- cli_write(s->data);
-}
-
static void
cli_err_hook(sock *s, int err)
{
s->pool = c->pool; /* We need to have all the socket buffers allocated in the cli pool */
rmove(s, c->pool);
- s->tx_hook = cli_tx_hook;
s->err_hook = cli_err_hook;
s->data = c;
coro_free(resource *r)
{
coroutine *c = (coroutine *) r;
+ ASSERT(coro_current != c);
pthread_cancel(c->thread);
pthread_join(c->thread, NULL);
}
return 0;
}
+static void
+coro_sk_tx_hook(sock *sk)
+{
+ ASSERT(sk->tx_coroutine);
+ ASSERT(!coro_current);
+ coro_resume(sk->tx_coroutine);
+}
+
int
coro_sk_read(sock *s)
{
s->rx_hook = NULL;
return s->rpos - s->rbuf;
}
+
+void
+coro_sk_write(sock *s, unsigned len)
+{
+ ASSERT(coro_current);
+ s->tx_coroutine = coro_current;
+ s->tx_hook = coro_sk_tx_hook;
+ s->ttx = s->tbuf;
+ s->tpos = s->tbuf + len;
+ coro_suspend();
+ s->tx_hook = NULL;
+}