]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream: implement stream_buf_available()
authorWilly Tarreau <w@1wt.eu>
Tue, 6 Nov 2018 14:50:21 +0000 (15:50 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 11 Nov 2018 09:18:37 +0000 (10:18 +0100)
This function replaces stream_res_available(), which is used as a callback
for the buffer allocator. It now carefully checks which stream interface
was blocked on a buffer allocation, tries to allocate the input buffer to
this stream interface, and wakes the task up once such a buffer was found.
It will automatically remove the SI_FL_WAIT_ROOM flag upon success since
the info this flag indicates becomes wrong as soon as the buffer is
allocated.

The code is still far from being perfect because if a call to si_cs_recv()
fails to allocate a buffer, we'll still end up passing via process_stream()
again, but this could be improved in the future by using finer-grained
wake-up notifications.

include/proto/stream.h
src/stream.c

index 0e1cf66e0bbee0cc6bf8dfda10519b7af3069dbb..8c6773eeb44e160f390585f35fd10b05114ad07f 100644 (file)
@@ -57,6 +57,7 @@ int parse_track_counters(char **args, int *arg,
 /* Update the stream's backend and server time stats */
 void stream_update_time_stats(struct stream *s);
 void stream_release_buffers(struct stream *s);
+int stream_buf_available(void *arg);
 
 /* returns the session this stream belongs to */
 static inline struct session *strm_sess(const struct stream *strm)
@@ -347,18 +348,6 @@ static void inline stream_init_srv_conn(struct stream *sess)
        LIST_INIT(&sess->by_srv);
 }
 
-/* Callback used to wake up a stream when a buffer is available. The stream <s>
- * is woken up is if it is not already running and if it is not already in the
- * task run queue. This functions returns 1 is the stream is woken up, otherwise
- * it returns 0. */
-static int inline stream_res_wakeup(struct stream *s)
-{
-       if (s->task->state & TASK_RUNNING)
-               return 0;
-       task_wakeup(s->task, TASK_WOKEN_RES);
-       return 1;
-}
-
 void service_keywords_register(struct action_kw_list *kw_list);
 
 #endif /* _PROTO_STREAM_H */
index a3a65e17f42255c4d2fc49742bd4d007438d2734..2dfb2f9be02f3071f40a2e726482b153a9bc25a6 100644 (file)
@@ -85,6 +85,31 @@ int stream_create_from_cs(struct conn_stream *cs)
        return 0;
 }
 
+/* Callback used to wake up a stream when an input buffer is available. The
+ * stream <s>'s stream interfaces are checked for a failed buffer allocation
+ * as indicated by the presence of the SI_FL_WAIT_ROOM flag and the lack of a
+ * buffer, and and input buffer is assigned there (at most one). The function
+ * returns 1 and wakes the stream up if a buffer was taken, otherwise zero.
+ * It's designed to be called from __offer_buffer().
+ */
+int stream_buf_available(void *arg)
+{
+       struct stream *s = arg;
+
+       if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_WAIT_ROOM) &&
+           b_alloc_margin(&s->req.buf, global.tune.reserved_bufs))
+               s->si[0].flags &= ~SI_FL_WAIT_ROOM;
+       else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_WAIT_ROOM) &&
+                b_alloc_margin(&s->res.buf, 0))
+               s->si[1].flags &= ~SI_FL_WAIT_ROOM;
+       else
+               return 0;
+
+       task_wakeup(s->task, TASK_WOKEN_RES);
+       return 1;
+
+}
+
 /* This function is called from the session handler which detects the end of
  * handshake, in order to complete initialization of a valid stream. It must be
  * called with a completley initialized session. It returns the pointer to
@@ -157,7 +182,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
 
        LIST_INIT(&s->buffer_wait.list);
        s->buffer_wait.target = s;
-       s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup;
+       s->buffer_wait.wakeup_cb = stream_buf_available;
 
        s->flags |= SF_INITIALIZED;
        s->pcli_next_pid = 0;