selftests: ublk: add batch buffer management infrastructure

Add the foundational infrastructure for UBLK_F_BATCH_IO buffer
management including:

- Allocator utility functions for small sized per-thread allocation
- Batch buffer allocation and deallocation functions
- Buffer index management for commit buffers
- Thread state management for batch I/O mode
- Buffer size calculation based on device features

This prepares the groundwork for handling batch I/O commands by
establishing the buffer management layer needed for UBLK_U_IO_PREP_IO_CMDS
and UBLK_U_IO_COMMIT_IO_CMDS operations.

The allocator uses CPU sets for efficient per-thread buffer tracking,
and commit buffers are pre-allocated with 2 buffers per thread to handle
overlapping command operations.

Signed-off-by: Ming Lei <ming.lei@redhat.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
Ming Lei 2026-01-16 22:18:51 +08:00 committed by Jens Axboe
parent f1d621b5a0
commit dccbfa9d41
4 changed files with 282 additions and 3 deletions

View File

@ -0,0 +1,152 @@
/* SPDX-License-Identifier: MIT */
/*
* Description: UBLK_F_BATCH_IO buffer management
*/
#include "kublk.h"
static inline void *ublk_get_commit_buf(struct ublk_thread *t,
unsigned short buf_idx)
{
unsigned idx;
if (buf_idx < t->commit_buf_start ||
buf_idx >= t->commit_buf_start + t->nr_commit_buf)
return NULL;
idx = buf_idx - t->commit_buf_start;
return t->commit_buf + idx * t->commit_buf_size;
}
/*
* Allocate one buffer for UBLK_U_IO_PREP_IO_CMDS or UBLK_U_IO_COMMIT_IO_CMDS
*
* Buffer index is returned.
*/
static inline unsigned short ublk_alloc_commit_buf(struct ublk_thread *t)
{
int idx = allocator_get(&t->commit_buf_alloc);
if (idx >= 0)
return idx + t->commit_buf_start;
return UBLKS_T_COMMIT_BUF_INV_IDX;
}
/*
* Free one commit buffer which is used by UBLK_U_IO_PREP_IO_CMDS or
* UBLK_U_IO_COMMIT_IO_CMDS
*/
static inline void ublk_free_commit_buf(struct ublk_thread *t,
unsigned short i)
{
unsigned short idx = i - t->commit_buf_start;
ublk_assert(idx < t->nr_commit_buf);
ublk_assert(allocator_get_val(&t->commit_buf_alloc, idx) != 0);
allocator_put(&t->commit_buf_alloc, idx);
}
static unsigned char ublk_commit_elem_buf_size(struct ublk_dev *dev)
{
if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_USER_COPY |
UBLK_F_AUTO_BUF_REG))
return 8;
/* one extra 8bytes for carrying buffer address */
return 16;
}
static unsigned ublk_commit_buf_size(struct ublk_thread *t)
{
struct ublk_dev *dev = t->dev;
unsigned elem_size = ublk_commit_elem_buf_size(dev);
unsigned int total = elem_size * dev->dev_info.queue_depth;
unsigned int page_sz = getpagesize();
return round_up(total, page_sz);
}
static void free_batch_commit_buf(struct ublk_thread *t)
{
if (t->commit_buf) {
unsigned buf_size = ublk_commit_buf_size(t);
unsigned int total = buf_size * t->nr_commit_buf;
munlock(t->commit_buf, total);
free(t->commit_buf);
}
allocator_deinit(&t->commit_buf_alloc);
}
static int alloc_batch_commit_buf(struct ublk_thread *t)
{
unsigned buf_size = ublk_commit_buf_size(t);
unsigned int total = buf_size * t->nr_commit_buf;
unsigned int page_sz = getpagesize();
void *buf = NULL;
int ret;
allocator_init(&t->commit_buf_alloc, t->nr_commit_buf);
t->commit_buf = NULL;
ret = posix_memalign(&buf, page_sz, total);
if (ret || !buf)
goto fail;
t->commit_buf = buf;
/* lock commit buffer pages for fast access */
if (mlock(t->commit_buf, total))
ublk_err("%s: can't lock commit buffer %s\n", __func__,
strerror(errno));
return 0;
fail:
free_batch_commit_buf(t);
return ret;
}
void ublk_batch_prepare(struct ublk_thread *t)
{
/*
* We only handle single device in this thread context.
*
* All queues have same feature flags, so use queue 0's for
* calculate uring_cmd flags.
*
* This way looks not elegant, but it works so far.
*/
struct ublk_queue *q = &t->dev->q[0];
t->commit_buf_elem_size = ublk_commit_elem_buf_size(t->dev);
t->commit_buf_size = ublk_commit_buf_size(t);
t->commit_buf_start = t->nr_bufs;
t->nr_commit_buf = 2;
t->nr_bufs += t->nr_commit_buf;
t->cmd_flags = 0;
if (ublk_queue_use_auto_zc(q)) {
if (ublk_queue_auto_zc_fallback(q))
t->cmd_flags |= UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK;
} else if (!ublk_queue_no_buf(q))
t->cmd_flags |= UBLK_BATCH_F_HAS_BUF_ADDR;
t->state |= UBLKS_T_BATCH_IO;
ublk_log("%s: thread %d commit(nr_bufs %u, buf_size %u, start %u)\n",
__func__, t->idx,
t->nr_commit_buf, t->commit_buf_size,
t->nr_bufs);
}
int ublk_batch_alloc_buf(struct ublk_thread *t)
{
ublk_assert(t->nr_commit_buf < 16);
return alloc_batch_commit_buf(t);
}
void ublk_batch_free_buf(struct ublk_thread *t)
{
free_batch_commit_buf(t);
}

View File

@ -435,6 +435,8 @@ static void ublk_thread_deinit(struct ublk_thread *t)
{
io_uring_unregister_buffers(&t->ring);
ublk_batch_free_buf(t);
io_uring_unregister_ring_fd(&t->ring);
if (t->ring.ring_fd > 0) {
@ -531,15 +533,33 @@ static int ublk_thread_init(struct ublk_thread *t, unsigned long long extra_flag
unsigned nr_ios = dev->dev_info.queue_depth * dev->dev_info.nr_hw_queues;
unsigned max_nr_ios_per_thread = nr_ios / dev->nthreads;
max_nr_ios_per_thread += !!(nr_ios % dev->nthreads);
ret = io_uring_register_buffers_sparse(
&t->ring, max_nr_ios_per_thread);
t->nr_bufs = max_nr_ios_per_thread;
} else {
t->nr_bufs = 0;
}
if (ublk_dev_batch_io(dev))
ublk_batch_prepare(t);
if (t->nr_bufs) {
ret = io_uring_register_buffers_sparse(&t->ring, t->nr_bufs);
if (ret) {
ublk_err("ublk dev %d thread %d register spare buffers failed %d",
ublk_err("ublk dev %d thread %d register spare buffers failed %d\n",
dev->dev_info.dev_id, t->idx, ret);
goto fail;
}
}
if (ublk_dev_batch_io(dev)) {
ret = ublk_batch_alloc_buf(t);
if (ret) {
ublk_err("ublk dev %d thread %d alloc batch buf failed %d\n",
dev->dev_info.dev_id, t->idx, ret);
goto fail;
}
}
io_uring_register_ring_fd(&t->ring);
if (flags & UBLKS_Q_NO_UBLK_FIXED_FD) {

View File

@ -182,15 +182,40 @@ struct ublk_queue {
struct ublk_io ios[UBLK_QUEUE_DEPTH];
};
/* align with `ublk_elem_header` */
struct ublk_batch_elem {
__u16 tag;
__u16 buf_index;
__s32 result;
__u64 buf_addr;
};
struct ublk_thread {
struct ublk_dev *dev;
unsigned idx;
#define UBLKS_T_STOPPING (1U << 0)
#define UBLKS_T_IDLE (1U << 1)
#define UBLKS_T_BATCH_IO (1U << 31) /* readonly */
unsigned state;
unsigned int cmd_inflight;
unsigned int io_inflight;
unsigned short nr_bufs;
/* followings are for BATCH_IO */
unsigned short commit_buf_start;
unsigned char commit_buf_elem_size;
/*
* We just support single device, so pre-calculate commit/prep flags
*/
unsigned short cmd_flags;
unsigned int nr_commit_buf;
unsigned int commit_buf_size;
void *commit_buf;
#define UBLKS_T_COMMIT_BUF_INV_IDX ((unsigned short)-1)
struct allocator commit_buf_alloc;
struct io_uring ring;
};
@ -211,6 +236,27 @@ struct ublk_dev {
extern int ublk_queue_io_cmd(struct ublk_thread *t, struct ublk_io *io);
static inline int __ublk_use_batch_io(__u64 flags)
{
return flags & UBLK_F_BATCH_IO;
}
static inline int ublk_queue_batch_io(const struct ublk_queue *q)
{
return __ublk_use_batch_io(q->flags);
}
static inline int ublk_dev_batch_io(const struct ublk_dev *dev)
{
return __ublk_use_batch_io(dev->dev_info.flags);
}
/* only work for handle single device in this pthread context */
static inline int ublk_thread_batch_io(const struct ublk_thread *t)
{
return t->state & UBLKS_T_BATCH_IO;
}
static inline void ublk_set_integrity_params(const struct dev_ctx *ctx,
struct ublk_params *params)
{
@ -465,6 +511,13 @@ static inline int ublk_queue_no_buf(const struct ublk_queue *q)
return ublk_queue_use_zc(q) || ublk_queue_use_auto_zc(q);
}
/* Initialize batch I/O state and calculate buffer parameters */
void ublk_batch_prepare(struct ublk_thread *t);
/* Allocate and register commit buffers for batch operations */
int ublk_batch_alloc_buf(struct ublk_thread *t);
/* Free commit buffers and cleanup batch allocator */
void ublk_batch_free_buf(struct ublk_thread *t);
extern const struct ublk_tgt_ops null_tgt_ops;
extern const struct ublk_tgt_ops loop_tgt_ops;
extern const struct ublk_tgt_ops stripe_tgt_ops;

View File

@ -21,6 +21,60 @@
#define round_up(val, rnd) \
(((val) + ((rnd) - 1)) & ~((rnd) - 1))
/* small sized & per-thread allocator */
struct allocator {
unsigned int size;
cpu_set_t *set;
};
static inline int allocator_init(struct allocator *a, unsigned size)
{
a->set = CPU_ALLOC(size);
a->size = size;
if (a->set)
return 0;
return -ENOMEM;
}
static inline void allocator_deinit(struct allocator *a)
{
CPU_FREE(a->set);
a->set = NULL;
a->size = 0;
}
static inline int allocator_get(struct allocator *a)
{
int i;
for (i = 0; i < a->size; i += 1) {
size_t set_size = CPU_ALLOC_SIZE(a->size);
if (!CPU_ISSET_S(i, set_size, a->set)) {
CPU_SET_S(i, set_size, a->set);
return i;
}
}
return -1;
}
static inline void allocator_put(struct allocator *a, int i)
{
size_t set_size = CPU_ALLOC_SIZE(a->size);
if (i >= 0 && i < a->size)
CPU_CLR_S(i, set_size, a->set);
}
static inline int allocator_get_val(struct allocator *a, int i)
{
size_t set_size = CPU_ALLOC_SIZE(a->size);
return CPU_ISSET_S(i, set_size, a->set);
}
static inline unsigned int ilog2(unsigned int x)
{
if (x == 0)