Implementation of the TQ_FRONT flag.
Adds a task queue to receive tasks dispatched with TQ_FRONT. Worker threads pull tasks from this high priority queue before the default pending queue. Executing tasks out of FIFO order potentially breaks taskq_lowest_id() if we do not preserve the ordering of the work list by taskqid. Therefore, instead of always appending to the work list, we search for the appropriate place to insert a task. The common case is to append to the list, so we make this operation efficient by searching the work list in reverse order. Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
This commit is contained in:
parent
c2688979a4
commit
f0d8bb26b4
|
@ -74,6 +74,7 @@ typedef struct taskq {
|
||||||
struct list_head tq_free_list; /* free task_t's */
|
struct list_head tq_free_list; /* free task_t's */
|
||||||
struct list_head tq_work_list; /* work task_t's */
|
struct list_head tq_work_list; /* work task_t's */
|
||||||
struct list_head tq_pend_list; /* pending task_t's */
|
struct list_head tq_pend_list; /* pending task_t's */
|
||||||
|
struct list_head tq_prio_list; /* priority pending task_t's */
|
||||||
wait_queue_head_t tq_work_waitq; /* new work waitq */
|
wait_queue_head_t tq_work_waitq; /* new work waitq */
|
||||||
wait_queue_head_t tq_wait_waitq; /* wait waitq */
|
wait_queue_head_t tq_wait_waitq; /* wait waitq */
|
||||||
} taskq_t;
|
} taskq_t;
|
||||||
|
|
|
@ -158,21 +158,22 @@ task_done(taskq_t *tq, spl_task_t *t)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* As tasks are submitted to the task queue they are assigned a
|
* As tasks are submitted to the task queue they are assigned a
|
||||||
* monotonically increasing taskqid and added to the tail of the
|
* monotonically increasing taskqid and added to the tail of the pending
|
||||||
* pending list. As worker threads become available the tasks are
|
* list. As worker threads become available the tasks are removed from
|
||||||
* removed from the head of the pending list and added to the tail
|
* the head of the pending or priority list, giving preference to the
|
||||||
* of the work list. Finally, as tasks complete they are removed
|
* priority list. The tasks are then added to the work list, preserving
|
||||||
* from the work list. This means that the pending and work lists
|
* the ordering by taskqid. Finally, as tasks complete they are removed
|
||||||
* are always kept sorted by taskqid. Thus the lowest outstanding
|
* from the work list. This means that the pending and work lists are
|
||||||
|
* always kept sorted by taskqid. Thus the lowest outstanding
|
||||||
* incomplete taskqid can be determined simply by checking the min
|
* incomplete taskqid can be determined simply by checking the min
|
||||||
* taskqid for each head item on the pending and work list. This
|
* taskqid for each head item on the pending, priority, and work list.
|
||||||
* value is stored in tq->tq_lowest_id and only updated to the new
|
* This value is stored in tq->tq_lowest_id and only updated to the new
|
||||||
* lowest id when the previous lowest id completes. All taskqids
|
* lowest id when the previous lowest id completes. All taskqids lower
|
||||||
* lower than tq->tq_lowest_id must have completed. It is also
|
* than tq->tq_lowest_id must have completed. It is also possible
|
||||||
* possible larger taskqid's have completed because they may be
|
* larger taskqid's have completed because they may be processed in
|
||||||
* processed in parallel by several worker threads. However, this
|
* parallel by several worker threads. However, this is not a problem
|
||||||
* is not a problem because the behavior of taskq_wait_id() is to
|
* because the behavior of taskq_wait_id() is to block until all
|
||||||
* block until all previously submitted taskqid's have completed.
|
* previously submitted taskqid's have completed.
|
||||||
*
|
*
|
||||||
* XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are
|
* XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are
|
||||||
* 64-bit values so even if a taskq is processing 2^24 (16,777,216)
|
* 64-bit values so even if a taskq is processing 2^24 (16,777,216)
|
||||||
|
@ -274,7 +275,13 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
|
||||||
GOTO(out, rc = 0);
|
GOTO(out, rc = 0);
|
||||||
|
|
||||||
spin_lock(&t->t_lock);
|
spin_lock(&t->t_lock);
|
||||||
list_add_tail(&t->t_list, &tq->tq_pend_list);
|
|
||||||
|
/* Queue to the priority list instead of the pending list */
|
||||||
|
if (flags & TQ_FRONT)
|
||||||
|
list_add_tail(&t->t_list, &tq->tq_prio_list);
|
||||||
|
else
|
||||||
|
list_add_tail(&t->t_list, &tq->tq_pend_list);
|
||||||
|
|
||||||
t->t_id = rc = tq->tq_next_id;
|
t->t_id = rc = tq->tq_next_id;
|
||||||
tq->tq_next_id++;
|
tq->tq_next_id++;
|
||||||
t->t_func = func;
|
t->t_func = func;
|
||||||
|
@ -290,8 +297,9 @@ EXPORT_SYMBOL(__taskq_dispatch);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Returns the lowest incomplete taskqid_t. The taskqid_t may
|
* Returns the lowest incomplete taskqid_t. The taskqid_t may
|
||||||
* be queued on the pending list or may be on the work list
|
* be queued on the pending list, on the priority list, or on
|
||||||
* currently being handled, but it is not 100% complete yet.
|
* the work list currently being handled, but it is not 100%
|
||||||
|
* complete yet.
|
||||||
*/
|
*/
|
||||||
static taskqid_t
|
static taskqid_t
|
||||||
taskq_lowest_id(taskq_t *tq)
|
taskq_lowest_id(taskq_t *tq)
|
||||||
|
@ -308,6 +316,11 @@ taskq_lowest_id(taskq_t *tq)
|
||||||
lowest_id = MIN(lowest_id, t->t_id);
|
lowest_id = MIN(lowest_id, t->t_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!list_empty(&tq->tq_prio_list)) {
|
||||||
|
t = list_entry(tq->tq_prio_list.next, spl_task_t, t_list);
|
||||||
|
lowest_id = MIN(lowest_id, t->t_id);
|
||||||
|
}
|
||||||
|
|
||||||
if (!list_empty(&tq->tq_work_list)) {
|
if (!list_empty(&tq->tq_work_list)) {
|
||||||
t = list_entry(tq->tq_work_list.next, spl_task_t, t_list);
|
t = list_entry(tq->tq_work_list.next, spl_task_t, t_list);
|
||||||
lowest_id = MIN(lowest_id, t->t_id);
|
lowest_id = MIN(lowest_id, t->t_id);
|
||||||
|
@ -316,6 +329,34 @@ taskq_lowest_id(taskq_t *tq)
|
||||||
RETURN(lowest_id);
|
RETURN(lowest_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Insert a task into a list keeping the list sorted by increasing
|
||||||
|
* taskqid.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
taskq_insert_in_order(taskq_t *tq, spl_task_t *t)
|
||||||
|
{
|
||||||
|
spl_task_t *w;
|
||||||
|
struct list_head *l;
|
||||||
|
|
||||||
|
ENTRY;
|
||||||
|
ASSERT(tq);
|
||||||
|
ASSERT(t);
|
||||||
|
ASSERT(spin_is_locked(&tq->tq_lock));
|
||||||
|
|
||||||
|
list_for_each_prev(l, &tq->tq_work_list) {
|
||||||
|
w = list_entry(l, spl_task_t, t_list);
|
||||||
|
if (w->t_id < t->t_id) {
|
||||||
|
list_add(&t->t_list, l);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (l == &tq->tq_work_list)
|
||||||
|
list_add(&t->t_list, &tq->tq_work_list);
|
||||||
|
|
||||||
|
EXIT;
|
||||||
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
taskq_thread(void *args)
|
taskq_thread(void *args)
|
||||||
{
|
{
|
||||||
|
@ -324,6 +365,7 @@ taskq_thread(void *args)
|
||||||
taskqid_t id;
|
taskqid_t id;
|
||||||
taskq_t *tq = args;
|
taskq_t *tq = args;
|
||||||
spl_task_t *t;
|
spl_task_t *t;
|
||||||
|
struct list_head *pend_list;
|
||||||
ENTRY;
|
ENTRY;
|
||||||
|
|
||||||
ASSERT(tq);
|
ASSERT(tq);
|
||||||
|
@ -341,7 +383,8 @@ taskq_thread(void *args)
|
||||||
while (!kthread_should_stop()) {
|
while (!kthread_should_stop()) {
|
||||||
|
|
||||||
add_wait_queue(&tq->tq_work_waitq, &wait);
|
add_wait_queue(&tq->tq_work_waitq, &wait);
|
||||||
if (list_empty(&tq->tq_pend_list)) {
|
if (list_empty(&tq->tq_pend_list) &&
|
||||||
|
list_empty(&tq->tq_prio_list)) {
|
||||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||||
schedule();
|
schedule();
|
||||||
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
|
||||||
|
@ -350,10 +393,18 @@ taskq_thread(void *args)
|
||||||
}
|
}
|
||||||
|
|
||||||
remove_wait_queue(&tq->tq_work_waitq, &wait);
|
remove_wait_queue(&tq->tq_work_waitq, &wait);
|
||||||
if (!list_empty(&tq->tq_pend_list)) {
|
|
||||||
t = list_entry(tq->tq_pend_list.next,spl_task_t,t_list);
|
if (!list_empty(&tq->tq_prio_list))
|
||||||
|
pend_list = &tq->tq_prio_list;
|
||||||
|
else if (!list_empty(&tq->tq_pend_list))
|
||||||
|
pend_list = &tq->tq_pend_list;
|
||||||
|
else
|
||||||
|
pend_list = NULL;
|
||||||
|
|
||||||
|
if (pend_list) {
|
||||||
|
t = list_entry(pend_list->next, spl_task_t, t_list);
|
||||||
list_del_init(&t->t_list);
|
list_del_init(&t->t_list);
|
||||||
list_add_tail(&t->t_list, &tq->tq_work_list);
|
taskq_insert_in_order(tq, t);
|
||||||
tq->tq_nactive++;
|
tq->tq_nactive++;
|
||||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||||
|
|
||||||
|
@ -435,6 +486,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
|
||||||
INIT_LIST_HEAD(&tq->tq_free_list);
|
INIT_LIST_HEAD(&tq->tq_free_list);
|
||||||
INIT_LIST_HEAD(&tq->tq_work_list);
|
INIT_LIST_HEAD(&tq->tq_work_list);
|
||||||
INIT_LIST_HEAD(&tq->tq_pend_list);
|
INIT_LIST_HEAD(&tq->tq_pend_list);
|
||||||
|
INIT_LIST_HEAD(&tq->tq_prio_list);
|
||||||
init_waitqueue_head(&tq->tq_work_waitq);
|
init_waitqueue_head(&tq->tq_work_waitq);
|
||||||
init_waitqueue_head(&tq->tq_wait_waitq);
|
init_waitqueue_head(&tq->tq_wait_waitq);
|
||||||
|
|
||||||
|
@ -503,6 +555,7 @@ __taskq_destroy(taskq_t *tq)
|
||||||
ASSERT(list_empty(&tq->tq_free_list));
|
ASSERT(list_empty(&tq->tq_free_list));
|
||||||
ASSERT(list_empty(&tq->tq_work_list));
|
ASSERT(list_empty(&tq->tq_work_list));
|
||||||
ASSERT(list_empty(&tq->tq_pend_list));
|
ASSERT(list_empty(&tq->tq_pend_list));
|
||||||
|
ASSERT(list_empty(&tq->tq_prio_list));
|
||||||
|
|
||||||
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
|
||||||
kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));
|
kmem_free(tq->tq_threads, nthreads * sizeof(spl_task_t *));
|
||||||
|
|
Loading…
Reference in New Issue