netvsc: optimize receive completions
authorstephen hemminger <stephen@networkplumber.org>
Fri, 28 Jul 2017 15:59:45 +0000 (08:59 -0700)
committerDavid S. Miller <davem@davemloft.net>
Sat, 29 Jul 2017 22:25:43 +0000 (15:25 -0700)
Optimize how receive completion ring are managed.
   * Allocate only as many slots as needed for all buffers from host
   * Allocate before setting up sub channel for better error detection
   * Don't need to keep copy of initial receive section message
   * Precompute the watermark for when receive flushing is needed
   * Replace division with conditional test
   * Replace atomic per-device variable with per-channel check.
   * Handle corner case where receive completion send
     fails if ring buffer to host is full.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
drivers/net/hyperv/hyperv_net.h
drivers/net/hyperv/netvsc.c
drivers/net/hyperv/rndis_filter.c

index 9ca3ed692d7322a0d6f372e42a921f5a1fccfa25..f2cef5aaed1f4fdc47b9b88d03ee754ef81108f2 100644 (file)
@@ -186,6 +186,7 @@ struct net_device_context;
 
 struct netvsc_device *netvsc_device_add(struct hv_device *device,
                                        const struct netvsc_device_info *info);
+int netvsc_alloc_recv_comp_ring(struct netvsc_device *net_device, u32 q_idx);
 void netvsc_device_remove(struct hv_device *device);
 int netvsc_send(struct net_device_context *ndc,
                struct hv_netvsc_packet *packet,
@@ -657,13 +658,10 @@ struct recv_comp_data {
        u32 status;
 };
 
-/* Netvsc Receive Slots Max */
-#define NETVSC_RECVSLOT_MAX (NETVSC_RECEIVE_BUFFER_SIZE / ETH_DATA_LEN + 1)
-
 struct multi_recv_comp {
-       void *buf; /* queued receive completions */
-       u32 first; /* first data entry */
-       u32 next; /* next entry for writing */
+       struct recv_comp_data *slots;
+       u32 first;      /* first data entry */
+       u32 next;       /* next entry for writing */
 };
 
 struct netvsc_stats {
@@ -750,7 +748,7 @@ struct netvsc_device {
        u32 recv_buf_size;
        u32 recv_buf_gpadl_handle;
        u32 recv_section_cnt;
-       struct nvsp_1_receive_buffer_section *recv_section;
+       u32 recv_completion_cnt;
 
        /* Send buffer allocated by us */
        void *send_buf;
@@ -778,8 +776,6 @@ struct netvsc_device {
        u32 max_pkt; /* max number of pkt in one send, e.g. 8 */
        u32 pkt_align; /* alignment bytes, e.g. 8 */
 
-       atomic_t num_outstanding_recvs;
-
        atomic_t open_cnt;
 
        struct netvsc_channel chan_table[VRSS_CHANNEL_MAX];
index d3c0b19f6d346aff833cffe3c66852ec37a26be0..4c709b454d3481a28e0d9cf80f5e88ec46d02f6b 100644 (file)
@@ -72,9 +72,6 @@ static struct netvsc_device *alloc_net_device(void)
        if (!net_device)
                return NULL;
 
-       net_device->chan_table[0].mrc.buf
-               = vzalloc(NETVSC_RECVSLOT_MAX * sizeof(struct recv_comp_data));
-
        init_waitqueue_head(&net_device->wait_drain);
        net_device->destroy = false;
        atomic_set(&net_device->open_cnt, 0);
@@ -92,7 +89,7 @@ static void free_netvsc_device(struct rcu_head *head)
        int i;
 
        for (i = 0; i < VRSS_CHANNEL_MAX; i++)
-               vfree(nvdev->chan_table[i].mrc.buf);
+               vfree(nvdev->chan_table[i].mrc.slots);
 
        kfree(nvdev);
 }
@@ -171,12 +168,6 @@ static void netvsc_destroy_buf(struct hv_device *device)
                net_device->recv_buf = NULL;
        }
 
-       if (net_device->recv_section) {
-               net_device->recv_section_cnt = 0;
-               kfree(net_device->recv_section);
-               net_device->recv_section = NULL;
-       }
-
        /* Deal with the send buffer we may have setup.
         * If we got a  send section size, it means we received a
         * NVSP_MSG1_TYPE_SEND_SEND_BUF_COMPLETE msg (ie sent
@@ -239,11 +230,26 @@ static void netvsc_destroy_buf(struct hv_device *device)
        kfree(net_device->send_section_map);
 }
 
+int netvsc_alloc_recv_comp_ring(struct netvsc_device *net_device, u32 q_idx)
+{
+       struct netvsc_channel *nvchan = &net_device->chan_table[q_idx];
+       int node = cpu_to_node(nvchan->channel->target_cpu);
+       size_t size;
+
+       size = net_device->recv_completion_cnt * sizeof(struct recv_comp_data);
+       nvchan->mrc.slots = vzalloc_node(size, node);
+       if (!nvchan->mrc.slots)
+               nvchan->mrc.slots = vzalloc(size);
+
+       return nvchan->mrc.slots ? 0 : -ENOMEM;
+}
+
 static int netvsc_init_buf(struct hv_device *device,
                           struct netvsc_device *net_device)
 {
        int ret = 0;
        struct nvsp_message *init_packet;
+       struct nvsp_1_message_send_receive_buffer_complete *resp;
        struct net_device *ndev;
        size_t map_words;
        int node;
@@ -300,43 +306,41 @@ static int netvsc_init_buf(struct hv_device *device,
        wait_for_completion(&net_device->channel_init_wait);
 
        /* Check the response */
-       if (init_packet->msg.v1_msg.
-           send_recv_buf_complete.status != NVSP_STAT_SUCCESS) {
-               netdev_err(ndev, "Unable to complete receive buffer "
-                          "initialization with NetVsp - status %d\n",
-                          init_packet->msg.v1_msg.
-                          send_recv_buf_complete.status);
+       resp = &init_packet->msg.v1_msg.send_recv_buf_complete;
+       if (resp->status != NVSP_STAT_SUCCESS) {
+               netdev_err(ndev,
+                          "Unable to complete receive buffer initialization with NetVsp - status %d\n",
+                          resp->status);
                ret = -EINVAL;
                goto cleanup;
        }
 
        /* Parse the response */
+       netdev_dbg(ndev, "Receive sections: %u sub_allocs: size %u count: %u\n",
+                  resp->num_sections, resp->sections[0].sub_alloc_size,
+                  resp->sections[0].num_sub_allocs);
 
-       net_device->recv_section_cnt = init_packet->msg.
-               v1_msg.send_recv_buf_complete.num_sections;
-
-       net_device->recv_section = kmemdup(
-               init_packet->msg.v1_msg.send_recv_buf_complete.sections,
-               net_device->recv_section_cnt *
-               sizeof(struct nvsp_1_receive_buffer_section),
-               GFP_KERNEL);
-       if (net_device->recv_section == NULL) {
-               ret = -EINVAL;
-               goto cleanup;
-       }
+       net_device->recv_section_cnt = resp->num_sections;
 
        /*
         * For 1st release, there should only be 1 section that represents the
         * entire receive buffer
         */
        if (net_device->recv_section_cnt != 1 ||
-           net_device->recv_section->offset != 0) {
+           resp->sections[0].offset != 0) {
                ret = -EINVAL;
                goto cleanup;
        }
 
-       /* Now setup the send buffer.
-        */
+       /* Setup receive completion ring */
+       net_device->recv_completion_cnt
+               = round_up(resp->sections[0].num_sub_allocs + 1,
+                          PAGE_SIZE / sizeof(u64));
+       ret = netvsc_alloc_recv_comp_ring(net_device, 0);
+       if (ret)
+               goto cleanup;
+
+       /* Now setup the send buffer. */
        net_device->send_buf = vzalloc_node(net_device->send_buf_size, node);
        if (!net_device->send_buf)
                net_device->send_buf = vzalloc(net_device->send_buf_size);
@@ -951,130 +955,94 @@ send_now:
        return ret;
 }
 
-static int netvsc_send_recv_completion(struct vmbus_channel *channel,
-                                      u64 transaction_id, u32 status)
+/* Send pending recv completions */
+static int send_recv_completions(struct netvsc_channel *nvchan)
 {
-       struct nvsp_message recvcompMessage;
+       struct netvsc_device *nvdev = nvchan->net_device;
+       struct multi_recv_comp *mrc = &nvchan->mrc;
+       struct recv_comp_msg {
+               struct nvsp_message_header hdr;
+               u32 status;
+       }  __packed;
+       struct recv_comp_msg msg = {
+               .hdr.msg_type = NVSP_MSG1_TYPE_SEND_RNDIS_PKT_COMPLETE,
+       };
        int ret;
 
-       recvcompMessage.hdr.msg_type =
-                               NVSP_MSG1_TYPE_SEND_RNDIS_PKT_COMPLETE;
-
-       recvcompMessage.msg.v1_msg.send_rndis_pkt_complete.status = status;
-
-       /* Send the completion */
-       ret = vmbus_sendpacket(channel, &recvcompMessage,
-                              sizeof(struct nvsp_message_header) + sizeof(u32),
-                              transaction_id, VM_PKT_COMP, 0);
+       while (mrc->first != mrc->next) {
+               const struct recv_comp_data *rcd
+                       = mrc->slots + mrc->first;
 
-       return ret;
-}
-
-static inline void count_recv_comp_slot(struct netvsc_device *nvdev, u16 q_idx,
-                                       u32 *filled, u32 *avail)
-{
-       struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
-       u32 first = mrc->first;
-       u32 next = mrc->next;
+               msg.status = rcd->status;
+               ret = vmbus_sendpacket(nvchan->channel, &msg, sizeof(msg),
+                                      rcd->tid, VM_PKT_COMP, 0);
+               if (unlikely(ret))
+                       return ret;
 
-       *filled = (first > next) ? NETVSC_RECVSLOT_MAX - first + next :
-                 next - first;
-
-       *avail = NETVSC_RECVSLOT_MAX - *filled - 1;
-}
-
-/* Read the first filled slot, no change to index */
-static inline struct recv_comp_data *read_recv_comp_slot(struct netvsc_device
-                                                        *nvdev, u16 q_idx)
-{
-       struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
-       u32 filled, avail;
-
-       if (unlikely(!mrc->buf))
-               return NULL;
+               if (++mrc->first == nvdev->recv_completion_cnt)
+                       mrc->first = 0;
+       }
 
-       count_recv_comp_slot(nvdev, q_idx, &filled, &avail);
-       if (!filled)
-               return NULL;
+       /* receive completion ring has been emptied */
+       if (unlikely(nvdev->destroy))
+               wake_up(&nvdev->wait_drain);
 
-       return mrc->buf + mrc->first * sizeof(struct recv_comp_data);
+       return 0;
 }
 
-/* Put the first filled slot back to available pool */
-static inline void put_recv_comp_slot(struct netvsc_device *nvdev, u16 q_idx)
+/* Count how many receive completions are outstanding */
+static void recv_comp_slot_avail(const struct netvsc_device *nvdev,
+                                const struct multi_recv_comp *mrc,
+                                u32 *filled, u32 *avail)
 {
-       struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
-       int num_recv;
+       u32 count = nvdev->recv_completion_cnt;
 
-       mrc->first = (mrc->first + 1) % NETVSC_RECVSLOT_MAX;
-
-       num_recv = atomic_dec_return(&nvdev->num_outstanding_recvs);
+       if (mrc->next >= mrc->first)
+               *filled = mrc->next - mrc->first;
+       else
+               *filled = (count - mrc->first) + mrc->next;
 
-       if (nvdev->destroy && num_recv == 0)
-               wake_up(&nvdev->wait_drain);
+       *avail = count - *filled - 1;
 }
 
-/* Check and send pending recv completions */
-static void netvsc_chk_recv_comp(struct netvsc_device *nvdev,
-                                struct vmbus_channel *channel, u16 q_idx)
+/* Add receive complete to ring to send to host. */
+static void enq_receive_complete(struct net_device *ndev,
+                                struct netvsc_device *nvdev, u16 q_idx,
+                                u64 tid, u32 status)
 {
+       struct netvsc_channel *nvchan = &nvdev->chan_table[q_idx];
+       struct multi_recv_comp *mrc = &nvchan->mrc;
        struct recv_comp_data *rcd;
-       int ret;
-
-       while (true) {
-               rcd = read_recv_comp_slot(nvdev, q_idx);
-               if (!rcd)
-                       break;
+       u32 filled, avail;
 
-               ret = netvsc_send_recv_completion(channel, rcd->tid,
-                                                 rcd->status);
-               if (ret)
-                       break;
+       recv_comp_slot_avail(nvdev, mrc, &filled, &avail);
 
-               put_recv_comp_slot(nvdev, q_idx);
+       if (unlikely(filled > NAPI_POLL_WEIGHT)) {
+               send_recv_completions(nvchan);
+               recv_comp_slot_avail(nvdev, mrc, &filled, &avail);
        }
-}
-
-#define NETVSC_RCD_WATERMARK 80
 
-/* Get next available slot */
-static inline struct recv_comp_data *get_recv_comp_slot(
-       struct netvsc_device *nvdev, struct vmbus_channel *channel, u16 q_idx)
-{
-       struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
-       u32 filled, avail, next;
-       struct recv_comp_data *rcd;
-
-       if (unlikely(!nvdev->recv_section))
-               return NULL;
-
-       if (unlikely(!mrc->buf))
-               return NULL;
-
-       if (atomic_read(&nvdev->num_outstanding_recvs) >
-           nvdev->recv_section->num_sub_allocs * NETVSC_RCD_WATERMARK / 100)
-               netvsc_chk_recv_comp(nvdev, channel, q_idx);
-
-       count_recv_comp_slot(nvdev, q_idx, &filled, &avail);
-       if (!avail)
-               return NULL;
-
-       next = mrc->next;
-       rcd = mrc->buf + next * sizeof(struct recv_comp_data);
-       mrc->next = (next + 1) % NETVSC_RECVSLOT_MAX;
+       if (unlikely(!avail)) {
+               netdev_err(ndev, "Recv_comp full buf q:%hd, tid:%llx\n",
+                          q_idx, tid);
+               return;
+       }
 
-       atomic_inc(&nvdev->num_outstanding_recvs);
+       rcd = mrc->slots + mrc->next;
+       rcd->tid = tid;
+       rcd->status = status;
 
-       return rcd;
+       if (++mrc->next == nvdev->recv_completion_cnt)
+               mrc->next = 0;
 }
 
 static int netvsc_receive(struct net_device *ndev,
-                  struct netvsc_device *net_device,
-                  struct net_device_context *net_device_ctx,
-                  struct hv_device *device,
-                  struct vmbus_channel *channel,
-                  const struct vmpacket_descriptor *desc,
-                  struct nvsp_message *nvsp)
+                         struct netvsc_device *net_device,
+                         struct net_device_context *net_device_ctx,
+                         struct hv_device *device,
+                         struct vmbus_channel *channel,
+                         const struct vmpacket_descriptor *desc,
+                         struct nvsp_message *nvsp)
 {
        const struct vmtransfer_page_packet_header *vmxferpage_packet
                = container_of(desc, const struct vmtransfer_page_packet_header, d);
@@ -1083,7 +1051,6 @@ static int netvsc_receive(struct net_device *ndev,
        u32 status = NVSP_STAT_SUCCESS;
        int i;
        int count = 0;
-       int ret;
 
        /* Make sure this is a valid nvsp packet */
        if (unlikely(nvsp->hdr.msg_type != NVSP_MSG1_TYPE_SEND_RNDIS_PKT)) {
@@ -1114,25 +1081,9 @@ static int netvsc_receive(struct net_device *ndev,
                                              channel, data, buflen);
        }
 
-       if (net_device->chan_table[q_idx].mrc.buf) {
-               struct recv_comp_data *rcd;
+       enq_receive_complete(ndev, net_device, q_idx,
+                            vmxferpage_packet->d.trans_id, status);
 
-               rcd = get_recv_comp_slot(net_device, channel, q_idx);
-               if (rcd) {
-                       rcd->tid = vmxferpage_packet->d.trans_id;
-                       rcd->status = status;
-               } else {
-                       netdev_err(ndev, "Recv_comp full buf q:%hd, tid:%llx\n",
-                                  q_idx, vmxferpage_packet->d.trans_id);
-               }
-       } else {
-               ret = netvsc_send_recv_completion(channel,
-                                                 vmxferpage_packet->d.trans_id,
-                                                 status);
-               if (ret)
-                       netdev_err(ndev, "Recv_comp q:%hd, tid:%llx, err:%d\n",
-                                  q_idx, vmxferpage_packet->d.trans_id, ret);
-       }
        return count;
 }
 
@@ -1231,7 +1182,6 @@ int netvsc_poll(struct napi_struct *napi, int budget)
        struct netvsc_device *net_device = nvchan->net_device;
        struct vmbus_channel *channel = nvchan->channel;
        struct hv_device *device = netvsc_channel_to_device(channel);
-       u16 q_idx = channel->offermsg.offer.sub_channel_index;
        struct net_device *ndev = hv_get_drvdata(device);
        int work_done = 0;
 
@@ -1245,17 +1195,18 @@ int netvsc_poll(struct napi_struct *napi, int budget)
                nvchan->desc = hv_pkt_iter_next(channel, nvchan->desc);
        }
 
-       /* If receive ring was exhausted
-        * and not doing busy poll
-        * then re-enable host interrupts
-        *  and reschedule if ring is not empty.
+       /* If send of  pending receive completions suceeded
+        *   and did not exhaust NAPI budget
+        *   and not doing busy poll
+        * then reschedule if more data has arrived from host
         */
-       if (work_done < budget &&
+       if (send_recv_completions(nvchan) == 0 &&
+           work_done < budget &&
            napi_complete_done(napi, work_done) &&
-           hv_end_read(&channel->inbound) != 0)
+           hv_end_read(&channel->inbound)) {
+               hv_begin_read(&channel->inbound);
                napi_reschedule(napi);
-
-       netvsc_chk_recv_comp(net_device, channel, q_idx);
+       }
 
        /* Driver may overshoot since multiple packets per descriptor */
        return min(work_done, budget);
index d80e9e3f433e2c2fe05a255dee7b5de584c4803a..44165fe328a498f970ebf26eeb2eb083cf5c7547 100644 (file)
@@ -928,12 +928,12 @@ static bool netvsc_device_idle(const struct netvsc_device *nvdev)
 {
        int i;
 
-       if (atomic_read(&nvdev->num_outstanding_recvs) > 0)
-               return false;
-
        for (i = 0; i < nvdev->num_chn; i++) {
                const struct netvsc_channel *nvchan = &nvdev->chan_table[i];
 
+               if (nvchan->mrc.first != nvchan->mrc.next)
+                       return false;
+
                if (atomic_read(&nvchan->queue_sends) > 0)
                        return false;
        }
@@ -1031,11 +1031,6 @@ static void netvsc_sc_open(struct vmbus_channel *new_sc)
                return;
 
        nvchan = nvscdev->chan_table + chn_index;
-       nvchan->mrc.buf
-               = vzalloc(NETVSC_RECVSLOT_MAX * sizeof(struct recv_comp_data));
-
-       if (!nvchan->mrc.buf)
-               return;
 
        /* Because the device uses NAPI, all the interrupt batching and
         * control is done via Net softirq, not the channel handling
@@ -1225,6 +1220,15 @@ struct netvsc_device *rndis_filter_device_add(struct hv_device *dev,
        if (num_rss_qs == 0)
                return net_device;
 
+       for (i = 1; i < net_device->num_chn; i++) {
+               ret = netvsc_alloc_recv_comp_ring(net_device, i);
+               if (ret) {
+                       while (--i != 0)
+                               vfree(net_device->chan_table[i].mrc.slots);
+                       goto out;
+               }
+       }
+
        refcount_set(&net_device->sc_offered, num_rss_qs);
        vmbus_set_sc_create_callback(dev->channel, netvsc_sc_open);