rxrpc: Rewrite the data and ack handling code
[GitHub/moto-9609/android_kernel_motorola_exynos9610.git] / net / rxrpc / call_accept.c
index cc7194e05a151e163ed8a9386326b82cc8234bca..b8acec0d596e3b2e71ff77524cdf8cdf2b81d3d0 100644 (file)
@@ -129,6 +129,8 @@ static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx,
                set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
        }
 
+       list_add(&call->sock_link, &rx->sock_calls);
+
        write_unlock(&rx->call_lock);
 
        write_lock(&rxrpc_call_lock);
@@ -186,6 +188,12 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
                return;
        rx->backlog = NULL;
 
+       /* Make sure that there aren't any incoming calls in progress before we
+        * clear the preallocation buffers.
+        */
+       spin_lock_bh(&rx->incoming_lock);
+       spin_unlock_bh(&rx->incoming_lock);
+
        head = b->peer_backlog_head;
        tail = b->peer_backlog_tail;
        while (CIRC_CNT(head, tail, size) > 0) {
@@ -224,251 +232,179 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
 }
 
 /*
- * generate a connection-level abort
+ * Allocate a new incoming call from the prealloc pool, along with a connection
+ * and a peer as necessary.
  */
-static int rxrpc_busy(struct rxrpc_local *local, struct sockaddr_rxrpc *srx,
-                     struct rxrpc_wire_header *whdr)
+static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
+                                                   struct rxrpc_local *local,
+                                                   struct rxrpc_connection *conn,
+                                                   struct sk_buff *skb)
 {
-       struct msghdr msg;
-       struct kvec iov[1];
-       size_t len;
-       int ret;
-
-       _enter("%d,,", local->debug_id);
-
-       whdr->type      = RXRPC_PACKET_TYPE_BUSY;
-       whdr->serial    = htonl(1);
-
-       msg.msg_name    = &srx->transport.sin;
-       msg.msg_namelen = sizeof(srx->transport.sin);
-       msg.msg_control = NULL;
-       msg.msg_controllen = 0;
-       msg.msg_flags   = 0;
-
-       iov[0].iov_base = whdr;
-       iov[0].iov_len  = sizeof(*whdr);
-
-       len = iov[0].iov_len;
-
-       _proto("Tx BUSY %%1");
+       struct rxrpc_backlog *b = rx->backlog;
+       struct rxrpc_peer *peer, *xpeer;
+       struct rxrpc_call *call;
+       unsigned short call_head, conn_head, peer_head;
+       unsigned short call_tail, conn_tail, peer_tail;
+       unsigned short call_count, conn_count;
+
+       /* #calls >= #conns >= #peers must hold true. */
+       call_head = smp_load_acquire(&b->call_backlog_head);
+       call_tail = b->call_backlog_tail;
+       call_count = CIRC_CNT(call_head, call_tail, RXRPC_BACKLOG_MAX);
+       conn_head = smp_load_acquire(&b->conn_backlog_head);
+       conn_tail = b->conn_backlog_tail;
+       conn_count = CIRC_CNT(conn_head, conn_tail, RXRPC_BACKLOG_MAX);
+       ASSERTCMP(conn_count, >=, call_count);
+       peer_head = smp_load_acquire(&b->peer_backlog_head);
+       peer_tail = b->peer_backlog_tail;
+       ASSERTCMP(CIRC_CNT(peer_head, peer_tail, RXRPC_BACKLOG_MAX), >=,
+                 conn_count);
+
+       if (call_count == 0)
+               return NULL;
+
+       if (!conn) {
+               /* No connection.  We're going to need a peer to start off
+                * with.  If one doesn't yet exist, use a spare from the
+                * preallocation set.  We dump the address into the spare in
+                * anticipation - and to save on stack space.
+                */
+               xpeer = b->peer_backlog[peer_tail];
+               if (rxrpc_extract_addr_from_skb(&xpeer->srx, skb) < 0)
+                       return NULL;
+
+               peer = rxrpc_lookup_incoming_peer(local, xpeer);
+               if (peer == xpeer) {
+                       b->peer_backlog[peer_tail] = NULL;
+                       smp_store_release(&b->peer_backlog_tail,
+                                         (peer_tail + 1) &
+                                         (RXRPC_BACKLOG_MAX - 1));
+               }
 
-       ret = kernel_sendmsg(local->socket, &msg, iov, 1, len);
-       if (ret < 0) {
-               _leave(" = -EAGAIN [sendmsg failed: %d]", ret);
-               return -EAGAIN;
+               /* Now allocate and set up the connection */
+               conn = b->conn_backlog[conn_tail];
+               b->conn_backlog[conn_tail] = NULL;
+               smp_store_release(&b->conn_backlog_tail,
+                                 (conn_tail + 1) & (RXRPC_BACKLOG_MAX - 1));
+               rxrpc_get_local(local);
+               conn->params.local = local;
+               conn->params.peer = peer;
+               rxrpc_new_incoming_connection(conn, skb);
+       } else {
+               rxrpc_get_connection(conn);
        }
 
-       _leave(" = 0");
-       return 0;
+       /* And now we can allocate and set up a new call */
+       call = b->call_backlog[call_tail];
+       b->call_backlog[call_tail] = NULL;
+       smp_store_release(&b->call_backlog_tail,
+                         (call_tail + 1) & (RXRPC_BACKLOG_MAX - 1));
+
+       call->conn = conn;
+       call->peer = rxrpc_get_peer(conn->params.peer);
+       return call;
 }
 
 /*
- * accept an incoming call that needs peer, transport and/or connection setting
- * up
+ * Set up a new incoming call.  Called in BH context with the RCU read lock
+ * held.
+ *
+ * If this is for a kernel service, when we allocate the call, it will have
+ * three refs on it: (1) the kernel service, (2) the user_call_ID tree, (3) the
+ * retainer ref obtained from the backlog buffer.  Prealloc calls for userspace
+ * services only have the ref from the backlog buffer.  We want to pass this
+ * ref to non-BH context to dispose of.
+ *
+ * If we want to report an error, we mark the skb with the packet type and
+ * abort code and return NULL.
  */
-static int rxrpc_accept_incoming_call(struct rxrpc_local *local,
-                                     struct rxrpc_sock *rx,
-                                     struct sk_buff *skb,
-                                     struct sockaddr_rxrpc *srx)
+struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
+                                          struct rxrpc_connection *conn,
+                                          struct sk_buff *skb)
 {
-       struct rxrpc_connection *conn;
-       struct rxrpc_skb_priv *sp, *nsp;
+       struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+       struct rxrpc_sock *rx;
        struct rxrpc_call *call;
-       struct sk_buff *notification;
-       int ret;
 
        _enter("");
 
-       sp = rxrpc_skb(skb);
-
-       /* get a notification message to send to the server app */
-       notification = alloc_skb(0, GFP_NOFS);
-       if (!notification) {
-               _debug("no memory");
-               ret = -ENOMEM;
-               goto error_nofree;
-       }
-       rxrpc_new_skb(notification);
-       notification->mark = RXRPC_SKB_MARK_NEW_CALL;
-
-       conn = rxrpc_incoming_connection(local, srx, skb);
-       if (IS_ERR(conn)) {
-               _debug("no conn");
-               ret = PTR_ERR(conn);
-               goto error;
-       }
-
-       call = rxrpc_incoming_call(rx, conn, skb);
-       rxrpc_put_connection(conn);
-       if (IS_ERR(call)) {
-               _debug("no call");
-               ret = PTR_ERR(call);
-               goto error;
+       /* Get the socket providing the service */
+       hlist_for_each_entry_rcu_bh(rx, &local->services, listen_link) {
+               if (rx->srx.srx_service == sp->hdr.serviceId)
+                       goto found_service;
        }
 
-       /* attach the call to the socket */
-       read_lock_bh(&local->services_lock);
-       if (rx->sk.sk_state == RXRPC_CLOSE)
-               goto invalid_service;
-
-       write_lock(&rx->call_lock);
-       if (!test_and_set_bit(RXRPC_CALL_INIT_ACCEPT, &call->flags)) {
-               rxrpc_get_call(call, rxrpc_call_got);
-
-               spin_lock(&call->conn->state_lock);
-               if (sp->hdr.securityIndex > 0 &&
-                   call->conn->state == RXRPC_CONN_SERVICE_UNSECURED) {
-                       _debug("await conn sec");
-                       list_add_tail(&call->accept_link, &rx->secureq);
-                       call->conn->state = RXRPC_CONN_SERVICE_CHALLENGING;
-                       set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events);
-                       rxrpc_queue_conn(call->conn);
-               } else {
-                       _debug("conn ready");
-                       call->state = RXRPC_CALL_SERVER_ACCEPTING;
-                       list_add_tail(&call->accept_link, &rx->acceptq);
-                       rxrpc_get_call_for_skb(call, notification);
-                       nsp = rxrpc_skb(notification);
-                       nsp->call = call;
-
-                       ASSERTCMP(atomic_read(&call->usage), >=, 3);
-
-                       _debug("notify");
-                       spin_lock(&call->lock);
-                       ret = rxrpc_queue_rcv_skb(call, notification, true,
-                                                 false);
-                       spin_unlock(&call->lock);
-                       notification = NULL;
-                       BUG_ON(ret < 0);
-               }
-               spin_unlock(&call->conn->state_lock);
+       trace_rxrpc_abort("INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+                         RX_INVALID_OPERATION, EOPNOTSUPP);
+       skb->mark = RXRPC_SKB_MARK_LOCAL_ABORT;
+       skb->priority = RX_INVALID_OPERATION;
+       _leave(" = NULL [service]");
+       return NULL;
 
-               _debug("queued");
+found_service:
+       spin_lock(&rx->incoming_lock);
+       if (rx->sk.sk_state == RXRPC_CLOSE) {
+               trace_rxrpc_abort("CLS", sp->hdr.cid, sp->hdr.callNumber,
+                                 sp->hdr.seq, RX_INVALID_OPERATION, ESHUTDOWN);
+               skb->mark = RXRPC_SKB_MARK_LOCAL_ABORT;
+               skb->priority = RX_INVALID_OPERATION;
+               _leave(" = NULL [close]");
+               call = NULL;
+               goto out;
        }
-       write_unlock(&rx->call_lock);
 
-       _debug("process");
-       rxrpc_fast_process_packet(call, skb);
-
-       _debug("done");
-       read_unlock_bh(&local->services_lock);
-       rxrpc_free_skb(notification);
-       rxrpc_put_call(call, rxrpc_call_put);
-       _leave(" = 0");
-       return 0;
-
-invalid_service:
-       _debug("invalid");
-       read_unlock_bh(&local->services_lock);
-
-       rxrpc_release_call(rx, call);
-       rxrpc_put_call(call, rxrpc_call_put);
-       ret = -ECONNREFUSED;
-error:
-       rxrpc_free_skb(notification);
-error_nofree:
-       _leave(" = %d", ret);
-       return ret;
-}
+       call = rxrpc_alloc_incoming_call(rx, local, conn, skb);
+       if (!call) {
+               skb->mark = RXRPC_SKB_MARK_BUSY;
+               _leave(" = NULL [busy]");
+               call = NULL;
+               goto out;
+       }
 
-/*
- * accept incoming calls that need peer, transport and/or connection setting up
- * - the packets we get are all incoming client DATA packets that have seq == 1
- */
-void rxrpc_accept_incoming_calls(struct rxrpc_local *local)
-{
-       struct rxrpc_skb_priv *sp;
-       struct sockaddr_rxrpc srx;
-       struct rxrpc_sock *rx;
-       struct rxrpc_wire_header whdr;
-       struct sk_buff *skb;
-       int ret;
+       /* Make the call live. */
+       rxrpc_incoming_call(rx, call, skb);
+       conn = call->conn;
 
-       _enter("%d", local->debug_id);
+       if (rx->notify_new_call)
+               rx->notify_new_call(&rx->sk, call, call->user_call_ID);
 
-       skb = skb_dequeue(&local->accept_queue);
-       if (!skb) {
-               _leave("\n");
-               return;
-       }
+       spin_lock(&conn->state_lock);
+       switch (conn->state) {
+       case RXRPC_CONN_SERVICE_UNSECURED:
+               conn->state = RXRPC_CONN_SERVICE_CHALLENGING;
+               set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events);
+               rxrpc_queue_conn(call->conn);
+               break;
 
-       _net("incoming call skb %p", skb);
-
-       rxrpc_see_skb(skb);
-       sp = rxrpc_skb(skb);
-
-       /* Set up a response packet header in case we need it */
-       whdr.epoch      = htonl(sp->hdr.epoch);
-       whdr.cid        = htonl(sp->hdr.cid);
-       whdr.callNumber = htonl(sp->hdr.callNumber);
-       whdr.seq        = htonl(sp->hdr.seq);
-       whdr.serial     = 0;
-       whdr.flags      = 0;
-       whdr.type       = 0;
-       whdr.userStatus = 0;
-       whdr.securityIndex = sp->hdr.securityIndex;
-       whdr._rsvd      = 0;
-       whdr.serviceId  = htons(sp->hdr.serviceId);
-
-       if (rxrpc_extract_addr_from_skb(&srx, skb) < 0)
-               goto drop;
-
-       /* get the socket providing the service */
-       read_lock_bh(&local->services_lock);
-       hlist_for_each_entry(rx, &local->services, listen_link) {
-               if (rx->srx.srx_service == sp->hdr.serviceId &&
-                   rx->sk.sk_state != RXRPC_CLOSE)
-                       goto found_service;
-       }
-       read_unlock_bh(&local->services_lock);
-       goto invalid_service;
+       case RXRPC_CONN_SERVICE:
+               write_lock(&call->state_lock);
+               if (rx->discard_new_call)
+                       call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
+               else
+                       call->state = RXRPC_CALL_SERVER_ACCEPTING;
+               write_unlock(&call->state_lock);
+               break;
 
-found_service:
-       _debug("found service %hd", rx->srx.srx_service);
-       if (sk_acceptq_is_full(&rx->sk))
-               goto backlog_full;
-       sk_acceptq_added(&rx->sk);
-       read_unlock_bh(&local->services_lock);
-
-       ret = rxrpc_accept_incoming_call(local, rx, skb, &srx);
-       if (ret < 0)
-               sk_acceptq_removed(&rx->sk);
-       switch (ret) {
-       case -ECONNRESET: /* old calls are ignored */
-       case -ECONNABORTED: /* aborted calls are reaborted or ignored */
-       case 0:
-               return;
-       case -ECONNREFUSED:
-               goto invalid_service;
-       case -EBUSY:
-               goto busy;
-       case -EKEYREJECTED:
-               goto security_mismatch;
+       case RXRPC_CONN_REMOTELY_ABORTED:
+               rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
+                                         conn->remote_abort, ECONNABORTED);
+               break;
+       case RXRPC_CONN_LOCALLY_ABORTED:
+               rxrpc_abort_call("CON", call, sp->hdr.seq,
+                                conn->local_abort, ECONNABORTED);
+               break;
        default:
                BUG();
        }
+       spin_unlock(&conn->state_lock);
 
-backlog_full:
-       read_unlock_bh(&local->services_lock);
-busy:
-       rxrpc_busy(local, &srx, &whdr);
-       rxrpc_free_skb(skb);
-       return;
-
-drop:
-       rxrpc_free_skb(skb);
-       return;
+       if (call->state == RXRPC_CALL_SERVER_ACCEPTING)
+               rxrpc_notify_socket(call);
 
-invalid_service:
-       skb->priority = RX_INVALID_OPERATION;
-       rxrpc_reject_packet(local, skb);
-       return;
-
-       /* can't change connection security type mid-flow */
-security_mismatch:
-       skb->priority = RX_PROTOCOL_ERROR;
-       rxrpc_reject_packet(local, skb);
-       return;
+       _leave(" = %p{%d}", call, call->debug_id);
+out:
+       spin_unlock(&rx->incoming_lock);
+       return call;
 }
 
 /*
@@ -490,11 +426,10 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
        write_lock(&rx->call_lock);
 
        ret = -ENODATA;
-       if (list_empty(&rx->acceptq))
+       if (list_empty(&rx->to_be_accepted))
                goto out;
 
        /* check the user ID isn't already in use */
-       ret = -EBADSLT;
        pp = &rx->calls.rb_node;
        parent = NULL;
        while (*pp) {
@@ -506,11 +441,14 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
                else if (user_call_ID > call->user_call_ID)
                        pp = &(*pp)->rb_right;
                else
-                       goto out;
+                       goto id_in_use;
        }
 
-       /* dequeue the first call and check it's still valid */
-       call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
+       /* Dequeue the first call and check it's still valid.  We gain
+        * responsibility for the queue's reference.
+        */
+       call = list_entry(rx->to_be_accepted.next,
+                         struct rxrpc_call, accept_link);
        list_del_init(&call->accept_link);
        sk_acceptq_removed(&rx->sk);
        rxrpc_see_call(call);
@@ -528,31 +466,35 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
        }
 
        /* formalise the acceptance */
-       rxrpc_get_call(call, rxrpc_call_got_userid);
+       rxrpc_get_call(call, rxrpc_call_got);
        call->notify_rx = notify_rx;
        call->user_call_ID = user_call_ID;
+       rxrpc_get_call(call, rxrpc_call_got_userid);
        rb_link_node(&call->sock_node, parent, pp);
        rb_insert_color(&call->sock_node, &rx->calls);
        if (test_and_set_bit(RXRPC_CALL_HAS_USERID, &call->flags))
                BUG();
-       if (test_and_set_bit(RXRPC_CALL_EV_ACCEPTED, &call->events))
-               BUG();
 
        write_unlock_bh(&call->state_lock);
        write_unlock(&rx->call_lock);
-       rxrpc_queue_call(call);
+       rxrpc_notify_socket(call);
+       rxrpc_service_prealloc(rx, GFP_KERNEL);
        _leave(" = %p{%d}", call, call->debug_id);
        return call;
 
 out_release:
+       _debug("release %p", call);
        write_unlock_bh(&call->state_lock);
        write_unlock(&rx->call_lock);
-       _debug("release %p", call);
        rxrpc_release_call(rx, call);
-       _leave(" = %d", ret);
-       return ERR_PTR(ret);
-out:
+       rxrpc_put_call(call, rxrpc_call_put);
+       goto out;
+
+id_in_use:
+       ret = -EBADSLT;
        write_unlock(&rx->call_lock);
+out:
+       rxrpc_service_prealloc(rx, GFP_KERNEL);
        _leave(" = %d", ret);
        return ERR_PTR(ret);
 }
@@ -564,6 +506,7 @@ out:
 int rxrpc_reject_call(struct rxrpc_sock *rx)
 {
        struct rxrpc_call *call;
+       bool abort = false;
        int ret;
 
        _enter("");
@@ -572,15 +515,16 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
 
        write_lock(&rx->call_lock);
 
-       ret = -ENODATA;
-       if (list_empty(&rx->acceptq)) {
+       if (list_empty(&rx->to_be_accepted)) {
                write_unlock(&rx->call_lock);
-               _leave(" = -ENODATA");
                return -ENODATA;
        }
 
-       /* dequeue the first call and check it's still valid */
-       call = list_entry(rx->acceptq.next, struct rxrpc_call, accept_link);
+       /* Dequeue the first call and check it's still valid.  We gain
+        * responsibility for the queue's reference.
+        */
+       call = list_entry(rx->to_be_accepted.next,
+                         struct rxrpc_call, accept_link);
        list_del_init(&call->accept_link);
        sk_acceptq_removed(&rx->sk);
        rxrpc_see_call(call);
@@ -588,66 +532,28 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
        write_lock_bh(&call->state_lock);
        switch (call->state) {
        case RXRPC_CALL_SERVER_ACCEPTING:
-               __rxrpc_set_call_completion(call, RXRPC_CALL_SERVER_BUSY,
-                                           0, ECONNABORTED);
-               if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events))
-                       rxrpc_queue_call(call);
-               ret = 0;
-               break;
+               __rxrpc_abort_call("REJ", call, 1, RX_USER_ABORT, ECONNABORTED);
+               abort = true;
+               /* fall through */
        case RXRPC_CALL_COMPLETE:
                ret = call->error;
-               break;
+               goto out_discard;
        default:
                BUG();
        }
 
+out_discard:
        write_unlock_bh(&call->state_lock);
        write_unlock(&rx->call_lock);
-       rxrpc_release_call(rx, call);
-       _leave(" = %d", ret);
-       return ret;
-}
-
-/**
- * rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call
- * @sock: The socket on which the impending call is waiting
- * @user_call_ID: The tag to attach to the call
- * @notify_rx: Where to send notifications instead of socket queue
- *
- * Allow a kernel service to accept an incoming call, assuming the incoming
- * call is still valid.  The caller should immediately trigger their own
- * notification as there must be data waiting.
- */
-struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock,
-                                           unsigned long user_call_ID,
-                                           rxrpc_notify_rx_t notify_rx)
-{
-       struct rxrpc_call *call;
-
-       _enter(",%lx", user_call_ID);
-       call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID, notify_rx);
-       _leave(" = %p", call);
-       return call;
-}
-EXPORT_SYMBOL(rxrpc_kernel_accept_call);
-
-/**
- * rxrpc_kernel_reject_call - Allow a kernel service to reject an incoming call
- * @sock: The socket on which the impending call is waiting
- *
- * Allow a kernel service to reject an incoming call with a BUSY message,
- * assuming the incoming call is still valid.
- */
-int rxrpc_kernel_reject_call(struct socket *sock)
-{
-       int ret;
-
-       _enter("");
-       ret = rxrpc_reject_call(rxrpc_sk(sock->sk));
+       if (abort) {
+               rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT);
+               rxrpc_release_call(rx, call);
+               rxrpc_put_call(call, rxrpc_call_put);
+       }
+       rxrpc_service_prealloc(rx, GFP_KERNEL);
        _leave(" = %d", ret);
        return ret;
 }
-EXPORT_SYMBOL(rxrpc_kernel_reject_call);
 
 /*
  * rxrpc_kernel_charge_accept - Charge up socket with preallocated calls