{
if (c_size(chn) && c_empty(chn)) {
b_free(&chn->buf);
- offer_buffers(wait->target, tasks_run_queue);
+ offer_buffers(wait->target, 1);
}
}
}
-/* Offer a buffer currently belonging to target <from> to whoever needs one.
- * Any pointer is valid for <from>, including NULL. Its purpose is to avoid
- * passing a buffer to oneself in case of failed allocations (e.g. need two
- * buffers, get one, fail, release it and wake up self again). In case of
- * normal buffer release where it is expected that the caller is not waiting
+/* Offer one or multiple buffer currently belonging to target <from> to whoever
+ * needs one. Any pointer is valid for <from>, including NULL. Its purpose is
+ * to avoid passing a buffer to oneself in case of failed allocations (e.g.
+ * need two buffers, get one, fail, release it and wake up self again). In case
+ * of normal buffer release where it is expected that the caller is not waiting
* for a buffer, NULL is fine. It will wake waiters on the current thread only.
*/
-void __offer_buffer(void *from, unsigned int threshold);
+void __offer_buffers(void *from, unsigned int count);
-static inline void offer_buffers(void *from, unsigned int threshold)
+static inline void offer_buffers(void *from, unsigned int count)
{
if (!LIST_ISEMPTY(&ti->buffer_wq))
- __offer_buffer(from, threshold);
+ __offer_buffers(from, count);
}
{
if (bptr->size) {
b_free(bptr);
- offer_buffers(check->buf_wait.target, tasks_run_queue);
+ offer_buffers(check->buf_wait.target, 1);
}
}
fflush(o);
}
-/* see offer_buffer() for details */
-void __offer_buffer(void *from, unsigned int threshold)
+/* see offer_buffers() for details */
+void __offer_buffers(void *from, unsigned int count)
{
struct buffer_wait *wait, *wait_back;
- int avail;
/* For now, we consider that all objects need 1 buffer, so we can stop
* waking up them once we have enough of them to eat all the available
* buffers. Note that we don't really know if they are streams or just
* other tasks, but that's a rough estimate. Similarly, for each cached
- * event we'll need 1 buffer. If no buffer is currently used, always
- * wake up the number of tasks we can offer a buffer based on what is
- * allocated, and in any case at least one task per two reserved
- * buffers.
+ * event we'll need 1 buffer.
*/
- avail = pool_head_buffer->allocated - pool_head_buffer->used - global.tune.reserved_bufs / 2;
-
list_for_each_entry_safe(wait, wait_back, &ti->buffer_wq, list) {
- if (avail <= threshold)
+ if (!count)
break;
if (wait->target == from || !wait->wakeup_cb(wait->target))
continue;
LIST_DEL_INIT(&wait->list);
- avail--;
+ count--;
}
}
/* Release the buffer if needed */
if (buf->size) {
b_free(buf);
- offer_buffers(buffer_wait->target, tasks_run_queue);
+ offer_buffers(buffer_wait->target, 1);
}
}
{
if (bptr->size) {
b_free(bptr);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
}
count++;
}
if (count)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, count);
}
/* Returns the number of allocatable outgoing streams for the connection taking
eb32_delete(&fstrm->by_id);
if (b_size(&fstrm->rxbuf)) {
b_free(&fstrm->rxbuf);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
if (fstrm->subs)
fstrm->subs->events = 0;
}
if (released)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, released);
/* wrote at least one byte, the buffer is not full anymore */
if (fconn->flags & (FCGI_CF_MUX_MFULL | FCGI_CF_DEM_MROOM))
}
if (released)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, released);
}
end:
{
if (bptr->size) {
b_free(bptr);
- offer_buffers(h1c->buf_wait.target, tasks_run_queue);
+ offer_buffers(h1c->buf_wait.target, 1);
}
}
{
if (bptr->size) {
b_free(bptr);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
}
count++;
}
if (count)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, count);
}
/* returns the number of allocatable outgoing streams for the connection taking
eb32_delete(&h2s->by_id);
if (b_size(&h2s->rxbuf)) {
b_free(&h2s->rxbuf);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
if (h2s->subs)
}
if (released)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, released);
/* wrote at least one byte, the buffer is not full anymore */
if (sent)
}
if (released)
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, released);
}
/* in any case this connection must not be considered idle anymore */
cs->flags |= CS_FL_ERROR;
if (b_size(&h2s->rxbuf)) {
b_free(&h2s->rxbuf);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, 1);
}
}
LIST_DEL_INIT(&s->buffer_wait.list);
if (s->req.buf.size || s->res.buf.size) {
+ int count = !!s->req.buf.size + !!s->res.buf.size;
+
b_free(&s->req.buf);
b_free(&s->res.buf);
- offer_buffers(NULL, tasks_run_queue);
+ offer_buffers(NULL, count);
}
pool_free(pool_head_uniqueid, s->unique_id.ptr);
int offer = 0;
if (c_size(&s->req) && c_empty(&s->req)) {
- offer = 1;
+ offer++;
b_free(&s->req.buf);
}
if (c_size(&s->res) && c_empty(&s->res)) {
- offer = 1;
+ offer++;
b_free(&s->res.buf);
}
* someone waiting, we can wake up a waiter and offer them.
*/
if (offer)
- offer_buffers(s, tasks_run_queue);
+ offer_buffers(s, offer);
}
void stream_process_counters(struct stream *s)