Merge branch 'mptcp-implement-read_sock-and-splice_read'

Matthieu Baerts says:

====================
mptcp: implement .read_sock and .splice_read

This series is a preparation work for future in-kernel MPTCP sockets
usage. Here, two interfaces are implemented: read_sock and splice_read.
As a result of this series, splice() with MPTCP sockets -- which was
already supported -- is now improved.

- Patches 1-2: .read_sock implementation

- Patches 3-4: .splice_read implementation

- Patches 5-6: validate splice() support with MPTCP sockets.
====================

Link: https://patch.msgid.link/20260130-net-next-mptcp-splice-v2-0-31332ba70d7f@kernel.org
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
Jakub Kicinski 2026-02-02 18:15:35 -08:00
commit 84b86025f6
6 changed files with 308 additions and 19 deletions

View File

@ -347,6 +347,15 @@ extern struct proto tcp_prot;
#define TCP_DEC_STATS(net, field) SNMP_DEC_STATS((net)->mib.tcp_statistics, field)
#define TCP_ADD_STATS(net, field, val) SNMP_ADD_STATS((net)->mib.tcp_statistics, field, val)
/*
* TCP splice context
*/
struct tcp_splice_state {
struct pipe_inode_info *pipe;
size_t len;
unsigned int flags;
};
void tcp_tsq_work_init(void);
int tcp_v4_err(struct sk_buff *skb, u32);
@ -378,6 +387,8 @@ void tcp_rcv_space_adjust(struct sock *sk);
int tcp_twsk_unique(struct sock *sk, struct sock *sktw, void *twp);
void tcp_twsk_destructor(struct sock *sk);
void tcp_twsk_purge(struct list_head *net_exit_list);
int tcp_splice_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
unsigned int offset, size_t len);
ssize_t tcp_splice_read(struct socket *sk, loff_t *ppos,
struct pipe_inode_info *pipe, size_t len,
unsigned int flags);

View File

@ -318,15 +318,6 @@ EXPORT_SYMBOL(tcp_have_smc);
struct percpu_counter tcp_sockets_allocated ____cacheline_aligned_in_smp;
EXPORT_IPV6_MOD(tcp_sockets_allocated);
/*
* TCP splice context
*/
struct tcp_splice_state {
struct pipe_inode_info *pipe;
size_t len;
unsigned int flags;
};
/*
* Pressure flag: try to collapse.
* Technical note: it is used by multiple contexts non atomically.
@ -791,8 +782,8 @@ void tcp_push(struct sock *sk, int flags, int mss_now,
__tcp_push_pending_frames(sk, mss_now, nonagle);
}
static int tcp_splice_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
unsigned int offset, size_t len)
int tcp_splice_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
unsigned int offset, size_t len)
{
struct tcp_splice_state *tss = rd_desc->arg.data;
int ret;

View File

@ -1995,6 +1995,17 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied);
static void mptcp_eat_recv_skb(struct sock *sk, struct sk_buff *skb)
{
/* avoid the indirect call, we know the destructor is sock_rfree */
skb->destructor = NULL;
skb->sk = NULL;
atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
sk_mem_uncharge(sk, skb->truesize);
__skb_unlink(skb, &sk->sk_receive_queue);
skb_attempt_defer_free(skb);
}
static int __mptcp_recvmsg_mskq(struct sock *sk, struct msghdr *msg,
size_t len, int flags, int copied_total,
struct scm_timestamping_internal *tss,
@ -2049,13 +2060,7 @@ static int __mptcp_recvmsg_mskq(struct sock *sk, struct msghdr *msg,
break;
}
/* avoid the indirect call, we know the destructor is sock_rfree */
skb->destructor = NULL;
skb->sk = NULL;
atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
sk_mem_uncharge(sk, skb->truesize);
__skb_unlink(skb, &sk->sk_receive_queue);
skb_attempt_defer_free(skb);
mptcp_eat_recv_skb(sk, skb);
}
if (copied >= len)
@ -4312,6 +4317,201 @@ static __poll_t mptcp_poll(struct file *file, struct socket *sock,
return mask;
}
static struct sk_buff *mptcp_recv_skb(struct sock *sk, u32 *off)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct sk_buff *skb;
u32 offset;
if (!list_empty(&msk->backlog_list))
mptcp_move_skbs(sk);
while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) {
offset = MPTCP_SKB_CB(skb)->offset;
if (offset < skb->len) {
*off = offset;
return skb;
}
mptcp_eat_recv_skb(sk, skb);
}
return NULL;
}
/*
* Note:
* - It is assumed that the socket was locked by the caller.
*/
static int __mptcp_read_sock(struct sock *sk, read_descriptor_t *desc,
sk_read_actor_t recv_actor, bool noack)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct sk_buff *skb;
int copied = 0;
u32 offset;
msk_owned_by_me(msk);
if (sk->sk_state == TCP_LISTEN)
return -ENOTCONN;
while ((skb = mptcp_recv_skb(sk, &offset)) != NULL) {
u32 data_len = skb->len - offset;
int count;
u32 size;
size = min_t(size_t, data_len, INT_MAX);
count = recv_actor(desc, skb, offset, size);
if (count <= 0) {
if (!copied)
copied = count;
break;
}
copied += count;
msk->bytes_consumed += count;
if (count < data_len) {
MPTCP_SKB_CB(skb)->offset += count;
MPTCP_SKB_CB(skb)->map_seq += count;
break;
}
mptcp_eat_recv_skb(sk, skb);
}
if (noack)
goto out;
mptcp_rcv_space_adjust(msk, copied);
if (copied > 0) {
mptcp_recv_skb(sk, &offset);
mptcp_cleanup_rbuf(msk, copied);
}
out:
return copied;
}
static int mptcp_read_sock(struct sock *sk, read_descriptor_t *desc,
sk_read_actor_t recv_actor)
{
return __mptcp_read_sock(sk, desc, recv_actor, false);
}
static int __mptcp_splice_read(struct sock *sk, struct tcp_splice_state *tss)
{
/* Store TCP splice context information in read_descriptor_t. */
read_descriptor_t rd_desc = {
.arg.data = tss,
.count = tss->len,
};
return mptcp_read_sock(sk, &rd_desc, tcp_splice_data_recv);
}
/**
* mptcp_splice_read - splice data from MPTCP socket to a pipe
* @sock: socket to splice from
* @ppos: position (not valid)
* @pipe: pipe to splice to
* @len: number of bytes to splice
* @flags: splice modifier flags
*
* Description:
* Will read pages from given socket and fill them into a pipe.
*
* Return:
* Amount of bytes that have been spliced.
*
**/
static ssize_t mptcp_splice_read(struct socket *sock, loff_t *ppos,
struct pipe_inode_info *pipe, size_t len,
unsigned int flags)
{
struct tcp_splice_state tss = {
.pipe = pipe,
.len = len,
.flags = flags,
};
struct sock *sk = sock->sk;
ssize_t spliced = 0;
int ret = 0;
long timeo;
/*
* We can't seek on a socket input
*/
if (unlikely(*ppos))
return -ESPIPE;
lock_sock(sk);
mptcp_rps_record_subflows(mptcp_sk(sk));
timeo = sock_rcvtimeo(sk, sock->file->f_flags & O_NONBLOCK);
while (tss.len) {
ret = __mptcp_splice_read(sk, &tss);
if (ret < 0) {
break;
} else if (!ret) {
if (spliced)
break;
if (sock_flag(sk, SOCK_DONE))
break;
if (sk->sk_err) {
ret = sock_error(sk);
break;
}
if (sk->sk_shutdown & RCV_SHUTDOWN)
break;
if (sk->sk_state == TCP_CLOSE) {
/*
* This occurs when user tries to read
* from never connected socket.
*/
ret = -ENOTCONN;
break;
}
if (!timeo) {
ret = -EAGAIN;
break;
}
/* if __mptcp_splice_read() got nothing while we have
* an skb in receive queue, we do not want to loop.
* This might happen with URG data.
*/
if (!skb_queue_empty(&sk->sk_receive_queue))
break;
ret = sk_wait_data(sk, &timeo, NULL);
if (ret < 0)
break;
if (signal_pending(current)) {
ret = sock_intr_errno(timeo);
break;
}
continue;
}
tss.len -= ret;
spliced += ret;
if (!tss.len || !timeo)
break;
release_sock(sk);
lock_sock(sk);
if (sk->sk_err || sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
signal_pending(current))
break;
}
release_sock(sk);
if (spliced)
return spliced;
return ret;
}
static const struct proto_ops mptcp_stream_ops = {
.family = PF_INET,
.owner = THIS_MODULE,
@ -4332,6 +4532,8 @@ static const struct proto_ops mptcp_stream_ops = {
.recvmsg = inet_recvmsg,
.mmap = sock_no_mmap,
.set_rcvlowat = mptcp_set_rcvlowat,
.read_sock = mptcp_read_sock,
.splice_read = mptcp_splice_read,
};
static struct inet_protosw mptcp_protosw = {
@ -4436,6 +4638,8 @@ static const struct proto_ops mptcp_v6_stream_ops = {
.compat_ioctl = inet6_compat_ioctl,
#endif
.set_rcvlowat = mptcp_set_rcvlowat,
.read_sock = mptcp_read_sock,
.splice_read = mptcp_splice_read,
};
static struct proto mptcp_v6_prot;

View File

@ -11,6 +11,7 @@ TEST_PROGS := \
mptcp_connect_checksum.sh \
mptcp_connect_mmap.sh \
mptcp_connect_sendfile.sh \
mptcp_connect_splice.sh \
mptcp_join.sh \
mptcp_sockopt.sh \
pm_netlink.sh \

View File

@ -52,6 +52,7 @@ enum cfg_mode {
CFG_MODE_POLL,
CFG_MODE_MMAP,
CFG_MODE_SENDFILE,
CFG_MODE_SPLICE,
};
enum cfg_peek {
@ -124,7 +125,7 @@ static void die_usage(void)
fprintf(stderr, "\t-j -- add additional sleep at connection start and tear down "
"-- for MPJ tests\n");
fprintf(stderr, "\t-l -- listens mode, accepts incoming connection\n");
fprintf(stderr, "\t-m [poll|mmap|sendfile] -- use poll(default)/mmap+write/sendfile\n");
fprintf(stderr, "\t-m [poll|mmap|sendfile|splice] -- use poll(default)/mmap+write/sendfile/splice\n");
fprintf(stderr, "\t-M mark -- set socket packet mark\n");
fprintf(stderr, "\t-o option -- test sockopt <option>\n");
fprintf(stderr, "\t-p num -- use port num\n");
@ -935,6 +936,71 @@ static int copyfd_io_sendfile(int infd, int peerfd, int outfd,
return err;
}
static int do_splice(const int infd, const int outfd, const size_t len,
struct wstate *winfo)
{
ssize_t in_bytes, out_bytes;
int pipefd[2];
int err;
err = pipe(pipefd);
if (err) {
perror("pipe");
return 2;
}
again:
in_bytes = splice(infd, NULL, pipefd[1], NULL, len - winfo->total_len,
SPLICE_F_MOVE | SPLICE_F_MORE);
if (in_bytes < 0) {
perror("splice in");
err = 3;
} else if (in_bytes > 0) {
out_bytes = splice(pipefd[0], NULL, outfd, NULL, in_bytes,
SPLICE_F_MOVE | SPLICE_F_MORE);
if (out_bytes < 0) {
perror("splice out");
err = 4;
} else if (in_bytes != out_bytes) {
fprintf(stderr, "Unexpected transfer: %zu vs %zu\n",
in_bytes, out_bytes);
err = 5;
} else {
goto again;
}
}
close(pipefd[0]);
close(pipefd[1]);
return err;
}
static int copyfd_io_splice(int infd, int peerfd, int outfd, unsigned int size,
bool *in_closed_after_out, struct wstate *winfo)
{
int err;
if (listen_mode) {
err = do_splice(peerfd, outfd, size, winfo);
if (err)
return err;
err = do_splice(infd, peerfd, size, winfo);
} else {
err = do_splice(infd, peerfd, size, winfo);
if (err)
return err;
shut_wr(peerfd);
err = do_splice(peerfd, outfd, size, winfo);
*in_closed_after_out = true;
}
return err;
}
static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd, struct wstate *winfo)
{
bool in_closed_after_out = false;
@ -967,6 +1033,14 @@ static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd, struct
&in_closed_after_out, winfo);
break;
case CFG_MODE_SPLICE:
file_size = get_infd_size(infd);
if (file_size < 0)
return file_size;
ret = copyfd_io_splice(infd, peerfd, outfd, file_size,
&in_closed_after_out, winfo);
break;
default:
fprintf(stderr, "Invalid mode %d\n", cfg_mode);
@ -1380,12 +1454,15 @@ int parse_mode(const char *mode)
return CFG_MODE_MMAP;
if (!strcasecmp(mode, "sendfile"))
return CFG_MODE_SENDFILE;
if (!strcasecmp(mode, "splice"))
return CFG_MODE_SPLICE;
fprintf(stderr, "Unknown test mode: %s\n", mode);
fprintf(stderr, "Supported modes are:\n");
fprintf(stderr, "\t\t\"poll\" - interleaved read/write using poll()\n");
fprintf(stderr, "\t\t\"mmap\" - send entire input file (mmap+write), then read response (-l will read input first)\n");
fprintf(stderr, "\t\t\"sendfile\" - send entire input file (sendfile), then read response (-l will read input first)\n");
fprintf(stderr, "\t\t\"splice\" - send entire input file (splice), then read response (-l will read input first)\n");
die_usage();

View File

@ -0,0 +1,5 @@
#!/bin/bash
# SPDX-License-Identifier: GPL-2.0
MPTCP_LIB_KSFT_TEST="$(basename "${0}" .sh)" \
"$(dirname "${0}")/mptcp_connect.sh" -m splice "${@}"