struct pool_head *pool2_session;
struct list sessions;
+/* list of sessions waiting for at least one buffer */
+struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
+
static int conn_session_complete(struct connection *conn);
static int conn_session_update(struct connection *conn);
static struct task *expire_mini_session(struct task *t);
/* OK, we're keeping the session, so let's properly initialize the session */
LIST_ADDQ(&sessions, &s->list);
LIST_INIT(&s->back_refs);
+ LIST_INIT(&s->buffer_wait);
s->flags |= SN_INITIALIZED;
s->unique_id = NULL;
if (s->rep->pipe)
put_pipe(s->rep->pipe);
- b_free(&s->req->buf);
- b_free(&s->rep->buf);
+ /* We may still be present in the buffer wait queue */
+ if (!LIST_ISEMPTY(&s->buffer_wait)) {
+ LIST_DEL(&s->buffer_wait);
+ LIST_INIT(&s->buffer_wait);
+ }
+
+ b_drop(&s->req->buf);
+ b_drop(&s->rep->buf);
+ if (!LIST_ISEMPTY(&buffer_wq))
+ session_offer_buffers(1);
pool_free2(pool2_channel, s->req);
pool_free2(pool2_channel, s->rep);
if (b)
return 1;
- /* FIXME: normally we're supposed to subscribe to a list of waiters
- * for buffers. We release what we failed to allocate.
- */
+ if (LIST_ISEMPTY(&s->buffer_wait))
+ LIST_ADDQ(&buffer_wq, &s->buffer_wait);
return 0;
}
/* Allocates up to two buffers for session <s>. Only succeeds if both buffers
* are properly allocated. It is meant to be called inside process_session() so
- * that both request and response buffers are allocated. Returns 0 incase of
+ * that both request and response buffers are allocated. Returns 0 in case of
* failure, non-zero otherwise.
*/
int session_alloc_buffers(struct session *s)
{
- if (!s->req->buf->size && !b_alloc(&s->req->buf))
- return 0;
+ if (!LIST_ISEMPTY(&s->buffer_wait)) {
+ LIST_DEL(&s->buffer_wait);
+ LIST_INIT(&s->buffer_wait);
+ }
- if (s->rep->buf->size || b_alloc(&s->rep->buf))
+ if ((s->req->buf->size || b_alloc(&s->req->buf)) &&
+ (s->rep->buf->size || b_alloc(&s->rep->buf)))
return 1;
- if (buffer_empty(s->req->buf)) {
- __b_drop(&s->req->buf);
- s->req->buf = &buf_wanted;
- }
-
- /* FIXME: normally we're supposed to subscribe to a list of waiters
- * for buffers. We release what we failed to allocate.
- */
+ session_release_buffers(s);
+ LIST_ADDQ(&buffer_wq, &s->buffer_wait);
return 0;
}
/* releases unused buffers after processing. Typically used at the end of the
- * update() functions.
+ * update() functions. It will try to wake up as many tasks as the number of
+ * buffers that it releases. In practice, most often sessions are blocked on
+ * a single buffer, so it makes sense to try to wake two up when two buffers
+ * are released at once.
*/
void session_release_buffers(struct session *s)
{
+ int release_count = 0;
+
+ release_count = !!s->req->buf->size + !!s->rep->buf->size;
+
if (s->req->buf->size && buffer_empty(s->req->buf))
b_free(&s->req->buf);
if (s->rep->buf->size && buffer_empty(s->rep->buf))
b_free(&s->rep->buf);
- /* FIXME: normally we want to wake up pending tasks */
+ /* if we're certain to have at least 1 buffer available, and there is
+ * someone waiting, we can wake up a waiter and offer them.
+ */
+ if (release_count >= 1 && !LIST_ISEMPTY(&buffer_wq))
+ session_offer_buffers(release_count);
+}
+
+/* run across the list of pending sessions waiting for a buffer and wake
+ * one up if buffers are available.
+ */
+void session_offer_buffers(int count)
+{
+ struct session *sess, *bak;
+
+ list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) {
+ if (sess->task->state & TASK_RUNNING)
+ continue;
+
+ LIST_DEL(&sess->buffer_wait);
+ LIST_INIT(&sess->buffer_wait);
+
+ task_wakeup(sess->task, TASK_WOKEN_RES);
+ if (--count <= 0)
+ break;
+ }
}
/* perform minimal intializations, report 0 in case of error, 1 if OK. */