int max;
if (conn->flags & CO_FL_ERROR)
- goto error;
+ return;
+
+ if (h2c->flags & H2_CF_DEM_BLOCK_ANY)
+ return;
buf = h2_get_dbuf(h2c);
if (!buf) {
/* note: buf->o == 0 */
max = buf->size - buf->i;
if (!max) {
- /* FIXME: buffer full, add a flag, stop polling and wait */
- __conn_xprt_stop_recv(conn);
+ h2c->flags |= H2_CF_DEM_DFULL;
return;
}
conn->xprt->rcv_buf(conn, buf, max);
if (conn->flags & CO_FL_ERROR)
- goto error;
+ return;
- if (!buf->i)
+ if (!buf->i) {
h2_release_dbuf(h2c);
-
- if (buf->i == buf->size) {
- /* buffer now full */
- __conn_xprt_stop_recv(conn);
return;
}
+ if (buf->i == buf->size)
+ h2c->flags |= H2_CF_DEM_DFULL;
+
/* FIXME: should we try to process streams here instead of doing it in ->wake ? */
- if (conn_xprt_read0_pending(conn))
- __conn_xprt_stop_recv(conn);
+ /* after streams have been processed, we should have made some room */
+ if (buf->i != buf->size)
+ h2c->flags &= ~H2_CF_DEM_DFULL;
return;
-
- error:
- __conn_xprt_stop_recv(conn);
}
/* callback called on send event by the connection handler */
/* FIXME: should we try to process pending streams here instead of doing it in ->wake ? */
if (conn->flags & CO_FL_ERROR)
- goto error;
+ return;
if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
/* a handshake was requested */
return;
}
- if (!h2c->mbuf->o) {
- /* nothing to send */
- goto done;
- }
-
if (conn->flags & CO_FL_SOCK_WR_SH) {
/* output closed, nothing to send, clear the buffer to release it */
h2c->mbuf->o = 0;
- goto done;
}
/* pending response data, we need to try to send or subscribe to
* problematic for ACKs. The latter should possibly not be set
* for now.
*/
- conn->xprt->snd_buf(conn, h2c->mbuf, 0);
+ if (conn->xprt->snd_buf(conn, h2c->mbuf, 0) > 0)
+ h2c->flags &= ~(H2_CF_MUX_MFULL | H2_CF_DEM_MROOM);
if (conn->flags & CO_FL_ERROR)
- goto error;
-
- if (!h2c->mbuf->o)
- h2_release_mbuf(h2c);
-
- if (h2c->mbuf->o) {
- /* incomplete send, the snd_buf callback has already updated
- * the connection flags.
- *
- * FIXME: we should arm a send timeout here
- */
- __conn_xprt_want_send(conn);
return;
- }
-
- done:
- /* FIXME: release the output buffer when empty or do it in ->wake() ? */
- __conn_xprt_stop_send(conn);
- return;
+}
- error:
- /* FIXME: report an error somewhere in the mux */
- __conn_xprt_stop_send(conn);
- return;
+/* call the wake up function of all streams attached to the connection */
+static void h2_wake_all_streams(struct h2c *h2c)
+{
+ struct eb32_node *node;
+ struct h2s *h2s;
+ unsigned int flags = 0;
+
+ if (h2c->st0 >= H2_CS_ERROR || h2c->conn->flags & CO_FL_ERROR)
+ flags |= CS_FL_ERROR;
+
+ if (conn_xprt_read0_pending(h2c->conn))
+ flags |= CS_FL_EOS;
+
+ node = eb32_first(&h2c->streams_by_id);
+ while (node) {
+ h2s = container_of(node, struct h2s, by_id);
+ node = eb32_next(node);
+ if (h2s->cs) {
+ h2s->cs->flags |= flags;
+ /* recv is used to force to detect CS_FL_EOS that wake()
+ * doesn't handle in the stream int code.
+ */
+ h2s->cs->data_cb->recv(h2s->cs);
+ h2s->cs->data_cb->wake(h2s->cs);
+ }
+ }
}
/* callback called on any event by the connection handler.
{
struct h2c *h2c = conn->mux_ctx;
- if ((conn->flags & CO_FL_ERROR) && eb_is_empty(&h2c->streams_by_id)) {
- h2_release(conn);
- return -1;
+ if (conn->flags & CO_FL_ERROR || h2c->st0 == H2_CS_ERROR2) {
+ h2_wake_all_streams(h2c);
+
+ if (eb_is_empty(&h2c->streams_by_id)) {
+ /* no more stream, kill the connection now */
+ h2_release(conn);
+ return -1;
+ }
+ else {
+ /* some streams still there, we need to signal them all and
+ * wait for their departure.
+ */
+ __conn_xprt_stop_recv(conn);
+ __conn_xprt_stop_send(conn);
+ return 0;
+ }
+ }
+
+ if (!h2c->dbuf->i)
+ h2_release_dbuf(h2c);
+
+ /* stop being notified of incoming data if we can't process them */
+ if (h2c->st0 >= H2_CS_ERROR ||
+ (h2c->flags & H2_CF_DEM_BLOCK_ANY) || conn_xprt_read0_pending(conn)) {
+ /* FIXME: we should clear a read timeout here */
+ __conn_xprt_stop_recv(conn);
+ }
+ else {
+ /* FIXME: we should (re-)arm a read timeout here */
+ __conn_xprt_want_recv(conn);
+ }
+
+ /* adjust output polling */
+ if ((h2c->st0 == H2_CS_ERROR || h2c->mbuf->o) &&
+ !(conn->flags & CO_FL_SOCK_WR_SH)) {
+ /* FIXME: we should (re-)arm a send timeout here */
+ __conn_xprt_want_send(conn);
+ }
+ else {
+ /* FIXME: we should clear a send timeout here */
+ h2_release_mbuf(h2c);
+ __conn_xprt_stop_send(conn);
}
return 0;