X-Git-Url: https://git.stricted.de/?a=blobdiff_plain;f=kernel%2Fworkqueue.c;h=56e47c59d73b8ff58b7412bd8a8ddeef01f44025;hb=73f53c4aa732eced5fcb1844d3d452c30905f20f;hp=77dabbf64b8fd45acdab9eb33cde54bad83cba62;hpb=7466a38478a30d5f7248134c9bdcb4e1c01fe4d9;p=GitHub%2Fmt8127%2Fandroid_kernel_alcatel_ttab.git diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 77dabbf64b8f..56e47c59d73b 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -33,12 +33,24 @@ #include #include #include -#define CREATE_TRACE_POINTS -#include + +/* + * Structure fields follow one of the following exclusion rules. + * + * I: Set during initialization and read-only afterwards. + * + * L: cwq->lock protected. Access with cwq->lock held. + * + * F: wq->flush_mutex protected. + * + * W: workqueue_lock protected. + */ /* * The per-CPU workqueue (if single thread, we always use the first - * possible cpu). + * possible cpu). The lower WORK_STRUCT_FLAG_BITS of + * work_struct->data are used for flags and thus cwqs need to be + * aligned at two's power of the number of flag bits. */ struct cpu_workqueue_struct { @@ -47,24 +59,45 @@ struct cpu_workqueue_struct { struct list_head worklist; wait_queue_head_t more_work; struct work_struct *current_work; + unsigned int cpu; + + struct workqueue_struct *wq; /* I: the owning workqueue */ + int work_color; /* L: current color */ + int flush_color; /* L: flushing color */ + int nr_in_flight[WORK_NR_COLORS]; + /* L: nr of in_flight works */ + struct task_struct *thread; +}; - struct workqueue_struct *wq; - struct task_struct *thread; -} ____cacheline_aligned; +/* + * Structure used to wait for workqueue flush. + */ +struct wq_flusher { + struct list_head list; /* F: list of flushers */ + int flush_color; /* F: flush color waiting for */ + struct completion done; /* flush completion */ +}; /* * The externally visible workqueue abstraction is an array of * per-CPU workqueues: */ struct workqueue_struct { - struct cpu_workqueue_struct *cpu_wq; - struct list_head list; - const char *name; - int singlethread; - int freezeable; /* Freeze threads during suspend */ - int rt; + unsigned int flags; /* I: WQ_* flags */ + struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */ + struct list_head list; /* W: list of all workqueues */ + + struct mutex flush_mutex; /* protects wq flushing */ + int work_color; /* F: current work color */ + int flush_color; /* F: current flush color */ + atomic_t nr_cwqs_to_flush; /* flush in progress */ + struct wq_flusher *first_flusher; /* F: first flusher */ + struct list_head flusher_queue; /* F: flush waiters */ + struct list_head flusher_overflow; /* F: flush overflow list */ + + const char *name; /* I: workqueue name */ #ifdef CONFIG_LOCKDEP - struct lockdep_map lockdep_map; + struct lockdep_map lockdep_map; #endif }; @@ -107,7 +140,7 @@ static int work_fixup_activate(void *addr, enum debug_obj_state state) * statically initialized. We just make sure that it * is tracked in the object tracker. */ - if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) { + if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) { debug_object_init(work, &work_debug_descr); debug_object_activate(work, &work_debug_descr); return 0; @@ -183,34 +216,35 @@ static DEFINE_SPINLOCK(workqueue_lock); static LIST_HEAD(workqueues); static int singlethread_cpu __read_mostly; -static const struct cpumask *cpu_singlethread_map __read_mostly; -/* - * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD - * flushes cwq->worklist. This means that flush_workqueue/wait_on_work - * which comes in between can't use for_each_online_cpu(). We could - * use cpu_possible_map, the cpumask below is more a documentation - * than optimization. - */ -static cpumask_var_t cpu_populated_map __read_mostly; -/* If it's single threaded, it isn't in the list of workqueues. */ -static inline int is_wq_single_threaded(struct workqueue_struct *wq) +static struct cpu_workqueue_struct *get_cwq(unsigned int cpu, + struct workqueue_struct *wq) { - return wq->singlethread; + return per_cpu_ptr(wq->cpu_wq, cpu); } -static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) +static struct cpu_workqueue_struct *target_cwq(unsigned int cpu, + struct workqueue_struct *wq) { - return is_wq_single_threaded(wq) - ? cpu_singlethread_map : cpu_populated_map; + if (unlikely(wq->flags & WQ_SINGLE_THREAD)) + cpu = singlethread_cpu; + return get_cwq(cpu, wq); } -static -struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu) +static unsigned int work_color_to_flags(int color) { - if (unlikely(is_wq_single_threaded(wq))) - cpu = singlethread_cpu; - return per_cpu_ptr(wq->cpu_wq, cpu); + return color << WORK_STRUCT_COLOR_SHIFT; +} + +static int get_work_color(struct work_struct *work) +{ + return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) & + ((1 << WORK_STRUCT_COLOR_BITS) - 1); +} + +static int work_next_color(int color) +{ + return (color + 1) % WORK_NR_COLORS; } /* @@ -218,15 +252,13 @@ struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu) * - Must *only* be called if the pending flag is set */ static inline void set_wq_data(struct work_struct *work, - struct cpu_workqueue_struct *cwq) + struct cpu_workqueue_struct *cwq, + unsigned long extra_flags) { - unsigned long new; - BUG_ON(!work_pending(work)); - new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); - new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); - atomic_long_set(&work->data, new); + atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) | + WORK_STRUCT_PENDING | extra_flags); } /* @@ -234,40 +266,56 @@ static inline void set_wq_data(struct work_struct *work, */ static inline void clear_wq_data(struct work_struct *work) { - unsigned long flags = *work_data_bits(work) & - (1UL << WORK_STRUCT_STATIC); - atomic_long_set(&work->data, flags); + atomic_long_set(&work->data, work_static(work)); } -static inline -struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) +static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) { - return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); + return (void *)(atomic_long_read(&work->data) & + WORK_STRUCT_WQ_DATA_MASK); } +/** + * insert_work - insert a work into cwq + * @cwq: cwq @work belongs to + * @work: work to insert + * @head: insertion point + * @extra_flags: extra WORK_STRUCT_* flags to set + * + * Insert @work into @cwq after @head. + * + * CONTEXT: + * spin_lock_irq(cwq->lock). + */ static void insert_work(struct cpu_workqueue_struct *cwq, - struct work_struct *work, struct list_head *head) + struct work_struct *work, struct list_head *head, + unsigned int extra_flags) { - trace_workqueue_insertion(cwq->thread, work); + /* we own @work, set data and link */ + set_wq_data(work, cwq, extra_flags); - set_wq_data(work, cwq); /* * Ensure that we get the right work->data if we see the * result of list_add() below, see try_to_grab_pending(). */ smp_wmb(); + list_add_tail(&work->entry, head); wake_up(&cwq->more_work); } -static void __queue_work(struct cpu_workqueue_struct *cwq, +static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, struct work_struct *work) { + struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq); unsigned long flags; debug_work_activate(work); spin_lock_irqsave(&cwq->lock, flags); - insert_work(cwq, work, &cwq->worklist); + BUG_ON(!list_empty(&work->entry)); + cwq->nr_in_flight[cwq->work_color]++; + insert_work(cwq, work, &cwq->worklist, + work_color_to_flags(cwq->work_color)); spin_unlock_irqrestore(&cwq->lock, flags); } @@ -308,9 +356,8 @@ queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) { int ret = 0; - if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { - BUG_ON(!list_empty(&work->entry)); - __queue_work(wq_per_cpu(wq, cpu), work); + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { + __queue_work(cpu, wq, work); ret = 1; } return ret; @@ -321,9 +368,8 @@ static void delayed_work_timer_fn(unsigned long __data) { struct delayed_work *dwork = (struct delayed_work *)__data; struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); - struct workqueue_struct *wq = cwq->wq; - __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work); + __queue_work(smp_processor_id(), cwq->wq, &dwork->work); } /** @@ -360,14 +406,14 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, struct timer_list *timer = &dwork->timer; struct work_struct *work = &dwork->work; - if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { BUG_ON(timer_pending(timer)); BUG_ON(!list_empty(&work->entry)); timer_stats_timer_set_start_info(&dwork->timer); /* This stores cwq for the moment, for the timer_fn */ - set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id())); + set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0); timer->expires = jiffies + delay; timer->data = (unsigned long)dwork; timer->function = delayed_work_timer_fn; @@ -382,61 +428,129 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, } EXPORT_SYMBOL_GPL(queue_delayed_work_on); +/** + * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight + * @cwq: cwq of interest + * @color: color of work which left the queue + * + * A work either has completed or is removed from pending queue, + * decrement nr_in_flight of its cwq and handle workqueue flushing. + * + * CONTEXT: + * spin_lock_irq(cwq->lock). + */ +static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color) +{ + /* ignore uncolored works */ + if (color == WORK_NO_COLOR) + return; + + cwq->nr_in_flight[color]--; + + /* is flush in progress and are we at the flushing tip? */ + if (likely(cwq->flush_color != color)) + return; + + /* are there still in-flight works? */ + if (cwq->nr_in_flight[color]) + return; + + /* this cwq is done, clear flush_color */ + cwq->flush_color = -1; + + /* + * If this was the last cwq, wake up the first flusher. It + * will handle the rest. + */ + if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush)) + complete(&cwq->wq->first_flusher->done); +} + +/** + * process_one_work - process single work + * @cwq: cwq to process work for + * @work: work to process + * + * Process @work. This function contains all the logics necessary to + * process a single work including synchronization against and + * interaction with other workers on the same cpu, queueing and + * flushing. As long as context requirement is met, any worker can + * call this function to process a work. + * + * CONTEXT: + * spin_lock_irq(cwq->lock) which is released and regrabbed. + */ +static void process_one_work(struct cpu_workqueue_struct *cwq, + struct work_struct *work) +{ + work_func_t f = work->func; + int work_color; +#ifdef CONFIG_LOCKDEP + /* + * It is permissible to free the struct work_struct from + * inside the function that is called from it, this we need to + * take into account for lockdep too. To avoid bogus "held + * lock freed" warnings as well as problems when looking into + * work->lockdep_map, make a copy and use that here. + */ + struct lockdep_map lockdep_map = work->lockdep_map; +#endif + /* claim and process */ + debug_work_deactivate(work); + cwq->current_work = work; + work_color = get_work_color(work); + list_del_init(&work->entry); + + spin_unlock_irq(&cwq->lock); + + BUG_ON(get_wq_data(work) != cwq); + work_clear_pending(work); + lock_map_acquire(&cwq->wq->lockdep_map); + lock_map_acquire(&lockdep_map); + f(work); + lock_map_release(&lockdep_map); + lock_map_release(&cwq->wq->lockdep_map); + + if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { + printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " + "%s/0x%08x/%d\n", + current->comm, preempt_count(), task_pid_nr(current)); + printk(KERN_ERR " last function: "); + print_symbol("%s\n", (unsigned long)f); + debug_show_held_locks(current); + dump_stack(); + } + + spin_lock_irq(&cwq->lock); + + /* we're done with it, release */ + cwq->current_work = NULL; + cwq_dec_nr_in_flight(cwq, work_color); +} + static void run_workqueue(struct cpu_workqueue_struct *cwq) { spin_lock_irq(&cwq->lock); while (!list_empty(&cwq->worklist)) { struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); - work_func_t f = work->func; -#ifdef CONFIG_LOCKDEP - /* - * It is permissible to free the struct work_struct - * from inside the function that is called from it, - * this we need to take into account for lockdep too. - * To avoid bogus "held lock freed" warnings as well - * as problems when looking into work->lockdep_map, - * make a copy and use that here. - */ - struct lockdep_map lockdep_map = work->lockdep_map; -#endif - trace_workqueue_execution(cwq->thread, work); - debug_work_deactivate(work); - cwq->current_work = work; - list_del_init(cwq->worklist.next); - spin_unlock_irq(&cwq->lock); - - BUG_ON(get_wq_data(work) != cwq); - work_clear_pending(work); - lock_map_acquire(&cwq->wq->lockdep_map); - lock_map_acquire(&lockdep_map); - f(work); - lock_map_release(&lockdep_map); - lock_map_release(&cwq->wq->lockdep_map); - - if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { - printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " - "%s/0x%08x/%d\n", - current->comm, preempt_count(), - task_pid_nr(current)); - printk(KERN_ERR " last function: "); - print_symbol("%s\n", (unsigned long)f); - debug_show_held_locks(current); - dump_stack(); - } - - spin_lock_irq(&cwq->lock); - cwq->current_work = NULL; + process_one_work(cwq, work); } spin_unlock_irq(&cwq->lock); } +/** + * worker_thread - the worker thread function + * @__cwq: cwq to serve + * + * The cwq worker thread function. + */ static int worker_thread(void *__cwq) { struct cpu_workqueue_struct *cwq = __cwq; DEFINE_WAIT(wait); - if (cwq->wq->freezeable) + if (cwq->wq->flags & WQ_FREEZEABLE) set_freezable(); for (;;) { @@ -452,6 +566,10 @@ static int worker_thread(void *__cwq) if (kthread_should_stop()) break; + if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed, + get_cpu_mask(cwq->cpu)))) + set_cpus_allowed_ptr(cwq->thread, + get_cpu_mask(cwq->cpu)); run_workqueue(cwq); } @@ -469,6 +587,17 @@ static void wq_barrier_func(struct work_struct *work) complete(&barr->done); } +/** + * insert_wq_barrier - insert a barrier work + * @cwq: cwq to insert barrier into + * @barr: wq_barrier to insert + * @head: insertion point + * + * Insert barrier @barr into @cwq before @head. + * + * CONTEXT: + * spin_lock_irq(cwq->lock). + */ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, struct wq_barrier *barr, struct list_head *head) { @@ -479,34 +608,82 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, * might deadlock. */ INIT_WORK_ON_STACK(&barr->work, wq_barrier_func); - __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); - + __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work)); init_completion(&barr->done); debug_work_activate(&barr->work); - insert_work(cwq, &barr->work, head); + insert_work(cwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR)); } -static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) +/** + * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing + * @wq: workqueue being flushed + * @flush_color: new flush color, < 0 for no-op + * @work_color: new work color, < 0 for no-op + * + * Prepare cwqs for workqueue flushing. + * + * If @flush_color is non-negative, flush_color on all cwqs should be + * -1. If no cwq has in-flight commands at the specified color, all + * cwq->flush_color's stay at -1 and %false is returned. If any cwq + * has in flight commands, its cwq->flush_color is set to + * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq + * wakeup logic is armed and %true is returned. + * + * The caller should have initialized @wq->first_flusher prior to + * calling this function with non-negative @flush_color. If + * @flush_color is negative, no flush color update is done and %false + * is returned. + * + * If @work_color is non-negative, all cwqs should have the same + * work_color which is previous to @work_color and all will be + * advanced to @work_color. + * + * CONTEXT: + * mutex_lock(wq->flush_mutex). + * + * RETURNS: + * %true if @flush_color >= 0 and there's something to flush. %false + * otherwise. + */ +static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq, + int flush_color, int work_color) { - int active = 0; - struct wq_barrier barr; + bool wait = false; + unsigned int cpu; - WARN_ON(cwq->thread == current); - - spin_lock_irq(&cwq->lock); - if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { - insert_wq_barrier(cwq, &barr, &cwq->worklist); - active = 1; + if (flush_color >= 0) { + BUG_ON(atomic_read(&wq->nr_cwqs_to_flush)); + atomic_set(&wq->nr_cwqs_to_flush, 1); } - spin_unlock_irq(&cwq->lock); - if (active) { - wait_for_completion(&barr.done); - destroy_work_on_stack(&barr.work); + for_each_possible_cpu(cpu) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + + spin_lock_irq(&cwq->lock); + + if (flush_color >= 0) { + BUG_ON(cwq->flush_color != -1); + + if (cwq->nr_in_flight[flush_color]) { + cwq->flush_color = flush_color; + atomic_inc(&wq->nr_cwqs_to_flush); + wait = true; + } + } + + if (work_color >= 0) { + BUG_ON(work_color != work_next_color(cwq->work_color)); + cwq->work_color = work_color; + } + + spin_unlock_irq(&cwq->lock); } - return active; + if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush)) + complete(&wq->first_flusher->done); + + return wait; } /** @@ -518,20 +695,146 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) * * We sleep until all works which were queued on entry have been handled, * but we are not livelocked by new incoming ones. - * - * This function used to run the workqueues itself. Now we just wait for the - * helper threads to do it. */ void flush_workqueue(struct workqueue_struct *wq) { - const struct cpumask *cpu_map = wq_cpu_map(wq); - int cpu; + struct wq_flusher this_flusher = { + .list = LIST_HEAD_INIT(this_flusher.list), + .flush_color = -1, + .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done), + }; + int next_color; - might_sleep(); lock_map_acquire(&wq->lockdep_map); lock_map_release(&wq->lockdep_map); - for_each_cpu(cpu, cpu_map) - flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); + + mutex_lock(&wq->flush_mutex); + + /* + * Start-to-wait phase + */ + next_color = work_next_color(wq->work_color); + + if (next_color != wq->flush_color) { + /* + * Color space is not full. The current work_color + * becomes our flush_color and work_color is advanced + * by one. + */ + BUG_ON(!list_empty(&wq->flusher_overflow)); + this_flusher.flush_color = wq->work_color; + wq->work_color = next_color; + + if (!wq->first_flusher) { + /* no flush in progress, become the first flusher */ + BUG_ON(wq->flush_color != this_flusher.flush_color); + + wq->first_flusher = &this_flusher; + + if (!flush_workqueue_prep_cwqs(wq, wq->flush_color, + wq->work_color)) { + /* nothing to flush, done */ + wq->flush_color = next_color; + wq->first_flusher = NULL; + goto out_unlock; + } + } else { + /* wait in queue */ + BUG_ON(wq->flush_color == this_flusher.flush_color); + list_add_tail(&this_flusher.list, &wq->flusher_queue); + flush_workqueue_prep_cwqs(wq, -1, wq->work_color); + } + } else { + /* + * Oops, color space is full, wait on overflow queue. + * The next flush completion will assign us + * flush_color and transfer to flusher_queue. + */ + list_add_tail(&this_flusher.list, &wq->flusher_overflow); + } + + mutex_unlock(&wq->flush_mutex); + + wait_for_completion(&this_flusher.done); + + /* + * Wake-up-and-cascade phase + * + * First flushers are responsible for cascading flushes and + * handling overflow. Non-first flushers can simply return. + */ + if (wq->first_flusher != &this_flusher) + return; + + mutex_lock(&wq->flush_mutex); + + wq->first_flusher = NULL; + + BUG_ON(!list_empty(&this_flusher.list)); + BUG_ON(wq->flush_color != this_flusher.flush_color); + + while (true) { + struct wq_flusher *next, *tmp; + + /* complete all the flushers sharing the current flush color */ + list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) { + if (next->flush_color != wq->flush_color) + break; + list_del_init(&next->list); + complete(&next->done); + } + + BUG_ON(!list_empty(&wq->flusher_overflow) && + wq->flush_color != work_next_color(wq->work_color)); + + /* this flush_color is finished, advance by one */ + wq->flush_color = work_next_color(wq->flush_color); + + /* one color has been freed, handle overflow queue */ + if (!list_empty(&wq->flusher_overflow)) { + /* + * Assign the same color to all overflowed + * flushers, advance work_color and append to + * flusher_queue. This is the start-to-wait + * phase for these overflowed flushers. + */ + list_for_each_entry(tmp, &wq->flusher_overflow, list) + tmp->flush_color = wq->work_color; + + wq->work_color = work_next_color(wq->work_color); + + list_splice_tail_init(&wq->flusher_overflow, + &wq->flusher_queue); + flush_workqueue_prep_cwqs(wq, -1, wq->work_color); + } + + if (list_empty(&wq->flusher_queue)) { + BUG_ON(wq->flush_color != wq->work_color); + break; + } + + /* + * Need to flush more colors. Make the next flusher + * the new first flusher and arm cwqs. + */ + BUG_ON(wq->flush_color == wq->work_color); + BUG_ON(wq->flush_color != next->flush_color); + + list_del_init(&next->list); + wq->first_flusher = next; + + if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1)) + break; + + /* + * Meh... this color is already done, clear first + * flusher and repeat cascading. + */ + wq->first_flusher = NULL; + } + +out_unlock: + mutex_unlock(&wq->flush_mutex); } EXPORT_SYMBOL_GPL(flush_workqueue); @@ -559,7 +862,6 @@ int flush_work(struct work_struct *work) lock_map_acquire(&cwq->wq->lockdep_map); lock_map_release(&cwq->wq->lockdep_map); - prev = NULL; spin_lock_irq(&cwq->lock); if (!list_empty(&work->entry)) { /* @@ -568,22 +870,22 @@ int flush_work(struct work_struct *work) */ smp_rmb(); if (unlikely(cwq != get_wq_data(work))) - goto out; + goto already_gone; prev = &work->entry; } else { if (cwq->current_work != work) - goto out; + goto already_gone; prev = &cwq->worklist; } insert_wq_barrier(cwq, &barr, prev->next); -out: - spin_unlock_irq(&cwq->lock); - if (!prev) - return 0; + spin_unlock_irq(&cwq->lock); wait_for_completion(&barr.done); destroy_work_on_stack(&barr.work); return 1; +already_gone: + spin_unlock_irq(&cwq->lock); + return 0; } EXPORT_SYMBOL_GPL(flush_work); @@ -596,7 +898,7 @@ static int try_to_grab_pending(struct work_struct *work) struct cpu_workqueue_struct *cwq; int ret = -1; - if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) return 0; /* @@ -619,6 +921,7 @@ static int try_to_grab_pending(struct work_struct *work) if (cwq == get_wq_data(work)) { debug_work_deactivate(work); list_del_init(&work->entry); + cwq_dec_nr_in_flight(cwq, get_work_color(work)); ret = 1; } } @@ -650,7 +953,6 @@ static void wait_on_work(struct work_struct *work) { struct cpu_workqueue_struct *cwq; struct workqueue_struct *wq; - const struct cpumask *cpu_map; int cpu; might_sleep(); @@ -663,10 +965,9 @@ static void wait_on_work(struct work_struct *work) return; wq = cwq->wq; - cpu_map = wq_cpu_map(wq); - for_each_cpu(cpu, cpu_map) - wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work); + for_each_possible_cpu(cpu) + wait_on_cpu_work(get_cwq(cpu, wq), work); } static int __cancel_work_timer(struct work_struct *work, @@ -783,9 +1084,8 @@ EXPORT_SYMBOL(schedule_delayed_work); void flush_delayed_work(struct delayed_work *dwork) { if (del_timer_sync(&dwork->timer)) { - struct cpu_workqueue_struct *cwq; - cwq = wq_per_cpu(get_wq_data(&dwork->work)->wq, get_cpu()); - __queue_work(cwq, &dwork->work); + __queue_work(get_cpu(), get_wq_data(&dwork->work)->wq, + &dwork->work); put_cpu(); } flush_work(&dwork->work); @@ -924,7 +1224,7 @@ int current_is_keventd(void) BUG_ON(!keventd_wq); - cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); + cwq = get_cwq(cpu, keventd_wq); if (current == cwq->thread) ret = 1; @@ -932,27 +1232,59 @@ int current_is_keventd(void) } -static struct cpu_workqueue_struct * -init_cpu_workqueue(struct workqueue_struct *wq, int cpu) +static struct cpu_workqueue_struct *alloc_cwqs(void) { - struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); + /* + * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS. + * Make sure that the alignment isn't lower than that of + * unsigned long long. + */ + const size_t size = sizeof(struct cpu_workqueue_struct); + const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS, + __alignof__(unsigned long long)); + struct cpu_workqueue_struct *cwqs; +#ifndef CONFIG_SMP + void *ptr; - cwq->wq = wq; - spin_lock_init(&cwq->lock); - INIT_LIST_HEAD(&cwq->worklist); - init_waitqueue_head(&cwq->more_work); + /* + * On UP, percpu allocator doesn't honor alignment parameter + * and simply uses arch-dependent default. Allocate enough + * room to align cwq and put an extra pointer at the end + * pointing back to the originally allocated pointer which + * will be used for free. + * + * FIXME: This really belongs to UP percpu code. Update UP + * percpu code to honor alignment and remove this ugliness. + */ + ptr = __alloc_percpu(size + align + sizeof(void *), 1); + cwqs = PTR_ALIGN(ptr, align); + *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr; +#else + /* On SMP, percpu allocator can do it itself */ + cwqs = __alloc_percpu(size, align); +#endif + /* just in case, make sure it's actually aligned */ + BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align)); + return cwqs; +} - return cwq; +static void free_cwqs(struct cpu_workqueue_struct *cwqs) +{ +#ifndef CONFIG_SMP + /* on UP, the pointer to free is stored right after the cwq */ + if (cwqs) + free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0)); +#else + free_percpu(cwqs); +#endif } static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) { - struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 }; struct workqueue_struct *wq = cwq->wq; - const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d"; struct task_struct *p; - p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); + p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); /* * Nobody can add the work_struct to this cwq, * if (caller is __create_workqueue) @@ -963,12 +1295,8 @@ static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) */ if (IS_ERR(p)) return PTR_ERR(p); - if (cwq->wq->rt) - sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m); cwq->thread = p; - trace_workqueue_creation(cwq->thread, cpu); - return 0; } @@ -984,100 +1312,78 @@ static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) } struct workqueue_struct *__create_workqueue_key(const char *name, - int singlethread, - int freezeable, - int rt, + unsigned int flags, struct lock_class_key *key, const char *lock_name) { + bool singlethread = flags & WQ_SINGLE_THREAD; struct workqueue_struct *wq; - struct cpu_workqueue_struct *cwq; int err = 0, cpu; wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) - return NULL; + goto err; - wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); - if (!wq->cpu_wq) { - kfree(wq); - return NULL; - } + wq->cpu_wq = alloc_cwqs(); + if (!wq->cpu_wq) + goto err; + wq->flags = flags; + mutex_init(&wq->flush_mutex); + atomic_set(&wq->nr_cwqs_to_flush, 0); + INIT_LIST_HEAD(&wq->flusher_queue); + INIT_LIST_HEAD(&wq->flusher_overflow); wq->name = name; lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); - wq->singlethread = singlethread; - wq->freezeable = freezeable; - wq->rt = rt; INIT_LIST_HEAD(&wq->list); - if (singlethread) { - cwq = init_cpu_workqueue(wq, singlethread_cpu); - err = create_workqueue_thread(cwq, singlethread_cpu); - start_workqueue_thread(cwq, -1); - } else { - cpu_maps_update_begin(); - /* - * We must place this wq on list even if the code below fails. - * cpu_down(cpu) can remove cpu from cpu_populated_map before - * destroy_workqueue() takes the lock, in that case we leak - * cwq[cpu]->thread. - */ - spin_lock(&workqueue_lock); - list_add(&wq->list, &workqueues); - spin_unlock(&workqueue_lock); - /* - * We must initialize cwqs for each possible cpu even if we - * are going to call destroy_workqueue() finally. Otherwise - * cpu_up() can hit the uninitialized cwq once we drop the - * lock. - */ - for_each_possible_cpu(cpu) { - cwq = init_cpu_workqueue(wq, cpu); - if (err || !cpu_online(cpu)) - continue; - err = create_workqueue_thread(cwq, cpu); + cpu_maps_update_begin(); + /* + * We must initialize cwqs for each possible cpu even if we + * are going to call destroy_workqueue() finally. Otherwise + * cpu_up() can hit the uninitialized cwq once we drop the + * lock. + */ + for_each_possible_cpu(cpu) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + + BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); + cwq->wq = wq; + cwq->cpu = cpu; + cwq->flush_color = -1; + spin_lock_init(&cwq->lock); + INIT_LIST_HEAD(&cwq->worklist); + init_waitqueue_head(&cwq->more_work); + + if (err) + continue; + err = create_workqueue_thread(cwq, cpu); + if (cpu_online(cpu) && !singlethread) start_workqueue_thread(cwq, cpu); - } - cpu_maps_update_done(); + else + start_workqueue_thread(cwq, -1); } + spin_lock(&workqueue_lock); + list_add(&wq->list, &workqueues); + spin_unlock(&workqueue_lock); + + cpu_maps_update_done(); + if (err) { destroy_workqueue(wq); wq = NULL; } return wq; +err: + if (wq) { + free_cwqs(wq->cpu_wq); + kfree(wq); + } + return NULL; } EXPORT_SYMBOL_GPL(__create_workqueue_key); -static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) -{ - /* - * Our caller is either destroy_workqueue() or CPU_POST_DEAD, - * cpu_add_remove_lock protects cwq->thread. - */ - if (cwq->thread == NULL) - return; - - lock_map_acquire(&cwq->wq->lockdep_map); - lock_map_release(&cwq->wq->lockdep_map); - - flush_cpu_workqueue(cwq); - /* - * If the caller is CPU_POST_DEAD and cwq->worklist was not empty, - * a concurrent flush_workqueue() can insert a barrier after us. - * However, in that case run_workqueue() won't return and check - * kthread_should_stop() until it flushes all work_struct's. - * When ->worklist becomes empty it is safe to exit because no - * more work_structs can be queued on this cwq: flush_workqueue - * checks list_empty(), and a "normal" queue_work() can't use - * a dead CPU. - */ - trace_workqueue_destruction(cwq->thread); - kthread_stop(cwq->thread); - cwq->thread = NULL; -} - /** * destroy_workqueue - safely terminate a workqueue * @wq: target workqueue @@ -1086,19 +1392,30 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) */ void destroy_workqueue(struct workqueue_struct *wq) { - const struct cpumask *cpu_map = wq_cpu_map(wq); int cpu; cpu_maps_update_begin(); spin_lock(&workqueue_lock); list_del(&wq->list); spin_unlock(&workqueue_lock); + cpu_maps_update_done(); + + flush_workqueue(wq); + + for_each_possible_cpu(cpu) { + struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); + int i; + + if (cwq->thread) { + kthread_stop(cwq->thread); + cwq->thread = NULL; + } - for_each_cpu(cpu, cpu_map) - cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); - cpu_maps_update_done(); + for (i = 0; i < WORK_NR_COLORS; i++) + BUG_ON(cwq->nr_in_flight[i]); + } - free_percpu(wq->cpu_wq); + free_cwqs(wq->cpu_wq); kfree(wq); } EXPORT_SYMBOL_GPL(destroy_workqueue); @@ -1110,47 +1427,23 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, unsigned int cpu = (unsigned long)hcpu; struct cpu_workqueue_struct *cwq; struct workqueue_struct *wq; - int ret = NOTIFY_OK; action &= ~CPU_TASKS_FROZEN; - switch (action) { - case CPU_UP_PREPARE: - cpumask_set_cpu(cpu, cpu_populated_map); - } -undo: list_for_each_entry(wq, &workqueues, list) { - cwq = per_cpu_ptr(wq->cpu_wq, cpu); + if (wq->flags & WQ_SINGLE_THREAD) + continue; - switch (action) { - case CPU_UP_PREPARE: - if (!create_workqueue_thread(cwq, cpu)) - break; - printk(KERN_ERR "workqueue [%s] for %i failed\n", - wq->name, cpu); - action = CPU_UP_CANCELED; - ret = NOTIFY_BAD; - goto undo; - - case CPU_ONLINE: - start_workqueue_thread(cwq, cpu); - break; + cwq = get_cwq(cpu, wq); - case CPU_UP_CANCELED: - start_workqueue_thread(cwq, -1); + switch (action) { case CPU_POST_DEAD: - cleanup_workqueue_thread(cwq); + flush_workqueue(wq); break; } } - switch (action) { - case CPU_UP_CANCELED: - case CPU_POST_DEAD: - cpumask_clear_cpu(cpu, cpu_populated_map); - } - - return ret; + return notifier_from_errno(0); } #ifdef CONFIG_SMP @@ -1202,11 +1495,7 @@ EXPORT_SYMBOL_GPL(work_on_cpu); void __init init_workqueues(void) { - alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); - - cpumask_copy(cpu_populated_map, cpu_online_mask); singlethread_cpu = cpumask_first(cpu_possible_mask); - cpu_singlethread_map = cpumask_of(singlethread_cpu); hotcpu_notifier(workqueue_cpu_callback, 0); keventd_wq = create_workqueue("events"); BUG_ON(!keventd_wq);