Merge branch 'af_unix-introduce-so_inq-scm_inq'

Kuniyuki Iwashima says:

====================
af_unix: Introduce SO_INQ & SCM_INQ.

We have an application that uses almost the same code for TCP and
AF_UNIX (SOCK_STREAM).

The application uses TCP_INQ for TCP, but AF_UNIX doesn't have it
and requires an extra syscall, ioctl(SIOCINQ) or getsockopt(SO_MEMINFO)
as an alternative.

Also, ioctl(SIOCINQ) for AF_UNIX SOCK_STREAM is more expensive because
it needs to iterate all skb in the receive queue.

This series adds a cached field for SIOCINQ to speed it up and introduce
SO_INQ, the generic version of TCP_INQ to get the queue length as cmsg in
each recvmsg().
====================

Link: https://patch.msgid.link/20250702223606.1054680-1-kuniyu@google.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
Jakub Kicinski 2025-07-08 18:05:27 -07:00
commit 042ef6aafd
10 changed files with 271 additions and 58 deletions

View File

@ -152,6 +152,9 @@
#define SO_PASSRIGHTS 83
#define SO_INQ 84
#define SCM_INQ SO_INQ
#if !defined(__KERNEL__)
#if __BITS_PER_LONG == 64

View File

@ -163,6 +163,9 @@
#define SO_PASSRIGHTS 83
#define SO_INQ 84
#define SCM_INQ SO_INQ
#if !defined(__KERNEL__)
#if __BITS_PER_LONG == 64

View File

@ -144,6 +144,9 @@
#define SO_PASSRIGHTS 0x4051
#define SO_INQ 0x4052
#define SCM_INQ SO_INQ
#if !defined(__KERNEL__)
#if __BITS_PER_LONG == 64

View File

@ -145,6 +145,9 @@
#define SO_PASSRIGHTS 0x005c
#define SO_INQ 0x005d
#define SCM_INQ SO_INQ
#if !defined(__KERNEL__)

View File

@ -47,6 +47,8 @@ struct unix_sock {
#define peer_wait peer_wq.wait
wait_queue_entry_t peer_wake;
struct scm_stat scm_stat;
int inq_len;
bool recvmsg_inq;
#if IS_ENABLED(CONFIG_AF_UNIX_OOB)
struct sk_buff *oob_skb;
#endif

View File

@ -147,6 +147,9 @@
#define SO_PASSRIGHTS 83
#define SO_INQ 84
#define SCM_INQ SO_INQ
#if !defined(__KERNEL__)
#if __BITS_PER_LONG == 64 || (defined(__x86_64__) && defined(__ILP32__))

View File

@ -934,6 +934,52 @@ static void unix_show_fdinfo(struct seq_file *m, struct socket *sock)
#define unix_show_fdinfo NULL
#endif
static bool unix_custom_sockopt(int optname)
{
switch (optname) {
case SO_INQ:
return true;
default:
return false;
}
}
static int unix_setsockopt(struct socket *sock, int level, int optname,
sockptr_t optval, unsigned int optlen)
{
struct unix_sock *u = unix_sk(sock->sk);
struct sock *sk = sock->sk;
int val;
if (level != SOL_SOCKET)
return -EOPNOTSUPP;
if (!unix_custom_sockopt(optname))
return sock_setsockopt(sock, level, optname, optval, optlen);
if (optlen != sizeof(int))
return -EINVAL;
if (copy_from_sockptr(&val, optval, sizeof(val)))
return -EFAULT;
switch (optname) {
case SO_INQ:
if (sk->sk_type != SOCK_STREAM)
return -EINVAL;
if (val > 1 || val < 0)
return -EINVAL;
WRITE_ONCE(u->recvmsg_inq, val);
break;
default:
return -ENOPROTOOPT;
}
return 0;
}
static const struct proto_ops unix_stream_ops = {
.family = PF_UNIX,
.owner = THIS_MODULE,
@ -950,6 +996,7 @@ static const struct proto_ops unix_stream_ops = {
#endif
.listen = unix_listen,
.shutdown = unix_shutdown,
.setsockopt = unix_setsockopt,
.sendmsg = unix_stream_sendmsg,
.recvmsg = unix_stream_recvmsg,
.read_skb = unix_stream_read_skb,
@ -1116,6 +1163,7 @@ static int unix_create(struct net *net, struct socket *sock, int protocol,
switch (sock->type) {
case SOCK_STREAM:
set_bit(SOCK_CUSTOM_SOCKOPT, &sock->flags);
sock->ops = &unix_stream_ops;
break;
/*
@ -1847,6 +1895,9 @@ static int unix_accept(struct socket *sock, struct socket *newsock,
skb_free_datagram(sk, skb);
wake_up_interruptible(&unix_sk(sk)->peer_wait);
if (tsk->sk_type == SOCK_STREAM)
set_bit(SOCK_CUSTOM_SOCKOPT, &newsock->flags);
/* attach accepted sock to socket */
unix_state_lock(tsk);
unix_update_edges(unix_sk(tsk));
@ -2297,6 +2348,7 @@ static int queue_oob(struct sock *sk, struct msghdr *msg, struct sock *other,
spin_lock(&other->sk_receive_queue.lock);
WRITE_ONCE(ousk->oob_skb, skb);
WRITE_ONCE(ousk->inq_len, ousk->inq_len + 1);
__skb_queue_tail(&other->sk_receive_queue, skb);
spin_unlock(&other->sk_receive_queue.lock);
@ -2319,6 +2371,7 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
struct sock *sk = sock->sk;
struct sk_buff *skb = NULL;
struct sock *other = NULL;
struct unix_sock *otheru;
struct scm_cookie scm;
bool fds_sent = false;
int err, sent = 0;
@ -2342,14 +2395,16 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
if (msg->msg_namelen) {
err = READ_ONCE(sk->sk_state) == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
goto out_err;
} else {
other = unix_peer(sk);
if (!other) {
err = -ENOTCONN;
goto out_err;
}
}
other = unix_peer(sk);
if (!other) {
err = -ENOTCONN;
goto out_err;
}
otheru = unix_sk(other);
if (READ_ONCE(sk->sk_shutdown) & SEND_SHUTDOWN)
goto out_pipe;
@ -2417,7 +2472,12 @@ static int unix_stream_sendmsg(struct socket *sock, struct msghdr *msg,
unix_maybe_add_creds(skb, sk, other);
scm_stat_add(other, skb);
skb_queue_tail(&other->sk_receive_queue, skb);
spin_lock(&other->sk_receive_queue.lock);
WRITE_ONCE(otheru->inq_len, otheru->inq_len + skb->len);
__skb_queue_tail(&other->sk_receive_queue, skb);
spin_unlock(&other->sk_receive_queue.lock);
unix_state_unlock(other);
other->sk_data_ready(other);
sent += size;
@ -2527,12 +2587,10 @@ int __unix_dgram_recvmsg(struct sock *sk, struct msghdr *msg, size_t size,
&err, &timeo, last));
if (!skb) { /* implies iolock unlocked */
unix_state_lock(sk);
/* Signal EOF on disconnected non-blocking SEQPACKET socket. */
if (sk->sk_type == SOCK_SEQPACKET && err == -EAGAIN &&
(sk->sk_shutdown & RCV_SHUTDOWN))
(READ_ONCE(sk->sk_shutdown) & RCV_SHUTDOWN))
err = 0;
unix_state_unlock(sk);
goto out;
}
@ -2706,6 +2764,7 @@ static int unix_stream_recv_urg(struct unix_stream_read_state *state)
if (!(state->flags & MSG_PEEK)) {
WRITE_ONCE(u->oob_skb, NULL);
WRITE_ONCE(u->inq_len, u->inq_len - 1);
if (oob_skb->prev != (struct sk_buff *)&sk->sk_receive_queue &&
!unix_skb_len(oob_skb->prev)) {
@ -2788,6 +2847,7 @@ static struct sk_buff *manage_oob(struct sk_buff *skb, struct sock *sk,
static int unix_stream_read_skb(struct sock *sk, skb_read_actor_t recv_actor)
{
struct sk_buff_head *queue = &sk->sk_receive_queue;
struct unix_sock *u = unix_sk(sk);
struct sk_buff *skb;
int err;
@ -2795,60 +2855,57 @@ static int unix_stream_read_skb(struct sock *sk, skb_read_actor_t recv_actor)
if (unlikely(READ_ONCE(sk->sk_state) != TCP_ESTABLISHED))
return -ENOTCONN;
mutex_lock(&u->iolock);
skb = skb_recv_datagram(sk, MSG_DONTWAIT, &err);
mutex_unlock(&u->iolock);
if (!skb)
err = sock_error(sk);
if (err)
return err;
mutex_lock(&u->iolock);
spin_lock(&queue->lock);
skb = __skb_dequeue(queue);
if (!skb) {
spin_unlock(&queue->lock);
mutex_unlock(&u->iolock);
return -EAGAIN;
}
WRITE_ONCE(u->inq_len, u->inq_len - skb->len);
#if IS_ENABLED(CONFIG_AF_UNIX_OOB)
if (unlikely(skb == READ_ONCE(u->oob_skb))) {
bool drop = false;
if (skb == u->oob_skb) {
WRITE_ONCE(u->oob_skb, NULL);
spin_unlock(&queue->lock);
mutex_unlock(&u->iolock);
unix_state_lock(sk);
if (sock_flag(sk, SOCK_DEAD)) {
unix_state_unlock(sk);
kfree_skb_reason(skb, SKB_DROP_REASON_SOCKET_CLOSE);
return -ECONNRESET;
}
spin_lock(&sk->sk_receive_queue.lock);
if (likely(skb == u->oob_skb)) {
WRITE_ONCE(u->oob_skb, NULL);
drop = true;
}
spin_unlock(&sk->sk_receive_queue.lock);
unix_state_unlock(sk);
if (drop) {
kfree_skb_reason(skb, SKB_DROP_REASON_UNIX_SKIP_OOB);
return -EAGAIN;
}
kfree_skb_reason(skb, SKB_DROP_REASON_UNIX_SKIP_OOB);
return -EAGAIN;
}
#endif
spin_unlock(&queue->lock);
mutex_unlock(&u->iolock);
return recv_actor(sk, skb);
}
static int unix_stream_read_generic(struct unix_stream_read_state *state,
bool freezable)
{
struct scm_cookie scm;
int noblock = state->flags & MSG_DONTWAIT;
struct socket *sock = state->socket;
struct msghdr *msg = state->msg;
struct sock *sk = sock->sk;
struct unix_sock *u = unix_sk(sk);
int copied = 0;
size_t size = state->size;
int flags = state->flags;
int noblock = flags & MSG_DONTWAIT;
bool check_creds = false;
int target;
struct scm_cookie scm;
unsigned int last_len;
struct unix_sock *u;
int copied = 0;
int err = 0;
long timeo;
int target;
int skip;
size_t size = state->size;
unsigned int last_len;
if (unlikely(READ_ONCE(sk->sk_state) != TCP_ESTABLISHED)) {
err = -EINVAL;
@ -2868,6 +2925,8 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
memset(&scm, 0, sizeof(scm));
u = unix_sk(sk);
/* Lock the socket to prevent queue disordering
* while sleeps in memcpy_tomsg
*/
@ -2959,14 +3018,12 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
}
/* Copy address just once */
if (state->msg && state->msg->msg_name) {
DECLARE_SOCKADDR(struct sockaddr_un *, sunaddr,
state->msg->msg_name);
unix_copy_addr(state->msg, skb->sk);
if (msg && msg->msg_name) {
DECLARE_SOCKADDR(struct sockaddr_un *, sunaddr, msg->msg_name);
BPF_CGROUP_RUN_PROG_UNIX_RECVMSG_LOCK(sk,
state->msg->msg_name,
&state->msg->msg_namelen);
unix_copy_addr(msg, skb->sk);
BPF_CGROUP_RUN_PROG_UNIX_RECVMSG_LOCK(sk, msg->msg_name,
&msg->msg_namelen);
sunaddr = NULL;
}
@ -2995,7 +3052,11 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
if (unix_skb_len(skb))
break;
skb_unlink(skb, &sk->sk_receive_queue);
spin_lock(&sk->sk_receive_queue.lock);
WRITE_ONCE(u->inq_len, u->inq_len - skb->len);
__skb_unlink(skb, &sk->sk_receive_queue);
spin_unlock(&sk->sk_receive_queue.lock);
consume_skb(skb);
if (scm.fp)
@ -3024,10 +3085,17 @@ static int unix_stream_read_generic(struct unix_stream_read_state *state,
} while (size);
mutex_unlock(&u->iolock);
if (state->msg)
scm_recv_unix(sock, state->msg, &scm, flags);
else
if (msg) {
scm_recv_unix(sock, msg, &scm, flags);
if (READ_ONCE(u->recvmsg_inq) || msg->msg_get_inq) {
msg->msg_inq = READ_ONCE(u->inq_len);
put_cmsg(msg, SOL_SOCKET, SCM_INQ,
sizeof(msg->msg_inq), &msg->msg_inq);
}
} else {
scm_destroy(&scm);
}
out:
return copied ? : err;
}
@ -3166,9 +3234,11 @@ long unix_inq_len(struct sock *sk)
if (READ_ONCE(sk->sk_state) == TCP_LISTEN)
return -EINVAL;
if (sk->sk_type == SOCK_STREAM)
return READ_ONCE(unix_sk(sk)->inq_len);
spin_lock(&sk->sk_receive_queue.lock);
if (sk->sk_type == SOCK_STREAM ||
sk->sk_type == SOCK_SEQPACKET) {
if (sk->sk_type == SOCK_SEQPACKET) {
skb_queue_walk(&sk->sk_receive_queue, skb)
amount += unix_skb_len(skb);
} else {

View File

@ -34,6 +34,7 @@ reuseport_bpf_numa
reuseport_dualstack
rxtimestamp
sctp_hello
scm_inq
scm_pidfd
scm_rights
sk_bind_sendto_listen

View File

@ -1,4 +1,4 @@
CFLAGS += $(KHDR_INCLUDES)
TEST_GEN_PROGS := diag_uid msg_oob scm_pidfd scm_rights unix_connect
TEST_GEN_PROGS := diag_uid msg_oob scm_inq scm_pidfd scm_rights unix_connect
include ../../lib.mk

View File

@ -0,0 +1,125 @@
// SPDX-License-Identifier: GPL-2.0
/* Copyright 2025 Google LLC */
#include <linux/sockios.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include "../../kselftest_harness.h"
#define NR_CHUNKS 100
#define MSG_LEN 256
struct scm_inq {
struct cmsghdr cmsghdr;
int inq;
};
FIXTURE(scm_inq)
{
int fd[2];
};
FIXTURE_VARIANT(scm_inq)
{
int type;
};
FIXTURE_VARIANT_ADD(scm_inq, stream)
{
.type = SOCK_STREAM,
};
FIXTURE_VARIANT_ADD(scm_inq, dgram)
{
.type = SOCK_DGRAM,
};
FIXTURE_VARIANT_ADD(scm_inq, seqpacket)
{
.type = SOCK_SEQPACKET,
};
FIXTURE_SETUP(scm_inq)
{
int err;
err = socketpair(AF_UNIX, variant->type | SOCK_NONBLOCK, 0, self->fd);
ASSERT_EQ(0, err);
}
FIXTURE_TEARDOWN(scm_inq)
{
close(self->fd[0]);
close(self->fd[1]);
}
static void send_chunks(struct __test_metadata *_metadata,
FIXTURE_DATA(scm_inq) *self)
{
char buf[MSG_LEN] = {};
int i, ret;
for (i = 0; i < NR_CHUNKS; i++) {
ret = send(self->fd[0], buf, sizeof(buf), 0);
ASSERT_EQ(sizeof(buf), ret);
}
}
static void recv_chunks(struct __test_metadata *_metadata,
FIXTURE_DATA(scm_inq) *self)
{
struct msghdr msg = {};
struct iovec iov = {};
struct scm_inq cmsg;
char buf[MSG_LEN];
int i, ret;
int inq;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = &cmsg;
msg.msg_controllen = CMSG_SPACE(sizeof(cmsg.inq));
iov.iov_base = buf;
iov.iov_len = sizeof(buf);
for (i = 0; i < NR_CHUNKS; i++) {
memset(buf, 0, sizeof(buf));
memset(&cmsg, 0, sizeof(cmsg));
ret = recvmsg(self->fd[1], &msg, 0);
ASSERT_EQ(MSG_LEN, ret);
ASSERT_NE(NULL, CMSG_FIRSTHDR(&msg));
ASSERT_EQ(CMSG_LEN(sizeof(cmsg.inq)), cmsg.cmsghdr.cmsg_len);
ASSERT_EQ(SOL_SOCKET, cmsg.cmsghdr.cmsg_level);
ASSERT_EQ(SCM_INQ, cmsg.cmsghdr.cmsg_type);
ret = ioctl(self->fd[1], SIOCINQ, &inq);
ASSERT_EQ(0, ret);
ASSERT_EQ(cmsg.inq, inq);
}
}
TEST_F(scm_inq, basic)
{
int err, inq;
err = setsockopt(self->fd[1], SOL_SOCKET, SO_INQ, &(int){1}, sizeof(int));
if (variant->type != SOCK_STREAM) {
ASSERT_EQ(-ENOPROTOOPT, -errno);
return;
}
ASSERT_EQ(0, err);
err = ioctl(self->fd[1], SIOCINQ, &inq);
ASSERT_EQ(0, err);
ASSERT_EQ(0, inq);
send_chunks(_metadata, self);
recv_chunks(_metadata, self);
}
TEST_HARNESS_MAIN