#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
-#define CREATE_TRACE_POINTS
-#include <trace/events/workqueue.h>
+
+/*
+ * Structure fields follow one of the following exclusion rules.
+ *
+ * I: Set during initialization and read-only afterwards.
+ *
+ * L: cwq->lock protected. Access with cwq->lock held.
+ *
+ * F: wq->flush_mutex protected.
+ *
+ * W: workqueue_lock protected.
+ */
/*
* The per-CPU workqueue (if single thread, we always use the first
- * possible cpu).
+ * possible cpu). The lower WORK_STRUCT_FLAG_BITS of
+ * work_struct->data are used for flags and thus cwqs need to be
+ * aligned at two's power of the number of flag bits.
*/
struct cpu_workqueue_struct {
struct list_head worklist;
wait_queue_head_t more_work;
struct work_struct *current_work;
+ unsigned int cpu;
+
+ struct workqueue_struct *wq; /* I: the owning workqueue */
+ int work_color; /* L: current color */
+ int flush_color; /* L: flushing color */
+ int nr_in_flight[WORK_NR_COLORS];
+ /* L: nr of in_flight works */
+ struct task_struct *thread;
+};
- struct workqueue_struct *wq;
- struct task_struct *thread;
-} ____cacheline_aligned;
+/*
+ * Structure used to wait for workqueue flush.
+ */
+struct wq_flusher {
+ struct list_head list; /* F: list of flushers */
+ int flush_color; /* F: flush color waiting for */
+ struct completion done; /* flush completion */
+};
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
- struct cpu_workqueue_struct *cpu_wq;
- struct list_head list;
- const char *name;
- int singlethread;
- int freezeable; /* Freeze threads during suspend */
- int rt;
+ unsigned int flags; /* I: WQ_* flags */
+ struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
+ struct list_head list; /* W: list of all workqueues */
+
+ struct mutex flush_mutex; /* protects wq flushing */
+ int work_color; /* F: current work color */
+ int flush_color; /* F: current flush color */
+ atomic_t nr_cwqs_to_flush; /* flush in progress */
+ struct wq_flusher *first_flusher; /* F: first flusher */
+ struct list_head flusher_queue; /* F: flush waiters */
+ struct list_head flusher_overflow; /* F: flush overflow list */
+
+ const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP
- struct lockdep_map lockdep_map;
+ struct lockdep_map lockdep_map;
#endif
};
* statically initialized. We just make sure that it
* is tracked in the object tracker.
*/
- if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) {
+ if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
debug_object_init(work, &work_debug_descr);
debug_object_activate(work, &work_debug_descr);
return 0;
static LIST_HEAD(workqueues);
static int singlethread_cpu __read_mostly;
-static const struct cpumask *cpu_singlethread_map __read_mostly;
-/*
- * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
- * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
- * which comes in between can't use for_each_online_cpu(). We could
- * use cpu_possible_map, the cpumask below is more a documentation
- * than optimization.
- */
-static cpumask_var_t cpu_populated_map __read_mostly;
-/* If it's single threaded, it isn't in the list of workqueues. */
-static inline int is_wq_single_threaded(struct workqueue_struct *wq)
+static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
+ struct workqueue_struct *wq)
{
- return wq->singlethread;
+ return per_cpu_ptr(wq->cpu_wq, cpu);
}
-static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
+static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
+ struct workqueue_struct *wq)
{
- return is_wq_single_threaded(wq)
- ? cpu_singlethread_map : cpu_populated_map;
+ if (unlikely(wq->flags & WQ_SINGLE_THREAD))
+ cpu = singlethread_cpu;
+ return get_cwq(cpu, wq);
}
-static
-struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
+static unsigned int work_color_to_flags(int color)
{
- if (unlikely(is_wq_single_threaded(wq)))
- cpu = singlethread_cpu;
- return per_cpu_ptr(wq->cpu_wq, cpu);
+ return color << WORK_STRUCT_COLOR_SHIFT;
+}
+
+static int get_work_color(struct work_struct *work)
+{
+ return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
+ ((1 << WORK_STRUCT_COLOR_BITS) - 1);
+}
+
+static int work_next_color(int color)
+{
+ return (color + 1) % WORK_NR_COLORS;
}
/*
* - Must *only* be called if the pending flag is set
*/
static inline void set_wq_data(struct work_struct *work,
- struct cpu_workqueue_struct *cwq)
+ struct cpu_workqueue_struct *cwq,
+ unsigned long extra_flags)
{
- unsigned long new;
-
BUG_ON(!work_pending(work));
- new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
- new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
- atomic_long_set(&work->data, new);
+ atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) |
+ WORK_STRUCT_PENDING | extra_flags);
}
/*
*/
static inline void clear_wq_data(struct work_struct *work)
{
- unsigned long flags = *work_data_bits(work) &
- (1UL << WORK_STRUCT_STATIC);
- atomic_long_set(&work->data, flags);
+ atomic_long_set(&work->data, work_static(work));
}
-static inline
-struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
+static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
{
- return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
+ return (void *)(atomic_long_read(&work->data) &
+ WORK_STRUCT_WQ_DATA_MASK);
}
+/**
+ * insert_work - insert a work into cwq
+ * @cwq: cwq @work belongs to
+ * @work: work to insert
+ * @head: insertion point
+ * @extra_flags: extra WORK_STRUCT_* flags to set
+ *
+ * Insert @work into @cwq after @head.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
static void insert_work(struct cpu_workqueue_struct *cwq,
- struct work_struct *work, struct list_head *head)
+ struct work_struct *work, struct list_head *head,
+ unsigned int extra_flags)
{
- trace_workqueue_insertion(cwq->thread, work);
+ /* we own @work, set data and link */
+ set_wq_data(work, cwq, extra_flags);
- set_wq_data(work, cwq);
/*
* Ensure that we get the right work->data if we see the
* result of list_add() below, see try_to_grab_pending().
*/
smp_wmb();
+
list_add_tail(&work->entry, head);
wake_up(&cwq->more_work);
}
-static void __queue_work(struct cpu_workqueue_struct *cwq,
+static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
+ struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
unsigned long flags;
debug_work_activate(work);
spin_lock_irqsave(&cwq->lock, flags);
- insert_work(cwq, work, &cwq->worklist);
+ BUG_ON(!list_empty(&work->entry));
+ cwq->nr_in_flight[cwq->work_color]++;
+ insert_work(cwq, work, &cwq->worklist,
+ work_color_to_flags(cwq->work_color));
spin_unlock_irqrestore(&cwq->lock, flags);
}
{
int ret = 0;
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
- BUG_ON(!list_empty(&work->entry));
- __queue_work(wq_per_cpu(wq, cpu), work);
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+ __queue_work(cpu, wq, work);
ret = 1;
}
return ret;
{
struct delayed_work *dwork = (struct delayed_work *)__data;
struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
- struct workqueue_struct *wq = cwq->wq;
- __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
+ __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
}
/**
struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work;
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
timer_stats_timer_set_start_info(&dwork->timer);
/* This stores cwq for the moment, for the timer_fn */
- set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
+ set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
+/**
+ * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
+ * @cwq: cwq of interest
+ * @color: color of work which left the queue
+ *
+ * A work either has completed or is removed from pending queue,
+ * decrement nr_in_flight of its cwq and handle workqueue flushing.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
+{
+ /* ignore uncolored works */
+ if (color == WORK_NO_COLOR)
+ return;
+
+ cwq->nr_in_flight[color]--;
+
+ /* is flush in progress and are we at the flushing tip? */
+ if (likely(cwq->flush_color != color))
+ return;
+
+ /* are there still in-flight works? */
+ if (cwq->nr_in_flight[color])
+ return;
+
+ /* this cwq is done, clear flush_color */
+ cwq->flush_color = -1;
+
+ /*
+ * If this was the last cwq, wake up the first flusher. It
+ * will handle the rest.
+ */
+ if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
+ complete(&cwq->wq->first_flusher->done);
+}
+
+/**
+ * process_one_work - process single work
+ * @cwq: cwq to process work for
+ * @work: work to process
+ *
+ * Process @work. This function contains all the logics necessary to
+ * process a single work including synchronization against and
+ * interaction with other workers on the same cpu, queueing and
+ * flushing. As long as context requirement is met, any worker can
+ * call this function to process a work.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock) which is released and regrabbed.
+ */
+static void process_one_work(struct cpu_workqueue_struct *cwq,
+ struct work_struct *work)
+{
+ work_func_t f = work->func;
+ int work_color;
+#ifdef CONFIG_LOCKDEP
+ /*
+ * It is permissible to free the struct work_struct from
+ * inside the function that is called from it, this we need to
+ * take into account for lockdep too. To avoid bogus "held
+ * lock freed" warnings as well as problems when looking into
+ * work->lockdep_map, make a copy and use that here.
+ */
+ struct lockdep_map lockdep_map = work->lockdep_map;
+#endif
+ /* claim and process */
+ debug_work_deactivate(work);
+ cwq->current_work = work;
+ work_color = get_work_color(work);
+ list_del_init(&work->entry);
+
+ spin_unlock_irq(&cwq->lock);
+
+ BUG_ON(get_wq_data(work) != cwq);
+ work_clear_pending(work);
+ lock_map_acquire(&cwq->wq->lockdep_map);
+ lock_map_acquire(&lockdep_map);
+ f(work);
+ lock_map_release(&lockdep_map);
+ lock_map_release(&cwq->wq->lockdep_map);
+
+ if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
+ printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
+ "%s/0x%08x/%d\n",
+ current->comm, preempt_count(), task_pid_nr(current));
+ printk(KERN_ERR " last function: ");
+ print_symbol("%s\n", (unsigned long)f);
+ debug_show_held_locks(current);
+ dump_stack();
+ }
+
+ spin_lock_irq(&cwq->lock);
+
+ /* we're done with it, release */
+ cwq->current_work = NULL;
+ cwq_dec_nr_in_flight(cwq, work_color);
+}
+
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
- work_func_t f = work->func;
-#ifdef CONFIG_LOCKDEP
- /*
- * It is permissible to free the struct work_struct
- * from inside the function that is called from it,
- * this we need to take into account for lockdep too.
- * To avoid bogus "held lock freed" warnings as well
- * as problems when looking into work->lockdep_map,
- * make a copy and use that here.
- */
- struct lockdep_map lockdep_map = work->lockdep_map;
-#endif
- trace_workqueue_execution(cwq->thread, work);
- debug_work_deactivate(work);
- cwq->current_work = work;
- list_del_init(cwq->worklist.next);
- spin_unlock_irq(&cwq->lock);
-
- BUG_ON(get_wq_data(work) != cwq);
- work_clear_pending(work);
- lock_map_acquire(&cwq->wq->lockdep_map);
- lock_map_acquire(&lockdep_map);
- f(work);
- lock_map_release(&lockdep_map);
- lock_map_release(&cwq->wq->lockdep_map);
-
- if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
- printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
- "%s/0x%08x/%d\n",
- current->comm, preempt_count(),
- task_pid_nr(current));
- printk(KERN_ERR " last function: ");
- print_symbol("%s\n", (unsigned long)f);
- debug_show_held_locks(current);
- dump_stack();
- }
-
- spin_lock_irq(&cwq->lock);
- cwq->current_work = NULL;
+ process_one_work(cwq, work);
}
spin_unlock_irq(&cwq->lock);
}
+/**
+ * worker_thread - the worker thread function
+ * @__cwq: cwq to serve
+ *
+ * The cwq worker thread function.
+ */
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);
- if (cwq->wq->freezeable)
+ if (cwq->wq->flags & WQ_FREEZEABLE)
set_freezable();
for (;;) {
if (kthread_should_stop())
break;
+ if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed,
+ get_cpu_mask(cwq->cpu))))
+ set_cpus_allowed_ptr(cwq->thread,
+ get_cpu_mask(cwq->cpu));
run_workqueue(cwq);
}
complete(&barr->done);
}
+/**
+ * insert_wq_barrier - insert a barrier work
+ * @cwq: cwq to insert barrier into
+ * @barr: wq_barrier to insert
+ * @head: insertion point
+ *
+ * Insert barrier @barr into @cwq before @head.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
struct wq_barrier *barr, struct list_head *head)
{
* might deadlock.
*/
INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
- __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
-
+ __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done);
debug_work_activate(&barr->work);
- insert_work(cwq, &barr->work, head);
+ insert_work(cwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR));
}
-static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
+/**
+ * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
+ * @wq: workqueue being flushed
+ * @flush_color: new flush color, < 0 for no-op
+ * @work_color: new work color, < 0 for no-op
+ *
+ * Prepare cwqs for workqueue flushing.
+ *
+ * If @flush_color is non-negative, flush_color on all cwqs should be
+ * -1. If no cwq has in-flight commands at the specified color, all
+ * cwq->flush_color's stay at -1 and %false is returned. If any cwq
+ * has in flight commands, its cwq->flush_color is set to
+ * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
+ * wakeup logic is armed and %true is returned.
+ *
+ * The caller should have initialized @wq->first_flusher prior to
+ * calling this function with non-negative @flush_color. If
+ * @flush_color is negative, no flush color update is done and %false
+ * is returned.
+ *
+ * If @work_color is non-negative, all cwqs should have the same
+ * work_color which is previous to @work_color and all will be
+ * advanced to @work_color.
+ *
+ * CONTEXT:
+ * mutex_lock(wq->flush_mutex).
+ *
+ * RETURNS:
+ * %true if @flush_color >= 0 and there's something to flush. %false
+ * otherwise.
+ */
+static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
+ int flush_color, int work_color)
{
- int active = 0;
- struct wq_barrier barr;
+ bool wait = false;
+ unsigned int cpu;
- WARN_ON(cwq->thread == current);
-
- spin_lock_irq(&cwq->lock);
- if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
- insert_wq_barrier(cwq, &barr, &cwq->worklist);
- active = 1;
+ if (flush_color >= 0) {
+ BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
+ atomic_set(&wq->nr_cwqs_to_flush, 1);
}
- spin_unlock_irq(&cwq->lock);
- if (active) {
- wait_for_completion(&barr.done);
- destroy_work_on_stack(&barr.work);
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ spin_lock_irq(&cwq->lock);
+
+ if (flush_color >= 0) {
+ BUG_ON(cwq->flush_color != -1);
+
+ if (cwq->nr_in_flight[flush_color]) {
+ cwq->flush_color = flush_color;
+ atomic_inc(&wq->nr_cwqs_to_flush);
+ wait = true;
+ }
+ }
+
+ if (work_color >= 0) {
+ BUG_ON(work_color != work_next_color(cwq->work_color));
+ cwq->work_color = work_color;
+ }
+
+ spin_unlock_irq(&cwq->lock);
}
- return active;
+ if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
+ complete(&wq->first_flusher->done);
+
+ return wait;
}
/**
*
* We sleep until all works which were queued on entry have been handled,
* but we are not livelocked by new incoming ones.
- *
- * This function used to run the workqueues itself. Now we just wait for the
- * helper threads to do it.
*/
void flush_workqueue(struct workqueue_struct *wq)
{
- const struct cpumask *cpu_map = wq_cpu_map(wq);
- int cpu;
+ struct wq_flusher this_flusher = {
+ .list = LIST_HEAD_INIT(this_flusher.list),
+ .flush_color = -1,
+ .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
+ };
+ int next_color;
- might_sleep();
lock_map_acquire(&wq->lockdep_map);
lock_map_release(&wq->lockdep_map);
- for_each_cpu(cpu, cpu_map)
- flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
+
+ mutex_lock(&wq->flush_mutex);
+
+ /*
+ * Start-to-wait phase
+ */
+ next_color = work_next_color(wq->work_color);
+
+ if (next_color != wq->flush_color) {
+ /*
+ * Color space is not full. The current work_color
+ * becomes our flush_color and work_color is advanced
+ * by one.
+ */
+ BUG_ON(!list_empty(&wq->flusher_overflow));
+ this_flusher.flush_color = wq->work_color;
+ wq->work_color = next_color;
+
+ if (!wq->first_flusher) {
+ /* no flush in progress, become the first flusher */
+ BUG_ON(wq->flush_color != this_flusher.flush_color);
+
+ wq->first_flusher = &this_flusher;
+
+ if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
+ wq->work_color)) {
+ /* nothing to flush, done */
+ wq->flush_color = next_color;
+ wq->first_flusher = NULL;
+ goto out_unlock;
+ }
+ } else {
+ /* wait in queue */
+ BUG_ON(wq->flush_color == this_flusher.flush_color);
+ list_add_tail(&this_flusher.list, &wq->flusher_queue);
+ flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
+ }
+ } else {
+ /*
+ * Oops, color space is full, wait on overflow queue.
+ * The next flush completion will assign us
+ * flush_color and transfer to flusher_queue.
+ */
+ list_add_tail(&this_flusher.list, &wq->flusher_overflow);
+ }
+
+ mutex_unlock(&wq->flush_mutex);
+
+ wait_for_completion(&this_flusher.done);
+
+ /*
+ * Wake-up-and-cascade phase
+ *
+ * First flushers are responsible for cascading flushes and
+ * handling overflow. Non-first flushers can simply return.
+ */
+ if (wq->first_flusher != &this_flusher)
+ return;
+
+ mutex_lock(&wq->flush_mutex);
+
+ wq->first_flusher = NULL;
+
+ BUG_ON(!list_empty(&this_flusher.list));
+ BUG_ON(wq->flush_color != this_flusher.flush_color);
+
+ while (true) {
+ struct wq_flusher *next, *tmp;
+
+ /* complete all the flushers sharing the current flush color */
+ list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
+ if (next->flush_color != wq->flush_color)
+ break;
+ list_del_init(&next->list);
+ complete(&next->done);
+ }
+
+ BUG_ON(!list_empty(&wq->flusher_overflow) &&
+ wq->flush_color != work_next_color(wq->work_color));
+
+ /* this flush_color is finished, advance by one */
+ wq->flush_color = work_next_color(wq->flush_color);
+
+ /* one color has been freed, handle overflow queue */
+ if (!list_empty(&wq->flusher_overflow)) {
+ /*
+ * Assign the same color to all overflowed
+ * flushers, advance work_color and append to
+ * flusher_queue. This is the start-to-wait
+ * phase for these overflowed flushers.
+ */
+ list_for_each_entry(tmp, &wq->flusher_overflow, list)
+ tmp->flush_color = wq->work_color;
+
+ wq->work_color = work_next_color(wq->work_color);
+
+ list_splice_tail_init(&wq->flusher_overflow,
+ &wq->flusher_queue);
+ flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
+ }
+
+ if (list_empty(&wq->flusher_queue)) {
+ BUG_ON(wq->flush_color != wq->work_color);
+ break;
+ }
+
+ /*
+ * Need to flush more colors. Make the next flusher
+ * the new first flusher and arm cwqs.
+ */
+ BUG_ON(wq->flush_color == wq->work_color);
+ BUG_ON(wq->flush_color != next->flush_color);
+
+ list_del_init(&next->list);
+ wq->first_flusher = next;
+
+ if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
+ break;
+
+ /*
+ * Meh... this color is already done, clear first
+ * flusher and repeat cascading.
+ */
+ wq->first_flusher = NULL;
+ }
+
+out_unlock:
+ mutex_unlock(&wq->flush_mutex);
}
EXPORT_SYMBOL_GPL(flush_workqueue);
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
- prev = NULL;
spin_lock_irq(&cwq->lock);
if (!list_empty(&work->entry)) {
/*
*/
smp_rmb();
if (unlikely(cwq != get_wq_data(work)))
- goto out;
+ goto already_gone;
prev = &work->entry;
} else {
if (cwq->current_work != work)
- goto out;
+ goto already_gone;
prev = &cwq->worklist;
}
insert_wq_barrier(cwq, &barr, prev->next);
-out:
- spin_unlock_irq(&cwq->lock);
- if (!prev)
- return 0;
+ spin_unlock_irq(&cwq->lock);
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
return 1;
+already_gone:
+ spin_unlock_irq(&cwq->lock);
+ return 0;
}
EXPORT_SYMBOL_GPL(flush_work);
struct cpu_workqueue_struct *cwq;
int ret = -1;
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
return 0;
/*
if (cwq == get_wq_data(work)) {
debug_work_deactivate(work);
list_del_init(&work->entry);
+ cwq_dec_nr_in_flight(cwq, get_work_color(work));
ret = 1;
}
}
{
struct cpu_workqueue_struct *cwq;
struct workqueue_struct *wq;
- const struct cpumask *cpu_map;
int cpu;
might_sleep();
return;
wq = cwq->wq;
- cpu_map = wq_cpu_map(wq);
- for_each_cpu(cpu, cpu_map)
- wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
+ for_each_possible_cpu(cpu)
+ wait_on_cpu_work(get_cwq(cpu, wq), work);
}
static int __cancel_work_timer(struct work_struct *work,
void flush_delayed_work(struct delayed_work *dwork)
{
if (del_timer_sync(&dwork->timer)) {
- struct cpu_workqueue_struct *cwq;
- cwq = wq_per_cpu(get_wq_data(&dwork->work)->wq, get_cpu());
- __queue_work(cwq, &dwork->work);
+ __queue_work(get_cpu(), get_wq_data(&dwork->work)->wq,
+ &dwork->work);
put_cpu();
}
flush_work(&dwork->work);
BUG_ON(!keventd_wq);
- cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
+ cwq = get_cwq(cpu, keventd_wq);
if (current == cwq->thread)
ret = 1;
}
-static struct cpu_workqueue_struct *
-init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
+static struct cpu_workqueue_struct *alloc_cwqs(void)
{
- struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
+ /*
+ * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
+ * Make sure that the alignment isn't lower than that of
+ * unsigned long long.
+ */
+ const size_t size = sizeof(struct cpu_workqueue_struct);
+ const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
+ __alignof__(unsigned long long));
+ struct cpu_workqueue_struct *cwqs;
+#ifndef CONFIG_SMP
+ void *ptr;
- cwq->wq = wq;
- spin_lock_init(&cwq->lock);
- INIT_LIST_HEAD(&cwq->worklist);
- init_waitqueue_head(&cwq->more_work);
+ /*
+ * On UP, percpu allocator doesn't honor alignment parameter
+ * and simply uses arch-dependent default. Allocate enough
+ * room to align cwq and put an extra pointer at the end
+ * pointing back to the originally allocated pointer which
+ * will be used for free.
+ *
+ * FIXME: This really belongs to UP percpu code. Update UP
+ * percpu code to honor alignment and remove this ugliness.
+ */
+ ptr = __alloc_percpu(size + align + sizeof(void *), 1);
+ cwqs = PTR_ALIGN(ptr, align);
+ *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
+#else
+ /* On SMP, percpu allocator can do it itself */
+ cwqs = __alloc_percpu(size, align);
+#endif
+ /* just in case, make sure it's actually aligned */
+ BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
+ return cwqs;
+}
- return cwq;
+static void free_cwqs(struct cpu_workqueue_struct *cwqs)
+{
+#ifndef CONFIG_SMP
+ /* on UP, the pointer to free is stored right after the cwq */
+ if (cwqs)
+ free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
+#else
+ free_percpu(cwqs);
+#endif
}
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
- struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
struct workqueue_struct *wq = cwq->wq;
- const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
struct task_struct *p;
- p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
+ p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
/*
* Nobody can add the work_struct to this cwq,
* if (caller is __create_workqueue)
*/
if (IS_ERR(p))
return PTR_ERR(p);
- if (cwq->wq->rt)
- sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);
cwq->thread = p;
- trace_workqueue_creation(cwq->thread, cpu);
-
return 0;
}
}
struct workqueue_struct *__create_workqueue_key(const char *name,
- int singlethread,
- int freezeable,
- int rt,
+ unsigned int flags,
struct lock_class_key *key,
const char *lock_name)
{
+ bool singlethread = flags & WQ_SINGLE_THREAD;
struct workqueue_struct *wq;
- struct cpu_workqueue_struct *cwq;
int err = 0, cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
- return NULL;
+ goto err;
- wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
- if (!wq->cpu_wq) {
- kfree(wq);
- return NULL;
- }
+ wq->cpu_wq = alloc_cwqs();
+ if (!wq->cpu_wq)
+ goto err;
+ wq->flags = flags;
+ mutex_init(&wq->flush_mutex);
+ atomic_set(&wq->nr_cwqs_to_flush, 0);
+ INIT_LIST_HEAD(&wq->flusher_queue);
+ INIT_LIST_HEAD(&wq->flusher_overflow);
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
- wq->singlethread = singlethread;
- wq->freezeable = freezeable;
- wq->rt = rt;
INIT_LIST_HEAD(&wq->list);
- if (singlethread) {
- cwq = init_cpu_workqueue(wq, singlethread_cpu);
- err = create_workqueue_thread(cwq, singlethread_cpu);
- start_workqueue_thread(cwq, -1);
- } else {
- cpu_maps_update_begin();
- /*
- * We must place this wq on list even if the code below fails.
- * cpu_down(cpu) can remove cpu from cpu_populated_map before
- * destroy_workqueue() takes the lock, in that case we leak
- * cwq[cpu]->thread.
- */
- spin_lock(&workqueue_lock);
- list_add(&wq->list, &workqueues);
- spin_unlock(&workqueue_lock);
- /*
- * We must initialize cwqs for each possible cpu even if we
- * are going to call destroy_workqueue() finally. Otherwise
- * cpu_up() can hit the uninitialized cwq once we drop the
- * lock.
- */
- for_each_possible_cpu(cpu) {
- cwq = init_cpu_workqueue(wq, cpu);
- if (err || !cpu_online(cpu))
- continue;
- err = create_workqueue_thread(cwq, cpu);
+ cpu_maps_update_begin();
+ /*
+ * We must initialize cwqs for each possible cpu even if we
+ * are going to call destroy_workqueue() finally. Otherwise
+ * cpu_up() can hit the uninitialized cwq once we drop the
+ * lock.
+ */
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
+ cwq->wq = wq;
+ cwq->cpu = cpu;
+ cwq->flush_color = -1;
+ spin_lock_init(&cwq->lock);
+ INIT_LIST_HEAD(&cwq->worklist);
+ init_waitqueue_head(&cwq->more_work);
+
+ if (err)
+ continue;
+ err = create_workqueue_thread(cwq, cpu);
+ if (cpu_online(cpu) && !singlethread)
start_workqueue_thread(cwq, cpu);
- }
- cpu_maps_update_done();
+ else
+ start_workqueue_thread(cwq, -1);
}
+ spin_lock(&workqueue_lock);
+ list_add(&wq->list, &workqueues);
+ spin_unlock(&workqueue_lock);
+
+ cpu_maps_update_done();
+
if (err) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
+err:
+ if (wq) {
+ free_cwqs(wq->cpu_wq);
+ kfree(wq);
+ }
+ return NULL;
}
EXPORT_SYMBOL_GPL(__create_workqueue_key);
-static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
-{
- /*
- * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
- * cpu_add_remove_lock protects cwq->thread.
- */
- if (cwq->thread == NULL)
- return;
-
- lock_map_acquire(&cwq->wq->lockdep_map);
- lock_map_release(&cwq->wq->lockdep_map);
-
- flush_cpu_workqueue(cwq);
- /*
- * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
- * a concurrent flush_workqueue() can insert a barrier after us.
- * However, in that case run_workqueue() won't return and check
- * kthread_should_stop() until it flushes all work_struct's.
- * When ->worklist becomes empty it is safe to exit because no
- * more work_structs can be queued on this cwq: flush_workqueue
- * checks list_empty(), and a "normal" queue_work() can't use
- * a dead CPU.
- */
- trace_workqueue_destruction(cwq->thread);
- kthread_stop(cwq->thread);
- cwq->thread = NULL;
-}
-
/**
* destroy_workqueue - safely terminate a workqueue
* @wq: target workqueue
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
- const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;
cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);
+ cpu_maps_update_done();
+
+ flush_workqueue(wq);
+
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ int i;
+
+ if (cwq->thread) {
+ kthread_stop(cwq->thread);
+ cwq->thread = NULL;
+ }
- for_each_cpu(cpu, cpu_map)
- cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
- cpu_maps_update_done();
+ for (i = 0; i < WORK_NR_COLORS; i++)
+ BUG_ON(cwq->nr_in_flight[i]);
+ }
- free_percpu(wq->cpu_wq);
+ free_cwqs(wq->cpu_wq);
kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);
unsigned int cpu = (unsigned long)hcpu;
struct cpu_workqueue_struct *cwq;
struct workqueue_struct *wq;
- int ret = NOTIFY_OK;
action &= ~CPU_TASKS_FROZEN;
- switch (action) {
- case CPU_UP_PREPARE:
- cpumask_set_cpu(cpu, cpu_populated_map);
- }
-undo:
list_for_each_entry(wq, &workqueues, list) {
- cwq = per_cpu_ptr(wq->cpu_wq, cpu);
+ if (wq->flags & WQ_SINGLE_THREAD)
+ continue;
- switch (action) {
- case CPU_UP_PREPARE:
- if (!create_workqueue_thread(cwq, cpu))
- break;
- printk(KERN_ERR "workqueue [%s] for %i failed\n",
- wq->name, cpu);
- action = CPU_UP_CANCELED;
- ret = NOTIFY_BAD;
- goto undo;
-
- case CPU_ONLINE:
- start_workqueue_thread(cwq, cpu);
- break;
+ cwq = get_cwq(cpu, wq);
- case CPU_UP_CANCELED:
- start_workqueue_thread(cwq, -1);
+ switch (action) {
case CPU_POST_DEAD:
- cleanup_workqueue_thread(cwq);
+ flush_workqueue(wq);
break;
}
}
- switch (action) {
- case CPU_UP_CANCELED:
- case CPU_POST_DEAD:
- cpumask_clear_cpu(cpu, cpu_populated_map);
- }
-
- return ret;
+ return notifier_from_errno(0);
}
#ifdef CONFIG_SMP
void __init init_workqueues(void)
{
- alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
-
- cpumask_copy(cpu_populated_map, cpu_online_mask);
singlethread_cpu = cpumask_first(cpu_possible_mask);
- cpu_singlethread_map = cpumask_of(singlethread_cpu);
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);