[DLM] Use workqueues for dlm lowcomms
authorPatrick Caulfield <pcaulfie@redhat.com>
Mon, 15 Jan 2007 14:33:34 +0000 (14:33 +0000)
committerSteven Whitehouse <swhiteho@redhat.com>
Mon, 5 Feb 2007 18:36:52 +0000 (13:36 -0500)
This patch converts the DLM TCP lowcomms to use workqueues rather than using its
own daemon functions. Simultaneously removing a lot of code and making it more
scalable on multi-processor machines.

Signed-Off-By: Patrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
fs/dlm/lowcomms-sctp.c
fs/dlm/lowcomms-tcp.c

index 5aeadadd8afd406d26701b29bb43d1af4c1670c3..dc83a9d979b5285bb5e9704a9d32cef199a5c129 100644 (file)
@@ -72,6 +72,8 @@ struct nodeinfo {
        struct list_head        writequeue; /* outgoing writequeue_entries */
        spinlock_t              writequeue_lock;
        int                     nodeid;
+       struct work_struct      swork; /* Send workqueue */
+       struct work_struct      lwork; /* Locking workqueue */
 };
 
 static DEFINE_IDR(nodeinfo_idr);
@@ -96,6 +98,7 @@ struct connection {
        atomic_t                waiting_requests;
        struct cbuf             cb;
        int                     eagain_flag;
+       struct work_struct      work; /* Send workqueue */
 };
 
 /* An entry waiting to be sent */
@@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n)
 static LIST_HEAD(write_nodes);
 static DEFINE_SPINLOCK(write_nodes_lock);
 
+
 /* Maximum number of incoming messages to process before
  * doing a schedule()
  */
 #define MAX_RX_MSG_COUNT 25
 
-/* Manage daemons */
-static struct task_struct *recv_task;
-static struct task_struct *send_task;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);
+/* Work queues */
+static struct workqueue_struct *recv_workqueue;
+static struct workqueue_struct *send_workqueue;
+static struct workqueue_struct *lock_workqueue;
 
 /* The SCTP connection */
 static struct connection sctp_con;
 
+static void process_send_sockets(struct work_struct *work);
+static void process_recv_sockets(struct work_struct *work);
+static void process_lock_request(struct work_struct *work);
 
 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
 {
@@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
        spin_lock_init(&ni->lock);
        INIT_LIST_HEAD(&ni->writequeue);
        spin_lock_init(&ni->writequeue_lock);
+       INIT_WORK(&ni->lwork, process_lock_request);
+       INIT_WORK(&ni->swork, process_send_sockets);
        ni->nodeid = nodeid;
 
        if (nodeid > max_nodeid)
@@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
 /* Data or notification available on socket */
 static void lowcomms_data_ready(struct sock *sk, int count_unused)
 {
-       atomic_inc(&sctp_con.waiting_requests);
        if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
-               return;
-
-       wake_up_interruptible(&lowcomms_recv_wait);
+               queue_work(recv_workqueue, &sctp_con.work);
 }
 
 
@@ -361,10 +367,10 @@ static void init_failed(void)
                                spin_lock_bh(&write_nodes_lock);
                                list_add_tail(&ni->write_list, &write_nodes);
                                spin_unlock_bh(&write_nodes_lock);
+                               queue_work(send_workqueue, &ni->swork);
                        }
                }
        }
-       wake_up_process(send_task);
 }
 
 /* Something happened to an association */
@@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
                                spin_lock_bh(&write_nodes_lock);
                                list_add_tail(&ni->write_list, &write_nodes);
                                spin_unlock_bh(&write_nodes_lock);
+                               queue_work(send_workqueue, &ni->swork);
                        }
-                       wake_up_process(send_task);
                }
                break;
 
@@ -580,8 +586,8 @@ static int receive_from_sock(void)
                                spin_lock_bh(&write_nodes_lock);
                                list_add_tail(&ni->write_list, &write_nodes);
                                spin_unlock_bh(&write_nodes_lock);
+                               queue_work(send_workqueue, &ni->swork);
                        }
-                       wake_up_process(send_task);
                }
        }
 
@@ -590,6 +596,7 @@ static int receive_from_sock(void)
                return 0;
 
        cbuf_add(&sctp_con.cb, ret);
+       // PJC: TODO: Add to node's workqueue....can we ??
        ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
                                          page_address(sctp_con.rx_page),
                                          sctp_con.cb.base, sctp_con.cb.len,
@@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg)
                spin_lock_bh(&write_nodes_lock);
                list_add_tail(&ni->write_list, &write_nodes);
                spin_unlock_bh(&write_nodes_lock);
-               wake_up_process(send_task);
+
+               queue_work(send_workqueue, &ni->swork);
        }
        return;
 
@@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid)
        return 0;
 }
 
-static int write_list_empty(void)
+// PJC: The work queue function for receiving.
+static void process_recv_sockets(struct work_struct *work)
 {
-       int status;
+       if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
+               int ret;
+               int count = 0;
 
-       spin_lock_bh(&write_nodes_lock);
-       status = list_empty(&write_nodes);
-       spin_unlock_bh(&write_nodes_lock);
+               do {
+                       ret = receive_from_sock();
 
-       return status;
+                       /* Don't starve out everyone else */
+                       if (++count >= MAX_RX_MSG_COUNT) {
+                               cond_resched();
+                               count = 0;
+                       }
+               } while (!kthread_should_stop() && ret >=0);
+       }
+       cond_resched();
 }
 
-static int dlm_recvd(void *data)
+// PJC: the work queue function for sending
+static void process_send_sockets(struct work_struct *work)
 {
-       DECLARE_WAITQUEUE(wait, current);
-
-       while (!kthread_should_stop()) {
-               int count = 0;
-
-               set_current_state(TASK_INTERRUPTIBLE);
-               add_wait_queue(&lowcomms_recv_wait, &wait);
-               if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
-                       schedule();
-               remove_wait_queue(&lowcomms_recv_wait, &wait);
-               set_current_state(TASK_RUNNING);
-
-               if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
-                       int ret;
-
-                       do {
-                               ret = receive_from_sock();
-
-                               /* Don't starve out everyone else */
-                               if (++count >= MAX_RX_MSG_COUNT) {
-                                       cond_resched();
-                                       count = 0;
-                               }
-                       } while (!kthread_should_stop() && ret >=0);
-               }
-               cond_resched();
+       if (sctp_con.eagain_flag) {
+               sctp_con.eagain_flag = 0;
+               refill_write_queue();
        }
-
-       return 0;
+       process_output_queue();
 }
 
-static int dlm_sendd(void *data)
+// PJC: Process lock requests from a particular node.
+// TODO: can we optimise this out on UP ??
+static void process_lock_request(struct work_struct *work)
 {
-       DECLARE_WAITQUEUE(wait, current);
-
-       add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
-
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (write_list_empty())
-                       schedule();
-               set_current_state(TASK_RUNNING);
-
-               if (sctp_con.eagain_flag) {
-                       sctp_con.eagain_flag = 0;
-                       refill_write_queue();
-               }
-               process_output_queue();
-       }
-
-       remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
-
-       return 0;
 }
 
 static void daemons_stop(void)
 {
-       kthread_stop(recv_task);
-       kthread_stop(send_task);
+       destroy_workqueue(recv_workqueue);
+       destroy_workqueue(send_workqueue);
+       destroy_workqueue(lock_workqueue);
 }
 
 static int daemons_start(void)
 {
-       struct task_struct *p;
        int error;
+       recv_workqueue = create_workqueue("dlm_recv");
+       error = IS_ERR(recv_workqueue);
+       if (error) {
+               log_print("can't start dlm_recv %d", error);
+               return error;
+       }
 
-       p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
-       error = IS_ERR(p);
+       send_workqueue = create_singlethread_workqueue("dlm_send");
+       error = IS_ERR(send_workqueue);
        if (error) {
-               log_print("can't start dlm_recvd %d", error);
+               log_print("can't start dlm_send %d", error);
+               destroy_workqueue(recv_workqueue);
                return error;
        }
-       recv_task = p;
 
-       p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
-       error = IS_ERR(p);
+       lock_workqueue = create_workqueue("dlm_rlock");
+       error = IS_ERR(lock_workqueue);
        if (error) {
-               log_print("can't start dlm_sendd %d", error);
-               kthread_stop(recv_task);
+               log_print("can't start dlm_rlock %d", error);
+               destroy_workqueue(send_workqueue);
+               destroy_workqueue(recv_workqueue);
                return error;
        }
-       send_task = p;
 
        return 0;
 }
@@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void)
 {
        int error;
 
+       INIT_WORK(&sctp_con.work, process_recv_sockets);
+
        error = init_sock();
        if (error)
                goto fail_sock;
@@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void)
        for (i = 0; i < dlm_local_count; i++)
                kfree(dlm_local_addr[i]);
 }
-
index b4fb5783ef8acf8d69b6a3915a2bd2e9149fc477..86e5f81da7cbf0edcf4fc55a33d8cc9a70b93294 100644 (file)
@@ -115,6 +115,8 @@ struct connection {
        atomic_t waiting_requests;
 #define MAX_CONNECT_RETRIES 3
        struct connection *othercon;
+       struct work_struct rwork; /* Receive workqueue */
+       struct work_struct swork; /* Send workqueue */
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
 
@@ -131,14 +133,9 @@ struct writequeue_entry {
 
 static struct sockaddr_storage dlm_local_addr;
 
-/* Manage daemons */
-static struct task_struct *recv_task;
-static struct task_struct *send_task;
-
-static wait_queue_t lowcomms_send_waitq_head;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
-static wait_queue_t lowcomms_recv_waitq_head;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
+/* Work queues */
+static struct workqueue_struct *recv_workqueue;
+static struct workqueue_struct *send_workqueue;
 
 /* An array of pointers to connections, indexed by NODEID */
 static struct connection **connections;
@@ -146,17 +143,8 @@ static DECLARE_MUTEX(connections_lock);
 static struct kmem_cache *con_cache;
 static int conn_array_size;
 
-/* List of sockets that have reads pending */
-static LIST_HEAD(read_sockets);
-static DEFINE_SPINLOCK(read_sockets_lock);
-
-/* List of sockets which have writes pending */
-static LIST_HEAD(write_sockets);
-static DEFINE_SPINLOCK(write_sockets_lock);
-
-/* List of sockets which have connects pending */
-static LIST_HEAD(state_sockets);
-static DEFINE_SPINLOCK(state_sockets_lock);
+static void process_recv_sockets(struct work_struct *work);
+static void process_send_sockets(struct work_struct *work);
 
 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 {
@@ -189,6 +177,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
                init_rwsem(&con->sock_sem);
                INIT_LIST_HEAD(&con->writequeue);
                spin_lock_init(&con->writequeue_lock);
+               INIT_WORK(&con->swork, process_send_sockets);
+               INIT_WORK(&con->rwork, process_recv_sockets);
 
                connections[nodeid] = con;
        }
@@ -203,41 +193,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused)
 {
        struct connection *con = sock2con(sk);
 
-       atomic_inc(&con->waiting_requests);
-       if (test_and_set_bit(CF_READ_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&read_sockets_lock);
-       list_add_tail(&con->read_list, &read_sockets);
-       spin_unlock_bh(&read_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_recv_waitq);
+       if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
+               queue_work(recv_workqueue, &con->rwork);
 }
 
 static void lowcomms_write_space(struct sock *sk)
 {
        struct connection *con = sock2con(sk);
 
-       if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&write_sockets_lock);
-       list_add_tail(&con->write_list, &write_sockets);
-       spin_unlock_bh(&write_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
+               queue_work(send_workqueue, &con->swork);
 }
 
 static inline void lowcomms_connect_sock(struct connection *con)
 {
-       if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&state_sockets_lock);
-       list_add_tail(&con->state_list, &state_sockets);
-       spin_unlock_bh(&state_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
+               queue_work(send_workqueue, &con->swork);
 }
 
 static void lowcomms_state_change(struct sock *sk)
@@ -388,7 +359,8 @@ out:
        return 0;
 
 out_resched:
-       lowcomms_data_ready(con->sock->sk, 0);
+       if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
+               queue_work(recv_workqueue, &con->rwork);
        up_read(&con->sock_sem);
        cond_resched();
        return 0;
@@ -477,6 +449,8 @@ static int accept_from_sock(struct connection *con)
                        othercon->nodeid = nodeid;
                        othercon->rx_action = receive_from_sock;
                        init_rwsem(&othercon->sock_sem);
+                       INIT_WORK(&othercon->swork, process_send_sockets);
+                       INIT_WORK(&othercon->rwork, process_recv_sockets);
                        set_bit(CF_IS_OTHERCON, &othercon->flags);
                        newcon->othercon = othercon;
                }
@@ -498,7 +472,8 @@ static int accept_from_sock(struct connection *con)
         * beween processing the accept adding the socket
         * to the read_sockets list
         */
-       lowcomms_data_ready(newsock->sk, 0);
+       if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags))
+               queue_work(recv_workqueue, &newcon->rwork);
        up_read(&con->sock_sem);
 
        return 0;
@@ -757,12 +732,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
        kunmap(e->page);
        spin_unlock(&con->writequeue_lock);
 
-       if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
-               spin_lock_bh(&write_sockets_lock);
-               list_add_tail(&con->write_list, &write_sockets);
-               spin_unlock_bh(&write_sockets_lock);
-
-               wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
+               queue_work(send_workqueue, &con->swork);
        }
        return;
 
@@ -803,6 +774,7 @@ static void send_to_sock(struct connection *con)
                offset = e->offset;
                BUG_ON(len == 0 && e->users == 0);
                spin_unlock(&con->writequeue_lock);
+               kmap(e->page);
 
                ret = 0;
                if (len) {
@@ -884,85 +856,29 @@ out:
 }
 
 /* Look for activity on active sockets */
-static void process_sockets(void)
+static void process_recv_sockets(struct work_struct *work)
 {
-       struct list_head *list;
-       struct list_head *temp;
-       int count = 0;
-
-       spin_lock_bh(&read_sockets_lock);
-       list_for_each_safe(list, temp, &read_sockets) {
-
-               struct connection *con =
-                       list_entry(list, struct connection, read_list);
-               list_del(&con->read_list);
-               clear_bit(CF_READ_PENDING, &con->flags);
-
-               spin_unlock_bh(&read_sockets_lock);
+       struct connection *con = container_of(work, struct connection, rwork);
+       int err;
 
-               /* This can reach zero if we are processing requests
-                * as they come in.
-                */
-               if (atomic_read(&con->waiting_requests) == 0) {
-                       spin_lock_bh(&read_sockets_lock);
-                       continue;
-               }
-
-               do {
-                       con->rx_action(con);
-
-                       /* Don't starve out everyone else */
-                       if (++count >= MAX_RX_MSG_COUNT) {
-                               cond_resched();
-                               count = 0;
-                       }
-
-               } while (!atomic_dec_and_test(&con->waiting_requests) &&
-                        !kthread_should_stop());
-
-               spin_lock_bh(&read_sockets_lock);
-       }
-       spin_unlock_bh(&read_sockets_lock);
+       clear_bit(CF_READ_PENDING, &con->flags);
+       do {
+               err = con->rx_action(con);
+       } while (!err);
 }
 
-/* Try to send any messages that are pending
- */
-static void process_output_queue(void)
-{
-       struct list_head *list;
-       struct list_head *temp;
-
-       spin_lock_bh(&write_sockets_lock);
-       list_for_each_safe(list, temp, &write_sockets) {
-               struct connection *con =
-                       list_entry(list, struct connection, write_list);
-               clear_bit(CF_WRITE_PENDING, &con->flags);
-               list_del(&con->write_list);
-
-               spin_unlock_bh(&write_sockets_lock);
-               send_to_sock(con);
-               spin_lock_bh(&write_sockets_lock);
-       }
-       spin_unlock_bh(&write_sockets_lock);
-}
 
-static void process_state_queue(void)
+static void process_send_sockets(struct work_struct *work)
 {
-       struct list_head *list;
-       struct list_head *temp;
-
-       spin_lock_bh(&state_sockets_lock);
-       list_for_each_safe(list, temp, &state_sockets) {
-               struct connection *con =
-                       list_entry(list, struct connection, state_list);
-               list_del(&con->state_list);
-               clear_bit(CF_CONNECT_PENDING, &con->flags);
-               spin_unlock_bh(&state_sockets_lock);
+       struct connection *con = container_of(work, struct connection, swork);
 
+       if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
                connect_to_sock(con);
-               spin_lock_bh(&state_sockets_lock);
        }
-       spin_unlock_bh(&state_sockets_lock);
+
+       if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) {
+               send_to_sock(con);
+       }
 }
 
 
@@ -979,97 +895,29 @@ static void clean_writequeues(void)
        }
 }
 
-static int read_list_empty(void)
-{
-       int status;
-
-       spin_lock_bh(&read_sockets_lock);
-       status = list_empty(&read_sockets);
-       spin_unlock_bh(&read_sockets_lock);
-
-       return status;
-}
-
-/* DLM Transport comms receive daemon */
-static int dlm_recvd(void *data)
+static void work_stop(void)
 {
-       init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
-       add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
-
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (read_list_empty())
-                       schedule();
-               set_current_state(TASK_RUNNING);
-
-               process_sockets();
-       }
-
-       return 0;
+       destroy_workqueue(recv_workqueue);
+       destroy_workqueue(send_workqueue);
 }
 
-static int write_and_state_lists_empty(void)
+static int work_start(void)
 {
-       int status;
-
-       spin_lock_bh(&write_sockets_lock);
-       status = list_empty(&write_sockets);
-       spin_unlock_bh(&write_sockets_lock);
-
-       spin_lock_bh(&state_sockets_lock);
-       if (list_empty(&state_sockets) == 0)
-               status = 0;
-       spin_unlock_bh(&state_sockets_lock);
-
-       return status;
-}
-
-/* DLM Transport send daemon */
-static int dlm_sendd(void *data)
-{
-       init_waitqueue_entry(&lowcomms_send_waitq_head, current);
-       add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
-
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (write_and_state_lists_empty())
-                       schedule();
-               set_current_state(TASK_RUNNING);
-
-               process_state_queue();
-               process_output_queue();
-       }
-
-       return 0;
-}
-
-static void daemons_stop(void)
-{
-       kthread_stop(recv_task);
-       kthread_stop(send_task);
-}
-
-static int daemons_start(void)
-{
-       struct task_struct *p;
        int error;
-
-       p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
-       error = IS_ERR(p);
+       recv_workqueue = create_workqueue("dlm_recv");
+       error = IS_ERR(recv_workqueue);
        if (error) {
-               log_print("can't start dlm_recvd %d", error);
+               log_print("can't start dlm_recv %d", error);
                return error;
        }
-       recv_task = p;
 
-       p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
-       error = IS_ERR(p);
+       send_workqueue = create_singlethread_workqueue("dlm_send");
+       error = IS_ERR(send_workqueue);
        if (error) {
-               log_print("can't start dlm_sendd %d", error);
-               kthread_stop(recv_task);
+               log_print("can't start dlm_send %d", error);
+               destroy_workqueue(recv_workqueue);
                return error;
        }
-       send_task = p;
 
        return 0;
 }
@@ -1086,7 +934,7 @@ void dlm_lowcomms_stop(void)
                        connections[i]->flags |= 0xFF;
        }
 
-       daemons_stop();
+       work_stop();
        clean_writequeues();
 
        for (i = 0; i < conn_array_size; i++) {
@@ -1138,7 +986,7 @@ int dlm_lowcomms_start(void)
        if (error)
                goto fail_unlisten;
 
-       error = daemons_start();
+       error = work_start();
        if (error)
                goto fail_unlisten;