struct msg_receiver *msr, *t;
list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
- msr->r_msg = NULL;
+ msr->r_msg = NULL; /* initialize expunge ordering */
wake_up_process(msr->r_tsk);
+ /*
+ * Ensure that the wakeup is visible before setting r_msg as
+ * the receiving end depends on it: either spinning on a nil,
+ * or dealing with -EAGAIN cases. See lockless receive part 1
+ * and 2 in do_msgrcv().
+ */
smp_mb();
msr->r_msg = ERR_PTR(res);
}
list_del(&msr->r_list);
if (msr->r_maxsize < msg->m_ts) {
+ /* initialize pipelined send ordering */
msr->r_msg = NULL;
wake_up_process(msr->r_tsk);
- smp_mb();
+ smp_mb(); /* see barrier comment below */
msr->r_msg = ERR_PTR(-E2BIG);
} else {
msr->r_msg = NULL;
msq->q_lrpid = task_pid_vnr(msr->r_tsk);
msq->q_rtime = get_seconds();
wake_up_process(msr->r_tsk);
+ /*
+ * Ensure that the wakeup is visible before
+ * setting r_msg, as the receiving end depends
+ * on it. See lockless receive part 1 and 2 in
+ * do_msgrcv().
+ */
smp_mb();
msr->r_msg = msg;
}
}
}
+
return 0;
}
goto out_unlock0;
}
+ /* enqueue the sender and prepare to block */
ss_add(msq, &s);
if (!ipc_rcu_getref(msq)) {