struct rpc_xprt_ops {
void (*set_buffer_size)(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize);
- int (*reserve_xprt)(struct rpc_task *task);
+ int (*reserve_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*rpcbind)(struct rpc_task *task);
void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
struct rpc_xprt *xprt_create_transport(struct xprt_create *args);
void xprt_connect(struct rpc_task *task);
void xprt_reserve(struct rpc_task *task);
-int xprt_reserve_xprt(struct rpc_task *task);
-int xprt_reserve_xprt_cong(struct rpc_task *task);
+int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
+int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
int xprt_prepare_transmit(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task);
void xprt_end_transmit(struct rpc_task *task);
* transport connects from colliding with writes. No congestion control
* is provided.
*/
-int xprt_reserve_xprt(struct rpc_task *task)
+int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
- struct rpc_xprt *xprt = req->rq_xprt;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
if (task == xprt->snd_task)
goto out_sleep;
}
xprt->snd_task = task;
- req->rq_bytes_sent = 0;
- req->rq_ntrans++;
+ if (req != NULL) {
+ req->rq_bytes_sent = 0;
+ req->rq_ntrans++;
+ }
return 1;
task->tk_pid, xprt);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- if (req->rq_ntrans)
+ if (req != NULL && req->rq_ntrans)
rpc_sleep_on(&xprt->resend, task, NULL);
else
rpc_sleep_on(&xprt->sending, task, NULL);
* integrated into the decision of whether a request is allowed to be
* woken up and given access to the transport.
*/
-int xprt_reserve_xprt_cong(struct rpc_task *task)
+int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
- struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
return 1;
goto out_sleep;
}
+ if (req == NULL) {
+ xprt->snd_task = task;
+ return 1;
+ }
if (__xprt_get_cong(xprt, task)) {
xprt->snd_task = task;
- if (req) {
- req->rq_bytes_sent = 0;
- req->rq_ntrans++;
- }
+ req->rq_bytes_sent = 0;
+ req->rq_ntrans++;
return 1;
}
xprt_clear_locked(xprt);
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- if (req && req->rq_ntrans)
+ if (req != NULL && req->rq_ntrans)
rpc_sleep_on(&xprt->resend, task, NULL);
else
rpc_sleep_on(&xprt->sending, task, NULL);
int retval;
spin_lock_bh(&xprt->transport_lock);
- retval = xprt->ops->reserve_xprt(task);
+ retval = xprt->ops->reserve_xprt(xprt, task);
spin_unlock_bh(&xprt->transport_lock);
return retval;
}
task = rpc_wake_up_next(&xprt->resend);
if (!task) {
task = rpc_wake_up_next(&xprt->sending);
- if (!task)
+ if (task == NULL)
goto out_unlock;
}
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
{
struct rpc_task *task;
+ struct rpc_rqst *req;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;
task = rpc_wake_up_next(&xprt->resend);
if (!task) {
task = rpc_wake_up_next(&xprt->sending);
- if (!task)
+ if (task == NULL)
goto out_unlock;
}
+
+ req = task->tk_rqstp;
+ if (req == NULL) {
+ xprt->snd_task = task;
+ return;
+ }
if (__xprt_get_cong(xprt, task)) {
- struct rpc_rqst *req = task->tk_rqstp;
xprt->snd_task = task;
- if (req) {
- req->rq_bytes_sent = 0;
- req->rq_ntrans++;
- }
+ req->rq_bytes_sent = 0;
+ req->rq_ntrans++;
return;
}
out_unlock:
err = req->rq_reply_bytes_recvd;
goto out_unlock;
}
- if (!xprt->ops->reserve_xprt(task))
+ if (!xprt->ops->reserve_xprt(xprt, task))
err = -EAGAIN;
out_unlock:
spin_unlock_bh(&xprt->transport_lock);
struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
- if (task->tk_rqstp)
- return;
if (!list_empty(&xprt->free)) {
struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
list_del_init(&req->rq_list);
}
dprintk("RPC: waiting for request slot\n");
task->tk_status = -EAGAIN;
- task->tk_timeout = 0;
rpc_sleep_on(&xprt->backlog, task, NULL);
}
{
struct rpc_xprt *xprt = task->tk_xprt;
+ task->tk_status = 0;
+ if (task->tk_rqstp != NULL)
+ return;
+
+ /* Note: grabbing the xprt_lock_write() here is not strictly needed,
+ * but ensures that we throttle new slot allocation if the transport
+ * is congested (e.g. if reconnecting or if we're out of socket
+ * write buffer space).
+ */
+ task->tk_timeout = 0;
+ task->tk_status = -EAGAIN;
+ if (!xprt_lock_write(xprt, task))
+ return;
+
task->tk_status = -EIO;
spin_lock(&xprt->reserve_lock);
xprt_alloc_slot(task);
spin_unlock(&xprt->reserve_lock);
+ xprt_release_write(xprt, task);
}
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)