struct eb_root streams_by_id; /* all active streams by their ID */
struct list send_list; /* list of blocked streams requesting to send */
struct list fctl_list; /* list of streams blocked by connection's fctl */
+ struct list sending_list; /* list of h2s scheduled to send data */
struct buffer_wait buf_wait; /* wait list for buffer allocations */
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
};
h2c->streams_by_id = EB_ROOT_UNIQUE;
LIST_INIT(&h2c->send_list);
LIST_INIT(&h2c->fctl_list);
+ LIST_INIT(&h2c->sending_list);
LIST_INIT(&h2c->buf_wait.list);
conn->mux_ctx = h2c;
h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
+ h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
tasklet_wakeup(h2s->send_wait->task);
- h2s->send_wait = NULL;
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
+ LIST_ADDQ(&h2c->sending_list, &h2s->list);
}
list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) {
h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
+ h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
tasklet_wakeup(h2s->send_wait->task);
- h2s->send_wait = NULL;
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
+ LIST_ADDQ(&h2c->sending_list, &h2s->list);
}
fail:
struct h2s *, list);
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
+ LIST_ADDQ(&h2c->sending_list, &h2s->list);
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
+ h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
tasklet_wakeup(h2s->send_wait->task);
- h2s->send_wait = NULL;
}
}
/* We're done, no more to send */
h2s->send_wait = NULL;
}
}
+ if (event_type & SUB_CALL_UNSUBSCRIBE) {
+ sw = param;
+ if (h2s->send_wait == sw) {
+ sw->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
+ h2s->send_wait = NULL;
+ }
+ }
return 0;
}
return ret;
}
+static void h2_stop_senders(struct h2c *h2c)
+{
+ struct h2s *h2s, *h2s_back;
+
+ list_for_each_entry_safe(h2s, h2s_back, &h2c->sending_list, list) {
+ /* Don't unschedule the stream if the mux is just busy waiting for more data fro mthat stream */
+ if (h2c->msi == h2s_id(h2s))
+ continue;
+ LIST_DEL(&h2s->list);
+ LIST_INIT(&h2s->list);
+ task_remove_from_task_list((struct task *)h2s->send_wait->task);
+ h2s->send_wait->wait_reason |= SUB_CAN_SEND;
+ h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
+ LIST_ADD(&h2c->send_list, &h2s->list);
+ }
+}
+
/* Called from the upper layer, to send data */
static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
size_t total = 0;
size_t ret;
+ if (h2s->send_wait) {
+ h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
+ h2s->send_wait = NULL;
+ LIST_DEL(&h2s->list);
+ LIST_INIT(&h2s->list);
+ }
if (h2s->h2c->st0 < H2_CS_FRAME_H)
return 0;
}
b_del(buf, total);
+
+ /* The mux is full, cancel the pending tasks */
+ if ((h2s->h2c->flags & H2_CF_MUX_BLOCK_ANY) ||
+ (h2s->flags & H2_SF_BLK_MBUSY))
+ h2_stop_senders(h2s->h2c);
if (total > 0) {
if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND))
tasklet_wakeup(h2s->h2c->wait_event.task);
+
}
return total;
}