NFSv4: stateful NFSv4 RPC call interface
authorTrond Myklebust <Trond.Myklebust@netapp.com>
Tue, 3 Jan 2006 08:55:06 +0000 (09:55 +0100)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Fri, 6 Jan 2006 19:58:40 +0000 (14:58 -0500)
 The NFSv4 model requires us to complete all RPC calls that might
 establish state on the server whether or not the user wants to
 interrupt it. We may also need to schedule new work (including
 new RPC calls) in order to cancel the new state.

 The asynchronous RPC model will allow us to ensure that RPC calls
 always complete, but in order to allow for "synchronous" RPC, we
 want to add the ability to wait for completion.
 The waits are, of course, interruptible.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
fs/nfs/direct.c
include/linux/sunrpc/sched.h
net/sunrpc/sched.c

index a834423942c7dabd9a7ba49f9f072cb8fe371a86..ae2be0744191089984d26a206193bea4917a7ae8 100644 (file)
@@ -268,7 +268,6 @@ static void nfs_direct_read_schedule(struct nfs_direct_req *dreq,
                NFS_PROTO(inode)->read_setup(data);
 
                data->task.tk_cookie = (unsigned long) inode;
-               data->task.tk_calldata = data;
                data->complete = nfs_direct_read_result;
 
                lock_kernel();
index ac1326fc3e1aab631c4a2c8b4d984326097c2ef0..94b0afa4ab05c2a38f1d2c30e932deb1cfa2d3af 100644 (file)
@@ -42,6 +42,7 @@ struct rpc_task {
 #ifdef RPC_DEBUG
        unsigned long           tk_magic;       /* 0xf00baa */
 #endif
+       atomic_t                tk_count;       /* Reference count */
        struct list_head        tk_task;        /* global list of tasks */
        struct rpc_clnt *       tk_client;      /* RPC client */
        struct rpc_rqst *       tk_rqstp;       /* RPC request */
@@ -78,7 +79,6 @@ struct rpc_task {
        struct timer_list       tk_timer;       /* kernel timer */
        unsigned long           tk_timeout;     /* timeout for rpc_sleep() */
        unsigned short          tk_flags;       /* misc flags */
-       unsigned char           tk_active   : 1;/* Task has been activated */
        unsigned char           tk_priority : 2;/* Task priority */
        unsigned long           tk_runstate;    /* Task run status */
        struct workqueue_struct *tk_workqueue;  /* Normally rpciod, but could
@@ -136,7 +136,6 @@ struct rpc_call_ops {
 #define RPC_IS_SWAPPER(t)      ((t)->tk_flags & RPC_TASK_SWAPPER)
 #define RPC_DO_ROOTOVERRIDE(t) ((t)->tk_flags & RPC_TASK_ROOTCREDS)
 #define RPC_ASSASSINATED(t)    ((t)->tk_flags & RPC_TASK_KILLED)
-#define RPC_IS_ACTIVATED(t)    ((t)->tk_active)
 #define RPC_DO_CALLBACK(t)     ((t)->tk_callback != NULL)
 #define RPC_IS_SOFT(t)         ((t)->tk_flags & RPC_TASK_SOFT)
 #define RPC_TASK_UNINTERRUPTIBLE(t) ((t)->tk_flags & RPC_TASK_NOINTR)
@@ -145,6 +144,7 @@ struct rpc_call_ops {
 #define RPC_TASK_QUEUED                1
 #define RPC_TASK_WAKEUP                2
 #define RPC_TASK_HAS_TIMER     3
+#define RPC_TASK_ACTIVE                4
 
 #define RPC_IS_RUNNING(t)      (test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
 #define rpc_set_running(t)     (set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
@@ -175,6 +175,15 @@ struct rpc_call_ops {
                smp_mb__after_clear_bit(); \
        } while (0)
 
+#define RPC_IS_ACTIVATED(t)    (test_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate))
+#define rpc_set_active(t)      (set_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate))
+#define rpc_clear_active(t)    \
+       do { \
+               smp_mb__before_clear_bit(); \
+               clear_bit(RPC_TASK_ACTIVE, &(t)->tk_runstate); \
+               smp_mb__after_clear_bit(); \
+       } while(0)
+
 /*
  * Task priorities.
  * Note: if you change these, you must also change
@@ -237,6 +246,8 @@ struct rpc_wait_queue {
  */
 struct rpc_task *rpc_new_task(struct rpc_clnt *, int flags,
                                const struct rpc_call_ops *ops, void *data);
+struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
+                               const struct rpc_call_ops *ops, void *data);
 struct rpc_task *rpc_new_child(struct rpc_clnt *, struct rpc_task *parent);
 void           rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
                                int flags, const struct rpc_call_ops *ops,
@@ -260,6 +271,7 @@ void *              rpc_malloc(struct rpc_task *, size_t);
 int            rpciod_up(void);
 void           rpciod_down(void);
 void           rpciod_wake_up(void);
+int            __rpc_wait_for_completion_task(struct rpc_task *task, int (*)(void *));
 #ifdef RPC_DEBUG
 void           rpc_show_tasks(void);
 #endif
@@ -272,6 +284,11 @@ static inline void rpc_exit(struct rpc_task *task, int status)
        task->tk_action = rpc_exit_task;
 }
 
+static inline int rpc_wait_for_completion_task(struct rpc_task *task)
+{
+       return __rpc_wait_for_completion_task(task, NULL);
+}
+
 #ifdef RPC_DEBUG
 static inline const char * rpc_qname(struct rpc_wait_queue *q)
 {
index 2d74a167202845c1435c300e648e60c0d25acae8..82d158dad16de78201596c3b3d81fe5fca0dfaf5 100644 (file)
@@ -264,6 +264,35 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 }
 EXPORT_SYMBOL(rpc_init_wait_queue);
 
+static int rpc_wait_bit_interruptible(void *word)
+{
+       if (signal_pending(current))
+               return -ERESTARTSYS;
+       schedule();
+       return 0;
+}
+
+/*
+ * Mark an RPC call as having completed by clearing the 'active' bit
+ */
+static inline void rpc_mark_complete_task(struct rpc_task *task)
+{
+       rpc_clear_active(task);
+       wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+}
+
+/*
+ * Allow callers to wait for completion of an RPC call
+ */
+int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
+{
+       if (action == NULL)
+               action = rpc_wait_bit_interruptible;
+       return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+                       action, TASK_INTERRUPTIBLE);
+}
+EXPORT_SYMBOL(__rpc_wait_for_completion_task);
+
 /*
  * Make an RPC task runnable.
  *
@@ -299,10 +328,7 @@ static void rpc_make_runnable(struct rpc_task *task)
 static inline void
 rpc_schedule_run(struct rpc_task *task)
 {
-       /* Don't run a child twice! */
-       if (RPC_IS_ACTIVATED(task))
-               return;
-       task->tk_active = 1;
+       rpc_set_active(task);
        rpc_make_runnable(task);
 }
 
@@ -324,8 +350,7 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
        }
 
        /* Mark the task as being activated if so needed */
-       if (!RPC_IS_ACTIVATED(task))
-               task->tk_active = 1;
+       rpc_set_active(task);
 
        __rpc_add_wait_queue(q, task);
 
@@ -580,14 +605,6 @@ void rpc_exit_task(struct rpc_task *task)
 }
 EXPORT_SYMBOL(rpc_exit_task);
 
-static int rpc_wait_bit_interruptible(void *word)
-{
-       if (signal_pending(current))
-               return -ERESTARTSYS;
-       schedule();
-       return 0;
-}
-
 /*
  * This is the RPC `scheduler' (or rather, the finite state machine).
  */
@@ -680,6 +697,8 @@ static int __rpc_execute(struct rpc_task *task)
        dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
        status = task->tk_status;
 
+       /* Wake up anyone who is waiting for task completion */
+       rpc_mark_complete_task(task);
        /* Release all resources associated with the task */
        rpc_release_task(task);
        return status;
@@ -697,9 +716,7 @@ static int __rpc_execute(struct rpc_task *task)
 int
 rpc_execute(struct rpc_task *task)
 {
-       BUG_ON(task->tk_active);
-
-       task->tk_active = 1;
+       rpc_set_active(task);
        rpc_set_running(task);
        return __rpc_execute(task);
 }
@@ -761,6 +778,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
        init_timer(&task->tk_timer);
        task->tk_timer.data     = (unsigned long) task;
        task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+       atomic_set(&task->tk_count, 1);
        task->tk_client = clnt;
        task->tk_flags  = flags;
        task->tk_ops = tk_ops;
@@ -848,11 +866,13 @@ void rpc_release_task(struct rpc_task *task)
 {
        const struct rpc_call_ops *tk_ops = task->tk_ops;
        void *calldata = task->tk_calldata;
-       dprintk("RPC: %4d release task\n", task->tk_pid);
 
 #ifdef RPC_DEBUG
        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
+       if (!atomic_dec_and_test(&task->tk_count))
+               return;
+       dprintk("RPC: %4d release task\n", task->tk_pid);
 
        /* Remove from global task list */
        spin_lock(&rpc_sched_lock);
@@ -860,7 +880,6 @@ void rpc_release_task(struct rpc_task *task)
        spin_unlock(&rpc_sched_lock);
 
        BUG_ON (RPC_IS_QUEUED(task));
-       task->tk_active = 0;
 
        /* Synchronously delete any running timer */
        rpc_delete_timer(task);
@@ -885,6 +904,27 @@ void rpc_release_task(struct rpc_task *task)
                tk_ops->rpc_release(calldata);
 }
 
+/**
+ * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
+ * @clnt - pointer to RPC client
+ * @flags - RPC flags
+ * @ops - RPC call ops
+ * @data - user call data
+ */
+struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
+                                       const struct rpc_call_ops *ops,
+                                       void *data)
+{
+       struct rpc_task *task;
+       task = rpc_new_task(clnt, flags, ops, data);
+       if (task == NULL)
+               return ERR_PTR(-ENOMEM);
+       atomic_inc(&task->tk_count);
+       rpc_execute(task);
+       return task;
+}
+EXPORT_SYMBOL(rpc_run_task);
+
 /**
  * rpc_find_parent - find the parent of a child task.
  * @child: child task