free(t->commit_buf);
}
allocator_deinit(&t->commit_buf_alloc);
+ free(t->commit);
}
static int alloc_batch_commit_buf(struct ublk_thread *t)
unsigned int total = buf_size * t->nr_commit_buf;
unsigned int page_sz = getpagesize();
void *buf = NULL;
- int ret;
+ int i, ret, j = 0;
+
+ t->commit = calloc(t->nr_queues, sizeof(*t->commit));
+ for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) {
+ if (t->q_map[i])
+ t->commit[j++].q_id = i;
+ }
allocator_init(&t->commit_buf_alloc, t->nr_commit_buf);
return ret;
}
+static unsigned int ublk_thread_nr_queues(const struct ublk_thread *t)
+{
+ int i;
+ int ret = 0;
+
+ for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++)
+ ret += !!t->q_map[i];
+
+ return ret;
+}
+
void ublk_batch_prepare(struct ublk_thread *t)
{
/*
*/
struct ublk_queue *q = &t->dev->q[0];
+ /* cache nr_queues because we don't support dynamic load-balance yet */
+ t->nr_queues = ublk_thread_nr_queues(t);
+
t->commit_buf_elem_size = ublk_commit_elem_buf_size(t->dev);
t->commit_buf_size = ublk_commit_buf_size(t);
t->commit_buf_start = t->nr_bufs;
- t->nr_commit_buf = 2;
+ t->nr_commit_buf = 2 * t->nr_queues;
t->nr_bufs += t->nr_commit_buf;
t->cmd_flags = 0;
{
int i;
- for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++) {
+ for (i = 0; i < t->nr_fetch_bufs; i++) {
io_uring_free_buf_ring(&t->ring, t->fetch[i].br, 1, i);
munlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size);
free(t->fetch[i].fetch_buf);
}
+ free(t->fetch);
}
static int alloc_batch_fetch_buf(struct ublk_thread *t)
int ret;
int i = 0;
- for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++) {
+ /* double fetch buffer for each queue */
+ t->nr_fetch_bufs = t->nr_queues * 2;
+ t->fetch = calloc(t->nr_fetch_bufs, sizeof(*t->fetch));
+
+ /* allocate one buffer for each queue */
+ for (i = 0; i < t->nr_fetch_bufs; i++) {
t->fetch[i].fetch_buf_size = buf_size;
if (posix_memalign((void **)&t->fetch[i].fetch_buf, pg_sz,
{
int ret;
- ublk_assert(t->nr_commit_buf < 16);
+ ublk_assert(t->nr_commit_buf < 2 * UBLK_MAX_QUEUES);
ret = alloc_batch_commit_buf(t);
if (ret)
t->fetch[buf_idx].fetch_buf_off = 0;
}
-void ublk_batch_start_fetch(struct ublk_thread *t,
- struct ublk_queue *q)
+void ublk_batch_start_fetch(struct ublk_thread *t)
{
int i;
+ int j = 0;
+
+ for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) {
+ if (t->q_map[i]) {
+ struct ublk_queue *q = &t->dev->q[i];
- for (i = 0; i < UBLKS_T_NR_FETCH_BUF; i++)
- ublk_batch_queue_fetch(t, q, i);
+ /* submit two fetch commands for each queue */
+ ublk_batch_queue_fetch(t, q, j++);
+ ublk_batch_queue_fetch(t, q, j++);
+ }
+ }
}
static unsigned short ublk_compl_batch_fetch(struct ublk_thread *t,
return buf_idx;
}
-int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q)
+static int __ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q)
{
unsigned short nr_elem = q->q_depth;
unsigned short buf_idx = ublk_alloc_commit_buf(t);
return 0;
}
+int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q)
+{
+ int ret = 0;
+
+ pthread_spin_lock(&q->lock);
+ if (q->flags & UBLKS_Q_PREPARED)
+ goto unlock;
+ ret = __ublk_batch_queue_prep_io_cmds(t, q);
+ if (!ret)
+ q->flags |= UBLKS_Q_PREPARED;
+unlock:
+ pthread_spin_unlock(&q->lock);
+
+ return ret;
+}
+
static void ublk_batch_compl_commit_cmd(struct ublk_thread *t,
const struct io_uring_cqe *cqe,
unsigned op)
}
}
-void ublk_batch_commit_io_cmds(struct ublk_thread *t)
+static void __ublk_batch_commit_io_cmds(struct ublk_thread *t,
+ struct batch_commit_buf *cb)
{
struct io_uring_sqe *sqe;
unsigned short buf_idx;
- unsigned short nr_elem = t->commit.done;
+ unsigned short nr_elem = cb->done;
/* nothing to commit */
if (!nr_elem) {
- ublk_free_commit_buf(t, t->commit.buf_idx);
+ ublk_free_commit_buf(t, cb->buf_idx);
return;
}
ublk_io_alloc_sqes(t, &sqe, 1);
- buf_idx = t->commit.buf_idx;
- sqe->addr = (__u64)t->commit.elem;
+ buf_idx = cb->buf_idx;
+ sqe->addr = (__u64)cb->elem;
sqe->len = nr_elem * t->commit_buf_elem_size;
/* commit isn't per-queue command */
- ublk_init_batch_cmd(t, t->commit.q_id, sqe, UBLK_U_IO_COMMIT_IO_CMDS,
+ ublk_init_batch_cmd(t, cb->q_id, sqe, UBLK_U_IO_COMMIT_IO_CMDS,
t->commit_buf_elem_size, nr_elem, buf_idx);
ublk_setup_commit_sqe(t, sqe, buf_idx);
}
-static void ublk_batch_init_commit(struct ublk_thread *t,
- unsigned short buf_idx)
+void ublk_batch_commit_io_cmds(struct ublk_thread *t)
+{
+ int i;
+
+ for (i = 0; i < t->nr_queues; i++) {
+ struct batch_commit_buf *cb = &t->commit[i];
+
+ if (cb->buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX)
+ __ublk_batch_commit_io_cmds(t, cb);
+ }
+
+}
+
+static void __ublk_batch_init_commit(struct ublk_thread *t,
+ struct batch_commit_buf *cb,
+ unsigned short buf_idx)
{
/* so far only support 1:1 queue/thread mapping */
- t->commit.q_id = t->idx;
- t->commit.buf_idx = buf_idx;
- t->commit.elem = ublk_get_commit_buf(t, buf_idx);
- t->commit.done = 0;
- t->commit.count = t->commit_buf_size /
+ cb->buf_idx = buf_idx;
+ cb->elem = ublk_get_commit_buf(t, buf_idx);
+ cb->done = 0;
+ cb->count = t->commit_buf_size /
t->commit_buf_elem_size;
}
-void ublk_batch_prep_commit(struct ublk_thread *t)
+/* COMMIT_IO_CMDS is per-queue command, so use its own commit buffer */
+static void ublk_batch_init_commit(struct ublk_thread *t,
+ struct batch_commit_buf *cb)
{
unsigned short buf_idx = ublk_alloc_commit_buf(t);
ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX);
- ublk_batch_init_commit(t, buf_idx);
+ ublk_assert(!ublk_batch_commit_prepared(cb));
+
+ __ublk_batch_init_commit(t, cb, buf_idx);
+}
+
+void ublk_batch_prep_commit(struct ublk_thread *t)
+{
+ int i;
+
+ for (i = 0; i < t->nr_queues; i++)
+ t->commit[i].buf_idx = UBLKS_T_COMMIT_BUF_INV_IDX;
}
void ublk_batch_complete_io(struct ublk_thread *t, struct ublk_queue *q,
unsigned tag, int res)
{
- struct batch_commit_buf *cb = &t->commit;
- struct ublk_batch_elem *elem = (struct ublk_batch_elem *)(cb->elem +
- cb->done * t->commit_buf_elem_size);
+ unsigned q_t_idx = ublk_queue_idx_in_thread(t, q);
+ struct batch_commit_buf *cb = &t->commit[q_t_idx];
+ struct ublk_batch_elem *elem;
struct ublk_io *io = &q->ios[tag];
- ublk_assert(q->q_id == t->commit.q_id);
+ if (!ublk_batch_commit_prepared(cb))
+ ublk_batch_init_commit(t, cb);
+
+ ublk_assert(q->q_id == cb->q_id);
+ elem = (struct ublk_batch_elem *)(cb->elem + cb->done * t->commit_buf_elem_size);
elem->tag = tag;
elem->buf_index = ublk_batch_io_buf_idx(t, q, tag);
elem->result = res;
cb->done += 1;
ublk_assert(cb->done <= cb->count);
}
+
+void ublk_batch_setup_map(unsigned char (*q_thread_map)[UBLK_MAX_QUEUES],
+ int nthreads, int queues)
+{
+ int i, j;
+
+ /*
+ * Setup round-robin queue-to-thread mapping for arbitrary N:M combinations.
+ *
+ * This algorithm distributes queues across threads (and threads across queues)
+ * in a balanced round-robin fashion to ensure even load distribution.
+ *
+ * Examples:
+ * - 2 threads, 4 queues: T0=[Q0,Q2], T1=[Q1,Q3]
+ * - 4 threads, 2 queues: T0=[Q0], T1=[Q1], T2=[Q0], T3=[Q1]
+ * - 3 threads, 3 queues: T0=[Q0], T1=[Q1], T2=[Q2] (1:1 mapping)
+ *
+ * Phase 1: Mark which queues each thread handles (boolean mapping)
+ */
+ for (i = 0, j = 0; i < queues || j < nthreads; i++, j++) {
+ q_thread_map[j % nthreads][i % queues] = 1;
+ }
+
+ /*
+ * Phase 2: Convert boolean mapping to sequential indices within each thread.
+ *
+ * Transform from: q_thread_map[thread][queue] = 1 (handles queue)
+ * To: q_thread_map[thread][queue] = N (queue index within thread)
+ *
+ * This allows each thread to know the local index of each queue it handles,
+ * which is essential for buffer allocation and management. For example:
+ * - Thread 0 handling queues [0,2] becomes: q_thread_map[0][0]=1, q_thread_map[0][2]=2
+ * - Thread 1 handling queues [1,3] becomes: q_thread_map[1][1]=1, q_thread_map[1][3]=2
+ */
+ for (j = 0; j < nthreads; j++) {
+ unsigned char seq = 1;
+
+ for (i = 0; i < queues; i++) {
+ if (q_thread_map[j][i])
+ q_thread_map[j][i] = seq++;
+ }
+ }
+
+#if 0
+ for (j = 0; j < nthreads; j++) {
+ printf("thread %0d: ", j);
+ for (i = 0; i < queues; i++) {
+ if (q_thread_map[j][i])
+ printf("%03u ", i);
+ }
+ printf("\n");
+ }
+ printf("\n");
+ for (j = 0; j < nthreads; j++) {
+ for (i = 0; i < queues; i++) {
+ printf("%03u ", q_thread_map[j][i]);
+ }
+ printf("\n");
+ }
+#endif
+}
int cmd_buf_size, io_buf_size, integrity_size;
unsigned long off;
+ pthread_spin_init(&q->lock, PTHREAD_PROCESS_PRIVATE);
q->tgt_ops = dev->tgt.ops;
q->flags = 0;
q->q_depth = depth;
/* FETCH_IO_CMDS is multishot, so increase cq depth for BATCH_IO */
if (ublk_dev_batch_io(dev))
- cq_depth += dev->dev_info.queue_depth;
+ cq_depth += dev->dev_info.queue_depth * 2;
ret = ublk_setup_ring(&t->ring, ring_depth, cq_depth,
IORING_SETUP_COOP_TASKRUN |
sem_t *ready;
cpu_set_t *affinity;
unsigned long long extra_flags;
+ unsigned char (*q_thread_map)[UBLK_MAX_QUEUES];
};
static void ublk_thread_set_sched_affinity(const struct ublk_thread_info *info)
{
int i;
- /* setup all queues in the 1st thread */
for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) {
struct ublk_queue *q = &t->dev->q[i];
int ret;
+ /*
+ * Only prepare io commands in the mapped thread context,
+ * otherwise io command buffer index may not work as expected
+ */
+ if (t->q_map[i] == 0)
+ continue;
+
ret = ublk_batch_queue_prep_io_cmds(t, q);
- ublk_assert(ret == 0);
- ret = ublk_process_io(t);
ublk_assert(ret >= 0);
}
}
int dev_id = info->dev->dev_info.dev_id;
int ret;
+ /* Copy per-thread queue mapping into thread-local variable */
+ if (info->q_thread_map)
+ memcpy(t.q_map, info->q_thread_map[info->idx], sizeof(t.q_map));
+
ret = ublk_thread_init(&t, info->extra_flags);
if (ret) {
ublk_err("ublk dev %d thread %u init failed\n",
/* submit all io commands to ublk driver */
ublk_submit_fetch_commands(&t);
} else {
- struct ublk_queue *q = &t.dev->q[t.idx];
-
- /* prepare all io commands in the 1st thread context */
- if (!t.idx)
- ublk_batch_setup_queues(&t);
- ublk_batch_start_fetch(&t, q);
+ ublk_batch_setup_queues(&t);
+ ublk_batch_start_fetch(&t);
}
do {
struct ublk_thread_info *tinfo;
unsigned long long extra_flags = 0;
cpu_set_t *affinity_buf;
+ unsigned char (*q_thread_map)[UBLK_MAX_QUEUES] = NULL;
void *thread_ret;
sem_t ready;
int ret, i;
if (ret)
return ret;
+ if (ublk_dev_batch_io(dev)) {
+ q_thread_map = calloc(dev->nthreads, sizeof(*q_thread_map));
+ if (!q_thread_map) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+ ublk_batch_setup_map(q_thread_map, dev->nthreads,
+ dinfo->nr_hw_queues);
+ }
+
if (ctx->auto_zc_fallback)
extra_flags = UBLKS_Q_AUTO_BUF_REG_FALLBACK;
if (ctx->no_ublk_fixed_fd)
tinfo[i].idx = i;
tinfo[i].ready = &ready;
tinfo[i].extra_flags = extra_flags;
+ tinfo[i].q_thread_map = q_thread_map;
/*
* If threads are not tied 1:1 to queues, setting thread
for (i = 0; i < dev->nthreads; i++)
sem_wait(&ready);
free(affinity_buf);
+ free(q_thread_map);
/* everything is fine now, start us */
if (ctx->recovery)
goto fail;
}
- if (nthreads != nr_queues && !ctx->per_io_tasks) {
+ if (nthreads != nr_queues && (!ctx->per_io_tasks &&
+ !(ctx->flags & UBLK_F_BATCH_IO))) {
ublk_err("%s: threads %u must be same as queues %u if "
"not using per_io_tasks\n",
__func__, nthreads, nr_queues);
return -EINVAL;
}
+ if ((ctx.flags & UBLK_F_AUTO_BUF_REG) &&
+ (ctx.flags & UBLK_F_BATCH_IO) &&
+ (ctx.nthreads > ctx.nr_hw_queues)) {
+ ublk_err("too many threads for F_AUTO_BUF_REG & F_BATCH_IO\n");
+ return -EINVAL;
+ }
+
i = optind;
while (i < argc && ctx.nr_files < MAX_BACK_FILES) {
ctx.files[ctx.nr_files++] = argv[i++];
const struct ublk_tgt_ops *tgt_ops;
struct ublksrv_io_desc *io_cmd_buf;
-/* borrow one bit of ublk uapi flags, which may never be used */
+/* borrow three bit of ublk uapi flags, which may never be used */
#define UBLKS_Q_AUTO_BUF_REG_FALLBACK (1ULL << 63)
#define UBLKS_Q_NO_UBLK_FIXED_FD (1ULL << 62)
+#define UBLKS_Q_PREPARED (1ULL << 61)
__u64 flags;
int ublk_fd; /* cached ublk char device fd */
__u8 metadata_size;
struct ublk_io ios[UBLK_QUEUE_DEPTH];
+
+ /* used for prep io commands */
+ pthread_spinlock_t lock;
};
/* align with `ublk_elem_header` */
};
struct ublk_thread {
+ /* Thread-local copy of queue-to-thread mapping for this thread */
+ unsigned char q_map[UBLK_MAX_QUEUES];
+
struct ublk_dev *dev;
- unsigned idx;
+ unsigned short idx;
+ unsigned short nr_queues;
#define UBLKS_T_STOPPING (1U << 0)
#define UBLKS_T_IDLE (1U << 1)
void *commit_buf;
#define UBLKS_T_COMMIT_BUF_INV_IDX ((unsigned short)-1)
struct allocator commit_buf_alloc;
- struct batch_commit_buf commit;
+ struct batch_commit_buf *commit;
/* FETCH_IO_CMDS buffer */
-#define UBLKS_T_NR_FETCH_BUF 2
- struct batch_fetch_buf fetch[UBLKS_T_NR_FETCH_BUF];
+ unsigned short nr_fetch_bufs;
+ struct batch_fetch_buf *fetch;
struct io_uring ring;
};
return ublk_queue_use_zc(q) || ublk_queue_use_auto_zc(q);
}
+static inline int ublk_batch_commit_prepared(struct batch_commit_buf *cb)
+{
+ return cb->buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX;
+}
+
+static inline unsigned ublk_queue_idx_in_thread(const struct ublk_thread *t,
+ const struct ublk_queue *q)
+{
+ unsigned char idx;
+
+ idx = t->q_map[q->q_id];
+ ublk_assert(idx != 0);
+ return idx - 1;
+}
+
/*
* Each IO's buffer index has to be calculated by this helper for
* UBLKS_T_BATCH_IO
const struct ublk_thread *t, const struct ublk_queue *q,
unsigned tag)
{
- return tag;
+ return ublk_queue_idx_in_thread(t, q) * q->q_depth + tag;
}
/* Queue UBLK_U_IO_PREP_IO_CMDS for a specific queue with batch elements */
int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q);
/* Start fetching I/O commands using multishot UBLK_U_IO_FETCH_IO_CMDS */
-void ublk_batch_start_fetch(struct ublk_thread *t,
- struct ublk_queue *q);
+void ublk_batch_start_fetch(struct ublk_thread *t);
/* Handle completion of batch I/O commands (prep/commit) */
void ublk_batch_compl_cmd(struct ublk_thread *t,
const struct io_uring_cqe *cqe);
/* Add a completed I/O operation to the current batch commit buffer */
void ublk_batch_complete_io(struct ublk_thread *t, struct ublk_queue *q,
unsigned tag, int res);
+void ublk_batch_setup_map(unsigned char (*q_thread_map)[UBLK_MAX_QUEUES],
+ int nthreads, int queues);
static inline int ublk_complete_io(struct ublk_thread *t, struct ublk_queue *q,
unsigned tag, int res)