if (bcbearer->remains_new.count == bcbearer->remains.count)
continue; /* bearer pair doesn't add anything */
- if (p->blocked ||
- p->media->send_msg(buf, p, &p->media->bcast_addr)) {
+ if (!tipc_bearer_blocked(p))
+ tipc_bearer_send(p, buf, &p->media->bcast_addr);
+ else if (s && !tipc_bearer_blocked(s))
/* unable to send on primary bearer */
- if (!s || s->blocked ||
- s->media->send_msg(buf, s,
- &s->media->bcast_addr)) {
- /* unable to send on either bearer */
- continue;
- }
- }
+ tipc_bearer_send(s, buf, &s->media->bcast_addr);
+ else
+ /* unable to send on either bearer */
+ continue;
if (s) {
bcbearer->bpairs[bp_index].primary = s;
" TX naks:%u acks:%u dups:%u\n",
s->sent_nacks, s->sent_acks, s->retransmitted);
ret += tipc_snprintf(buf + ret, buf_size - ret,
- " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n",
- s->bearer_congs, s->link_congs, s->max_queue_sz,
+ " Congestion link:%u Send queue max:%u avg:%u\n",
+ s->link_congs, s->max_queue_sz,
s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
void tipc_bclink_init(void)
{
- INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send;
sprintf(bcbearer->media.name, "tipc-broadcast");
}
/*
- * bearer_push(): Resolve bearer congestion. Force the waiting
- * links to push out their unsent packets, one packet per link
- * per iteration, until all packets are gone or congestion reoccurs.
- * 'tipc_net_lock' is read_locked when this function is called
- * bearer.lock must be taken before calling
- * Returns binary true(1) ore false(0)
- */
-static int bearer_push(struct tipc_bearer *b_ptr)
-{
- u32 res = 0;
- struct tipc_link *ln, *tln;
-
- if (b_ptr->blocked)
- return 0;
-
- while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) {
- list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) {
- res = tipc_link_push_packet(ln);
- if (res == PUSH_FAILED)
- break;
- if (res == PUSH_FINISHED)
- list_move_tail(&ln->link_list, &b_ptr->links);
- }
- }
- return list_empty(&b_ptr->cong_links);
-}
-
-void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)
-{
- spin_lock_bh(&b_ptr->lock);
- bearer_push(b_ptr);
- spin_unlock_bh(&b_ptr->lock);
-}
-
-
-/*
- * Interrupt enabling new requests after bearer congestion or blocking:
+ * Interrupt enabling new requests after bearer blocking:
* See bearer_send().
*/
-void tipc_continue(struct tipc_bearer *b_ptr)
+void tipc_continue(struct tipc_bearer *b)
{
- spin_lock_bh(&b_ptr->lock);
- if (!list_empty(&b_ptr->cong_links))
- tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr);
- b_ptr->blocked = 0;
- spin_unlock_bh(&b_ptr->lock);
+ spin_lock_bh(&b->lock);
+ b->blocked = 0;
+ spin_unlock_bh(&b->lock);
}
/*
- * Schedule link for sending of messages after the bearer
- * has been deblocked by 'continue()'. This method is called
- * when somebody tries to send a message via this link while
- * the bearer is congested. 'tipc_net_lock' is in read_lock here
- * bearer.lock is busy
+ * tipc_bearer_blocked - determines if bearer is currently blocked
*/
-static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr,
- struct tipc_link *l_ptr)
+int tipc_bearer_blocked(struct tipc_bearer *b)
{
- list_move_tail(&l_ptr->link_list, &b_ptr->cong_links);
-}
-
-/*
- * Schedule link for sending of messages after the bearer
- * has been deblocked by 'continue()'. This method is called
- * when somebody tries to send a message via this link while
- * the bearer is congested. 'tipc_net_lock' is in read_lock here,
- * bearer.lock is free
- */
-void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
-{
- spin_lock_bh(&b_ptr->lock);
- tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
- spin_unlock_bh(&b_ptr->lock);
-}
-
+ int res;
-/*
- * tipc_bearer_resolve_congestion(): Check if there is bearer congestion,
- * and if there is, try to resolve it before returning.
- * 'tipc_net_lock' is read_locked when this function is called
- */
-int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
- struct tipc_link *l_ptr)
-{
- int res = 1;
+ spin_lock_bh(&b->lock);
+ res = b->blocked;
+ spin_unlock_bh(&b->lock);
- if (list_empty(&b_ptr->cong_links))
- return 1;
- spin_lock_bh(&b_ptr->lock);
- if (!bearer_push(b_ptr)) {
- tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
- res = 0;
- }
- spin_unlock_bh(&b_ptr->lock);
return res;
}
-/**
- * tipc_bearer_congested - determines if bearer is currently congested
- */
-int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
-{
- if (unlikely(b_ptr->blocked))
- return 1;
- if (likely(list_empty(&b_ptr->cong_links)))
- return 0;
- return !tipc_bearer_resolve_congestion(b_ptr, l_ptr);
-}
-
/**
* tipc_enable_bearer - enable bearer with the given name
*/
b_ptr->net_plane = bearer_id + 'A';
b_ptr->active = 1;
b_ptr->priority = priority;
- INIT_LIST_HEAD(&b_ptr->cong_links);
INIT_LIST_HEAD(&b_ptr->links);
spin_lock_init(&b_ptr->lock);
pr_info("Blocking bearer <%s>\n", name);
spin_lock_bh(&b_ptr->lock);
b_ptr->blocked = 1;
- list_splice_init(&b_ptr->cong_links, &b_ptr->links);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
struct tipc_node *n_ptr = l_ptr->owner;
spin_lock_bh(&b_ptr->lock);
b_ptr->blocked = 1;
b_ptr->media->disable_bearer(b_ptr);
- list_splice_init(&b_ptr->cong_links, &b_ptr->links);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
tipc_link_delete(l_ptr);
}
* @identity: array index of this bearer within TIPC bearer array
* @link_req: ptr to (optional) structure making periodic link setup requests
* @links: list of non-congested links associated with bearer
- * @cong_links: list of congested links associated with bearer
* @active: non-zero if bearer structure is represents a bearer
* @net_plane: network plane ('A' through 'H') currently associated with bearer
* @nodes: indicates which nodes in cluster can be reached through bearer
u32 identity;
struct tipc_link_req *link_req;
struct list_head links;
- struct list_head cong_links;
int active;
char net_plane;
struct tipc_node_map nodes;
struct sk_buff *tipc_bearer_get_names(void);
void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
-void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
struct tipc_bearer *tipc_bearer_find(const char *name);
struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
struct tipc_media *tipc_media_find(const char *name);
-int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
- struct tipc_link *l_ptr);
-int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
+int tipc_bearer_blocked(struct tipc_bearer *b_ptr);
void tipc_bearer_stop(void);
-void tipc_bearer_lock_push(struct tipc_bearer *b_ptr);
-
/**
* tipc_bearer_send- sends buffer to destination over bearer
*
- * Returns true (1) if successful, or false (0) if unable to send
- *
* IMPORTANT:
* The media send routine must not alter the buffer being passed in
* as it may be needed for later retransmission!
- *
- * If the media send routine returns a non-zero value (indicating that
- * it was unable to send the buffer), it must:
- * 1) mark the bearer as blocked,
- * 2) call tipc_continue() once the bearer is able to send again.
- * Media types that are unable to meet these two critera must ensure their
- * send routine always returns success -- even if the buffer was not sent --
- * and let TIPC's link code deal with the undelivered message.
*/
-static inline int tipc_bearer_send(struct tipc_bearer *b_ptr,
- struct sk_buff *buf,
+static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
struct tipc_media_addr *dest)
{
- return !b_ptr->media->send_msg(buf, b_ptr, dest);
+ b->media->send_msg(buf, b, dest);
}
#endif /* _TIPC_BEARER_H */
if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
if (rbuf) {
- b_ptr->media->send_msg(rbuf, b_ptr, &media_addr);
+ tipc_bearer_send(b_ptr, rbuf, &media_addr);
kfree_skb(rbuf);
}
}
return link_send_long_buf(l_ptr, buf);
/* Packet can be queued or sent. */
- if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
+ if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
!link_congested(l_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
- if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
- l_ptr->unacked_window = 0;
- } else {
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
- l_ptr->stats.bearer_congs++;
- l_ptr->next_out = buf;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ l_ptr->unacked_window = 0;
return dsz;
}
/* Congestion: can message be bundled ? */
/* Try adding message to an existing bundle */
if (l_ptr->next_out &&
- link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
- tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
+ link_bundle_buf(l_ptr, l_ptr->last_out, buf))
return dsz;
- }
/* Try creating a new bundle */
if (size <= max_packet * 2 / 3) {
if (!l_ptr->next_out)
l_ptr->next_out = buf;
link_add_to_outqueue(l_ptr, buf, msg);
- tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
return dsz;
}
if (likely(!link_congested(l_ptr))) {
if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
- if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
+ if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
- if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
- &l_ptr->media_addr))) {
- l_ptr->unacked_window = 0;
- return res;
- }
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
- l_ptr->stats.bearer_congs++;
- l_ptr->next_out = buf;
+ tipc_bearer_send(l_ptr->b_ptr, buf,
+ &l_ptr->media_addr);
+ l_ptr->unacked_window = 0;
return res;
}
} else
/* Exit if link (or bearer) is congested */
if (link_congested(l_ptr) ||
- !list_empty(&l_ptr->b_ptr->cong_links)) {
+ tipc_bearer_blocked(l_ptr->b_ptr)) {
res = link_schedule_port(l_ptr,
sender->ref, res);
goto exit;
if (r_q_size && buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- l_ptr->retransm_queue_head = mod(++r_q_head);
- l_ptr->retransm_queue_size = --r_q_size;
- l_ptr->stats.retransmitted++;
- return 0;
- } else {
- l_ptr->stats.bearer_congs++;
- return PUSH_FAILED;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ l_ptr->retransm_queue_head = mod(++r_q_head);
+ l_ptr->retransm_queue_size = --r_q_size;
+ l_ptr->stats.retransmitted++;
+ return 0;
}
/* Send deferred protocol message, if any: */
if (buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- l_ptr->unacked_window = 0;
- kfree_skb(buf);
- l_ptr->proto_msg_queue = NULL;
- return 0;
- } else {
- l_ptr->stats.bearer_congs++;
- return PUSH_FAILED;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ l_ptr->unacked_window = 0;
+ kfree_skb(buf);
+ l_ptr->proto_msg_queue = NULL;
+ return 0;
}
/* Send one deferred data message, if send window not full: */
if (mod(next - first) < l_ptr->queue_limit[0]) {
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- if (msg_user(msg) == MSG_BUNDLER)
- msg_set_type(msg, CLOSED_MSG);
- l_ptr->next_out = buf->next;
- return 0;
- } else {
- l_ptr->stats.bearer_congs++;
- return PUSH_FAILED;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ if (msg_user(msg) == MSG_BUNDLER)
+ msg_set_type(msg, CLOSED_MSG);
+ l_ptr->next_out = buf->next;
+ return 0;
}
}
- return PUSH_FINISHED;
+ return 1;
}
/*
{
u32 res;
- if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
+ if (tipc_bearer_blocked(l_ptr->b_ptr))
return;
do {
res = tipc_link_push_packet(l_ptr);
} while (!res);
-
- if (res == PUSH_FAILED)
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
}
static void link_reset_all(unsigned long addr)
msg = buf_msg(buf);
- if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
+ if (tipc_bearer_blocked(l_ptr->b_ptr)) {
if (l_ptr->retransm_queue_size == 0) {
l_ptr->retransm_queue_head = msg_seqno(msg);
l_ptr->retransm_queue_size = retransmits;
}
return;
} else {
- /* Detect repeated retransmit failures on uncongested bearer */
+ /* Detect repeated retransmit failures on unblocked bearer */
if (l_ptr->last_retransmitted == msg_seqno(msg)) {
if (++l_ptr->stale_count > 100) {
link_retransmit_failure(l_ptr, buf);
msg = buf_msg(buf);
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- buf = buf->next;
- retransmits--;
- l_ptr->stats.retransmitted++;
- } else {
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
- l_ptr->stats.bearer_congs++;
- l_ptr->retransm_queue_head = buf_seqno(buf);
- l_ptr->retransm_queue_size = retransmits;
- return;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ buf = buf->next;
+ retransmits--;
+ l_ptr->stats.retransmitted++;
}
l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
- /* Defer message if bearer is already congested */
- if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
- l_ptr->proto_msg_queue = buf;
- return;
- }
-
- /* Defer message if attempting to send results in bearer congestion */
- if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
+ /* Defer message if bearer is already blocked */
+ if (tipc_bearer_blocked(l_ptr->b_ptr)) {
l_ptr->proto_msg_queue = buf;
- l_ptr->stats.bearer_congs++;
return;
}
- /* Discard message if it was sent successfully */
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
l_ptr->unacked_window = 0;
kfree_skb(buf);
}
s->sent_nacks, s->sent_acks, s->retransmitted);
ret += tipc_snprintf(buf + ret, buf_size - ret,
- " Congestion bearer:%u link:%u Send queue"
- " max:%u avg:%u\n", s->bearer_congs, s->link_congs,
+ " Congestion link:%u Send queue"
+ " max:%u avg:%u\n", s->link_congs,
s->max_queue_sz, s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
#include "msg.h"
#include "node.h"
-#define PUSH_FAILED 1
-#define PUSH_FINISHED 2
-
/*
* Out-of-range value for link sequence numbers
*/
u32 recv_fragmented;
u32 recv_fragments;
u32 link_congs; /* # port sends blocked by congestion */
- u32 bearer_congs;
u32 deferred_recv;
u32 duplicates;
u32 max_queue_sz; /* send queue size high water mark */