static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
-static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
-static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
/*
* Simple non-static link routines (i.e. referenced outside this file)
* Consumes buffer if message is of right type
* Node lock must be held
*/
-static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
+static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
+ struct sk_buff_head *inputq)
{
struct tipc_node *node = link->owner;
- struct tipc_msg *msg = buf_msg(skb);
- switch (msg_user(msg)) {
+ switch (msg_user(buf_msg(skb))) {
case TIPC_LOW_IMPORTANCE:
case TIPC_MEDIUM_IMPORTANCE:
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
case CONN_MANAGER:
- skb_queue_tail(link->inputq, skb);
+ __skb_queue_tail(inputq, skb);
return true;
case NAME_DISTRIBUTOR:
node->bclink.recv_permitted = true;
*
* Consumes buffer
*/
-static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb)
+static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
+ struct sk_buff_head *inputq)
{
struct tipc_node *node = l->owner;
struct tipc_msg *hdr = buf_msg(skb);
hdr = buf_msg(skb);
if (less(msg_seqno(hdr), l->drop_point))
goto drop;
- if (tipc_data_input(l, skb))
+ if (tipc_data_input(l, skb, inputq))
return rc;
usr = msg_user(hdr);
reasm_skb = &l->failover_reasm_skb;
l->stats.recv_bundles++;
l->stats.recv_bundled += msg_msgcnt(hdr);
while (tipc_msg_extract(skb, &iskb, &pos))
- tipc_data_input(l, iskb);
+ tipc_data_input(l, iskb, inputq);
return 0;
} else if (usr == MSG_FRAGMENTER) {
l->stats.recv_fragments++;
if (tipc_buf_append(reasm_skb, &skb)) {
l->stats.recv_fragmented++;
- tipc_data_input(l, skb);
+ tipc_data_input(l, skb, inputq);
} else if (!*reasm_skb) {
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
struct sk_buff_head *xmitq)
{
struct sk_buff_head *arrvq = &l->deferdq;
+ struct sk_buff_head tmpq;
struct tipc_msg *hdr;
u16 seqno, rcv_nxt;
int rc = 0;
+ __skb_queue_head_init(&tmpq);
+
if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) {
if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV))
tipc_link_build_proto_msg(l, STATE_MSG, 0,
rc = tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
if (!link_is_up(l)) {
kfree_skb(__skb_dequeue(arrvq));
- return rc;
+ goto exit;
}
}
rcv_nxt = l->rcv_nxt;
if (unlikely(less(rcv_nxt, seqno))) {
l->stats.deferred_recv++;
- return rc;
+ goto exit;
}
__skb_dequeue(arrvq);
if (unlikely(more(rcv_nxt, seqno))) {
l->stats.duplicates++;
kfree_skb(skb);
- return rc;
+ goto exit;
}
/* Packet can be delivered */
l->rcv_nxt++;
l->stats.recv_info++;
- if (unlikely(!tipc_data_input(l, skb)))
- rc = tipc_link_input(l, skb);
+ if (unlikely(!tipc_data_input(l, skb, &tmpq)))
+ rc = tipc_link_input(l, skb, &tmpq);
/* Ack at regular intervals */
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
0, 0, 0, 0, xmitq);
}
}
+exit:
+ tipc_skb_queue_splice_tail(&tmpq, l->inputq);
return rc;
}