From: Willy Tarreau Date: Tue, 6 Nov 2018 14:50:21 +0000 (+0100) Subject: MEDIUM: stream: implement stream_buf_available() X-Git-Tag: v1.9-dev6~18 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=b882dd88cc5996b2057ee0131bd59436002c1427;p=thirdparty%2Fhaproxy.git MEDIUM: stream: implement stream_buf_available() This function replaces stream_res_available(), which is used as a callback for the buffer allocator. It now carefully checks which stream interface was blocked on a buffer allocation, tries to allocate the input buffer to this stream interface, and wakes the task up once such a buffer was found. It will automatically remove the SI_FL_WAIT_ROOM flag upon success since the info this flag indicates becomes wrong as soon as the buffer is allocated. The code is still far from being perfect because if a call to si_cs_recv() fails to allocate a buffer, we'll still end up passing via process_stream() again, but this could be improved in the future by using finer-grained wake-up notifications. --- diff --git a/include/proto/stream.h b/include/proto/stream.h index 0e1cf66e0b..8c6773eeb4 100644 --- a/include/proto/stream.h +++ b/include/proto/stream.h @@ -57,6 +57,7 @@ int parse_track_counters(char **args, int *arg, /* Update the stream's backend and server time stats */ void stream_update_time_stats(struct stream *s); void stream_release_buffers(struct stream *s); +int stream_buf_available(void *arg); /* returns the session this stream belongs to */ static inline struct session *strm_sess(const struct stream *strm) @@ -347,18 +348,6 @@ static void inline stream_init_srv_conn(struct stream *sess) LIST_INIT(&sess->by_srv); } -/* Callback used to wake up a stream when a buffer is available. The stream - * is woken up is if it is not already running and if it is not already in the - * task run queue. This functions returns 1 is the stream is woken up, otherwise - * it returns 0. */ -static int inline stream_res_wakeup(struct stream *s) -{ - if (s->task->state & TASK_RUNNING) - return 0; - task_wakeup(s->task, TASK_WOKEN_RES); - return 1; -} - void service_keywords_register(struct action_kw_list *kw_list); #endif /* _PROTO_STREAM_H */ diff --git a/src/stream.c b/src/stream.c index a3a65e17f4..2dfb2f9be0 100644 --- a/src/stream.c +++ b/src/stream.c @@ -85,6 +85,31 @@ int stream_create_from_cs(struct conn_stream *cs) return 0; } +/* Callback used to wake up a stream when an input buffer is available. The + * stream 's stream interfaces are checked for a failed buffer allocation + * as indicated by the presence of the SI_FL_WAIT_ROOM flag and the lack of a + * buffer, and and input buffer is assigned there (at most one). The function + * returns 1 and wakes the stream up if a buffer was taken, otherwise zero. + * It's designed to be called from __offer_buffer(). + */ +int stream_buf_available(void *arg) +{ + struct stream *s = arg; + + if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_WAIT_ROOM) && + b_alloc_margin(&s->req.buf, global.tune.reserved_bufs)) + s->si[0].flags &= ~SI_FL_WAIT_ROOM; + else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_WAIT_ROOM) && + b_alloc_margin(&s->res.buf, 0)) + s->si[1].flags &= ~SI_FL_WAIT_ROOM; + else + return 0; + + task_wakeup(s->task, TASK_WOKEN_RES); + return 1; + +} + /* This function is called from the session handler which detects the end of * handshake, in order to complete initialization of a valid stream. It must be * called with a completley initialized session. It returns the pointer to @@ -157,7 +182,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) LIST_INIT(&s->buffer_wait.list); s->buffer_wait.target = s; - s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup; + s->buffer_wait.wakeup_cb = stream_buf_available; s->flags |= SF_INITIALIZED; s->pcli_next_pid = 0;