]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: streams: use one list per stream instead of a global one
authorWilly Tarreau <w@1wt.eu>
Wed, 24 Feb 2021 09:37:01 +0000 (10:37 +0100)
committerWilly Tarreau <w@1wt.eu>
Wed, 24 Feb 2021 12:53:20 +0000 (13:53 +0100)
The global streams list is exclusively used for "show sess", to look up
a stream to shut down, and for the hard-stop. Having all of them in a
single list is extremely expensive in terms of locking when using threads,
with performance losses as high as 7% having been observed just due to
this.

This patch makes the list per-thread, since there's no need to have a
global one in this situation. All call places just iterate over all
threads. The most "invasive" changes was in "show sess" where the end
of list needs to go back to the beginning of next thread's list until
the last thread is seen. For now the lock was maintained to keep the
code auditable but a next commit should get rid of it.

The observed performance gain here with only 4 threads is already 7%
(350krps -> 374krps).

include/haproxy/applet-t.h
include/haproxy/stream-t.h
include/haproxy/stream.h
include/haproxy/tinfo-t.h
src/proxy.c
src/stream.c

index f43f121c20c41d68290f1f6b2eacfc0e771e0a3f..49c8ab499d0895a887dd93b8ddcc81aac100baaa 100644 (file)
@@ -134,6 +134,7 @@ struct appctx {
                struct {
                        struct bref bref;       /* back-reference from the session being dumped */
                        void *target;           /* session we want to dump, or NULL for all */
+                       unsigned int thr;       /* the thread number being explored (0..MAX_THREADS-1) */
                        unsigned int uid;       /* if non-null, the uniq_id of the session being dumped */
                        int section;            /* section of the session being dumped */
                        int pos;                /* last position of the current session's buffer */
index 429d99244ccf3822b529711b8fd0a3616838545f..447dbc71361841264de14b138b72c5982664b4a2 100644 (file)
@@ -139,7 +139,7 @@ struct stream {
        int16_t priority_class;         /* priority class of the stream for the pending queue */
        int32_t priority_offset;        /* priority offset of the stream for the pending queue */
 
-       struct list list;               /* position in global streams list */
+       struct list list;               /* position in the thread's streams list */
        struct mt_list by_srv;          /* position in server stream list */
        struct list back_refs;          /* list of users tracking this stream */
        struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
index f35ee3dba5664c0778e97544cb26f1032027798c..d4d202b2f87397b9b9c703f2ec615aae44c3bbf5 100644 (file)
@@ -54,7 +54,6 @@ extern struct trace_source trace_strm;
 
 extern struct pool_head *pool_head_stream;
 extern struct pool_head *pool_head_uniqueid;
-extern struct list streams;
 
 extern struct data_cb sess_conn_cb;
 
index 4242a3f7b01bce385ec6894beddecefb565c9ce9..89fde4f1760fb9c53ddc1ef60ddd971e8c6d10da 100644 (file)
@@ -47,6 +47,9 @@ struct thread_info {
 #endif
        struct list buffer_wq;     /* buffer waiters */
 
+       struct list streams;       /* list of streams attached to this thread */
+       __decl_thread(HA_SPINLOCK_T streams_lock); /* shared with "show sess" */
+
        /* pad to cache line (64B) */
        char __pad[0];            /* unused except to check remaining room */
        char __end[0] __attribute__((aligned(64)));
index ea7fc7f6f622e3f3216d078d1649e9a9727430bc..5edbbfe238b42de51d90659f5a9cdf6ec97caac0 100644 (file)
@@ -1639,9 +1639,13 @@ struct task *hard_stop(struct task *t, void *context, unsigned short state)
        }
 
        thread_isolate();
-       list_for_each_entry(s, &streams, list) {
-               stream_shutdown(s, SF_ERR_KILLED);
+
+       for (thr = 0; thr < global.nbthread; thr++) {
+               list_for_each_entry(s, &ha_thread_info[thr].streams, list) {
+                       stream_shutdown(s, SF_ERR_KILLED);
+               }
        }
+
        thread_release();
 
        killed = 1;
index d04581a335921ecc66732d5396623a44beecbb69..83c9345756003a6856edcb87aff410f288c3cf81 100644 (file)
@@ -66,8 +66,6 @@ DECLARE_POOL(pool_head_uniqueid, "uniqueid", UNIQUEID_LEN);
 
 /* incremented by each "show sess" to fix a delimiter between streams */
 unsigned stream_epoch = 0;
-struct list streams = LIST_HEAD_INIT(streams);
-__decl_spinlock(streams_lock);
 
 /* List of all use-service keywords. */
 static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
@@ -542,9 +540,9 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu
 
        s->tunnel_timeout = TICK_ETERNITY;
 
-       HA_SPIN_LOCK(STRMS_LOCK, &streams_lock);
-       LIST_ADDQ(&streams, &s->list);
-       HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+       HA_SPIN_LOCK(STRMS_LOCK, &ti->streams_lock);
+       LIST_ADDQ(&ti->streams, &s->list);
+       HA_SPIN_UNLOCK(STRMS_LOCK, &ti->streams_lock);
 
        if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
                goto out_fail_accept;
@@ -713,19 +711,19 @@ static void stream_free(struct stream *s)
 
        stream_store_counters(s);
 
-       HA_SPIN_LOCK(STRMS_LOCK, &streams_lock);
+       HA_SPIN_LOCK(STRMS_LOCK, &ti->streams_lock);
        list_for_each_entry_safe(bref, back, &s->back_refs, users) {
                /* we have to unlink all watchers. We must not relink them if
                 * this stream was the last one in the list.
                 */
                LIST_DEL(&bref->users);
                LIST_INIT(&bref->users);
-               if (s->list.n != &streams)
+               if (s->list.n != &ti->streams)
                        LIST_ADDQ(&LIST_ELEM(s->list.n, struct stream *, list)->back_refs, &bref->users);
                bref->ref = s->list.n;
        }
        LIST_DEL(&s->list);
-       HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+       HA_SPIN_UNLOCK(STRMS_LOCK, &ti->streams_lock);
 
        /* applets do not release session yet */
        must_free_sess = objt_appctx(sess->origin) && sess->origin == s->si[0].end;
@@ -2739,6 +2737,16 @@ void stream_dump_and_crash(enum obj_type *obj, int rate)
        abort();
 }
 
+/* initialize the require structures */
+static void init_stream()
+{
+       int thr;
+
+       for (thr = 0; thr < MAX_THREADS; thr++)
+               LIST_INIT(&ha_thread_info[thr].streams);
+}
+INITCALL0(STG_INIT, init_stream);
+
 /* Generates a unique ID based on the given <format>, stores it in the given <strm> and
  * returns the unique ID.
 
@@ -3187,6 +3195,7 @@ static int cli_parse_show_sess(char **args, char *payload, struct appctx *appctx
                appctx->ctx.sess.target = NULL;
        appctx->ctx.sess.section = 0; /* start with stream status */
        appctx->ctx.sess.pos = 0;
+       appctx->ctx.sess.thr = 0;
 
        /* let's set our own stream's epoch to the current one and increment
         * it so that we know which streams were already there before us.
@@ -3232,7 +3241,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
                 * pointer points back to the head of the streams list.
                 */
                LIST_INIT(&appctx->ctx.sess.bref.users);
-               appctx->ctx.sess.bref.ref = streams.n;
+               appctx->ctx.sess.bref.ref = ha_thread_info[appctx->ctx.sess.thr].streams.n;
                appctx->st2 = STAT_ST_LIST;
                /* fall through */
 
@@ -3244,15 +3253,27 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
                }
 
                /* and start from where we stopped */
-               while (appctx->ctx.sess.bref.ref != &streams) {
+               while (1) {
                        char pn[INET6_ADDRSTRLEN];
                        struct stream *curr_strm;
+                       int done= 0;
 
-                       curr_strm = LIST_ELEM(appctx->ctx.sess.bref.ref, struct stream *, list);
+                       if (appctx->ctx.sess.bref.ref == &ha_thread_info[appctx->ctx.sess.thr].streams)
+                               done = 1;
+                       else {
+                               /* check if we've found a stream created after issuing the "show sess" */
+                               curr_strm = LIST_ELEM(appctx->ctx.sess.bref.ref, struct stream *, list);
+                               if ((int)(curr_strm->stream_epoch - si_strm(appctx->owner)->stream_epoch) > 0)
+                                       done = 1;
+                       }
 
-                       /* check if we've found a stream created after issuing the "show sess" */
-                       if ((int)(curr_strm->stream_epoch - si_strm(appctx->owner)->stream_epoch) > 0)
-                               break;
+                       if (done) {
+                               appctx->ctx.sess.thr++;
+                               if (appctx->ctx.sess.thr >= global.nbthread)
+                                       break;
+                               appctx->ctx.sess.bref.ref = ha_thread_info[appctx->ctx.sess.thr].streams.n;
+                               continue;
+                       }
 
                        if (appctx->ctx.sess.target) {
                                if (appctx->ctx.sess.target != (void *)-1 && appctx->ctx.sess.target != curr_strm)
@@ -3425,11 +3446,11 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
 
 static void cli_release_show_sess(struct appctx *appctx)
 {
-       if (appctx->st2 == STAT_ST_LIST) {
-               HA_SPIN_LOCK(STRMS_LOCK, &streams_lock);
+       if (appctx->st2 == STAT_ST_LIST && appctx->ctx.sess.thr < global.nbthread) {
+               HA_SPIN_LOCK(STRMS_LOCK, &ha_thread_info[appctx->ctx.sess.thr].streams_lock);
                if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users))
                        LIST_DEL(&appctx->ctx.sess.bref.users);
-               HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+               HA_SPIN_UNLOCK(STRMS_LOCK, &ha_thread_info[appctx->ctx.sess.thr].streams_lock);
        }
 }
 
@@ -3437,6 +3458,7 @@ static void cli_release_show_sess(struct appctx *appctx)
 static int cli_parse_shutdown_session(char **args, char *payload, struct appctx *appctx, void *private)
 {
        struct stream *strm, *ptr;
+       int thr;
 
        if (!cli_has_level(appctx, ACCESS_LVL_ADMIN))
                return 1;
@@ -3445,21 +3467,24 @@ static int cli_parse_shutdown_session(char **args, char *payload, struct appctx
                return cli_err(appctx, "Session pointer expected (use 'show sess').\n");
 
        ptr = (void *)strtoul(args[2], NULL, 0);
+       strm = NULL;
 
        thread_isolate();
 
        /* first, look for the requested stream in the stream table */
-       list_for_each_entry(strm, &streams, list) {
-               if (strm == ptr) {
-                       stream_shutdown(strm, SF_ERR_KILLED);
-                       break;
+       for (thr = 0; !strm && thr < global.nbthread; thr++) {
+               list_for_each_entry(strm, &ha_thread_info[thr].streams, list) {
+                       if (strm == ptr) {
+                               stream_shutdown(strm, SF_ERR_KILLED);
+                               break;
+                       }
                }
        }
 
        thread_release();
 
        /* do we have the stream ? */
-       if (strm != ptr)
+       if (!strm)
                return cli_err(appctx, "No such session (use 'show sess').\n");
 
        return 1;