Add TASKQ_DYNAMIC feature

Setting the TASKQ_DYNAMIC flag will create a taskq with dynamic
semantics.  Initially only a single worker thread will be created
to service tasks dispatched to the queue.  As additional threads
are needed they will be dynamically spawned up to the max number
specified by 'nthreads'.  When the threads are no longer needed,
because the taskq is empty, they will automatically terminate.

Due to the low cost of creating and destroying threads under Linux
by default new threads and spawned and terminated aggressively.
There are two modules options which can be tuned to adjust this
behavior if needed.

* spl_taskq_thread_sequential - The number of sequential tasks,
without interruption, which needed to be handled by a worker
thread before a new worker thread is spawned.  Default 4.

* spl_taskq_thread_dynamic - Provides the ability to completely
disable the use of dynamic taskqs on the system.  This is provided
for the purposes of debugging and troubleshooting.  Default 1
(enabled).

This behavior is fundamentally consistent with the dynamic taskq
implementation found in both illumos and FreeBSD.

Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Signed-off-by: Tim Chase <tim@chase2k.com>
Closes #458
This commit is contained in:
Brian Behlendorf 2015-06-08 14:36:27 -07:00
parent 5acb2307b2
commit f7a973d99b
4 changed files with 413 additions and 168 deletions

View File

@ -40,6 +40,7 @@
#define TASKQ_DYNAMIC 0x00000004 #define TASKQ_DYNAMIC 0x00000004
#define TASKQ_THREADS_CPU_PCT 0x00000008 #define TASKQ_THREADS_CPU_PCT 0x00000008
#define TASKQ_DC_BATCH 0x00000010 #define TASKQ_DC_BATCH 0x00000010
#define TASKQ_ACTIVE 0x80000000
/* /*
* Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
@ -53,7 +54,6 @@
#define TQ_NOALLOC 0x02000000 #define TQ_NOALLOC 0x02000000
#define TQ_NEW 0x04000000 #define TQ_NEW 0x04000000
#define TQ_FRONT 0x08000000 #define TQ_FRONT 0x08000000
#define TQ_ACTIVE 0x80000000
typedef unsigned long taskqid_t; typedef unsigned long taskqid_t;
typedef void (task_func_t)(void *); typedef void (task_func_t)(void *);
@ -61,11 +61,13 @@ typedef void (task_func_t)(void *);
typedef struct taskq { typedef struct taskq {
spinlock_t tq_lock; /* protects taskq_t */ spinlock_t tq_lock; /* protects taskq_t */
unsigned long tq_lock_flags; /* interrupt state */ unsigned long tq_lock_flags; /* interrupt state */
const char *tq_name; /* taskq name */ char *tq_name; /* taskq name */
struct list_head tq_thread_list;/* list of all threads */ struct list_head tq_thread_list;/* list of all threads */
struct list_head tq_active_list;/* list of active threads */ struct list_head tq_active_list;/* list of active threads */
int tq_nactive; /* # of active threads */ int tq_nactive; /* # of active threads */
int tq_nthreads; /* # of total threads */ int tq_nthreads; /* # of existing threads */
int tq_nspawn; /* # of threads being spawned */
int tq_maxthreads; /* # of threads maximum */
int tq_pri; /* priority */ int tq_pri; /* priority */
int tq_minalloc; /* min task_t pool size */ int tq_minalloc; /* min task_t pool size */
int tq_maxalloc; /* max task_t pool size */ int tq_maxalloc; /* max task_t pool size */

View File

@ -249,3 +249,37 @@ where a thread should run.
.sp .sp
Default value: \fB0\fR Default value: \fB0\fR
.RE .RE
.sp
.ne 2
.na
\fBspl_taskq_thread_dynamic\fR (int)
.ad
.RS 12n
Allow dynamic taskqs. When enabled taskqs which set the TASKQ_DYNAMIC flag
will by default create only a single thread. New threads will be created on
demand up to a maximum allowed number to facilitate the completion of
outstanding tasks. Threads which are no longer needed will be promptly
destroyed. By default this behavior is enabled but it can be disabled to
aid performance analysis or troubleshooting.
.sp
Default value: \fB1\fR
.RE
.sp
.ne 2
.na
\fBspl_taskq_thread_sequential\fR (int)
.ad
.RS 12n
The number of items a taskq worker thread must handle without interruption
before requesting a new worker thread be spawned. This is used to control
how quickly taskqs ramp up the number of threads processing the queue.
Because Linux thread creation and destruction are relatively inexpensive a
small default value has been selected. This means that normally threads will
be created aggressively which is desirable. Increasing this value will
result in a slower thread creation rate which may be preferable for some
configurations.
.sp
Default value: \fB4\fR
.RE

View File

@ -31,10 +31,24 @@ int spl_taskq_thread_bind = 0;
module_param(spl_taskq_thread_bind, int, 0644); module_param(spl_taskq_thread_bind, int, 0644);
MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default"); MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");
int spl_taskq_thread_dynamic = 1;
module_param(spl_taskq_thread_dynamic, int, 0644);
MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");
int spl_taskq_thread_sequential = 4;
module_param(spl_taskq_thread_sequential, int, 0644);
MODULE_PARM_DESC(spl_taskq_thread_sequential,
"Create new taskq threads after N sequential tasks");
/* Global system-wide dynamic task queue available for all consumers */ /* Global system-wide dynamic task queue available for all consumers */
taskq_t *system_taskq; taskq_t *system_taskq;
EXPORT_SYMBOL(system_taskq); EXPORT_SYMBOL(system_taskq);
/* Private dedicated taskq for creating new taskq threads on demand. */
static taskq_t *dynamic_taskq;
static taskq_thread_t *taskq_thread_create(taskq_t *);
static int static int
task_km_flags(uint_t flags) task_km_flags(uint_t flags)
{ {
@ -434,17 +448,22 @@ taskq_member(taskq_t *tq, void *t)
{ {
struct list_head *l; struct list_head *l;
taskq_thread_t *tqt; taskq_thread_t *tqt;
int found = 0;
ASSERT(tq); ASSERT(tq);
ASSERT(t); ASSERT(t);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
list_for_each(l, &tq->tq_thread_list) { list_for_each(l, &tq->tq_thread_list) {
tqt = list_entry(l, taskq_thread_t, tqt_thread_list); tqt = list_entry(l, taskq_thread_t, tqt_thread_list);
if (tqt->tqt_thread == (struct task_struct *)t) if (tqt->tqt_thread == (struct task_struct *)t) {
return (1); found = 1;
break;
}
} }
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
return (0); return (found);
} }
EXPORT_SYMBOL(taskq_member); EXPORT_SYMBOL(taskq_member);
@ -516,7 +535,7 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
/* Taskq being destroyed and all tasks drained */ /* Taskq being destroyed and all tasks drained */
if (!(tq->tq_flags & TQ_ACTIVE)) if (!(tq->tq_flags & TASKQ_ACTIVE))
goto out; goto out;
/* Do not queue the task unless there is idle thread for it */ /* Do not queue the task unless there is idle thread for it */
@ -568,7 +587,7 @@ taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
/* Taskq being destroyed and all tasks drained */ /* Taskq being destroyed and all tasks drained */
if (!(tq->tq_flags & TQ_ACTIVE)) if (!(tq->tq_flags & TASKQ_ACTIVE))
goto out; goto out;
if ((t = task_alloc(tq, flags)) == NULL) if ((t = task_alloc(tq, flags)) == NULL)
@ -604,12 +623,11 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
{ {
ASSERT(tq); ASSERT(tq);
ASSERT(func); ASSERT(func);
ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
/* Taskq being destroyed and all tasks drained */ /* Taskq being destroyed and all tasks drained */
if (!(tq->tq_flags & TQ_ACTIVE)) { if (!(tq->tq_flags & TASKQ_ACTIVE)) {
t->tqent_id = 0; t->tqent_id = 0;
goto out; goto out;
} }
@ -664,6 +682,97 @@ taskq_init_ent(taskq_ent_t *t)
} }
EXPORT_SYMBOL(taskq_init_ent); EXPORT_SYMBOL(taskq_init_ent);
/*
* Return the next pending task, preference is given to tasks on the
* priority list which were dispatched with TQ_FRONT.
*/
static taskq_ent_t *
taskq_next_ent(taskq_t *tq)
{
struct list_head *list;
ASSERT(spin_is_locked(&tq->tq_lock));
if (!list_empty(&tq->tq_prio_list))
list = &tq->tq_prio_list;
else if (!list_empty(&tq->tq_pend_list))
list = &tq->tq_pend_list;
else
return (NULL);
return (list_entry(list->next, taskq_ent_t, tqent_list));
}
/*
* Spawns a new thread for the specified taskq.
*/
static void
taskq_thread_spawn_task(void *arg)
{
taskq_t *tq = (taskq_t *)arg;
(void) taskq_thread_create(tq);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
tq->tq_nspawn--;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
}
/*
* Spawn addition threads for dynamic taskqs (TASKQ_DYNMAIC) the current
* number of threads is insufficient to handle the pending tasks. These
* new threads must be created by the dedicated dynamic_taskq to avoid
* deadlocks between thread creation and memory reclaim. The system_taskq
* which is also a dynamic taskq cannot be safely used for this.
*/
static int
taskq_thread_spawn(taskq_t *tq, int seq_tasks)
{
int spawning = 0;
if (!(tq->tq_flags & TASKQ_DYNAMIC))
return (0);
if ((seq_tasks > spl_taskq_thread_sequential) &&
(tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&
(tq->tq_flags & TASKQ_ACTIVE)) {
spawning = (++tq->tq_nspawn);
taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,
tq, TQ_NOSLEEP);
}
return (spawning);
}
/*
* Threads in a dynamic taskq should only exit once it has been completely
* drained and no other threads are actively servicing tasks. This prevents
* threads from being created and destroyed more than is required.
*
* The first thread is the thread list is treated as the primary thread.
* There is nothing special about the primary thread but in order to avoid
* all the taskq pids from changing we opt to make it long running.
*/
static int
taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)
{
ASSERT(spin_is_locked(&tq->tq_lock));
if (!(tq->tq_flags & TASKQ_DYNAMIC))
return (0);
if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,
tqt_thread_list) == tqt)
return (0);
return
((tq->tq_nspawn == 0) && /* No threads are being spawned */
(tq->tq_nactive == 0) && /* No threads are handling tasks */
(tq->tq_nthreads > 1) && /* More than 1 thread is running */
(!taskq_next_ent(tq)) && /* There are no pending tasks */
(spl_taskq_thread_dynamic));/* Dynamic taskqs are allowed */
}
static int static int
taskq_thread(void *args) taskq_thread(void *args)
{ {
@ -672,7 +781,7 @@ taskq_thread(void *args)
taskq_thread_t *tqt = args; taskq_thread_t *tqt = args;
taskq_t *tq; taskq_t *tq;
taskq_ent_t *t; taskq_ent_t *t;
struct list_head *pend_list; int seq_tasks = 0;
ASSERT(tqt); ASSERT(tqt);
tq = tqt->tqt_tq; tq = tqt->tqt_tq;
@ -683,7 +792,13 @@ taskq_thread(void *args)
flush_signals(current); flush_signals(current);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
/* Immediately exit if more threads than allowed were created. */
if (tq->tq_nthreads >= tq->tq_maxthreads)
goto error;
tq->tq_nthreads++; tq->tq_nthreads++;
list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);
wake_up(&tq->tq_wait_waitq); wake_up(&tq->tq_wait_waitq);
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
@ -691,25 +806,25 @@ taskq_thread(void *args)
if (list_empty(&tq->tq_pend_list) && if (list_empty(&tq->tq_pend_list) &&
list_empty(&tq->tq_prio_list)) { list_empty(&tq->tq_prio_list)) {
if (taskq_thread_should_stop(tq, tqt)) {
wake_up_all(&tq->tq_wait_waitq);
break;
}
add_wait_queue_exclusive(&tq->tq_work_waitq, &wait); add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
schedule(); schedule();
seq_tasks = 0;
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
remove_wait_queue(&tq->tq_work_waitq, &wait); remove_wait_queue(&tq->tq_work_waitq, &wait);
} else { } else {
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
} }
if ((t = taskq_next_ent(tq)) != NULL) {
if (!list_empty(&tq->tq_prio_list))
pend_list = &tq->tq_prio_list;
else if (!list_empty(&tq->tq_pend_list))
pend_list = &tq->tq_pend_list;
else
pend_list = NULL;
if (pend_list) {
t = list_entry(pend_list->next,taskq_ent_t,tqent_list);
list_del_init(&t->tqent_list); list_del_init(&t->tqent_list);
/* In order to support recursively dispatching a /* In order to support recursively dispatching a
@ -738,8 +853,7 @@ taskq_thread(void *args)
tqt->tqt_task = NULL; tqt->tqt_task = NULL;
/* For prealloc'd tasks, we don't free anything. */ /* For prealloc'd tasks, we don't free anything. */
if ((tq->tq_flags & TASKQ_DYNAMIC) || if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))
task_done(tq, t); task_done(tq, t);
/* When the current lowest outstanding taskqid is /* When the current lowest outstanding taskqid is
@ -749,9 +863,16 @@ taskq_thread(void *args)
ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id); ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);
} }
/* Spawn additional taskq threads if required. */
if (taskq_thread_spawn(tq, ++seq_tasks))
seq_tasks = 0;
tqt->tqt_id = 0; tqt->tqt_id = 0;
tqt->tqt_flags = 0; tqt->tqt_flags = 0;
wake_up_all(&tq->tq_wait_waitq); wake_up_all(&tq->tq_wait_waitq);
} else {
if (taskq_thread_should_stop(tq, tqt))
break;
} }
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
@ -761,27 +882,56 @@ taskq_thread(void *args)
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
tq->tq_nthreads--; tq->tq_nthreads--;
list_del_init(&tqt->tqt_thread_list); list_del_init(&tqt->tqt_thread_list);
kmem_free(tqt, sizeof(taskq_thread_t)); error:
kmem_free(tqt, sizeof (taskq_thread_t));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
return (0); return (0);
} }
static taskq_thread_t *
taskq_thread_create(taskq_t *tq)
{
static int last_used_cpu = 0;
taskq_thread_t *tqt;
tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);
INIT_LIST_HEAD(&tqt->tqt_thread_list);
INIT_LIST_HEAD(&tqt->tqt_active_list);
tqt->tqt_tq = tq;
tqt->tqt_id = 0;
tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
"%s", tq->tq_name);
if (tqt->tqt_thread == NULL) {
kmem_free(tqt, sizeof (taskq_thread_t));
return (NULL);
}
if (spl_taskq_thread_bind) {
last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
kthread_bind(tqt->tqt_thread, last_used_cpu);
}
set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));
wake_up_process(tqt->tqt_thread);
return (tqt);
}
taskq_t * taskq_t *
taskq_create(const char *name, int nthreads, pri_t pri, taskq_create(const char *name, int nthreads, pri_t pri,
int minalloc, int maxalloc, uint_t flags) int minalloc, int maxalloc, uint_t flags)
{ {
static int last_used_cpu = 0;
taskq_t *tq; taskq_t *tq;
taskq_thread_t *tqt; taskq_thread_t *tqt;
int rc = 0, i, j = 0; int count = 0, rc = 0, i;
ASSERT(name != NULL); ASSERT(name != NULL);
ASSERT(pri <= maxclsyspri); ASSERT(pri <= maxclsyspri);
ASSERT(minalloc >= 0); ASSERT(minalloc >= 0);
ASSERT(maxalloc <= INT_MAX); ASSERT(maxalloc <= INT_MAX);
ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */
/* Scale the number of threads using nthreads as a percentage */ /* Scale the number of threads using nthreads as a percentage */
if (flags & TASKQ_THREADS_CPU_PCT) { if (flags & TASKQ_THREADS_CPU_PCT) {
@ -792,24 +942,25 @@ taskq_create(const char *name, int nthreads, pri_t pri,
nthreads = MAX((num_online_cpus() * nthreads) / 100, 1); nthreads = MAX((num_online_cpus() * nthreads) / 100, 1);
} }
tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE); tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);
if (tq == NULL) if (tq == NULL)
return (NULL); return (NULL);
spin_lock_init(&tq->tq_lock); spin_lock_init(&tq->tq_lock);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
INIT_LIST_HEAD(&tq->tq_thread_list); INIT_LIST_HEAD(&tq->tq_thread_list);
INIT_LIST_HEAD(&tq->tq_active_list); INIT_LIST_HEAD(&tq->tq_active_list);
tq->tq_name = name; tq->tq_name = strdup(name);
tq->tq_nactive = 0; tq->tq_nactive = 0;
tq->tq_nthreads = 0; tq->tq_nthreads = 0;
tq->tq_pri = pri; tq->tq_nspawn = 0;
tq->tq_minalloc = minalloc; tq->tq_maxthreads = nthreads;
tq->tq_maxalloc = maxalloc; tq->tq_pri = pri;
tq->tq_nalloc = 0; tq->tq_minalloc = minalloc;
tq->tq_flags = (flags | TQ_ACTIVE); tq->tq_maxalloc = maxalloc;
tq->tq_next_id = 1; tq->tq_nalloc = 0;
tq->tq_lowest_id = 1; tq->tq_flags = (flags | TASKQ_ACTIVE);
tq->tq_next_id = 1;
tq->tq_lowest_id = 1;
INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_free_list);
INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_pend_list);
INIT_LIST_HEAD(&tq->tq_prio_list); INIT_LIST_HEAD(&tq->tq_prio_list);
@ -817,38 +968,28 @@ taskq_create(const char *name, int nthreads, pri_t pri,
init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_work_waitq);
init_waitqueue_head(&tq->tq_wait_waitq); init_waitqueue_head(&tq->tq_wait_waitq);
if (flags & TASKQ_PREPOPULATE) if (flags & TASKQ_PREPOPULATE) {
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
for (i = 0; i < minalloc; i++) for (i = 0; i < minalloc; i++)
task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW)); task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
}
if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)
nthreads = 1;
for (i = 0; i < nthreads; i++) { for (i = 0; i < nthreads; i++) {
tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); tqt = taskq_thread_create(tq);
INIT_LIST_HEAD(&tqt->tqt_thread_list); if (tqt == NULL)
INIT_LIST_HEAD(&tqt->tqt_active_list);
tqt->tqt_tq = tq;
tqt->tqt_id = 0;
tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,
"%s/%d", name, i);
if (tqt->tqt_thread) {
list_add(&tqt->tqt_thread_list, &tq->tq_thread_list);
if (spl_taskq_thread_bind) {
last_used_cpu = (last_used_cpu + 1) % num_online_cpus();
kthread_bind(tqt->tqt_thread, last_used_cpu);
}
set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri));
wake_up_process(tqt->tqt_thread);
j++;
} else {
kmem_free(tqt, sizeof(taskq_thread_t));
rc = 1; rc = 1;
} else
count++;
} }
/* Wait for all threads to be started before potential destroy */ /* Wait for all threads to be started before potential destroy */
wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);
if (rc) { if (rc) {
taskq_destroy(tq); taskq_destroy(tq);
@ -868,10 +1009,16 @@ taskq_destroy(taskq_t *tq)
ASSERT(tq); ASSERT(tq);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
tq->tq_flags &= ~TQ_ACTIVE; tq->tq_flags &= ~TASKQ_ACTIVE;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
/* TQ_ACTIVE cleared prevents new tasks being added to pending */ /*
* When TASKQ_ACTIVE is clear new tasks may not be added nor may
* new worker threads be spawned for dynamic taskq.
*/
if (dynamic_taskq != NULL)
taskq_wait_outstanding(dynamic_taskq, 0);
taskq_wait(tq); taskq_wait(tq);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
@ -884,7 +1031,7 @@ taskq_destroy(taskq_t *tq)
*/ */
while (!list_empty(&tq->tq_thread_list)) { while (!list_empty(&tq->tq_thread_list)) {
tqt = list_entry(tq->tq_thread_list.next, tqt = list_entry(tq->tq_thread_list.next,
taskq_thread_t, tqt_thread_list); taskq_thread_t, tqt_thread_list);
thread = tqt->tqt_thread; thread = tqt->tqt_thread;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
@ -902,8 +1049,9 @@ taskq_destroy(taskq_t *tq)
task_free(tq, t); task_free(tq, t);
} }
ASSERT(tq->tq_nthreads == 0); ASSERT0(tq->tq_nthreads);
ASSERT(tq->tq_nalloc == 0); ASSERT0(tq->tq_nalloc);
ASSERT0(tq->tq_nspawn);
ASSERT(list_empty(&tq->tq_thread_list)); ASSERT(list_empty(&tq->tq_thread_list));
ASSERT(list_empty(&tq->tq_active_list)); ASSERT(list_empty(&tq->tq_active_list));
ASSERT(list_empty(&tq->tq_free_list)); ASSERT(list_empty(&tq->tq_free_list));
@ -913,7 +1061,8 @@ taskq_destroy(taskq_t *tq)
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
kmem_free(tq, sizeof(taskq_t)); strfree(tq->tq_name);
kmem_free(tq, sizeof (taskq_t));
} }
EXPORT_SYMBOL(taskq_destroy); EXPORT_SYMBOL(taskq_destroy);
@ -927,11 +1076,22 @@ spl_taskq_init(void)
if (system_taskq == NULL) if (system_taskq == NULL)
return (1); return (1);
dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,
minclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);
if (dynamic_taskq == NULL) {
taskq_destroy(system_taskq);
return (1);
}
return (0); return (0);
} }
void void
spl_taskq_fini(void) spl_taskq_fini(void)
{ {
taskq_destroy(dynamic_taskq);
dynamic_taskq = NULL;
taskq_destroy(system_taskq); taskq_destroy(system_taskq);
system_taskq = NULL;
} }

View File

@ -28,6 +28,7 @@
#include <sys/vmem.h> #include <sys/vmem.h>
#include <sys/random.h> #include <sys/random.h>
#include <sys/taskq.h> #include <sys/taskq.h>
#include <sys/time.h>
#include <sys/timer.h> #include <sys/timer.h>
#include <linux/delay.h> #include <linux/delay.h>
#include "splat-internal.h" #include "splat-internal.h"
@ -75,6 +76,10 @@
#define SPLAT_TASKQ_TEST10_NAME "cancel" #define SPLAT_TASKQ_TEST10_NAME "cancel"
#define SPLAT_TASKQ_TEST10_DESC "Cancel task execution" #define SPLAT_TASKQ_TEST10_DESC "Cancel task execution"
#define SPLAT_TASKQ_TEST11_ID 0x020b
#define SPLAT_TASKQ_TEST11_NAME "dynamic"
#define SPLAT_TASKQ_TEST11_DESC "Dynamic task queue thread creation"
#define SPLAT_TASKQ_ORDER_MAX 8 #define SPLAT_TASKQ_ORDER_MAX 8
#define SPLAT_TASKQ_DEPTH_MAX 16 #define SPLAT_TASKQ_DEPTH_MAX 16
@ -1052,11 +1057,104 @@ splat_taskq_test7(struct file *file, void *arg)
rc = splat_taskq_test7_impl(file, arg, B_FALSE); rc = splat_taskq_test7_impl(file, arg, B_FALSE);
if (rc) if (rc)
return rc; return (rc);
rc = splat_taskq_test7_impl(file, arg, B_TRUE); rc = splat_taskq_test7_impl(file, arg, B_TRUE);
return rc; return (rc);
}
static void
splat_taskq_throughput_func(void *arg)
{
splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg;
ASSERT(tq_arg);
atomic_inc(tq_arg->count);
}
static int
splat_taskq_throughput(struct file *file, void *arg, const char *name,
int nthreads, int minalloc, int maxalloc, int flags, int tasks,
struct timespec *delta)
{
taskq_t *tq;
taskqid_t id;
splat_taskq_arg_t tq_arg;
taskq_ent_t **tqes;
atomic_t count;
struct timespec start, stop;
int i, j, rc = 0;
tqes = vmalloc(sizeof (*tqes) * tasks);
if (tqes == NULL)
return (-ENOMEM);
memset(tqes, 0, sizeof (*tqes) * tasks);
splat_vprint(file, name, "Taskq '%s' creating (%d/%d/%d/%d)\n",
name, nthreads, minalloc, maxalloc, tasks);
if ((tq = taskq_create(name, nthreads, maxclsyspri,
minalloc, maxalloc, flags)) == NULL) {
splat_vprint(file, name, "Taskq '%s' create failed\n", name);
rc = -EINVAL;
goto out_free;
}
tq_arg.file = file;
tq_arg.name = name;
tq_arg.count = &count;
atomic_set(tq_arg.count, 0);
getnstimeofday(&start);
for (i = 0; i < tasks; i++) {
tqes[i] = kmalloc(sizeof (taskq_ent_t), GFP_KERNEL);
if (tqes[i] == NULL) {
rc = -ENOMEM;
goto out;
}
taskq_init_ent(tqes[i]);
taskq_dispatch_ent(tq, splat_taskq_throughput_func,
&tq_arg, TQ_SLEEP, tqes[i]);
id = tqes[i]->tqent_id;
if (id == 0) {
splat_vprint(file, name, "Taskq '%s' function '%s' "
"dispatch %d failed\n", tq_arg.name,
sym2str(splat_taskq_throughput_func), i);
rc = -EINVAL;
goto out;
}
}
splat_vprint(file, name, "Taskq '%s' waiting for %d dispatches\n",
tq_arg.name, tasks);
taskq_wait(tq);
if (delta != NULL) {
getnstimeofday(&stop);
*delta = timespec_sub(stop, start);
}
splat_vprint(file, name, "Taskq '%s' %d/%d dispatches finished\n",
tq_arg.name, atomic_read(tq_arg.count), tasks);
if (atomic_read(tq_arg.count) != tasks)
rc = -ERANGE;
out:
splat_vprint(file, name, "Taskq '%s' destroying\n", tq_arg.name);
taskq_destroy(tq);
out_free:
for (j = 0; j < tasks && tqes[j] != NULL; j++)
kfree(tqes[j]);
vfree(tqes);
return (rc);
} }
/* /*
@ -1065,107 +1163,15 @@ splat_taskq_test7(struct file *file, void *arg)
* pass. The purpose is to provide a benchmark for measuring the * pass. The purpose is to provide a benchmark for measuring the
* effectiveness of taskq optimizations. * effectiveness of taskq optimizations.
*/ */
static void #define TEST8_NUM_TASKS 0x20000
splat_taskq_test8_func(void *arg) #define TEST8_THREADS_PER_TASKQ 100
{
splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg;
ASSERT(tq_arg);
atomic_inc(tq_arg->count);
}
#define TEST8_NUM_TASKS 0x20000
#define TEST8_THREADS_PER_TASKQ 100
static int
splat_taskq_test8_common(struct file *file, void *arg, int minalloc,
int maxalloc)
{
taskq_t *tq;
taskqid_t id;
splat_taskq_arg_t tq_arg;
taskq_ent_t **tqes;
atomic_t count;
int i, j, rc = 0;
tqes = vmalloc(sizeof(*tqes) * TEST8_NUM_TASKS);
if (tqes == NULL)
return -ENOMEM;
memset(tqes, 0, sizeof(*tqes) * TEST8_NUM_TASKS);
splat_vprint(file, SPLAT_TASKQ_TEST8_NAME,
"Taskq '%s' creating (%d/%d/%d)\n",
SPLAT_TASKQ_TEST8_NAME,
minalloc, maxalloc, TEST8_NUM_TASKS);
if ((tq = taskq_create(SPLAT_TASKQ_TEST8_NAME, TEST8_THREADS_PER_TASKQ,
maxclsyspri, minalloc, maxalloc,
TASKQ_PREPOPULATE)) == NULL) {
splat_vprint(file, SPLAT_TASKQ_TEST8_NAME,
"Taskq '%s' create failed\n",
SPLAT_TASKQ_TEST8_NAME);
rc = -EINVAL;
goto out_free;
}
tq_arg.file = file;
tq_arg.name = SPLAT_TASKQ_TEST8_NAME;
tq_arg.count = &count;
atomic_set(tq_arg.count, 0);
for (i = 0; i < TEST8_NUM_TASKS; i++) {
tqes[i] = kmalloc(sizeof(taskq_ent_t), GFP_KERNEL);
if (tqes[i] == NULL) {
rc = -ENOMEM;
goto out;
}
taskq_init_ent(tqes[i]);
taskq_dispatch_ent(tq, splat_taskq_test8_func,
&tq_arg, TQ_SLEEP, tqes[i]);
id = tqes[i]->tqent_id;
if (id == 0) {
splat_vprint(file, SPLAT_TASKQ_TEST8_NAME,
"Taskq '%s' function '%s' dispatch "
"%d failed\n", tq_arg.name,
sym2str(splat_taskq_test8_func), i);
rc = -EINVAL;
goto out;
}
}
splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' "
"waiting for %d dispatches\n", tq_arg.name,
TEST8_NUM_TASKS);
taskq_wait(tq);
splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' "
"%d/%d dispatches finished\n", tq_arg.name,
atomic_read(tq_arg.count), TEST8_NUM_TASKS);
if (atomic_read(tq_arg.count) != TEST8_NUM_TASKS)
rc = -ERANGE;
out:
splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' destroying\n",
tq_arg.name);
taskq_destroy(tq);
out_free:
for (j = 0; j < TEST8_NUM_TASKS && tqes[j] != NULL; j++)
kfree(tqes[j]);
vfree(tqes);
return rc;
}
static int static int
splat_taskq_test8(struct file *file, void *arg) splat_taskq_test8(struct file *file, void *arg)
{ {
int rc; return (splat_taskq_throughput(file, arg,
SPLAT_TASKQ_TEST8_NAME, TEST8_THREADS_PER_TASKQ,
rc = splat_taskq_test8_common(file, arg, 1, 100); 1, INT_MAX, TASKQ_PREPOPULATE, TEST8_NUM_TASKS, NULL));
return rc;
} }
/* /*
@ -1433,6 +1439,46 @@ out_free:
return rc; return rc;
} }
/*
* Create a dynamic taskq with 100 threads and dispatch a huge number of
* trivial tasks. This will cause the taskq to grow quickly to its max
* thread count. This test should always pass. The purpose is to provide
* a benchmark for measuring the performance of dynamic taskqs.
*/
#define TEST11_NUM_TASKS 100000
#define TEST11_THREADS_PER_TASKQ 100
static int
splat_taskq_test11(struct file *file, void *arg)
{
struct timespec normal, dynamic;
int error;
error = splat_taskq_throughput(file, arg, SPLAT_TASKQ_TEST11_NAME,
TEST11_THREADS_PER_TASKQ, 1, INT_MAX,
TASKQ_PREPOPULATE, TEST11_NUM_TASKS, &normal);
if (error)
return (error);
error = splat_taskq_throughput(file, arg, SPLAT_TASKQ_TEST11_NAME,
TEST11_THREADS_PER_TASKQ, 1, INT_MAX,
TASKQ_PREPOPULATE | TASKQ_DYNAMIC, TEST11_NUM_TASKS, &dynamic);
if (error)
return (error);
splat_vprint(file, SPLAT_TASKQ_TEST11_NAME,
"Timing taskq_wait(): normal=%ld.%09lds, dynamic=%ld.%09lds\n",
normal.tv_sec, normal.tv_nsec,
dynamic.tv_sec, dynamic.tv_nsec);
/* A 10x increase in runtime is used to indicate a core problem. */
if ((dynamic.tv_sec * NANOSEC + dynamic.tv_nsec) >
((normal.tv_sec * NANOSEC + normal.tv_nsec) * 10))
error = -ETIME;
return (error);
}
splat_subsystem_t * splat_subsystem_t *
splat_taskq_init(void) splat_taskq_init(void)
{ {
@ -1470,6 +1516,8 @@ splat_taskq_init(void)
SPLAT_TASKQ_TEST9_ID, splat_taskq_test9); SPLAT_TASKQ_TEST9_ID, splat_taskq_test9);
SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST10_NAME, SPLAT_TASKQ_TEST10_DESC, SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST10_NAME, SPLAT_TASKQ_TEST10_DESC,
SPLAT_TASKQ_TEST10_ID, splat_taskq_test10); SPLAT_TASKQ_TEST10_ID, splat_taskq_test10);
SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST11_NAME, SPLAT_TASKQ_TEST11_DESC,
SPLAT_TASKQ_TEST11_ID, splat_taskq_test11);
return sub; return sub;
} }
@ -1478,6 +1526,7 @@ void
splat_taskq_fini(splat_subsystem_t *sub) splat_taskq_fini(splat_subsystem_t *sub)
{ {
ASSERT(sub); ASSERT(sub);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST11_ID);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST10_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST10_ID);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID);