l->mtu = mtu;
l->priority = priority;
tipc_link_set_queue_limits(l, window);
+ l->ackers = 1;
l->inputq = inputq;
l->namedq = namedq;
l->state = LINK_RESETTING;
l->rcv_unacked = 0;
l->snd_nxt = 1;
l->rcv_nxt = 1;
+ l->acked = 0;
l->silent_intv_cnt = 0;
l->stats.recv_info = 0;
l->stale_count = 0;
__skb_dequeue(list);
__skb_queue_tail(transmq, skb);
__skb_queue_tail(xmitq, _skb);
+ TIPC_SKB_CB(skb)->ackers = l->ackers;
l->rcv_unacked = 0;
seqno++;
continue;
skb = __skb_dequeue(&link->backlogq);
if (!skb)
break;
+ TIPC_SKB_CB(skb)->ackers = link->ackers;
msg = buf_msg(skb);
link->backlog[msg_importance(msg)].len--;
msg_set_ack(msg, ack);
l->backlog[msg_importance(hdr)].len--;
__skb_queue_tail(&l->transmq, skb);
__skb_queue_tail(xmitq, _skb);
+ TIPC_SKB_CB(skb)->ackers = l->ackers;
msg_set_ack(hdr, ack);
msg_set_seqno(hdr, seqno);
msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
}
}
-static int tipc_link_retransm(struct tipc_link *l, int retransm,
- struct sk_buff_head *xmitq)
+int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
+ struct sk_buff_head *xmitq)
{
struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
struct tipc_msg *hdr;
+ u16 ack = l->rcv_nxt - 1;
+ u16 bc_ack = l->owner->bclink.last_in;
if (!skb)
return 0;
link_retransmit_failure(l, skb);
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
+
+ /* Move forward to where retransmission should start */
skb_queue_walk(&l->transmq, skb) {
- if (!retransm)
- return 0;
+ if (!less(buf_seqno(skb), from))
+ break;
+ }
+
+ skb_queue_walk_from(&l->transmq, skb) {
+ if (more(buf_seqno(skb), to))
+ break;
hdr = buf_msg(skb);
_skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
if (!_skb)
return 0;
hdr = buf_msg(_skb);
- msg_set_ack(hdr, l->rcv_nxt - 1);
- msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+ msg_set_ack(hdr, ack);
+ msg_set_bcast_ack(hdr, bc_ack);
_skb->priority = TC_PRIO_CONTROL;
__skb_queue_tail(xmitq, _skb);
- retransm--;
l->stats.retransmitted++;
}
return 0;
{
struct tipc_msg *hdr = buf_msg(skb);
u16 rcvgap = 0;
- u16 nacked_gap = msg_seq_gap(hdr);
+ u16 ack = msg_ack(hdr);
+ u16 gap = msg_seq_gap(hdr);
u16 peers_snd_nxt = msg_next_sent(hdr);
u16 peers_tol = msg_link_tolerance(hdr);
u16 peers_prio = msg_linkprio(hdr);
if (rcvgap || (msg_probe(hdr)))
tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
0, 0, xmitq);
- tipc_link_release_pkts(l, msg_ack(hdr));
+ tipc_link_release_pkts(l, ack);
/* If NACK, retransmit will now start at right position */
- if (nacked_gap) {
- rc = tipc_link_retransm(l, nacked_gap, xmitq);
+ if (gap) {
+ rc = tipc_link_retrans(l, ack + 1, ack + gap, xmitq);
l->stats.recv_nacks++;
}
static void link_print(struct tipc_link *l, const char *str)
{
struct sk_buff *hskb = skb_peek(&l->transmq);
- u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt;
+ u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt - 1;
u16 tail = l->snd_nxt - 1;
pr_info("%s Link <%s> state %x\n", str, l->name, l->state);
* @snt_nxt: next sequence number to use for outbound messages
* @last_retransmitted: sequence number of most recently retransmitted message
* @stale_count: # of identical retransmit requests made by peer
+ * @ackers: # of peers that needs to ack each packet before it can be released
+ * @acked: # last packet acked by a certain peer. Used for broadcast.
* @rcv_nxt: next sequence number to expect for inbound messages
* @deferred_queue: deferred queue saved OOS b'cast message received from node
* @unacked_window: # of inbound messages rx'd without ack'ing back to peer
* @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
* @long_msg_seq_no: next identifier to use for outbound fragmented messages
* @reasm_buf: head of partially reassembled inbound message fragments
+ * @bc_rcvr: marks that this is a broadcast receiver link
* @stats: collects statistics regarding link activity
*/
struct tipc_link {
/* Fragmentation/reassembly */
struct sk_buff *reasm_buf;
+ /* Broadcast */
+ u16 ackers;
+ u16 acked;
+
/* Statistics */
struct tipc_stats stats;
};