diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 2c437f0e76..a43a86da65 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -40,6 +40,7 @@ #define TASKQ_DYNAMIC 0x00000004 #define TASKQ_THREADS_CPU_PCT 0x00000008 #define TASKQ_DC_BATCH 0x00000010 +#define TASKQ_ACTIVE 0x80000000 /* * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as @@ -53,7 +54,6 @@ #define TQ_NOALLOC 0x02000000 #define TQ_NEW 0x04000000 #define TQ_FRONT 0x08000000 -#define TQ_ACTIVE 0x80000000 typedef unsigned long taskqid_t; typedef void (task_func_t)(void *); @@ -61,11 +61,13 @@ typedef void (task_func_t)(void *); typedef struct taskq { spinlock_t tq_lock; /* protects taskq_t */ unsigned long tq_lock_flags; /* interrupt state */ - const char *tq_name; /* taskq name */ + char *tq_name; /* taskq name */ struct list_head tq_thread_list;/* list of all threads */ struct list_head tq_active_list;/* list of active threads */ int tq_nactive; /* # of active threads */ - int tq_nthreads; /* # of total threads */ + int tq_nthreads; /* # of existing threads */ + int tq_nspawn; /* # of threads being spawned */ + int tq_maxthreads; /* # of threads maximum */ int tq_pri; /* priority */ int tq_minalloc; /* min task_t pool size */ int tq_maxalloc; /* max task_t pool size */ diff --git a/man/man5/spl-module-parameters.5 b/man/man5/spl-module-parameters.5 index 3e7e877fbb..fc38605b25 100644 --- a/man/man5/spl-module-parameters.5 +++ b/man/man5/spl-module-parameters.5 @@ -249,3 +249,37 @@ where a thread should run. .sp Default value: \fB0\fR .RE + +.sp +.ne 2 +.na +\fBspl_taskq_thread_dynamic\fR (int) +.ad +.RS 12n +Allow dynamic taskqs. When enabled taskqs which set the TASKQ_DYNAMIC flag +will by default create only a single thread. New threads will be created on +demand up to a maximum allowed number to facilitate the completion of +outstanding tasks. Threads which are no longer needed will be promptly +destroyed. By default this behavior is enabled but it can be disabled to +aid performance analysis or troubleshooting. +.sp +Default value: \fB1\fR +.RE + +.sp +.ne 2 +.na +\fBspl_taskq_thread_sequential\fR (int) +.ad +.RS 12n +The number of items a taskq worker thread must handle without interruption +before requesting a new worker thread be spawned. This is used to control +how quickly taskqs ramp up the number of threads processing the queue. +Because Linux thread creation and destruction are relatively inexpensive a +small default value has been selected. This means that normally threads will +be created aggressively which is desirable. Increasing this value will +result in a slower thread creation rate which may be preferable for some +configurations. +.sp +Default value: \fB4\fR +.RE diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 49bb40a251..9cd193369c 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -31,10 +31,24 @@ int spl_taskq_thread_bind = 0; module_param(spl_taskq_thread_bind, int, 0644); MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default"); + +int spl_taskq_thread_dynamic = 1; +module_param(spl_taskq_thread_dynamic, int, 0644); +MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads"); + +int spl_taskq_thread_sequential = 4; +module_param(spl_taskq_thread_sequential, int, 0644); +MODULE_PARM_DESC(spl_taskq_thread_sequential, + "Create new taskq threads after N sequential tasks"); + /* Global system-wide dynamic task queue available for all consumers */ taskq_t *system_taskq; EXPORT_SYMBOL(system_taskq); +/* Private dedicated taskq for creating new taskq threads on demand. */ +static taskq_t *dynamic_taskq; +static taskq_thread_t *taskq_thread_create(taskq_t *); + static int task_km_flags(uint_t flags) { @@ -434,17 +448,22 @@ taskq_member(taskq_t *tq, void *t) { struct list_head *l; taskq_thread_t *tqt; + int found = 0; ASSERT(tq); ASSERT(t); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); list_for_each(l, &tq->tq_thread_list) { tqt = list_entry(l, taskq_thread_t, tqt_thread_list); - if (tqt->tqt_thread == (struct task_struct *)t) - return (1); + if (tqt->tqt_thread == (struct task_struct *)t) { + found = 1; + break; + } } + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - return (0); + return (found); } EXPORT_SYMBOL(taskq_member); @@ -516,7 +535,7 @@ taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) + if (!(tq->tq_flags & TASKQ_ACTIVE)) goto out; /* Do not queue the task unless there is idle thread for it */ @@ -568,7 +587,7 @@ taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) + if (!(tq->tq_flags & TASKQ_ACTIVE)) goto out; if ((t = task_alloc(tq, flags)) == NULL) @@ -604,12 +623,11 @@ taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, { ASSERT(tq); ASSERT(func); - ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* Taskq being destroyed and all tasks drained */ - if (!(tq->tq_flags & TQ_ACTIVE)) { + if (!(tq->tq_flags & TASKQ_ACTIVE)) { t->tqent_id = 0; goto out; } @@ -664,6 +682,97 @@ taskq_init_ent(taskq_ent_t *t) } EXPORT_SYMBOL(taskq_init_ent); +/* + * Return the next pending task, preference is given to tasks on the + * priority list which were dispatched with TQ_FRONT. + */ +static taskq_ent_t * +taskq_next_ent(taskq_t *tq) +{ + struct list_head *list; + + ASSERT(spin_is_locked(&tq->tq_lock)); + + if (!list_empty(&tq->tq_prio_list)) + list = &tq->tq_prio_list; + else if (!list_empty(&tq->tq_pend_list)) + list = &tq->tq_pend_list; + else + return (NULL); + + return (list_entry(list->next, taskq_ent_t, tqent_list)); +} + +/* + * Spawns a new thread for the specified taskq. + */ +static void +taskq_thread_spawn_task(void *arg) +{ + taskq_t *tq = (taskq_t *)arg; + + (void) taskq_thread_create(tq); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + tq->tq_nspawn--; + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); +} + +/* + * Spawn addition threads for dynamic taskqs (TASKQ_DYNMAIC) the current + * number of threads is insufficient to handle the pending tasks. These + * new threads must be created by the dedicated dynamic_taskq to avoid + * deadlocks between thread creation and memory reclaim. The system_taskq + * which is also a dynamic taskq cannot be safely used for this. + */ +static int +taskq_thread_spawn(taskq_t *tq, int seq_tasks) +{ + int spawning = 0; + + if (!(tq->tq_flags & TASKQ_DYNAMIC)) + return (0); + + if ((seq_tasks > spl_taskq_thread_sequential) && + (tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) && + (tq->tq_flags & TASKQ_ACTIVE)) { + spawning = (++tq->tq_nspawn); + taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task, + tq, TQ_NOSLEEP); + } + + return (spawning); +} + +/* + * Threads in a dynamic taskq should only exit once it has been completely + * drained and no other threads are actively servicing tasks. This prevents + * threads from being created and destroyed more than is required. + * + * The first thread is the thread list is treated as the primary thread. + * There is nothing special about the primary thread but in order to avoid + * all the taskq pids from changing we opt to make it long running. + */ +static int +taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt) +{ + ASSERT(spin_is_locked(&tq->tq_lock)); + + if (!(tq->tq_flags & TASKQ_DYNAMIC)) + return (0); + + if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t, + tqt_thread_list) == tqt) + return (0); + + return + ((tq->tq_nspawn == 0) && /* No threads are being spawned */ + (tq->tq_nactive == 0) && /* No threads are handling tasks */ + (tq->tq_nthreads > 1) && /* More than 1 thread is running */ + (!taskq_next_ent(tq)) && /* There are no pending tasks */ + (spl_taskq_thread_dynamic));/* Dynamic taskqs are allowed */ +} + static int taskq_thread(void *args) { @@ -672,7 +781,7 @@ taskq_thread(void *args) taskq_thread_t *tqt = args; taskq_t *tq; taskq_ent_t *t; - struct list_head *pend_list; + int seq_tasks = 0; ASSERT(tqt); tq = tqt->tqt_tq; @@ -683,7 +792,13 @@ taskq_thread(void *args) flush_signals(current); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Immediately exit if more threads than allowed were created. */ + if (tq->tq_nthreads >= tq->tq_maxthreads) + goto error; + tq->tq_nthreads++; + list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list); wake_up(&tq->tq_wait_waitq); set_current_state(TASK_INTERRUPTIBLE); @@ -691,25 +806,25 @@ taskq_thread(void *args) if (list_empty(&tq->tq_pend_list) && list_empty(&tq->tq_prio_list)) { + + if (taskq_thread_should_stop(tq, tqt)) { + wake_up_all(&tq->tq_wait_waitq); + break; + } + add_wait_queue_exclusive(&tq->tq_work_waitq, &wait); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + schedule(); + seq_tasks = 0; + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); remove_wait_queue(&tq->tq_work_waitq, &wait); } else { __set_current_state(TASK_RUNNING); } - - if (!list_empty(&tq->tq_prio_list)) - pend_list = &tq->tq_prio_list; - else if (!list_empty(&tq->tq_pend_list)) - pend_list = &tq->tq_pend_list; - else - pend_list = NULL; - - if (pend_list) { - t = list_entry(pend_list->next,taskq_ent_t,tqent_list); + if ((t = taskq_next_ent(tq)) != NULL) { list_del_init(&t->tqent_list); /* In order to support recursively dispatching a @@ -738,8 +853,7 @@ taskq_thread(void *args) tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ - if ((tq->tq_flags & TASKQ_DYNAMIC) || - !(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) + if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) task_done(tq, t); /* When the current lowest outstanding taskqid is @@ -749,9 +863,16 @@ taskq_thread(void *args) ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id); } + /* Spawn additional taskq threads if required. */ + if (taskq_thread_spawn(tq, ++seq_tasks)) + seq_tasks = 0; + tqt->tqt_id = 0; tqt->tqt_flags = 0; wake_up_all(&tq->tq_wait_waitq); + } else { + if (taskq_thread_should_stop(tq, tqt)) + break; } set_current_state(TASK_INTERRUPTIBLE); @@ -761,27 +882,56 @@ taskq_thread(void *args) __set_current_state(TASK_RUNNING); tq->tq_nthreads--; list_del_init(&tqt->tqt_thread_list); - kmem_free(tqt, sizeof(taskq_thread_t)); - +error: + kmem_free(tqt, sizeof (taskq_thread_t)); spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); return (0); } +static taskq_thread_t * +taskq_thread_create(taskq_t *tq) +{ + static int last_used_cpu = 0; + taskq_thread_t *tqt; + + tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE); + INIT_LIST_HEAD(&tqt->tqt_thread_list); + INIT_LIST_HEAD(&tqt->tqt_active_list); + tqt->tqt_tq = tq; + tqt->tqt_id = 0; + + tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, + "%s", tq->tq_name); + if (tqt->tqt_thread == NULL) { + kmem_free(tqt, sizeof (taskq_thread_t)); + return (NULL); + } + + if (spl_taskq_thread_bind) { + last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); + kthread_bind(tqt->tqt_thread, last_used_cpu); + } + + set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri)); + wake_up_process(tqt->tqt_thread); + + return (tqt); +} + taskq_t * taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { - static int last_used_cpu = 0; taskq_t *tq; taskq_thread_t *tqt; - int rc = 0, i, j = 0; + int count = 0, rc = 0, i; ASSERT(name != NULL); ASSERT(pri <= maxclsyspri); ASSERT(minalloc >= 0); ASSERT(maxalloc <= INT_MAX); - ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ + ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */ /* Scale the number of threads using nthreads as a percentage */ if (flags & TASKQ_THREADS_CPU_PCT) { @@ -792,24 +942,25 @@ taskq_create(const char *name, int nthreads, pri_t pri, nthreads = MAX((num_online_cpus() * nthreads) / 100, 1); } - tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE); + tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE); if (tq == NULL) return (NULL); spin_lock_init(&tq->tq_lock); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); INIT_LIST_HEAD(&tq->tq_thread_list); INIT_LIST_HEAD(&tq->tq_active_list); - tq->tq_name = name; - tq->tq_nactive = 0; - tq->tq_nthreads = 0; - tq->tq_pri = pri; - tq->tq_minalloc = minalloc; - tq->tq_maxalloc = maxalloc; - tq->tq_nalloc = 0; - tq->tq_flags = (flags | TQ_ACTIVE); - tq->tq_next_id = 1; - tq->tq_lowest_id = 1; + tq->tq_name = strdup(name); + tq->tq_nactive = 0; + tq->tq_nthreads = 0; + tq->tq_nspawn = 0; + tq->tq_maxthreads = nthreads; + tq->tq_pri = pri; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; + tq->tq_nalloc = 0; + tq->tq_flags = (flags | TASKQ_ACTIVE); + tq->tq_next_id = 1; + tq->tq_lowest_id = 1; INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); @@ -817,38 +968,28 @@ taskq_create(const char *name, int nthreads, pri_t pri, init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_wait_waitq); - if (flags & TASKQ_PREPOPULATE) + if (flags & TASKQ_PREPOPULATE) { + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + for (i = 0; i < minalloc; i++) task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + } + + if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) + nthreads = 1; for (i = 0; i < nthreads; i++) { - tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); - INIT_LIST_HEAD(&tqt->tqt_thread_list); - INIT_LIST_HEAD(&tqt->tqt_active_list); - tqt->tqt_tq = tq; - tqt->tqt_id = 0; - - tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, - "%s/%d", name, i); - if (tqt->tqt_thread) { - list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); - if (spl_taskq_thread_bind) { - last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); - kthread_bind(tqt->tqt_thread, last_used_cpu); - } - set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(pri)); - wake_up_process(tqt->tqt_thread); - j++; - } else { - kmem_free(tqt, sizeof(taskq_thread_t)); + tqt = taskq_thread_create(tq); + if (tqt == NULL) rc = 1; - } + else + count++; } /* Wait for all threads to be started before potential destroy */ - wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); + wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count); if (rc) { taskq_destroy(tq); @@ -868,10 +1009,16 @@ taskq_destroy(taskq_t *tq) ASSERT(tq); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - tq->tq_flags &= ~TQ_ACTIVE; + tq->tq_flags &= ~TASKQ_ACTIVE; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - /* TQ_ACTIVE cleared prevents new tasks being added to pending */ + /* + * When TASKQ_ACTIVE is clear new tasks may not be added nor may + * new worker threads be spawned for dynamic taskq. + */ + if (dynamic_taskq != NULL) + taskq_wait_outstanding(dynamic_taskq, 0); + taskq_wait(tq); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); @@ -884,7 +1031,7 @@ taskq_destroy(taskq_t *tq) */ while (!list_empty(&tq->tq_thread_list)) { tqt = list_entry(tq->tq_thread_list.next, - taskq_thread_t, tqt_thread_list); + taskq_thread_t, tqt_thread_list); thread = tqt->tqt_thread; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); @@ -902,8 +1049,9 @@ taskq_destroy(taskq_t *tq) task_free(tq, t); } - ASSERT(tq->tq_nthreads == 0); - ASSERT(tq->tq_nalloc == 0); + ASSERT0(tq->tq_nthreads); + ASSERT0(tq->tq_nalloc); + ASSERT0(tq->tq_nspawn); ASSERT(list_empty(&tq->tq_thread_list)); ASSERT(list_empty(&tq->tq_active_list)); ASSERT(list_empty(&tq->tq_free_list)); @@ -913,7 +1061,8 @@ taskq_destroy(taskq_t *tq) spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - kmem_free(tq, sizeof(taskq_t)); + strfree(tq->tq_name); + kmem_free(tq, sizeof (taskq_t)); } EXPORT_SYMBOL(taskq_destroy); @@ -927,11 +1076,22 @@ spl_taskq_init(void) if (system_taskq == NULL) return (1); + dynamic_taskq = taskq_create("spl_dynamic_taskq", 1, + minclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE); + if (dynamic_taskq == NULL) { + taskq_destroy(system_taskq); + return (1); + } + return (0); } void spl_taskq_fini(void) { + taskq_destroy(dynamic_taskq); + dynamic_taskq = NULL; + taskq_destroy(system_taskq); + system_taskq = NULL; } diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index 7d4ad5b690..645bc91459 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include "splat-internal.h" @@ -75,6 +76,10 @@ #define SPLAT_TASKQ_TEST10_NAME "cancel" #define SPLAT_TASKQ_TEST10_DESC "Cancel task execution" +#define SPLAT_TASKQ_TEST11_ID 0x020b +#define SPLAT_TASKQ_TEST11_NAME "dynamic" +#define SPLAT_TASKQ_TEST11_DESC "Dynamic task queue thread creation" + #define SPLAT_TASKQ_ORDER_MAX 8 #define SPLAT_TASKQ_DEPTH_MAX 16 @@ -1052,11 +1057,104 @@ splat_taskq_test7(struct file *file, void *arg) rc = splat_taskq_test7_impl(file, arg, B_FALSE); if (rc) - return rc; + return (rc); rc = splat_taskq_test7_impl(file, arg, B_TRUE); - return rc; + return (rc); +} + +static void +splat_taskq_throughput_func(void *arg) +{ + splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; + ASSERT(tq_arg); + + atomic_inc(tq_arg->count); +} + +static int +splat_taskq_throughput(struct file *file, void *arg, const char *name, + int nthreads, int minalloc, int maxalloc, int flags, int tasks, + struct timespec *delta) +{ + taskq_t *tq; + taskqid_t id; + splat_taskq_arg_t tq_arg; + taskq_ent_t **tqes; + atomic_t count; + struct timespec start, stop; + int i, j, rc = 0; + + tqes = vmalloc(sizeof (*tqes) * tasks); + if (tqes == NULL) + return (-ENOMEM); + + memset(tqes, 0, sizeof (*tqes) * tasks); + + splat_vprint(file, name, "Taskq '%s' creating (%d/%d/%d/%d)\n", + name, nthreads, minalloc, maxalloc, tasks); + if ((tq = taskq_create(name, nthreads, maxclsyspri, + minalloc, maxalloc, flags)) == NULL) { + splat_vprint(file, name, "Taskq '%s' create failed\n", name); + rc = -EINVAL; + goto out_free; + } + + tq_arg.file = file; + tq_arg.name = name; + tq_arg.count = &count; + atomic_set(tq_arg.count, 0); + + getnstimeofday(&start); + + for (i = 0; i < tasks; i++) { + tqes[i] = kmalloc(sizeof (taskq_ent_t), GFP_KERNEL); + if (tqes[i] == NULL) { + rc = -ENOMEM; + goto out; + } + + taskq_init_ent(tqes[i]); + taskq_dispatch_ent(tq, splat_taskq_throughput_func, + &tq_arg, TQ_SLEEP, tqes[i]); + id = tqes[i]->tqent_id; + + if (id == 0) { + splat_vprint(file, name, "Taskq '%s' function '%s' " + "dispatch %d failed\n", tq_arg.name, + sym2str(splat_taskq_throughput_func), i); + rc = -EINVAL; + goto out; + } + } + + splat_vprint(file, name, "Taskq '%s' waiting for %d dispatches\n", + tq_arg.name, tasks); + + taskq_wait(tq); + + if (delta != NULL) { + getnstimeofday(&stop); + *delta = timespec_sub(stop, start); + } + + splat_vprint(file, name, "Taskq '%s' %d/%d dispatches finished\n", + tq_arg.name, atomic_read(tq_arg.count), tasks); + + if (atomic_read(tq_arg.count) != tasks) + rc = -ERANGE; + +out: + splat_vprint(file, name, "Taskq '%s' destroying\n", tq_arg.name); + taskq_destroy(tq); +out_free: + for (j = 0; j < tasks && tqes[j] != NULL; j++) + kfree(tqes[j]); + + vfree(tqes); + + return (rc); } /* @@ -1065,107 +1163,15 @@ splat_taskq_test7(struct file *file, void *arg) * pass. The purpose is to provide a benchmark for measuring the * effectiveness of taskq optimizations. */ -static void -splat_taskq_test8_func(void *arg) -{ - splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; - ASSERT(tq_arg); - - atomic_inc(tq_arg->count); -} - -#define TEST8_NUM_TASKS 0x20000 -#define TEST8_THREADS_PER_TASKQ 100 - -static int -splat_taskq_test8_common(struct file *file, void *arg, int minalloc, - int maxalloc) -{ - taskq_t *tq; - taskqid_t id; - splat_taskq_arg_t tq_arg; - taskq_ent_t **tqes; - atomic_t count; - int i, j, rc = 0; - - tqes = vmalloc(sizeof(*tqes) * TEST8_NUM_TASKS); - if (tqes == NULL) - return -ENOMEM; - memset(tqes, 0, sizeof(*tqes) * TEST8_NUM_TASKS); - - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' creating (%d/%d/%d)\n", - SPLAT_TASKQ_TEST8_NAME, - minalloc, maxalloc, TEST8_NUM_TASKS); - if ((tq = taskq_create(SPLAT_TASKQ_TEST8_NAME, TEST8_THREADS_PER_TASKQ, - maxclsyspri, minalloc, maxalloc, - TASKQ_PREPOPULATE)) == NULL) { - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' create failed\n", - SPLAT_TASKQ_TEST8_NAME); - rc = -EINVAL; - goto out_free; - } - - tq_arg.file = file; - tq_arg.name = SPLAT_TASKQ_TEST8_NAME; - tq_arg.count = &count; - atomic_set(tq_arg.count, 0); - - for (i = 0; i < TEST8_NUM_TASKS; i++) { - tqes[i] = kmalloc(sizeof(taskq_ent_t), GFP_KERNEL); - if (tqes[i] == NULL) { - rc = -ENOMEM; - goto out; - } - taskq_init_ent(tqes[i]); - - taskq_dispatch_ent(tq, splat_taskq_test8_func, - &tq_arg, TQ_SLEEP, tqes[i]); - - id = tqes[i]->tqent_id; - - if (id == 0) { - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, - "Taskq '%s' function '%s' dispatch " - "%d failed\n", tq_arg.name, - sym2str(splat_taskq_test8_func), i); - rc = -EINVAL; - goto out; - } - } - - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' " - "waiting for %d dispatches\n", tq_arg.name, - TEST8_NUM_TASKS); - taskq_wait(tq); - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' " - "%d/%d dispatches finished\n", tq_arg.name, - atomic_read(tq_arg.count), TEST8_NUM_TASKS); - - if (atomic_read(tq_arg.count) != TEST8_NUM_TASKS) - rc = -ERANGE; - -out: - splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' destroying\n", - tq_arg.name); - taskq_destroy(tq); -out_free: - for (j = 0; j < TEST8_NUM_TASKS && tqes[j] != NULL; j++) - kfree(tqes[j]); - vfree(tqes); - - return rc; -} +#define TEST8_NUM_TASKS 0x20000 +#define TEST8_THREADS_PER_TASKQ 100 static int splat_taskq_test8(struct file *file, void *arg) { - int rc; - - rc = splat_taskq_test8_common(file, arg, 1, 100); - - return rc; + return (splat_taskq_throughput(file, arg, + SPLAT_TASKQ_TEST8_NAME, TEST8_THREADS_PER_TASKQ, + 1, INT_MAX, TASKQ_PREPOPULATE, TEST8_NUM_TASKS, NULL)); } /* @@ -1433,6 +1439,46 @@ out_free: return rc; } +/* + * Create a dynamic taskq with 100 threads and dispatch a huge number of + * trivial tasks. This will cause the taskq to grow quickly to its max + * thread count. This test should always pass. The purpose is to provide + * a benchmark for measuring the performance of dynamic taskqs. + */ +#define TEST11_NUM_TASKS 100000 +#define TEST11_THREADS_PER_TASKQ 100 + +static int +splat_taskq_test11(struct file *file, void *arg) +{ + struct timespec normal, dynamic; + int error; + + error = splat_taskq_throughput(file, arg, SPLAT_TASKQ_TEST11_NAME, + TEST11_THREADS_PER_TASKQ, 1, INT_MAX, + TASKQ_PREPOPULATE, TEST11_NUM_TASKS, &normal); + if (error) + return (error); + + error = splat_taskq_throughput(file, arg, SPLAT_TASKQ_TEST11_NAME, + TEST11_THREADS_PER_TASKQ, 1, INT_MAX, + TASKQ_PREPOPULATE | TASKQ_DYNAMIC, TEST11_NUM_TASKS, &dynamic); + if (error) + return (error); + + splat_vprint(file, SPLAT_TASKQ_TEST11_NAME, + "Timing taskq_wait(): normal=%ld.%09lds, dynamic=%ld.%09lds\n", + normal.tv_sec, normal.tv_nsec, + dynamic.tv_sec, dynamic.tv_nsec); + + /* A 10x increase in runtime is used to indicate a core problem. */ + if ((dynamic.tv_sec * NANOSEC + dynamic.tv_nsec) > + ((normal.tv_sec * NANOSEC + normal.tv_nsec) * 10)) + error = -ETIME; + + return (error); +} + splat_subsystem_t * splat_taskq_init(void) { @@ -1470,6 +1516,8 @@ splat_taskq_init(void) SPLAT_TASKQ_TEST9_ID, splat_taskq_test9); SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST10_NAME, SPLAT_TASKQ_TEST10_DESC, SPLAT_TASKQ_TEST10_ID, splat_taskq_test10); + SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST11_NAME, SPLAT_TASKQ_TEST11_DESC, + SPLAT_TASKQ_TEST11_ID, splat_taskq_test11); return sub; } @@ -1478,6 +1526,7 @@ void splat_taskq_fini(splat_subsystem_t *sub) { ASSERT(sub); + SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST11_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST10_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID);