diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 84b5632080..3839de2885 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -41,20 +41,6 @@ #define TASKQ_THREADS_CPU_PCT 0x00000008 #define TASKQ_DC_BATCH 0x00000010 -typedef unsigned long taskqid_t; -typedef void (task_func_t)(void *); - -typedef struct taskq_ent { - spinlock_t tqent_lock; - struct list_head tqent_list; - taskqid_t tqent_id; - task_func_t *tqent_func; - void *tqent_arg; - uintptr_t tqent_flags; -} taskq_ent_t; - -#define TQENT_FLAG_PREALLOC 0x1 - /* * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as * KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly @@ -69,6 +55,9 @@ typedef struct taskq_ent { #define TQ_FRONT 0x08000000 #define TQ_ACTIVE 0x80000000 +typedef unsigned long taskqid_t; +typedef void (task_func_t)(void *); + typedef struct taskq { spinlock_t tq_lock; /* protects taskq_t */ unsigned long tq_lock_flags; /* interrupt state */ @@ -87,16 +76,33 @@ typedef struct taskq { struct list_head tq_free_list; /* free task_t's */ struct list_head tq_pend_list; /* pending task_t's */ struct list_head tq_prio_list; /* priority pending task_t's */ + struct list_head tq_delay_list; /* delayed task_t's */ wait_queue_head_t tq_work_waitq; /* new work waitq */ wait_queue_head_t tq_wait_waitq; /* wait waitq */ } taskq_t; +typedef struct taskq_ent { + spinlock_t tqent_lock; + wait_queue_head_t tqent_waitq; + struct timer_list tqent_timer; + struct list_head tqent_list; + taskqid_t tqent_id; + task_func_t *tqent_func; + void *tqent_arg; + taskq_t *tqent_taskq; + uintptr_t tqent_flags; +} taskq_ent_t; + +#define TQENT_FLAG_PREALLOC 0x1 +#define TQENT_FLAG_CANCEL 0x2 + typedef struct taskq_thread { struct list_head tqt_thread_list; struct list_head tqt_active_list; struct task_struct *tqt_thread; taskq_t *tqt_tq; taskqid_t tqt_id; + taskq_ent_t *tqt_task; uintptr_t tqt_flags; } taskq_thread_t; @@ -104,6 +110,8 @@ typedef struct taskq_thread { extern taskq_t *system_taskq; extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); +extern taskqid_t taskq_dispatch_delay(taskq_t *, task_func_t, void *, + uint_t, clock_t); extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *); extern int taskq_empty_ent(taskq_ent_t *); @@ -111,7 +119,9 @@ extern void taskq_init_ent(taskq_ent_t *); extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t); extern void taskq_destroy(taskq_t *); extern void taskq_wait_id(taskq_t *, taskqid_t); +extern void taskq_wait_all(taskq_t *, taskqid_t); extern void taskq_wait(taskq_t *); +extern int taskq_cancel_id(taskq_t *, taskqid_t); extern int taskq_member(taskq_t *, void *); #define taskq_create_proc(name, nthreads, pri, min, max, proc, flags) \ diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 2007cf084e..c9ae0a50b6 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -69,6 +69,8 @@ retry: t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL)); + ASSERT(!timer_pending(&t->tqent_timer)); list_del_init(&t->tqent_list); SRETURN(t); @@ -126,6 +128,7 @@ task_free(taskq_t *tq, taskq_ent_t *t) ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); ASSERT(list_empty(&t->tqent_list)); + ASSERT(!timer_pending(&t->tqent_timer)); kmem_free(t, sizeof(taskq_ent_t)); tq->tq_nalloc--; @@ -145,6 +148,9 @@ task_done(taskq_t *tq, taskq_ent_t *t) ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); + /* Wake tasks blocked in taskq_wait_id() */ + wake_up_all(&t->tqent_waitq); + list_del_init(&t->tqent_list); if (tq->tq_nalloc <= tq->tq_minalloc) { @@ -162,213 +168,49 @@ task_done(taskq_t *tq, taskq_ent_t *t) } /* - * As tasks are submitted to the task queue they are assigned a - * monotonically increasing taskqid and added to the tail of the pending - * list. As worker threads become available the tasks are removed from - * the head of the pending or priority list, giving preference to the - * priority list. The tasks are then removed from their respective - * list, and the taskq_thread servicing the task is added to the active - * list, preserving the order using the serviced task's taskqid. - * Finally, as tasks complete the taskq_thread servicing the task is - * removed from the active list. This means that the pending task and - * active taskq_thread lists are always kept sorted by taskqid. Thus the - * lowest outstanding incomplete taskqid can be determined simply by - * checking the min taskqid for each head item on the pending, priority, - * and active taskq_thread list. This value is stored in - * tq->tq_lowest_id and only updated to the new lowest id when the - * previous lowest id completes. All taskqids lower than - * tq->tq_lowest_id must have completed. It is also possible larger - * taskqid's have completed because they may be processed in parallel by - * several worker threads. However, this is not a problem because the - * behavior of taskq_wait_id() is to block until all previously - * submitted taskqid's have completed. - * - * XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are - * 64-bit values so even if a taskq is processing 2^24 (16,777,216) - * taskqid_ts per second it will still take 2^40 seconds, 34,865 years, - * before the wrap occurs. I can live with that for now. + * When a delayed task timer expires remove it from the delay list and + * add it to the priority list in order for immediate processing. */ -static int -taskq_wait_check(taskq_t *tq, taskqid_t id) -{ - int rc; - - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - rc = (id < tq->tq_lowest_id); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - - SRETURN(rc); -} - -void -taskq_wait_id(taskq_t *tq, taskqid_t id) -{ - SENTRY; - ASSERT(tq); - - wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); - - SEXIT; -} -EXPORT_SYMBOL(taskq_wait_id); - -void -taskq_wait(taskq_t *tq) -{ - taskqid_t id; - SENTRY; - ASSERT(tq); - - /* Wait for the largest outstanding taskqid */ - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - id = tq->tq_next_id - 1; - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - - taskq_wait_id(tq, id); - - SEXIT; - -} -EXPORT_SYMBOL(taskq_wait); - -int -taskq_member(taskq_t *tq, void *t) +static void +task_expire(unsigned long data) { + taskq_ent_t *w, *t = (taskq_ent_t *)data; + taskq_t *tq = t->tqent_taskq; struct list_head *l; - taskq_thread_t *tqt; - SENTRY; - - ASSERT(tq); - ASSERT(t); - - list_for_each(l, &tq->tq_thread_list) { - tqt = list_entry(l, taskq_thread_t, tqt_thread_list); - if (tqt->tqt_thread == (struct task_struct *)t) - SRETURN(1); - } - - SRETURN(0); -} -EXPORT_SYMBOL(taskq_member); - -taskqid_t -taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) -{ - taskq_ent_t *t; - taskqid_t rc = 0; - SENTRY; - - ASSERT(tq); - ASSERT(func); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) - SGOTO(out, rc = 0); - - /* Do not queue the task unless there is idle thread for it */ - ASSERT(tq->tq_nactive <= tq->tq_nthreads); - if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) - SGOTO(out, rc = 0); - - if ((t = task_alloc(tq, flags)) == NULL) - SGOTO(out, rc = 0); - - spin_lock(&t->tqent_lock); - - /* Queue to the priority list instead of the pending list */ - if (flags & TQ_FRONT) - list_add_tail(&t->tqent_list, &tq->tq_prio_list); - else - list_add_tail(&t->tqent_list, &tq->tq_pend_list); - - t->tqent_id = rc = tq->tq_next_id; - tq->tq_next_id++; - t->tqent_func = func; - t->tqent_arg = arg; - - ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); - - spin_unlock(&t->tqent_lock); - - wake_up(&tq->tq_work_waitq); -out: - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - SRETURN(rc); -} -EXPORT_SYMBOL(taskq_dispatch); - -void -taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, - taskq_ent_t *t) -{ - SENTRY; - - ASSERT(tq); - ASSERT(func); - ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); - - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - - /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) { - t->tqent_id = 0; - goto out; + if (t->tqent_flags & TQENT_FLAG_CANCEL) { + ASSERT(list_empty(&t->tqent_list)); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + return; } - spin_lock(&t->tqent_lock); - /* - * Mark it as a prealloc'd task. This is important - * to ensure that we don't free it later. + * The priority list must be maintained in strict task id order + * from lowest to highest for lowest_id to be easily calculable. */ - t->tqent_flags |= TQENT_FLAG_PREALLOC; + list_del(&t->tqent_list); + list_for_each_prev(l, &tq->tq_prio_list) { + w = list_entry(l, taskq_ent_t, tqent_list); + if (w->tqent_id < t->tqent_id) { + list_add(&t->tqent_list, l); + break; + } + } + if (l == &tq->tq_prio_list) + list_add(&t->tqent_list, &tq->tq_prio_list); - /* Queue to the priority list instead of the pending list */ - if (flags & TQ_FRONT) - list_add_tail(&t->tqent_list, &tq->tq_prio_list); - else - list_add_tail(&t->tqent_list, &tq->tq_pend_list); - - t->tqent_id = tq->tq_next_id; - tq->tq_next_id++; - t->tqent_func = func; - t->tqent_arg = arg; - - spin_unlock(&t->tqent_lock); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); wake_up(&tq->tq_work_waitq); -out: - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - SEXIT; } -EXPORT_SYMBOL(taskq_dispatch_ent); - -int -taskq_empty_ent(taskq_ent_t *t) -{ - return list_empty(&t->tqent_list); -} -EXPORT_SYMBOL(taskq_empty_ent); - -void -taskq_init_ent(taskq_ent_t *t) -{ - spin_lock_init(&t->tqent_lock); - INIT_LIST_HEAD(&t->tqent_list); - t->tqent_id = 0; - t->tqent_func = NULL; - t->tqent_arg = NULL; - t->tqent_flags = 0; -} -EXPORT_SYMBOL(taskq_init_ent); /* * Returns the lowest incomplete taskqid_t. The taskqid_t may - * be queued on the pending list, on the priority list, or on - * the work list currently being handled, but it is not 100% - * complete yet. + * be queued on the pending list, on the priority list, on the + * delay list, or on the work list currently being handled, but + * it is not 100% complete yet. */ static taskqid_t taskq_lowest_id(taskq_t *tq) @@ -391,6 +233,11 @@ taskq_lowest_id(taskq_t *tq) lowest_id = MIN(lowest_id, t->tqent_id); } + if (!list_empty(&tq->tq_delay_list)) { + t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list); + lowest_id = MIN(lowest_id, t->tqent_id); + } + if (!list_empty(&tq->tq_active_list)) { tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, tqt_active_list); @@ -428,6 +275,415 @@ taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) SEXIT; } +/* + * Find and return a task from the given list if it exists. The list + * must be in lowest to highest task id order. + */ +static taskq_ent_t * +taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id) +{ + struct list_head *l; + taskq_ent_t *t; + SENTRY; + + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each(l, lh) { + t = list_entry(l, taskq_ent_t, tqent_list); + + if (t->tqent_id == id) + SRETURN(t); + + if (t->tqent_id > id) + break; + } + + SRETURN(NULL); +} + +/* + * Find an already dispatched task given the task id regardless of what + * state it is in. If a task is still pending or executing it will be + * returned and 'active' set appropriately. If the task has already + * been run then NULL is returned. + */ +static taskq_ent_t * +taskq_find(taskq_t *tq, taskqid_t id, int *active) +{ + taskq_thread_t *tqt; + struct list_head *l; + taskq_ent_t *t; + SENTRY; + + ASSERT(spin_is_locked(&tq->tq_lock)); + *active = 0; + + t = taskq_find_list(tq, &tq->tq_delay_list, id); + if (t) + SRETURN(t); + + t = taskq_find_list(tq, &tq->tq_prio_list, id); + if (t) + SRETURN(t); + + t = taskq_find_list(tq, &tq->tq_pend_list, id); + if (t) + SRETURN(t); + + list_for_each(l, &tq->tq_active_list) { + tqt = list_entry(l, taskq_thread_t, tqt_active_list); + if (tqt->tqt_id == id) { + t = tqt->tqt_task; + *active = 1; + SRETURN(t); + } + } + + SRETURN(NULL); +} + +/* + * The taskq_wait_id() function blocks until the passed task id completes. + * This does not guarantee that all lower task id's have completed. + */ +void +taskq_wait_id(taskq_t *tq, taskqid_t id) +{ + DEFINE_WAIT(wait); + taskq_ent_t *t; + int active = 0; + SENTRY; + + ASSERT(tq); + ASSERT(id > 0); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + t = taskq_find(tq, id, &active); + if (t) + prepare_to_wait(&t->tqent_waitq, &wait, TASK_UNINTERRUPTIBLE); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + /* + * We rely on the kernels autoremove_wake_function() function to + * remove us from the wait queue in the context of wake_up(). + * Once woken the taskq_ent_t pointer must never be accessed. + */ + if (t) { + t = NULL; + schedule(); + __set_current_state(TASK_RUNNING); + } + + SEXIT; +} +EXPORT_SYMBOL(taskq_wait_id); + +/* + * The taskq_wait() function will block until all previously submitted + * tasks have been completed. A previously submitted task is defined as + * a task with a lower task id than the current task queue id. Note that + * all task id's are assigned monotonically at dispatch time. + * + * Waiting for all previous tasks to complete is accomplished by tracking + * the lowest outstanding task id. As tasks are dispatched they are added + * added to the tail of the pending, priority, or delay lists. And as + * worker threads become available the tasks are removed from the heads + * of these lists and linked to the worker threads. This ensures the + * lists are kept in lowest to highest task id order. + * + * Therefore the lowest outstanding task id can be quickly determined by + * checking the head item from all of these lists. This value is stored + * with the task queue as the lowest id. It only needs to be recalculated + * when either the task with the current lowest id completes or is canceled. + * + * By blocking until the lowest task id exceeds the current task id when + * the function was called we ensure all previous tasks have completed. + * + * NOTE: When there are multiple worked threads it is possible for larger + * task ids to complete before smaller ones. Conversely when the task + * queue contains delay tasks with small task ids, you may block for a + * considerable length of time waiting for them to expire and execute. + */ +static int +taskq_wait_check(taskq_t *tq, taskqid_t id) +{ + int rc; + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + rc = (id < tq->tq_lowest_id); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + SRETURN(rc); +} + +void +taskq_wait_all(taskq_t *tq, taskqid_t id) +{ + wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); +} +EXPORT_SYMBOL(taskq_wait_all); + +void +taskq_wait(taskq_t *tq) +{ + taskqid_t id; + SENTRY; + ASSERT(tq); + + /* Wait for the largest outstanding taskqid */ + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + id = tq->tq_next_id - 1; + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + taskq_wait_all(tq, id); + + SEXIT; + +} +EXPORT_SYMBOL(taskq_wait); + +int +taskq_member(taskq_t *tq, void *t) +{ + struct list_head *l; + taskq_thread_t *tqt; + SENTRY; + + ASSERT(tq); + ASSERT(t); + + list_for_each(l, &tq->tq_thread_list) { + tqt = list_entry(l, taskq_thread_t, tqt_thread_list); + if (tqt->tqt_thread == (struct task_struct *)t) + SRETURN(1); + } + + SRETURN(0); +} +EXPORT_SYMBOL(taskq_member); + +/* + * Cancel an already dispatched task given the task id. Still pending tasks + * will be immediately canceled, and if the task is active the function will + * block until it completes. Preallocated tasks which are canceled must be + * freed by the caller. + */ +int +taskq_cancel_id(taskq_t *tq, taskqid_t id) +{ + taskq_ent_t *t; + int active = 0; + int rc = ENOENT; + SENTRY; + + ASSERT(tq); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + t = taskq_find(tq, id, &active); + if (t && !active) { + list_del_init(&t->tqent_list); + t->tqent_flags |= TQENT_FLAG_CANCEL; + + /* + * When canceling the lowest outstanding task id we + * must recalculate the new lowest outstanding id. + */ + if (tq->tq_lowest_id == t->tqent_id) { + tq->tq_lowest_id = taskq_lowest_id(tq); + ASSERT3S(tq->tq_lowest_id, >, t->tqent_id); + } + + /* + * The task_expire() function takes the tq->tq_lock so drop + * drop the lock before synchronously cancelling the timer. + */ + if (timer_pending(&t->tqent_timer)) { + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + del_timer_sync(&t->tqent_timer); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + } + + if (!(t->tqent_flags & TQENT_FLAG_PREALLOC)) + task_done(tq, t); + + rc = 0; + } + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + if (active) { + taskq_wait_id(tq, id); + rc = EBUSY; + } + + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_cancel_id); + +taskqid_t +taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) +{ + taskq_ent_t *t; + taskqid_t rc = 0; + SENTRY; + + ASSERT(tq); + ASSERT(func); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) + SGOTO(out, rc = 0); + + /* Do not queue the task unless there is idle thread for it */ + ASSERT(tq->tq_nactive <= tq->tq_nthreads); + if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) + SGOTO(out, rc = 0); + + if ((t = task_alloc(tq, flags)) == NULL) + SGOTO(out, rc = 0); + + spin_lock(&t->tqent_lock); + + /* Queue to the priority list instead of the pending list */ + if (flags & TQ_FRONT) + list_add_tail(&t->tqent_list, &tq->tq_prio_list); + else + list_add_tail(&t->tqent_list, &tq->tq_pend_list); + + t->tqent_id = rc = tq->tq_next_id; + tq->tq_next_id++; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + t->tqent_timer.data = 0; + t->tqent_timer.function = NULL; + t->tqent_timer.expires = 0; + + ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + + spin_unlock(&t->tqent_lock); + + wake_up(&tq->tq_work_waitq); +out: + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_dispatch); + +taskqid_t +taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, + uint_t flags, clock_t expire_time) +{ + taskq_ent_t *t; + taskqid_t rc = 0; + SENTRY; + + ASSERT(tq); + ASSERT(func); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) + SGOTO(out, rc = 0); + + if ((t = task_alloc(tq, flags)) == NULL) + SGOTO(out, rc = 0); + + spin_lock(&t->tqent_lock); + + /* Queue to the delay list for subsequent execution */ + list_add_tail(&t->tqent_list, &tq->tq_delay_list); + + t->tqent_id = rc = tq->tq_next_id; + tq->tq_next_id++; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + t->tqent_timer.data = (unsigned long)t; + t->tqent_timer.function = task_expire; + t->tqent_timer.expires = (unsigned long)expire_time; + add_timer(&t->tqent_timer); + + ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + + spin_unlock(&t->tqent_lock); +out: + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_dispatch_delay); + +void +taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, + taskq_ent_t *t) +{ + SENTRY; + + ASSERT(tq); + ASSERT(func); + ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) { + t->tqent_id = 0; + goto out; + } + + spin_lock(&t->tqent_lock); + + /* + * Mark it as a prealloc'd task. This is important + * to ensure that we don't free it later. + */ + t->tqent_flags |= TQENT_FLAG_PREALLOC; + + /* Queue to the priority list instead of the pending list */ + if (flags & TQ_FRONT) + list_add_tail(&t->tqent_list, &tq->tq_prio_list); + else + list_add_tail(&t->tqent_list, &tq->tq_pend_list); + + t->tqent_id = tq->tq_next_id; + tq->tq_next_id++; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + + spin_unlock(&t->tqent_lock); + + wake_up(&tq->tq_work_waitq); +out: + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + SEXIT; +} +EXPORT_SYMBOL(taskq_dispatch_ent); + +int +taskq_empty_ent(taskq_ent_t *t) +{ + return list_empty(&t->tqent_list); +} +EXPORT_SYMBOL(taskq_empty_ent); + +void +taskq_init_ent(taskq_ent_t *t) +{ + spin_lock_init(&t->tqent_lock); + init_waitqueue_head(&t->tqent_waitq); + init_timer(&t->tqent_timer); + INIT_LIST_HEAD(&t->tqent_list); + t->tqent_id = 0; + t->tqent_func = NULL; + t->tqent_arg = NULL; + t->tqent_flags = 0; + t->tqent_taskq = NULL; +} +EXPORT_SYMBOL(taskq_init_ent); + static int taskq_thread(void *args) { @@ -481,6 +737,7 @@ taskq_thread(void *args) * preallocated taskq_ent_t, tqent_id must be * stored prior to executing tqent_func. */ tqt->tqt_id = t->tqent_id; + tqt->tqt_task = t; /* We must store a copy of the flags prior to * servicing the task (servicing a prealloc'd task @@ -499,6 +756,7 @@ taskq_thread(void *args) spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); tq->tq_nactive--; list_del_init(&tqt->tqt_active_list); + tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ if ((tq->tq_flags & TASKQ_DYNAMIC) || @@ -576,6 +834,7 @@ taskq_create(const char *name, int nthreads, pri_t pri, INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); + INIT_LIST_HEAD(&tq->tq_delay_list); init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_wait_waitq); @@ -669,6 +928,7 @@ taskq_destroy(taskq_t *tq) ASSERT(list_empty(&tq->tq_free_list)); ASSERT(list_empty(&tq->tq_pend_list)); ASSERT(list_empty(&tq->tq_prio_list)); + ASSERT(list_empty(&tq->tq_delay_list)); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index b94930cc9b..7fad4627e5 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -548,10 +548,10 @@ splat_taskq_test4(struct file *file, void *arg) * next pending task as soon as it completes its current task. This * means that tasks do not strictly complete in order in which they * were dispatched (increasing task id). This is fine but we need to - * verify that taskq_wait_id() blocks until the passed task id and all + * verify that taskq_wait_all() blocks until the passed task id and all * lower task ids complete. We do this by dispatching the following * specific sequence of tasks each of which block for N time units. - * We then use taskq_wait_id() to unblock at specific task id and + * We then use taskq_wait_all() to unblock at specific task id and * verify the only the expected task ids have completed and in the * correct order. The two cases of interest are: * @@ -562,17 +562,17 @@ splat_taskq_test4(struct file *file, void *arg) * * The following table shows each task id and how they will be * scheduled. Each rows represent one time unit and each column - * one of the three worker threads. The places taskq_wait_id() + * one of the three worker threads. The places taskq_wait_all() * must unblock for a specific id are identified as well as the * task ids which must have completed and their order. * - * +-----+ <--- taskq_wait_id(tq, 8) unblocks + * +-----+ <--- taskq_wait_all(tq, 8) unblocks * | | Required Completion Order: 1,2,4,5,3,8,6,7 * +-----+ | * | | | * | | +-----+ * | | | 8 | - * | | +-----+ <--- taskq_wait_id(tq, 3) unblocks + * | | +-----+ <--- taskq_wait_all(tq, 3) unblocks * | | 7 | | Required Completion Order: 1,2,4,5,3 * | +-----+ | * | 6 | | | @@ -712,13 +712,13 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc) splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, 3); - taskq_wait_id(tq, 3); + taskq_wait_all(tq, 3); if ((rc = splat_taskq_test_order(&tq_arg, order1))) goto out; splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, 8); - taskq_wait_id(tq, 8); + taskq_wait_all(tq, 8); rc = splat_taskq_test_order(&tq_arg, order2); out: @@ -874,7 +874,7 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc) splat_vprint(file, SPLAT_TASKQ_TEST6_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, SPLAT_TASKQ_ORDER_MAX); - taskq_wait_id(tq, SPLAT_TASKQ_ORDER_MAX); + taskq_wait_all(tq, SPLAT_TASKQ_ORDER_MAX); rc = splat_taskq_test_order(&tq_arg, order); out: @@ -975,7 +975,7 @@ splat_taskq_test7_impl(struct file *file, void *arg, boolean_t prealloc) if (tq_arg.flag == 0) { splat_vprint(file, SPLAT_TASKQ_TEST7_NAME, "Taskq '%s' waiting\n", tq_arg.name); - taskq_wait_id(tq, SPLAT_TASKQ_DEPTH_MAX); + taskq_wait_all(tq, SPLAT_TASKQ_DEPTH_MAX); } splat_vprint(file, SPLAT_TASKQ_TEST7_NAME,