sunrpc: allow svc_recv() to return -ETIMEDOUT and -EBUSY

To dynamically adjust the thread count, nfsd requires some information
about how busy things are.

Change svc_recv() to take a timeout value, and then allow the wait for
work to time out if it's set. If a timeout is not defined, then the
schedule will be set to MAX_SCHEDULE_TIMEOUT. If the task waits for the
full timeout, then have it return -ETIMEDOUT to the caller.

If it wakes up, finds that there is more work and that no threads are
available, then attempt to set SP_TASK_STARTING. If wasn't already set,
have the task return -EBUSY to cue to the caller that the service could
use more threads.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
This commit is contained in:
Jeff Layton 2026-01-06 13:59:48 -05:00 committed by Chuck Lever
parent 7f221b340d
commit a0022a38be
6 changed files with 47 additions and 13 deletions

View File

@ -141,7 +141,7 @@ lockd(void *vrqstp)
*/
while (!svc_thread_should_stop(rqstp)) {
nlmsvc_retry_blocked(rqstp);
svc_recv(rqstp);
svc_recv(rqstp, 0);
}
if (nlmsvc_ops)
nlmsvc_invalidate_all();

View File

@ -81,7 +81,7 @@ nfs4_callback_svc(void *vrqstp)
set_freezable();
while (!svc_thread_should_stop(rqstp))
svc_recv(rqstp);
svc_recv(rqstp, 0);
svc_exit_thread(rqstp);
return 0;

View File

@ -902,7 +902,7 @@ nfsd(void *vrqstp)
* The main request loop
*/
while (!svc_thread_should_stop(rqstp)) {
svc_recv(rqstp);
svc_recv(rqstp, 0);
nfsd_file_net_dispose(nn);
}

View File

@ -55,6 +55,7 @@ enum {
SP_TASK_PENDING, /* still work to do even if no xprt is queued */
SP_NEED_VICTIM, /* One thread needs to agree to exit */
SP_VICTIM_REMAINS, /* One thread needs to actually exit */
SP_TASK_STARTING, /* Task has started but not added to idle yet */
};

View File

@ -61,7 +61,7 @@ static inline u32 svc_sock_final_rec(struct svc_sock *svsk)
/*
* Function prototypes.
*/
void svc_recv(struct svc_rqst *rqstp);
int svc_recv(struct svc_rqst *rqstp, long timeo);
void svc_send(struct svc_rqst *rqstp);
int svc_addsock(struct svc_serv *serv, struct net *net,
const int fd, char *name_return, const size_t len,

View File

@ -714,15 +714,21 @@ svc_thread_should_sleep(struct svc_rqst *rqstp)
return true;
}
static void svc_thread_wait_for_work(struct svc_rqst *rqstp)
static bool svc_schedule_timeout(long timeo)
{
return schedule_timeout(timeo ? timeo : MAX_SCHEDULE_TIMEOUT) == 0;
}
static bool svc_thread_wait_for_work(struct svc_rqst *rqstp, long timeo)
{
struct svc_pool *pool = rqstp->rq_pool;
bool did_timeout = false;
if (svc_thread_should_sleep(rqstp)) {
set_current_state(TASK_IDLE | TASK_FREEZABLE);
llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
if (likely(svc_thread_should_sleep(rqstp)))
schedule();
did_timeout = svc_schedule_timeout(timeo);
while (!llist_del_first_this(&pool->sp_idle_threads,
&rqstp->rq_idle)) {
@ -734,7 +740,7 @@ static void svc_thread_wait_for_work(struct svc_rqst *rqstp)
* for this new work. This thread can safely sleep
* until woken again.
*/
schedule();
did_timeout = svc_schedule_timeout(timeo);
set_current_state(TASK_IDLE | TASK_FREEZABLE);
}
__set_current_state(TASK_RUNNING);
@ -742,6 +748,7 @@ static void svc_thread_wait_for_work(struct svc_rqst *rqstp)
cond_resched();
}
try_to_freeze();
return did_timeout;
}
static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
@ -835,25 +842,38 @@ static void svc_thread_wake_next(struct svc_rqst *rqstp)
/**
* svc_recv - Receive and process the next request on any transport
* @rqstp: an idle RPC service thread
* @timeo: timeout (in jiffies) (0 means infinite timeout)
*
* This code is carefully organised not to touch any cachelines in
* the shared svc_serv structure, only cachelines in the local
* svc_pool.
*
* If the timeout is 0, then the sleep will never time out.
*
* Returns -ETIMEDOUT if idle for an extended period
* -EBUSY if there is more work to do than available threads
* 0 otherwise.
*/
void svc_recv(struct svc_rqst *rqstp)
int svc_recv(struct svc_rqst *rqstp, long timeo)
{
struct svc_pool *pool = rqstp->rq_pool;
bool did_timeout;
int ret = 0;
if (!svc_alloc_arg(rqstp))
return;
return ret;
svc_thread_wait_for_work(rqstp);
did_timeout = svc_thread_wait_for_work(rqstp, timeo);
if (did_timeout && svc_thread_should_sleep(rqstp) &&
pool->sp_nrthrmin && pool->sp_nrthreads > pool->sp_nrthrmin)
ret = -ETIMEDOUT;
clear_bit(SP_TASK_PENDING, &pool->sp_flags);
if (svc_thread_should_stop(rqstp)) {
svc_thread_wake_next(rqstp);
return;
return ret;
}
rqstp->rq_xprt = svc_xprt_dequeue(pool);
@ -865,10 +885,22 @@ void svc_recv(struct svc_rqst *rqstp)
* cache information to be provided. When there are no
* idle threads, we reduce the wait time.
*/
if (pool->sp_idle_threads.first)
if (pool->sp_idle_threads.first) {
rqstp->rq_chandle.thread_wait = 5 * HZ;
else
} else {
rqstp->rq_chandle.thread_wait = 1 * HZ;
/*
* No idle threads: signal -EBUSY so the caller
* can consider spawning another thread. Use
* SP_TASK_STARTING to limit this signal to one
* thread at a time; the caller clears this flag
* after starting a new thread.
*/
if (!did_timeout && timeo &&
!test_and_set_bit(SP_TASK_STARTING,
&pool->sp_flags))
ret = -EBUSY;
}
trace_svc_xprt_dequeue(rqstp);
svc_handle_xprt(rqstp, xprt);
@ -887,6 +919,7 @@ void svc_recv(struct svc_rqst *rqstp)
}
}
#endif
return ret;
}
EXPORT_SYMBOL_GPL(svc_recv);