From 365ad353c2564bba8835290061308ba825166b3a Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Tue, 3 Jan 2017 10:55:11 -0500 Subject: [PATCH] tipc: reduce risk of user starvation during link congestion The socket code currently handles link congestion by either blocking and trying to send again when the congestion has abated, or just returning to the user with -EAGAIN and let him re-try later. This mechanism is prone to starvation, because the wakeup algorithm is non-atomic. During the time the link issues a wakeup signal, until the socket wakes up and re-attempts sending, other senders may have come in between and occupied the free buffer space in the link. This in turn may lead to a socket having to make many send attempts before it is successful. In extremely loaded systems we have observed latency times of several seconds before a low-priority socket is able to send out a message. In this commit, we simplify this mechanism and reduce the risk of the described scenario happening. When a message is attempted sent via a congested link, we now let it be added to the link's backlog queue anyway, thus permitting an oversubscription of one message per source socket. We still create a wakeup item and return an error code, hence instructing the sender to block or stop sending. Only when enough space has been freed up in the link's backlog queue do we issue a wakeup event that allows the sender to continue with the next message, if any. The fact that a socket now can consider a message sent even when the link returns a congestion code means that the sending socket code can be simplified. Also, since this is a good opportunity to get rid of the obsolete 'mtu change' condition in the three socket send functions, we now choose to refactor those functions completely. Signed-off-by: Parthasarathy Bhuvaragan Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/bcast.c | 6 +- net/tipc/link.c | 75 +++++----- net/tipc/msg.h | 2 - net/tipc/node.c | 15 +- net/tipc/socket.c | 347 ++++++++++++++++++++-------------------------- 5 files changed, 194 insertions(+), 251 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index aa1babbea385..c35fad3e08e8 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -174,7 +174,7 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) * and to identified node local sockets * @net: the applicable net namespace * @list: chain of buffers containing message - * Consumes the buffer chain, except when returning -ELINKCONG + * Consumes the buffer chain. * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE */ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) @@ -197,7 +197,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) tipc_bcast_unlock(net); /* Don't send to local node if adding to link failed */ - if (unlikely(rc)) { + if (unlikely(rc && (rc != -ELINKCONG))) { __skb_queue_purge(&rcvq); return rc; } @@ -206,7 +206,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) tipc_bcbase_xmit(net, &xmitq); tipc_sk_mcast_rcv(net, &rcvq, &inputq); __skb_queue_purge(list); - return 0; + return rc; } /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link diff --git a/net/tipc/link.c b/net/tipc/link.c index bda89bf9f4ff..b758ca8b2f79 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -776,60 +776,47 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) /** * link_schedule_user - schedule a message sender for wakeup after congestion - * @link: congested link - * @list: message that was attempted sent + * @l: congested link + * @hdr: header of message that is being sent * Create pseudo msg to send back to user when congestion abates - * Does not consume buffer list */ -static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) +static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr) { - struct tipc_msg *msg = buf_msg(skb_peek(list)); - int imp = msg_importance(msg); - u32 oport = msg_origport(msg); - u32 addr = tipc_own_addr(link->net); + u32 dnode = tipc_own_addr(l->net); + u32 dport = msg_origport(hdr); struct sk_buff *skb; - /* This really cannot happen... */ - if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { - pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); - return -ENOBUFS; - } - /* Non-blocking sender: */ - if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending) - return -ELINKCONG; - /* Create and schedule wakeup pseudo message */ skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, - addr, addr, oport, 0, 0); + dnode, l->addr, dport, 0, 0); if (!skb) return -ENOBUFS; - TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list); - TIPC_SKB_CB(skb)->chain_imp = imp; - skb_queue_tail(&link->wakeupq, skb); - link->stats.link_congs++; + msg_set_dest_droppable(buf_msg(skb), true); + TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr); + skb_queue_tail(&l->wakeupq, skb); + l->stats.link_congs++; return -ELINKCONG; } /** * link_prepare_wakeup - prepare users for wakeup after congestion - * @link: congested link - * Move a number of waiting users, as permitted by available space in - * the send queue, from link wait queue to node wait queue for wakeup + * @l: congested link + * Wake up a number of waiting users, as permitted by available space + * in the send queue */ void link_prepare_wakeup(struct tipc_link *l) { - int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; - int imp, lim; struct sk_buff *skb, *tmp; + int imp, i = 0; skb_queue_walk_safe(&l->wakeupq, skb, tmp) { imp = TIPC_SKB_CB(skb)->chain_imp; - lim = l->backlog[imp].limit; - pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; - if ((pnd[imp] + l->backlog[imp].len) >= lim) + if (l->backlog[imp].len < l->backlog[imp].limit) { + skb_unlink(skb, &l->wakeupq); + skb_queue_tail(l->inputq, skb); + } else if (i++ > 10) { break; - skb_unlink(skb, &l->wakeupq); - skb_queue_tail(l->inputq, skb); + } } } @@ -869,8 +856,7 @@ void tipc_link_reset(struct tipc_link *l) * @list: chain of buffers containing message * @xmitq: returned list of packets to be sent by caller * - * Consumes the buffer chain, except when returning -ELINKCONG, - * since the caller then may want to make more send attempts. + * Consumes the buffer chain. * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted */ @@ -879,7 +865,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, { struct tipc_msg *hdr = buf_msg(skb_peek(list)); unsigned int maxwin = l->window; - unsigned int i, imp = msg_importance(hdr); + int imp = msg_importance(hdr); unsigned int mtu = l->mtu; u16 ack = l->rcv_nxt - 1; u16 seqno = l->snd_nxt; @@ -888,19 +874,22 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, struct sk_buff_head *backlogq = &l->backlogq; struct sk_buff *skb, *_skb, *bskb; int pkt_cnt = skb_queue_len(list); + int rc = 0; - /* Match msg importance against this and all higher backlog limits: */ - if (!skb_queue_empty(backlogq)) { - for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { - if (unlikely(l->backlog[i].len >= l->backlog[i].limit)) - return link_schedule_user(l, list); - } - } if (unlikely(msg_size(hdr) > mtu)) { skb_queue_purge(list); return -EMSGSIZE; } + /* Allow oversubscription of one data msg per source at congestion */ + if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) { + if (imp == TIPC_SYSTEM_IMPORTANCE) { + pr_warn("%s<%s>, link overflow", link_rst_msg, l->name); + return -ENOBUFS; + } + rc = link_schedule_user(l, hdr); + } + if (pkt_cnt > 1) { l->stats.sent_fragmented++; l->stats.sent_fragments += pkt_cnt; @@ -946,7 +935,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, skb_queue_splice_tail_init(list, backlogq); } l->snd_nxt = seqno; - return 0; + return rc; } void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 8d408612ffa4..850ae0e469f5 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -98,8 +98,6 @@ struct tipc_skb_cb { u32 bytes_read; struct sk_buff *tail; bool validated; - bool wakeup_pending; - u16 chain_sz; u16 chain_imp; u16 ackers; }; diff --git a/net/tipc/node.c b/net/tipc/node.c index 9d2f4c2b08ab..2883f6a0ed98 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1167,7 +1167,7 @@ msg_full: * @list: chain of buffers containing message * @dnode: address of destination node * @selector: a number used for deterministic link selection - * Consumes the buffer chain, except when returning -ELINKCONG + * Consumes the buffer chain. * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF */ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, @@ -1206,10 +1206,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, spin_unlock_bh(&le->lock); tipc_node_read_unlock(n); - if (likely(rc == 0)) - tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); - else if (rc == -ENOBUFS) + if (unlikely(rc == -ENOBUFS)) tipc_node_link_down(n, bearer_id, false); + else + tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); tipc_node_put(n); @@ -1221,20 +1221,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, * messages, which will not be rejected * The only exception is datagram messages rerouted after secondary * lookup, which are rare and safe to dispose of anyway. - * TODO: Return real return value, and let callers use - * tipc_wait_for_sendpkt() where applicable */ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, u32 selector) { struct sk_buff_head head; - int rc; skb_queue_head_init(&head); __skb_queue_tail(&head, skb); - rc = tipc_node_xmit(net, &head, dnode, selector); - if (rc == -ELINKCONG) - kfree_skb(skb); + tipc_node_xmit(net, &head, dnode, selector); return 0; } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index fae6a55ef1b0..d2f353934f82 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -67,12 +67,14 @@ enum { * @max_pkt: maximum packet size "hint" used when building messages sent by port * @portid: unique port identity in TIPC socket hash table * @phdr: preformatted message header used when sending messages + * #cong_links: list of congested links * @publications: list of publications for port + * @blocking_link: address of the congested link we are currently sleeping on * @pub_count: total # of publications port has made during its lifetime * @probing_state: * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue - * @link_cong: non-zero if owner must sleep because of link congestion + * @cong_link_cnt: number of congested links * @sent_unacked: # messages sent by socket, and not yet acked by peer * @rcv_unacked: # messages read by user, but not yet acked back to peer * @peer: 'connected' peer for dgram/rdm @@ -87,13 +89,13 @@ struct tipc_sock { u32 max_pkt; u32 portid; struct tipc_msg phdr; - struct list_head sock_list; + struct list_head cong_links; struct list_head publications; u32 pub_count; uint conn_timeout; atomic_t dupl_rcvcnt; bool probe_unacked; - bool link_cong; + u16 cong_link_cnt; u16 snt_unacked; u16 snd_win; u16 peer_caps; @@ -118,8 +120,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, - size_t dsz); +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz); static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); static const struct proto_ops packet_ops; @@ -424,6 +425,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk = tipc_sk(sk); tsk->max_pkt = MAX_PKT_DEFAULT; INIT_LIST_HEAD(&tsk->publications); + INIT_LIST_HEAD(&tsk->cong_links); msg = &tsk->phdr; tn = net_generic(sock_net(sk), tipc_net_id); tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, @@ -474,9 +476,14 @@ static void __tipc_shutdown(struct socket *sock, int error) struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); + long timeout = CONN_TIMEOUT_DEFAULT; u32 dnode = tsk_peer_node(tsk); struct sk_buff *skb; + /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */ + tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt && + !tsk_conn_cong(tsk))); + /* Reject all unreceived messages, except on an active connection * (which disconnects locally & sends a 'FIN+' to peer). */ @@ -547,7 +554,8 @@ static int tipc_release(struct socket *sock) /* Reject any messages that accumulated in backlog queue */ release_sock(sk); - + u32_list_purge(&tsk->cong_links); + tsk->cong_link_cnt = 0; call_rcu(&tsk->rcu, tipc_sk_callback); sock->sk = NULL; @@ -690,7 +698,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, switch (sk->sk_state) { case TIPC_ESTABLISHED: - if (!tsk->link_cong && !tsk_conn_cong(tsk)) + if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) mask |= POLLOUT; /* fall thru' */ case TIPC_LISTEN: @@ -699,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, mask |= (POLLIN | POLLRDNORM); break; case TIPC_OPEN: - if (!tsk->link_cong) + if (!tsk->cong_link_cnt) mask |= POLLOUT; if (tipc_sk_type_connectionless(sk) && (!skb_queue_empty(&sk->sk_receive_queue))) @@ -718,63 +726,48 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, * @sock: socket structure * @seq: destination address * @msg: message to send - * @dsz: total length of message data - * @timeo: timeout to wait for wakeup + * @dlen: length of data to send + * @timeout: timeout to wait for wakeup * * Called from function tipc_sendmsg(), which has done all sanity checks * Returns the number of bytes sent on success, or errno */ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, - struct msghdr *msg, size_t dsz, long timeo) + struct msghdr *msg, size_t dlen, long timeout) { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = &tsk->phdr; struct net *net = sock_net(sk); - struct tipc_msg *mhdr = &tsk->phdr; - struct sk_buff_head pktchain; - struct iov_iter save = msg->msg_iter; - uint mtu; + int mtu = tipc_bcast_get_mtu(net); + struct sk_buff_head pkts; int rc; - if (!timeo && tsk->link_cong) - return -ELINKCONG; - - msg_set_type(mhdr, TIPC_MCAST_MSG); - msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); - msg_set_destport(mhdr, 0); - msg_set_destnode(mhdr, 0); - msg_set_nametype(mhdr, seq->type); - msg_set_namelower(mhdr, seq->lower); - msg_set_nameupper(mhdr, seq->upper); - msg_set_hdr_sz(mhdr, MCAST_H_SIZE); - - skb_queue_head_init(&pktchain); + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; -new_mtu: - mtu = tipc_bcast_get_mtu(net); - rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); - if (unlikely(rc < 0)) + msg_set_type(hdr, TIPC_MCAST_MSG); + msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); + msg_set_destport(hdr, 0); + msg_set_destnode(hdr, 0); + msg_set_nametype(hdr, seq->type); + msg_set_namelower(hdr, seq->lower); + msg_set_nameupper(hdr, seq->upper); + msg_set_hdr_sz(hdr, MCAST_H_SIZE); + + skb_queue_head_init(&pkts); + rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) return rc; - do { - rc = tipc_bcast_xmit(net, &pktchain); - if (likely(!rc)) - return dsz; - - if (rc == -ELINKCONG) { - tsk->link_cong = 1; - rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong); - if (!rc) - continue; - } - __skb_queue_purge(&pktchain); - if (rc == -EMSGSIZE) { - msg->msg_iter = save; - goto new_mtu; - } - break; - } while (1); - return rc; + rc = tipc_bcast_xmit(net, &pkts); + if (unlikely(rc == -ELINKCONG)) { + tsk->cong_link_cnt = 1; + rc = 0; + } + + return rc ? rc : dlen; } /** @@ -898,35 +891,38 @@ static int tipc_sendmsg(struct socket *sock, return ret; } -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) { - DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); struct net *net = sock_net(sk); - struct tipc_msg *mhdr = &tsk->phdr; - u32 dnode, dport; - struct sk_buff_head pktchain; - bool is_connectionless = tipc_sk_type_connectionless(sk); - struct sk_buff *skb; + struct tipc_sock *tsk = tipc_sk(sk); + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + struct list_head *clinks = &tsk->cong_links; + bool syn = !tipc_sk_type_connectionless(sk); + struct tipc_msg *hdr = &tsk->phdr; struct tipc_name_seq *seq; - struct iov_iter save; - u32 mtu; - long timeo; - int rc; + struct sk_buff_head pkts; + u32 type, inst, domain; + u32 dnode, dport; + int mtu, rc; - if (dsz > TIPC_MAX_USER_MSG_SIZE) + if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; + if (unlikely(!dest)) { - if (is_connectionless && tsk->peer.family == AF_TIPC) - dest = &tsk->peer; - else + dest = &tsk->peer; + if (!syn || dest->family != AF_TIPC) return -EDESTADDRREQ; - } else if (unlikely(m->msg_namelen < sizeof(*dest)) || - dest->family != AF_TIPC) { - return -EINVAL; } - if (!is_connectionless) { + + if (unlikely(m->msg_namelen < sizeof(*dest))) + return -EINVAL; + + if (unlikely(dest->family != AF_TIPC)) + return -EINVAL; + + if (unlikely(syn)) { if (sk->sk_state == TIPC_LISTEN) return -EPIPE; if (sk->sk_state != TIPC_OPEN) @@ -938,72 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) tsk->conn_instance = dest->addr.name.name.instance; } } - seq = &dest->addr.nameseq; - timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - if (dest->addrtype == TIPC_ADDR_MCAST) { - return tipc_sendmcast(sock, seq, m, dsz, timeo); - } else if (dest->addrtype == TIPC_ADDR_NAME) { - u32 type = dest->addr.name.name.type; - u32 inst = dest->addr.name.name.instance; - u32 domain = dest->addr.name.domain; + seq = &dest->addr.nameseq; + if (dest->addrtype == TIPC_ADDR_MCAST) + return tipc_sendmcast(sock, seq, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_NAME) { + type = dest->addr.name.name.type; + inst = dest->addr.name.name.instance; + domain = dest->addr.name.domain; dnode = domain; - msg_set_type(mhdr, TIPC_NAMED_MSG); - msg_set_hdr_sz(mhdr, NAMED_H_SIZE); - msg_set_nametype(mhdr, type); - msg_set_nameinst(mhdr, inst); - msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); + msg_set_type(hdr, TIPC_NAMED_MSG); + msg_set_hdr_sz(hdr, NAMED_H_SIZE); + msg_set_nametype(hdr, type); + msg_set_nameinst(hdr, inst); + msg_set_lookup_scope(hdr, tipc_addr_scope(domain)); dport = tipc_nametbl_translate(net, type, inst, &dnode); - msg_set_destnode(mhdr, dnode); - msg_set_destport(mhdr, dport); + msg_set_destnode(hdr, dnode); + msg_set_destport(hdr, dport); if (unlikely(!dport && !dnode)) return -EHOSTUNREACH; + } else if (dest->addrtype == TIPC_ADDR_ID) { dnode = dest->addr.id.node; - msg_set_type(mhdr, TIPC_DIRECT_MSG); - msg_set_lookup_scope(mhdr, 0); - msg_set_destnode(mhdr, dnode); - msg_set_destport(mhdr, dest->addr.id.ref); - msg_set_hdr_sz(mhdr, BASIC_H_SIZE); + msg_set_type(hdr, TIPC_DIRECT_MSG); + msg_set_lookup_scope(hdr, 0); + msg_set_destnode(hdr, dnode); + msg_set_destport(hdr, dest->addr.id.ref); + msg_set_hdr_sz(hdr, BASIC_H_SIZE); } - skb_queue_head_init(&pktchain); - save = m->msg_iter; -new_mtu: + /* Block or return if destination link is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); + if (unlikely(rc)) + return rc; + + skb_queue_head_init(&pkts); mtu = tipc_node_get_mtu(net, dnode, tsk->portid); - rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain); - if (rc < 0) + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) return rc; - do { - skb = skb_peek(&pktchain); - TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; - rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); - if (likely(!rc)) { - if (!is_connectionless) - tipc_set_sk_state(sk, TIPC_CONNECTING); - return dsz; - } - if (rc == -ELINKCONG) { - tsk->link_cong = 1; - rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong); - if (!rc) - continue; - } - __skb_queue_purge(&pktchain); - if (rc == -EMSGSIZE) { - m->msg_iter = save; - goto new_mtu; - } - break; - } while (1); + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + u32_push(clinks, dnode); + tsk->cong_link_cnt++; + rc = 0; + } - return rc; + if (unlikely(syn && !rc)) + tipc_set_sk_state(sk, TIPC_CONNECTING); + + return rc ? rc : dlen; } /** - * tipc_send_stream - send stream-oriented data + * tipc_sendstream - send stream-oriented data * @sock: socket structure * @m: data to send * @dsz: total length of data to be transmitted @@ -1013,97 +999,69 @@ new_mtu: * Returns the number of bytes sent on success (or partial success), * or errno if no data sent */ -static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz) { struct sock *sk = sock->sk; int ret; lock_sock(sk); - ret = __tipc_send_stream(sock, m, dsz); + ret = __tipc_sendstream(sock, m, dsz); release_sock(sk); return ret; } -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) { struct sock *sk = sock->sk; - struct net *net = sock_net(sk); - struct tipc_sock *tsk = tipc_sk(sk); - struct tipc_msg *mhdr = &tsk->phdr; - struct sk_buff_head pktchain; DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); - u32 portid = tsk->portid; - int rc = -EINVAL; - long timeo; - u32 dnode; - uint mtu, send, sent = 0; - struct iov_iter save; - int hlen = MIN_H_SIZE; - - /* Handle implied connection establishment */ - if (unlikely(dest)) { - rc = __tipc_sendmsg(sock, m, dsz); - hlen = msg_hdr_sz(mhdr); - if (dsz && (dsz == rc)) - tsk->snt_unacked = tsk_inc(tsk, dsz + hlen); - return rc; - } - if (dsz > (uint)INT_MAX) - return -EMSGSIZE; - - if (unlikely(!tipc_sk_connected(sk))) { - if (sk->sk_state == TIPC_DISCONNECTING) - return -EPIPE; - else - return -ENOTCONN; - } + long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = &tsk->phdr; + struct net *net = sock_net(sk); + struct sk_buff_head pkts; + u32 dnode = tsk_peer_node(tsk); + int send, sent = 0; + int rc = 0; - timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - if (!timeo && tsk->link_cong) - return -ELINKCONG; + skb_queue_head_init(&pkts); - dnode = tsk_peer_node(tsk); - skb_queue_head_init(&pktchain); + if (unlikely(dlen > INT_MAX)) + return -EMSGSIZE; -next: - save = m->msg_iter; - mtu = tsk->max_pkt; - send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); - rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); - if (unlikely(rc < 0)) + /* Handle implicit connection setup */ + if (unlikely(dest)) { + rc = __tipc_sendmsg(sock, m, dlen); + if (dlen && (dlen == rc)) + tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr)); return rc; + } do { - if (likely(!tsk_conn_cong(tsk))) { - rc = tipc_node_xmit(net, &pktchain, dnode, portid); - if (likely(!rc)) { - tsk->snt_unacked += tsk_inc(tsk, send + hlen); - sent += send; - if (sent == dsz) - return dsz; - goto next; - } - if (rc == -EMSGSIZE) { - __skb_queue_purge(&pktchain); - tsk->max_pkt = tipc_node_get_mtu(net, dnode, - portid); - m->msg_iter = save; - goto next; - } - if (rc != -ELINKCONG) - break; - - tsk->link_cong = 1; - } - rc = tipc_wait_for_cond(sock, &timeo, - (!tsk->link_cong && + rc = tipc_wait_for_cond(sock, &timeout, + (!tsk->cong_link_cnt && !tsk_conn_cong(tsk) && tipc_sk_connected(sk))); - } while (!rc); + if (unlikely(rc)) + break; + + send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); + rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); + if (unlikely(rc != send)) + break; + + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + tsk->cong_link_cnt = 1; + rc = 0; + } + if (likely(!rc)) { + tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); + sent += send; + } + } while (sent < dlen && !rc); - __skb_queue_purge(&pktchain); - return sent ? sent : rc; + return rc ? rc : sent; } /** @@ -1121,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz) if (dsz > TIPC_MAX_USER_MSG_SIZE) return -EMSGSIZE; - return tipc_send_stream(sock, m, dsz); + return tipc_sendstream(sock, m, dsz); } /* tipc_sk_finish_conn - complete the setup of a connection @@ -1688,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, unsigned int limit = rcvbuf_limit(sk, skb); int err = TIPC_OK; int usr = msg_user(hdr); + u32 onode; if (unlikely(msg_user(hdr) == CONN_MANAGER)) { tipc_sk_proto_rcv(tsk, skb, xmitq); @@ -1695,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, } if (unlikely(usr == SOCK_WAKEUP)) { + onode = msg_orignode(hdr); kfree_skb(skb); - tsk->link_cong = 0; + u32_del(&tsk->cong_links, onode); + tsk->cong_link_cnt--; sk->sk_write_space(sk); return false; } @@ -2104,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) struct msghdr m = {NULL,}; tsk_advance_rx_queue(sk); - __tipc_send_stream(new_sock, &m, 0); + __tipc_sendstream(new_sock, &m, 0); } else { __skb_dequeue(&sk->sk_receive_queue); __skb_queue_head(&new_sk->sk_receive_queue, buf); @@ -2565,7 +2526,7 @@ static const struct proto_ops stream_ops = { .shutdown = tipc_shutdown, .setsockopt = tipc_setsockopt, .getsockopt = tipc_getsockopt, - .sendmsg = tipc_send_stream, + .sendmsg = tipc_sendstream, .recvmsg = tipc_recv_stream, .mmap = sock_no_mmap, .sendpage = sock_no_sendpage -- 2.20.1