}
if (h2c->wait_list.task)
tasklet_free(h2c->wait_list.task);
+ LIST_DEL(&h2c->wait_list.list);
+ LIST_INIT(&h2c->wait_list.list);
pool_free(pool_head_h2c, h2c);
}
b_free(&h2s->rxbuf);
offer_buffers(NULL, tasks_run_queue);
}
+ LIST_DEL(&h2s->wait_list.list);
+ LIST_INIT(&h2s->wait_list.list);
tasklet_free(h2s->wait_list.task);
pool_free(pool_head_h2s, h2s);
}
}
h2s->cs->flags |= flags;
- h2s->cs->data_cb->wake(h2s->cs);
+ if (h2s->recv_wait_list) {
+ struct wait_list *sw = h2s->recv_wait_list;
+ sw->wait_reason &= ~SUB_CAN_RECV;
+ tasklet_wakeup(sw->task);
+ h2s->recv_wait_list = NULL;
+ }
if (flags & CS_FL_ERROR && h2s->st < H2_SS_ERROR)
h2s->st = H2_SS_ERROR;
if (h2s->cs) {
h2s->cs->flags |= CS_FL_REOS | CS_FL_ERROR;
- h2s->cs->data_cb->wake(h2s->cs);
+ if (h2s->recv_wait_list) {
+ struct wait_list *sw = h2s->recv_wait_list;
+
+ sw->wait_reason &= ~SUB_CAN_RECV;
+ tasklet_wakeup(sw->task);
+ h2s->recv_wait_list = NULL;
+ }
}
h2s->flags |= H2_SF_RST_RCVD;
if (tmp_h2s != h2s && h2s && h2s->cs && b_data(&h2s->rxbuf)) {
/* we may have to signal the upper layers */
h2s->cs->flags |= CS_FL_RCV_MORE;
- if (h2s->cs->data_cb->wake(h2s->cs) < 0) {
- /* cs has just been destroyed, we have to kill h2s. */
- h2s_error(h2s, H2_ERR_STREAM_CLOSED);
- goto strm_err;
+ if (h2s->recv_wait_list) {
+ h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV;
+ tasklet_wakeup(h2s->recv_wait_list->task);
+ h2s->recv_wait_list = NULL;
}
-
if (h2c->st0 >= H2_CS_ERROR)
goto strm_err;
if (h2s && h2s->cs && b_data(&h2s->rxbuf)) {
/* we may have to signal the upper layers */
h2s->cs->flags |= CS_FL_RCV_MORE;
- if (h2s->cs->data_cb->wake(h2s->cs) < 0) {
- /* cs has just been destroyed, we have to kill h2s. */
- h2s_error(h2s, H2_ERR_STREAM_CLOSED);
- h2c_send_rst_stream(h2c, h2s);
+ if (h2s->recv_wait_list) {
+ h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV;
+ tasklet_wakeup(h2s->recv_wait_list->task);
+ h2s->recv_wait_list = NULL;
}
}
return;
while (node) {
h2s = container_of(node, struct h2s, by_id);
- if (h2s->cs->flags & CS_FL_WAIT_FOR_HS)
- h2s->cs->data_cb->wake(h2s->cs);
+ if ((h2s->cs->flags & CS_FL_WAIT_FOR_HS) &&
+ h2s->recv_wait_list) {
+ struct wait_list *sw = h2s->recv_wait_list;
+ sw->wait_reason &= ~SUB_CAN_RECV;
+ tasklet_wakeup(sw->task);
+ h2s->recv_wait_list = NULL;
+ }
node = eb32_next(node);
}
}
si_f = &s->si[0];
si_b = &s->si[1];
+ /* First, attempd to do I/Os */
+ si_cs_io_cb(NULL, si_f, 0);
+ si_cs_io_cb(NULL, si_b, 0);
+
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
// si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
#endif
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
stream_release_buffers(s);
+ /* We may have free'd some space in buffers, or have more to send/recv, try again */
+ si_cs_io_cb(NULL, si_f, 0);
+ si_cs_io_cb(NULL, si_b, 0);
return t; /* nothing more to do */
}
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
static int si_cs_recv(struct conn_stream *cs);
-static int si_cs_wake_cb(struct conn_stream *cs);
+static int si_cs_process(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct conn_stream *cs);
static int si_cs_send(struct conn_stream *cs);
};
struct data_cb si_conn_cb = {
- .wake = si_cs_wake_cb,
.name = "STRM",
};
}
-/* Callback to be used by connection I/O handlers upon completion. It propagates
+/* Called by I/O handlers after completion.. It propagates
* connection flags to the stream interface, updates the stream (which may or
* may not take this opportunity to try to forward data), then update the
* connection's polling based on the channels and stream interface's final
* states. The function always returns 0.
*/
-static int si_cs_wake_cb(struct conn_stream *cs)
+static int si_cs_process(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
- /* if the CS's input buffer already has data available, let's try to
- * receive now. The new muxes do this. The CS_FL_REOS is another cause
- * for recv() (received only an empty response).
- */
- if (!(cs->flags & CS_FL_EOS) &&
- (cs->flags & (CS_FL_DATA_RD_ENA)))
- si_cs_recv(cs);
-
/* If we have data to send, try it now */
if (!channel_is_empty(oc) && objt_cs(si->end))
si_cs_send(objt_cs(si->end));
return 0;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return 0;
+ return 1;
if (conn->flags & CO_FL_HANDSHAKE) {
/* a handshake was requested */
/* we might have been called just after an asynchronous shutw */
if (si_oc(si)->flags & CF_SHUTW)
- return 0;
+ return 1;
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
}
}
/* We couldn't send all of our data, let the mux know we'd like to send more */
- if (co_data(oc))
+ if (co_data(oc)) {
+ cs_want_send(cs);
conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list);
+ }
wake_others:
/* Maybe somebody was waiting for this conn_stream, wake them */
if (!cs)
return NULL;
+redo:
if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
ret = si_cs_send(cs);
if (!(si->wait_list.wait_reason & SUB_CAN_RECV))
ret |= si_cs_recv(cs);
if (ret != 0)
- si_cs_wake_cb(cs);
+ si_cs_process(cs);
return (NULL);
}
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
/* stop reading */
- if (!(ic->flags & CF_DONT_READ)) /* full */
+ if (!(ic->flags & CF_DONT_READ)) /* full */ {
si->flags |= SI_FL_WAIT_ROOM;
+ }
__cs_stop_recv(cs);
}
else {
* which rejects it before reading it all.
*/
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return 0;
+ return 1; // We want to make sure si_cs_wake() is called, so that process_strema is woken up, on failure
/* If another call to si_cs_recv() failed, and we subscribed to
* recv events already, give up now.
/* maybe we were called immediately after an asynchronous shutr */
if (ic->flags & CF_SHUTR)
- return 0;
+ return 1;
/* stop here if we reached the end of data */
if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return cur_read != 0;
+ return 1;
if (conn->flags & CO_FL_WAIT_ROOM) {
/* the pipe is full or we have read enough data that it
end_recv:
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return cur_read != 0;
+ return 1;
if (cs->flags & CS_FL_EOS)
/* connection closed */
if (ic->flags & CF_AUTO_CLOSE)
channel_shutw_now(ic);
stream_sock_read0(si);
- return cur_read != 0;
+ return 1;
}
/*