]> git.ipfire.org Git - thirdparty/kernel/linux.git/commitdiff
selftests: ublk: kublk: decouple ublk_queues from ublk server threads
authorUday Shankar <ushankar@purestorage.com>
Thu, 29 May 2025 23:47:15 +0000 (17:47 -0600)
committerJens Axboe <axboe@kernel.dk>
Sat, 31 May 2025 20:38:43 +0000 (14:38 -0600)
Add support in kublk for decoupled ublk_queues and ublk server threads.
kublk now has two modes of operation:

- (preexisting mode) threads and queues are paired 1:1, and each thread
  services all the I/Os of one queue
- (new mode) thread and queue counts are independently configurable.
  threads service I/Os in a way that balances load across threads even
  if load is not balanced over queues.

The default is the preexisting mode. The new mode is activated by
passing the --per_io_tasks flag.

Signed-off-by: Uday Shankar <ushankar@purestorage.com>
Reviewed-by: Ming Lei <ming.lei@redhat.com>
Link: https://lore.kernel.org/r/20250529-ublk_task_per_io-v8-6-e9d3b119336a@purestorage.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
tools/testing/selftests/ublk/file_backed.c
tools/testing/selftests/ublk/kublk.c
tools/testing/selftests/ublk/kublk.h
tools/testing/selftests/ublk/null.c
tools/testing/selftests/ublk/stripe.c

index 922a87108b9f7bae53098e74602c7b1f3e0246bc..cfa59b631693793465f0e6909a6fbe1a364f4523 100644 (file)
@@ -54,7 +54,7 @@ static int loop_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_de
 
        ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 3);
 
-       io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+       io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
        sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
        sqe[0]->user_data = build_user_data(tag,
                        ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
@@ -66,7 +66,7 @@ static int loop_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_de
        sqe[1]->flags |= IOSQE_FIXED_FILE | IOSQE_IO_HARDLINK;
        sqe[1]->user_data = build_user_data(tag, ublk_op, 0, q->q_id, 1);
 
-       io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, tag);
+       io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
        sqe[2]->user_data = build_user_data(tag, ublk_cmd_op_nr(sqe[2]->cmd_op), 0, q->q_id, 1);
 
        return 2;
index 40431a8357a8f74d7d62e271e9090c8708c3ecc5..a98e14e4c245965d817b93843ff9a4011291223b 100644 (file)
@@ -505,8 +505,11 @@ static int ublk_thread_init(struct ublk_thread *t)
        }
 
        if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_AUTO_BUF_REG)) {
+               unsigned nr_ios = dev->dev_info.queue_depth * dev->dev_info.nr_hw_queues;
+               unsigned max_nr_ios_per_thread = nr_ios / dev->nthreads;
+               max_nr_ios_per_thread += !!(nr_ios % dev->nthreads);
                ret = io_uring_register_buffers_sparse(
-                       &t->ring, dev->dev_info.queue_depth);
+                       &t->ring, max_nr_ios_per_thread);
                if (ret) {
                        ublk_err("ublk dev %d thread %d register spare buffers failed %d",
                                        dev->dev_info.dev_id, t->idx, ret);
@@ -578,7 +581,7 @@ static void ublk_set_auto_buf_reg(const struct ublk_queue *q,
        if (q->tgt_ops->buf_index)
                buf.index = q->tgt_ops->buf_index(q, tag);
        else
-               buf.index = tag;
+               buf.index = q->ios[tag].buf_index;
 
        if (q->state & UBLKSRV_AUTO_BUF_REG_FALLBACK)
                buf.flags = UBLK_AUTO_BUF_REG_FALLBACK;
@@ -660,18 +663,44 @@ int ublk_queue_io_cmd(struct ublk_io *io)
 
 static void ublk_submit_fetch_commands(struct ublk_thread *t)
 {
-       /*
-        * Service exclusively the queue whose q_id matches our thread
-        * index. This may change in the future.
-        */
-       struct ublk_queue *q = &t->dev->q[t->idx];
+       struct ublk_queue *q;
        struct ublk_io *io;
-       int i = 0;
+       int i = 0, j = 0;
 
-       for (i = 0; i < q->q_depth; i++) {
-               io = &q->ios[i];
-               io->t = t;
-               ublk_queue_io_cmd(io);
+       if (t->dev->per_io_tasks) {
+               /*
+                * Lexicographically order all the (qid,tag) pairs, with
+                * qid taking priority (so (1,0) > (0,1)). Then make
+                * this thread the daemon for every Nth entry in this
+                * list (N is the number of threads), starting at this
+                * thread's index. This ensures that each queue is
+                * handled by as many ublk server threads as possible,
+                * so that load that is concentrated on one or a few
+                * queues can make use of all ublk server threads.
+                */
+               const struct ublksrv_ctrl_dev_info *dinfo = &t->dev->dev_info;
+               int nr_ios = dinfo->nr_hw_queues * dinfo->queue_depth;
+               for (i = t->idx; i < nr_ios; i += t->dev->nthreads) {
+                       int q_id = i / dinfo->queue_depth;
+                       int tag = i % dinfo->queue_depth;
+                       q = &t->dev->q[q_id];
+                       io = &q->ios[tag];
+                       io->t = t;
+                       io->buf_index = j++;
+                       ublk_queue_io_cmd(io);
+               }
+       } else {
+               /*
+                * Service exclusively the queue whose q_id matches our
+                * thread index.
+                */
+               struct ublk_queue *q = &t->dev->q[t->idx];
+               for (i = 0; i < q->q_depth; i++) {
+                       io = &q->ios[i];
+                       io->t = t;
+                       io->buf_index = i;
+                       ublk_queue_io_cmd(io);
+               }
        }
 }
 
@@ -826,7 +855,8 @@ static void *ublk_io_handler_fn(void *data)
                return NULL;
        }
        /* IO perf is sensitive with queue pthread affinity on NUMA machine*/
-       ublk_thread_set_sched_affinity(t, info->affinity);
+       if (info->affinity)
+               ublk_thread_set_sched_affinity(t, info->affinity);
        sem_post(info->ready);
 
        ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %u started\n",
@@ -893,7 +923,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
 
        ublk_dbg(UBLK_DBG_DEV, "%s enter\n", __func__);
 
-       tinfo = calloc(sizeof(struct ublk_thread_info), dinfo->nr_hw_queues);
+       tinfo = calloc(sizeof(struct ublk_thread_info), dev->nthreads);
        if (!tinfo)
                return -ENOMEM;
 
@@ -919,17 +949,29 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
                                 dinfo->dev_id, i);
                        goto fail;
                }
+       }
 
+       for (i = 0; i < dev->nthreads; i++) {
                tinfo[i].dev = dev;
                tinfo[i].idx = i;
                tinfo[i].ready = &ready;
-               tinfo[i].affinity = &affinity_buf[i];
+
+               /*
+                * If threads are not tied 1:1 to queues, setting thread
+                * affinity based on queue affinity makes little sense.
+                * However, thread CPU affinity has significant impact
+                * on performance, so to compare fairly, we'll still set
+                * thread CPU affinity based on queue affinity where
+                * possible.
+                */
+               if (dev->nthreads == dinfo->nr_hw_queues)
+                       tinfo[i].affinity = &affinity_buf[i];
                pthread_create(&dev->threads[i].thread, NULL,
                                ublk_io_handler_fn,
                                &tinfo[i]);
        }
 
-       for (i = 0; i < dinfo->nr_hw_queues; i++)
+       for (i = 0; i < dev->nthreads; i++)
                sem_wait(&ready);
        free(tinfo);
        free(affinity_buf);
@@ -953,7 +995,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
                ublk_send_dev_event(ctx, dev, dev->dev_info.dev_id);
 
        /* wait until we are terminated */
-       for (i = 0; i < dinfo->nr_hw_queues; i++)
+       for (i = 0; i < dev->nthreads; i++)
                pthread_join(dev->threads[i].thread, &thread_ret);
  fail:
        for (i = 0; i < dinfo->nr_hw_queues; i++)
@@ -1063,6 +1105,7 @@ wait:
 
 static int __cmd_dev_add(const struct dev_ctx *ctx)
 {
+       unsigned nthreads = ctx->nthreads;
        unsigned nr_queues = ctx->nr_hw_queues;
        const char *tgt_type = ctx->tgt_type;
        unsigned depth = ctx->queue_depth;
@@ -1086,6 +1129,23 @@ static int __cmd_dev_add(const struct dev_ctx *ctx)
                return -EINVAL;
        }
 
+       /* default to 1:1 threads:queues if nthreads is unspecified */
+       if (!nthreads)
+               nthreads = nr_queues;
+
+       if (nthreads > UBLK_MAX_THREADS) {
+               ublk_err("%s: %u is too many threads (max %u)\n",
+                               __func__, nthreads, UBLK_MAX_THREADS);
+               return -EINVAL;
+       }
+
+       if (nthreads != nr_queues && !ctx->per_io_tasks) {
+               ublk_err("%s: threads %u must be same as queues %u if "
+                       "not using per_io_tasks\n",
+                       __func__, nthreads, nr_queues);
+               return -EINVAL;
+       }
+
        dev = ublk_ctrl_init();
        if (!dev) {
                ublk_err("%s: can't alloc dev id %d, type %s\n",
@@ -1109,6 +1169,8 @@ static int __cmd_dev_add(const struct dev_ctx *ctx)
        if ((features & UBLK_F_QUIESCE) &&
                        (info->flags & UBLK_F_USER_RECOVERY))
                info->flags |= UBLK_F_QUIESCE;
+       dev->nthreads = nthreads;
+       dev->per_io_tasks = ctx->per_io_tasks;
        dev->tgt.ops = ops;
        dev->tgt.sq_depth = depth;
        dev->tgt.cq_depth = depth;
@@ -1307,6 +1369,7 @@ static int cmd_dev_get_features(void)
                [const_ilog2(UBLK_F_UPDATE_SIZE)] = "UPDATE_SIZE",
                [const_ilog2(UBLK_F_AUTO_BUF_REG)] = "AUTO_BUF_REG",
                [const_ilog2(UBLK_F_QUIESCE)] = "QUIESCE",
+               [const_ilog2(UBLK_F_PER_IO_DAEMON)] = "PER_IO_DAEMON",
        };
        struct ublk_dev *dev;
        __u64 features = 0;
@@ -1401,8 +1464,10 @@ static void __cmd_create_help(char *exe, bool recovery)
                        exe, recovery ? "recover" : "add");
        printf("\t[--foreground] [--quiet] [-z] [--auto_zc] [--auto_zc_fallback] [--debug_mask mask] [-r 0|1 ] [-g]\n");
        printf("\t[-e 0|1 ] [-i 0|1]\n");
+       printf("\t[--nthreads threads] [--per_io_tasks]\n");
        printf("\t[target options] [backfile1] [backfile2] ...\n");
        printf("\tdefault: nr_queues=2(max 32), depth=128(max 1024), dev_id=-1(auto allocation)\n");
+       printf("\tdefault: nthreads=nr_queues");
 
        for (i = 0; i < sizeof(tgt_ops_list) / sizeof(tgt_ops_list[0]); i++) {
                const struct ublk_tgt_ops *ops = tgt_ops_list[i];
@@ -1459,6 +1524,8 @@ int main(int argc, char *argv[])
                { "auto_zc",            0,      NULL,  0 },
                { "auto_zc_fallback",   0,      NULL,  0 },
                { "size",               1,      NULL, 's'},
+               { "nthreads",           1,      NULL,  0 },
+               { "per_io_tasks",       0,      NULL,  0 },
                { 0, 0, 0, 0 }
        };
        const struct ublk_tgt_ops *ops = NULL;
@@ -1534,6 +1601,10 @@ int main(int argc, char *argv[])
                                ctx.flags |= UBLK_F_AUTO_BUF_REG;
                        if (!strcmp(longopts[option_idx].name, "auto_zc_fallback"))
                                ctx.auto_zc_fallback = 1;
+                       if (!strcmp(longopts[option_idx].name, "nthreads"))
+                               ctx.nthreads = strtol(optarg, NULL, 10);
+                       if (!strcmp(longopts[option_idx].name, "per_io_tasks"))
+                               ctx.per_io_tasks = 1;
                        break;
                case '?':
                        /*
index 3a2ae095bee18633acd5a9c923cfab2d14fe3bff..6be601536b3d2c095654da5fba0cf13a51142052 100644 (file)
@@ -80,6 +80,7 @@ struct dev_ctx {
        char tgt_type[16];
        unsigned long flags;
        unsigned nr_hw_queues;
+       unsigned short nthreads;
        unsigned queue_depth;
        int dev_id;
        int nr_files;
@@ -89,6 +90,7 @@ struct dev_ctx {
        unsigned int    fg:1;
        unsigned int    recovery:1;
        unsigned int    auto_zc_fallback:1;
+       unsigned int    per_io_tasks:1;
 
        int _evtfd;
        int _shmid;
@@ -131,6 +133,7 @@ struct ublk_io {
 
        int result;
 
+       unsigned short buf_index;
        unsigned short tgt_ios;
        void *private_data;
        struct ublk_thread *t;
@@ -203,6 +206,8 @@ struct ublk_dev {
        struct ublksrv_ctrl_dev_info  dev_info;
        struct ublk_queue q[UBLK_MAX_QUEUES];
        struct ublk_thread threads[UBLK_MAX_THREADS];
+       unsigned nthreads;
+       unsigned per_io_tasks;
 
        int fds[MAX_BACK_FILES + 1];    /* fds[0] points to /dev/ublkcN */
        int nr_fds;
index 9acc7e0d271b5ae52d6d31587cc5bfb63b19778d..afe0b99d77eec74acae04952a9af5348252bc599 100644 (file)
@@ -62,7 +62,7 @@ static int null_queue_zc_io(struct ublk_queue *q, int tag)
 
        ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 3);
 
-       io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+       io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
        sqe[0]->user_data = build_user_data(tag,
                        ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
        sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
@@ -70,7 +70,7 @@ static int null_queue_zc_io(struct ublk_queue *q, int tag)
        __setup_nop_io(tag, iod, sqe[1], q->q_id);
        sqe[1]->flags |= IOSQE_IO_HARDLINK;
 
-       io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, tag);
+       io_uring_prep_buf_unregister(sqe[2], 0, tag, q->q_id, ublk_get_io(q, tag)->buf_index);
        sqe[2]->user_data = build_user_data(tag, ublk_cmd_op_nr(sqe[2]->cmd_op), 0, q->q_id, 1);
 
        // buf register is marked as IOSQE_CQE_SKIP_SUCCESS
@@ -136,7 +136,7 @@ static unsigned short ublk_null_buf_index(const struct ublk_queue *q, int tag)
 {
        if (q->state & UBLKSRV_AUTO_BUF_REG_FALLBACK)
                return (unsigned short)-1;
-       return tag;
+       return q->ios[tag].buf_index;
 }
 
 const struct ublk_tgt_ops null_tgt_ops = {
index 97079c3121ef8d4edc71891a289dd40658ce3f2a..37d50bbf5f5e86a520efedc9228510f8e1273625 100644 (file)
@@ -141,7 +141,7 @@ static int stripe_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_
        ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, s->nr + extra);
 
        if (zc) {
-               io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, tag);
+               io_uring_prep_buf_register(sqe[0], 0, tag, q->q_id, io->buf_index);
                sqe[0]->flags |= IOSQE_CQE_SKIP_SUCCESS | IOSQE_IO_HARDLINK;
                sqe[0]->user_data = build_user_data(tag,
                        ublk_cmd_op_nr(sqe[0]->cmd_op), 0, q->q_id, 1);
@@ -167,7 +167,7 @@ static int stripe_queue_tgt_rw_io(struct ublk_queue *q, const struct ublksrv_io_
        if (zc) {
                struct io_uring_sqe *unreg = sqe[s->nr + 1];
 
-               io_uring_prep_buf_unregister(unreg, 0, tag, q->q_id, tag);
+               io_uring_prep_buf_unregister(unreg, 0, tag, q->q_id, io->buf_index);
                unreg->user_data = build_user_data(
                        tag, ublk_cmd_op_nr(unreg->cmd_op), 0, q->q_id, 1);
        }