]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: threads/stream: Make streams list thread safe
authorEmeric Brun <ebrun@haproxy.com>
Fri, 30 Jun 2017 14:23:45 +0000 (16:23 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:32 +0000 (13:58 +0100)
Adds a global lock to protect the full streams list used to dump
sessions on stats socket.

include/common/hathreads.h
src/stream.c

index 1717cc9b78aa16f8a3862834c7f982b95ef8d1c2..81c2aad4a1d1c38fb99a20389f86b827b1c3c5e8 100644 (file)
@@ -157,6 +157,7 @@ enum lock_label {
        APPLETS_LOCK,
        PEER_LOCK,
        BUF_WQ_LOCK,
+       STRMS_LOCK,
        LOCK_LABELS
 };
 struct lock_stat {
@@ -243,7 +244,7 @@ static inline void show_lock_stats()
                                           "TASK_RQ", "TASK_WQ", "POOL",
                                           "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
                                           "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
-                                          "APPLETS", "PEER", "BUF_WQ" };
+                                          "APPLETS", "PEER", "BUF_WQ", "STREAMS" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
@@ -524,7 +525,6 @@ static inline void __spin_unlock(enum lock_label lbl, struct ha_spinlock *l,
        l->info.last_location.line     = line;
 
        __RWLOCK_WRUNLOCK(&l->lock);
-
        HA_ATOMIC_ADD(&lock_stats[lbl].num_write_unlocked, 1);
 }
 
index 51d235454ff506b5659230cfde9f87dfe6dea5c4..889908f4e6f788b0629ebde390a10976d4b4f9ed 100644 (file)
@@ -63,6 +63,9 @@
 struct pool_head *pool2_stream;
 struct list streams;
 
+#ifdef USE_THREAD
+HA_SPINLOCK_T streams_lock;
+#endif
 /* List of all use-service keywords. */
 static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
 
@@ -154,7 +157,6 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
        s->uniq_id = global.req_count++;
 
        /* OK, we're keeping the stream, so let's properly initialize the stream */
-       LIST_ADDQ(&streams, &s->list);
        LIST_INIT(&s->back_refs);
 
        LIST_INIT(&s->buffer_wait.list);
@@ -251,6 +253,10 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
        s->txn = NULL;
        s->hlua = NULL;
 
+       SPIN_LOCK(STRMS_LOCK, &streams_lock);
+       LIST_ADDQ(&streams, &s->list);
+       SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+
        if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
                goto out_fail_accept;
 
@@ -369,6 +375,7 @@ static void stream_free(struct stream *s)
 
        stream_store_counters(s);
 
+       SPIN_LOCK(STRMS_LOCK, &streams_lock);
        list_for_each_entry_safe(bref, back, &s->back_refs, users) {
                /* we have to unlink all watchers. We must not relink them if
                 * this stream was the last one in the list.
@@ -380,6 +387,8 @@ static void stream_free(struct stream *s)
                bref->ref = s->list.n;
        }
        LIST_DEL(&s->list);
+       SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+
        si_release_endpoint(&s->si[1]);
        si_release_endpoint(&s->si[0]);
 
@@ -462,6 +471,7 @@ void stream_release_buffers(struct stream *s)
 int init_stream()
 {
        LIST_INIT(&streams);
+       SPIN_INIT(&streams_lock);
        pool2_stream = create_pool("stream", sizeof(struct stream), MEM_F_SHARED);
        return pool2_stream != NULL;
 }
@@ -3039,11 +3049,14 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
                 * pointer points back to the head of the streams list.
                 */
                LIST_INIT(&appctx->ctx.sess.bref.users);
+               SPIN_LOCK(STRMS_LOCK, &streams_lock);
                appctx->ctx.sess.bref.ref = streams.n;
+               SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
                appctx->st2 = STAT_ST_LIST;
                /* fall through */
 
        case STAT_ST_LIST:
+               SPIN_LOCK(STRMS_LOCK, &streams_lock);
                /* first, let's detach the back-ref from a possible previous stream */
                if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users)) {
                        LIST_DEL(&appctx->ctx.sess.bref.users);
@@ -3063,8 +3076,10 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
 
                                LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users);
                                /* call the proper dump() function and return if we're missing space */
-                               if (!stats_dump_full_strm_to_buffer(si, curr_strm))
+                               if (!stats_dump_full_strm_to_buffer(si, curr_strm)) {
+                                       SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
                                        return 0;
+                               }
 
                                /* stream dump complete */
                                LIST_DEL(&appctx->ctx.sess.bref.users);
@@ -3190,6 +3205,7 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
                                 */
                                si_applet_cant_put(si);
                                LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users);
+                               SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
                                return 0;
                        }
 
@@ -3211,9 +3227,11 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
 
                        appctx->ctx.sess.target = NULL;
                        appctx->ctx.sess.uid = 0;
+                       SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
                        return 1;
                }
 
+               SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
                appctx->st2 = STAT_ST_FIN;
                /* fall through */
 
@@ -3226,8 +3244,10 @@ static int cli_io_handler_dump_sess(struct appctx *appctx)
 static void cli_release_show_sess(struct appctx *appctx)
 {
        if (appctx->st2 == STAT_ST_LIST) {
+               SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
                if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users))
                        LIST_DEL(&appctx->ctx.sess.bref.users);
+               SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
        }
 }