selftests/ublk: add shared memory zero-copy support in kublk

Add infrastructure for UBLK_F_SHMEM_ZC shared memory zero-copy:

- kublk.h: struct ublk_shmem_entry and table for tracking registered
  shared memory buffers
- kublk.c: per-device unix socket listener that accepts memfd
  registrations from clients via SCM_RIGHTS fd passing. The listener
  mmaps the memfd and registers the VA range with the kernel for PFN
  matching. Also adds --shmem_zc command line option.
- kublk.c: --htlb <path> option to open a pre-allocated hugetlbfs
  file, mmap it with MAP_SHARED|MAP_POPULATE, and register it with
  the kernel via ublk_ctrl_reg_buf(). Any process that mmaps the same
  hugetlbfs file shares the same physical pages, enabling zero-copy
  without socket-based fd passing.

Signed-off-by: Ming Lei <ming.lei@redhat.com>
Link: https://patch.msgid.link/20260331153207.3635125-6-ming.lei@redhat.com
Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
Ming Lei 2026-03-31 23:31:56 +08:00 committed by Jens Axboe
parent 8a34e88769
commit 166b476b8d
2 changed files with 352 additions and 2 deletions

View File

@ -4,6 +4,7 @@
*/
#include <linux/fs.h>
#include <sys/un.h>
#include "kublk.h"
#define MAX_NR_TGT_ARG 64
@ -1092,13 +1093,312 @@ static int ublk_send_dev_event(const struct dev_ctx *ctx, struct ublk_dev *dev,
}
/*
* Shared memory registration socket listener.
*
* The parent daemon context listens on a per-device unix socket at
* /run/ublk/ublkb<dev_id>.sock for shared memory registration requests
* from clients. Clients send a memfd via SCM_RIGHTS; the server
* registers it with the kernel, mmaps it, and returns the assigned index.
*/
#define UBLK_SHMEM_SOCK_DIR "/run/ublk"
/* defined in kublk.h, shared with file_backed.c (loop target) */
struct ublk_shmem_entry shmem_table[UBLK_BUF_MAX];
int shmem_count;
static void ublk_shmem_sock_path(int dev_id, char *buf, size_t len)
{
snprintf(buf, len, "%s/ublkb%d.sock", UBLK_SHMEM_SOCK_DIR, dev_id);
}
static int ublk_shmem_sock_create(int dev_id)
{
struct sockaddr_un addr = { .sun_family = AF_UNIX };
char path[108];
int fd;
mkdir(UBLK_SHMEM_SOCK_DIR, 0755);
ublk_shmem_sock_path(dev_id, path, sizeof(path));
unlink(path);
fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (fd < 0)
return -1;
snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", path);
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
close(fd);
return -1;
}
listen(fd, 4);
ublk_dbg(UBLK_DBG_DEV, "shmem socket created: %s\n", path);
return fd;
}
static void ublk_shmem_sock_destroy(int dev_id, int sock_fd)
{
char path[108];
if (sock_fd >= 0)
close(sock_fd);
ublk_shmem_sock_path(dev_id, path, sizeof(path));
unlink(path);
}
/* Receive a memfd from a client via SCM_RIGHTS */
static int ublk_shmem_recv_fd(int client_fd)
{
char buf[1];
struct iovec iov = { .iov_base = buf, .iov_len = sizeof(buf) };
union {
char cmsg_buf[CMSG_SPACE(sizeof(int))];
struct cmsghdr align;
} u;
struct msghdr msg = {
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = u.cmsg_buf,
.msg_controllen = sizeof(u.cmsg_buf),
};
struct cmsghdr *cmsg;
if (recvmsg(client_fd, &msg, 0) <= 0)
return -1;
cmsg = CMSG_FIRSTHDR(&msg);
if (!cmsg || cmsg->cmsg_level != SOL_SOCKET ||
cmsg->cmsg_type != SCM_RIGHTS)
return -1;
return *(int *)CMSG_DATA(cmsg);
}
/* Register a shared memory buffer: store fd, mmap it, return index */
static int ublk_shmem_register(int shmem_fd)
{
off_t size;
void *base;
int idx;
if (shmem_count >= UBLK_BUF_MAX)
return -1;
size = lseek(shmem_fd, 0, SEEK_END);
if (size <= 0)
return -1;
base = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED,
shmem_fd, 0);
if (base == MAP_FAILED)
return -1;
idx = shmem_count++;
shmem_table[idx].fd = shmem_fd;
shmem_table[idx].mmap_base = base;
shmem_table[idx].size = size;
ublk_dbg(UBLK_DBG_DEV, "shmem registered: index=%d fd=%d size=%zu\n",
idx, shmem_fd, (size_t)size);
return idx;
}
static void ublk_shmem_unregister_all(void)
{
int i;
for (i = 0; i < shmem_count; i++) {
if (shmem_table[i].mmap_base) {
munmap(shmem_table[i].mmap_base,
shmem_table[i].size);
close(shmem_table[i].fd);
shmem_table[i].mmap_base = NULL;
}
}
shmem_count = 0;
}
static int ublk_ctrl_reg_buf(struct ublk_dev *dev, void *addr, size_t size)
{
struct ublk_shmem_buf_reg buf_reg = {
.addr = (unsigned long)addr,
.len = size,
};
struct ublk_ctrl_cmd_data data = {
.cmd_op = UBLK_U_CMD_REG_BUF,
.flags = CTRL_CMD_HAS_BUF,
.addr = (unsigned long)&buf_reg,
.len = sizeof(buf_reg),
};
return __ublk_ctrl_cmd(dev, &data);
}
/*
* Handle one client connection: receive memfd, mmap it, register
* the VA range with kernel, send back the assigned index.
*/
static void ublk_shmem_handle_client(int sock_fd, struct ublk_dev *dev)
{
int client_fd, memfd, idx, ret;
int32_t reply;
off_t size;
void *base;
client_fd = accept(sock_fd, NULL, NULL);
if (client_fd < 0)
return;
memfd = ublk_shmem_recv_fd(client_fd);
if (memfd < 0) {
reply = -1;
goto out;
}
/* mmap the memfd in server address space */
size = lseek(memfd, 0, SEEK_END);
if (size <= 0) {
reply = -1;
close(memfd);
goto out;
}
base = mmap(NULL, size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, memfd, 0);
if (base == MAP_FAILED) {
reply = -1;
close(memfd);
goto out;
}
/* Register server's VA range with kernel for PFN matching */
ret = ublk_ctrl_reg_buf(dev, base, size);
if (ret < 0) {
ublk_dbg(UBLK_DBG_DEV,
"shmem_zc: kernel reg failed %d\n", ret);
munmap(base, size);
close(memfd);
reply = ret;
goto out;
}
/* Store in table for I/O handling */
idx = ublk_shmem_register(memfd);
if (idx >= 0) {
shmem_table[idx].mmap_base = base;
shmem_table[idx].size = size;
}
reply = idx;
out:
send(client_fd, &reply, sizeof(reply), 0);
close(client_fd);
}
struct shmem_listener_info {
int dev_id;
int stop_efd; /* eventfd to signal listener to stop */
int sock_fd; /* listener socket fd (output) */
struct ublk_dev *dev;
};
/*
* Socket listener thread: runs in the parent daemon context alongside
* the I/O threads. Accepts shared memory registration requests from
* clients via SCM_RIGHTS. Exits when stop_efd is signaled.
*/
static void *ublk_shmem_listener_fn(void *data)
{
struct shmem_listener_info *info = data;
struct pollfd pfds[2];
info->sock_fd = ublk_shmem_sock_create(info->dev_id);
if (info->sock_fd < 0)
return NULL;
pfds[0].fd = info->sock_fd;
pfds[0].events = POLLIN;
pfds[1].fd = info->stop_efd;
pfds[1].events = POLLIN;
while (1) {
int ret = poll(pfds, 2, -1);
if (ret < 0)
break;
/* Stop signal from parent */
if (pfds[1].revents & POLLIN)
break;
/* Client connection */
if (pfds[0].revents & POLLIN)
ublk_shmem_handle_client(info->sock_fd, info->dev);
}
return NULL;
}
static int ublk_shmem_htlb_setup(const struct dev_ctx *ctx,
struct ublk_dev *dev)
{
int fd, idx, ret;
struct stat st;
void *base;
fd = open(ctx->htlb_path, O_RDWR);
if (fd < 0) {
ublk_err("htlb: can't open %s\n", ctx->htlb_path);
return -errno;
}
if (fstat(fd, &st) < 0 || st.st_size <= 0) {
ublk_err("htlb: invalid file size\n");
close(fd);
return -EINVAL;
}
base = mmap(NULL, st.st_size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, 0);
if (base == MAP_FAILED) {
ublk_err("htlb: mmap failed\n");
close(fd);
return -ENOMEM;
}
ret = ublk_ctrl_reg_buf(dev, base, st.st_size);
if (ret < 0) {
ublk_err("htlb: reg_buf failed: %d\n", ret);
munmap(base, st.st_size);
close(fd);
return ret;
}
if (shmem_count >= UBLK_BUF_MAX) {
munmap(base, st.st_size);
close(fd);
return -ENOMEM;
}
idx = shmem_count++;
shmem_table[idx].fd = fd;
shmem_table[idx].mmap_base = base;
shmem_table[idx].size = st.st_size;
ublk_dbg(UBLK_DBG_DEV, "htlb registered: index=%d size=%zu\n",
idx, (size_t)st.st_size);
return 0;
}
static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
{
const struct ublksrv_ctrl_dev_info *dinfo = &dev->dev_info;
struct shmem_listener_info linfo = {};
struct ublk_thread_info *tinfo;
unsigned long long extra_flags = 0;
cpu_set_t *affinity_buf;
unsigned char (*q_thread_map)[UBLK_MAX_QUEUES] = NULL;
uint64_t stop_val = 1;
pthread_t listener;
void *thread_ret;
sem_t ready;
int ret, i;
@ -1187,15 +1487,44 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
goto fail_start;
}
if (ctx->htlb_path) {
ret = ublk_shmem_htlb_setup(ctx, dev);
if (ret < 0) {
ublk_err("htlb setup failed: %d\n", ret);
ublk_ctrl_stop_dev(dev);
goto fail_start;
}
}
ublk_ctrl_get_info(dev);
if (ctx->fg)
ublk_ctrl_dump(dev);
else
ublk_send_dev_event(ctx, dev, dev->dev_info.dev_id);
fail_start:
/* wait until we are terminated */
for (i = 0; i < dev->nthreads; i++)
/*
* Wait for I/O threads to exit. While waiting, a listener
* thread accepts shared memory registration requests from
* clients via a per-device unix socket (SCM_RIGHTS fd passing).
*/
linfo.dev_id = dinfo->dev_id;
linfo.dev = dev;
linfo.stop_efd = eventfd(0, 0);
if (linfo.stop_efd >= 0)
pthread_create(&listener, NULL,
ublk_shmem_listener_fn, &linfo);
for (i = 0; i < (int)dev->nthreads; i++)
pthread_join(tinfo[i].thread, &thread_ret);
/* Signal listener thread to stop and wait for it */
if (linfo.stop_efd >= 0) {
write(linfo.stop_efd, &stop_val, sizeof(stop_val));
pthread_join(listener, NULL);
close(linfo.stop_efd);
ublk_shmem_sock_destroy(dinfo->dev_id, linfo.sock_fd);
}
ublk_shmem_unregister_all();
free(tinfo);
fail:
for (i = 0; i < dinfo->nr_hw_queues; i++)
@ -1625,6 +1954,7 @@ static int cmd_dev_get_features(void)
FEAT_NAME(UBLK_F_SAFE_STOP_DEV),
FEAT_NAME(UBLK_F_BATCH_IO),
FEAT_NAME(UBLK_F_NO_AUTO_PART_SCAN),
FEAT_NAME(UBLK_F_SHMEM_ZC),
};
struct ublk_dev *dev;
__u64 features = 0;
@ -1797,6 +2127,8 @@ int main(int argc, char *argv[])
{ "safe", 0, NULL, 0 },
{ "batch", 0, NULL, 'b'},
{ "no_auto_part_scan", 0, NULL, 0 },
{ "shmem_zc", 0, NULL, 0 },
{ "htlb", 1, NULL, 0 },
{ 0, 0, 0, 0 }
};
const struct ublk_tgt_ops *ops = NULL;
@ -1912,6 +2244,10 @@ int main(int argc, char *argv[])
ctx.safe_stop = 1;
if (!strcmp(longopts[option_idx].name, "no_auto_part_scan"))
ctx.flags |= UBLK_F_NO_AUTO_PART_SCAN;
if (!strcmp(longopts[option_idx].name, "shmem_zc"))
ctx.flags |= UBLK_F_SHMEM_ZC;
if (!strcmp(longopts[option_idx].name, "htlb"))
ctx.htlb_path = strdup(optarg);
break;
case '?':
/*

View File

@ -96,6 +96,8 @@ struct dev_ctx {
/* for 'update_size' command */
unsigned long long size;
char *htlb_path;
union {
struct stripe_ctx stripe;
struct fault_inject_ctx fault_inject;
@ -602,6 +604,18 @@ static inline void ublk_queued_tgt_io(struct ublk_thread *t, struct ublk_queue *
}
}
/* shared memory zero-copy support */
#define UBLK_BUF_MAX 256
struct ublk_shmem_entry {
int fd;
void *mmap_base;
size_t size;
};
extern struct ublk_shmem_entry shmem_table[UBLK_BUF_MAX];
extern int shmem_count;
extern const struct ublk_tgt_ops null_tgt_ops;
extern const struct ublk_tgt_ops loop_tgt_ops;
extern const struct ublk_tgt_ops stripe_tgt_ops;