Merge branch 'taskq'

Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Closes #199
This commit is contained in:
Brian Behlendorf 2012-12-12 10:45:11 -08:00
commit 18e0c500a7
8 changed files with 978 additions and 487 deletions

View File

@ -26,7 +26,6 @@ AC_DEFUN([SPL_AC_CONFIG_KERNEL], [
SPL_AC_TYPE_ATOMIC64_CMPXCHG
SPL_AC_TYPE_ATOMIC64_XCHG
SPL_AC_TYPE_UINTPTR_T
SPL_AC_3ARGS_INIT_WORK
SPL_AC_2ARGS_REGISTER_SYSCTL
SPL_AC_SET_SHRINKER
SPL_AC_3ARGS_SHRINKER_CALLBACK
@ -870,26 +869,6 @@ AC_DEFUN([SPL_AC_TYPE_UINTPTR_T],
])
])
dnl #
dnl # 2.6.20 API change,
dnl # INIT_WORK use 2 args and not store data inside
dnl #
AC_DEFUN([SPL_AC_3ARGS_INIT_WORK],
[AC_MSG_CHECKING([whether INIT_WORK wants 3 args])
SPL_LINUX_TRY_COMPILE([
#include <linux/workqueue.h>
],[
struct work_struct work __attribute__ ((unused));
INIT_WORK(&work, NULL, NULL);
],[
AC_MSG_RESULT(yes)
AC_DEFINE(HAVE_3ARGS_INIT_WORK, 1,
[INIT_WORK wants 3 args])
],[
AC_MSG_RESULT(no)
])
])
dnl #
dnl # 2.6.21 API change,
dnl # 'register_sysctl_table' use only one argument instead of two

View File

@ -1,49 +0,0 @@
/*****************************************************************************\
* Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.
* Copyright (C) 2007 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Brian Behlendorf <behlendorf1@llnl.gov>.
* UCRL-CODE-235197
*
* This file is part of the SPL, Solaris Porting Layer.
* For details, see <http://github.com/behlendorf/spl/>.
*
* The SPL is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* The SPL is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License along
* with the SPL. If not, see <http://www.gnu.org/licenses/>.
\*****************************************************************************/
#ifndef _SPL_WORKQUEUE_COMPAT_H
#define _SPL_WORKQUEUE_COMPAT_H
#include <linux/workqueue.h>
#include <sys/types.h>
#ifdef HAVE_3ARGS_INIT_WORK
#define delayed_work work_struct
#define spl_init_work(wq, cb, d) INIT_WORK((wq), (void *)(cb), \
(void *)(d))
#define spl_init_delayed_work(wq,cb,d) INIT_WORK((wq), (void *)(cb), \
(void *)(d))
#define spl_get_work_data(d, t, f) (t *)(d)
#else
#define spl_init_work(wq, cb, d) INIT_WORK((wq), (void *)(cb));
#define spl_init_delayed_work(wq,cb,d) INIT_DELAYED_WORK((wq), (void *)(cb));
#define spl_get_work_data(d, t, f) (t *)container_of(d, t, f)
#endif /* HAVE_3ARGS_INIT_WORK */
#endif /* _SPL_WORKQUEUE_COMPAT_H */

View File

@ -37,6 +37,7 @@
#include <sys/types.h>
#include <sys/vmsystm.h>
#include <sys/kstat.h>
#include <sys/taskq.h>
/*
* Memory allocation interfaces
@ -406,7 +407,6 @@ typedef struct spl_kmem_magazine {
uint32_t skm_size; /* Magazine size */
uint32_t skm_refill; /* Batch refill size */
struct spl_kmem_cache *skm_cache; /* Owned by cache */
struct delayed_work skm_work; /* Magazine reclaim work */
unsigned long skm_age; /* Last cache access */
unsigned int skm_cpu; /* Owned by cpu */
void *skm_objs[0]; /* Object pointers */
@ -432,7 +432,7 @@ typedef struct spl_kmem_slab {
typedef struct spl_kmem_alloc {
struct spl_kmem_cache *ska_cache; /* Owned by cache */
int ska_flags; /* Allocation flags */
struct delayed_work ska_work; /* Allocation work */
taskq_ent_t ska_tqe; /* Task queue entry */
} spl_kmem_alloc_t;
typedef struct spl_kmem_emergency {
@ -460,7 +460,7 @@ typedef struct spl_kmem_cache {
uint32_t skc_delay; /* Slab reclaim interval */
uint32_t skc_reap; /* Slab reclaim count */
atomic_t skc_ref; /* Ref count callers */
struct delayed_work skc_work; /* Slab reclaim work */
taskqid_t skc_taskqid; /* Slab reclaim task */
struct list_head skc_list; /* List of caches linkage */
struct list_head skc_complete_list;/* Completely alloc'ed */
struct list_head skc_partial_list; /* Partially alloc'ed */

View File

@ -41,20 +41,6 @@
#define TASKQ_THREADS_CPU_PCT 0x00000008
#define TASKQ_DC_BATCH 0x00000010
typedef unsigned long taskqid_t;
typedef void (task_func_t)(void *);
typedef struct taskq_ent {
spinlock_t tqent_lock;
struct list_head tqent_list;
taskqid_t tqent_id;
task_func_t *tqent_func;
void *tqent_arg;
uintptr_t tqent_flags;
} taskq_ent_t;
#define TQENT_FLAG_PREALLOC 0x1
/*
* Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as
* KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly
@ -69,6 +55,9 @@ typedef struct taskq_ent {
#define TQ_FRONT 0x08000000
#define TQ_ACTIVE 0x80000000
typedef unsigned long taskqid_t;
typedef void (task_func_t)(void *);
typedef struct taskq {
spinlock_t tq_lock; /* protects taskq_t */
unsigned long tq_lock_flags; /* interrupt state */
@ -87,47 +76,60 @@ typedef struct taskq {
struct list_head tq_free_list; /* free task_t's */
struct list_head tq_pend_list; /* pending task_t's */
struct list_head tq_prio_list; /* priority pending task_t's */
struct list_head tq_delay_list; /* delayed task_t's */
wait_queue_head_t tq_work_waitq; /* new work waitq */
wait_queue_head_t tq_wait_waitq; /* wait waitq */
} taskq_t;
typedef struct taskq_ent {
spinlock_t tqent_lock;
wait_queue_head_t tqent_waitq;
struct timer_list tqent_timer;
struct list_head tqent_list;
taskqid_t tqent_id;
task_func_t *tqent_func;
void *tqent_arg;
taskq_t *tqent_taskq;
uintptr_t tqent_flags;
} taskq_ent_t;
#define TQENT_FLAG_PREALLOC 0x1
#define TQENT_FLAG_CANCEL 0x2
typedef struct taskq_thread {
struct list_head tqt_thread_list;
struct list_head tqt_active_list;
struct task_struct *tqt_thread;
taskq_t *tqt_tq;
taskqid_t tqt_id;
taskq_ent_t *tqt_task;
uintptr_t tqt_flags;
} taskq_thread_t;
/* Global system-wide dynamic task queue available for all consumers */
extern taskq_t *system_taskq;
extern taskqid_t __taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
extern void __taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *);
extern int __taskq_empty_ent(taskq_ent_t *);
extern void __taskq_init_ent(taskq_ent_t *);
extern taskq_t *__taskq_create(const char *, int, pri_t, int, int, uint_t);
extern void __taskq_destroy(taskq_t *);
extern void __taskq_wait_id(taskq_t *, taskqid_t);
extern void __taskq_wait(taskq_t *);
extern int __taskq_member(taskq_t *, void *);
extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t);
extern taskqid_t taskq_dispatch_delay(taskq_t *, task_func_t, void *,
uint_t, clock_t);
extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t,
taskq_ent_t *);
extern int taskq_empty_ent(taskq_ent_t *);
extern void taskq_init_ent(taskq_ent_t *);
extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t);
extern void taskq_destroy(taskq_t *);
extern void taskq_wait_id(taskq_t *, taskqid_t);
extern void taskq_wait_all(taskq_t *, taskqid_t);
extern void taskq_wait(taskq_t *);
extern int taskq_cancel_id(taskq_t *, taskqid_t);
extern int taskq_member(taskq_t *, void *);
#define taskq_create_proc(name, nthreads, pri, min, max, proc, flags) \
taskq_create(name, nthreads, pri, min, max, flags)
#define taskq_create_sysdc(name, nthreads, min, max, proc, dc, flags) \
taskq_create(name, nthreads, maxclsyspri, min, max, flags)
int spl_taskq_init(void);
void spl_taskq_fini(void);
#define taskq_member(tq, t) __taskq_member(tq, t)
#define taskq_wait_id(tq, id) __taskq_wait_id(tq, id)
#define taskq_wait(tq) __taskq_wait(tq)
#define taskq_dispatch(tq, f, p, fl) __taskq_dispatch(tq, f, p, fl)
#define taskq_dispatch_ent(tq, f, p, fl, t) __taskq_dispatch_ent(tq, f, p, fl, t)
#define taskq_empty_ent(t) __taskq_empty_ent(t)
#define taskq_init_ent(t) __taskq_init_ent(t)
#define taskq_create(n, th, p, mi, ma, fl) __taskq_create(n, th, p, mi, ma, fl)
#define taskq_create_proc(n, th, p, mi, ma, pr, fl) \
__taskq_create(n, th, p, mi, ma, fl)
#define taskq_create_sysdc(n, th, mi, ma, pr, dc, fl) \
__taskq_create(n, th, maxclsyspri, mi, ma, fl)
#define taskq_destroy(tq) __taskq_destroy(tq)
#endif /* _SPL_TASKQ_H */

View File

@ -34,7 +34,6 @@
#include <linux/time_compat.h>
#include <linux/bitops_compat.h>
#include <linux/smp_compat.h>
#include <linux/workqueue_compat.h>
#include <linux/kallsyms_compat.h>
#include <linux/mutex_compat.h>
#include <linux/module_compat.h>

View File

@ -825,6 +825,7 @@ EXPORT_SYMBOL(vmem_free_debug);
struct list_head spl_kmem_cache_list; /* List of caches */
struct rw_semaphore spl_kmem_cache_sem; /* Cache list lock */
taskq_t *spl_kmem_cache_taskq; /* Task queue for ageing / reclaim */
static int spl_cache_flush(spl_kmem_cache_t *skc,
spl_kmem_magazine_t *skm, int flush);
@ -1243,50 +1244,59 @@ spl_emergency_free(spl_kmem_cache_t *skc, void *obj)
SRETURN(0);
}
/*
* Called regularly on all caches to age objects out of the magazines
* which have not been access in skc->skc_delay seconds. This prevents
* idle magazines from holding memory which might be better used by
* other caches or parts of the system. The delay is present to
* prevent thrashing the magazine.
*/
static void
spl_magazine_age(void *data)
{
spl_kmem_magazine_t *skm =
spl_get_work_data(data, spl_kmem_magazine_t, skm_work.work);
spl_kmem_cache_t *skc = skm->skm_cache;
spl_kmem_cache_t *skc = (spl_kmem_cache_t *)data;
spl_kmem_magazine_t *skm = skc->skc_mag[smp_processor_id()];
ASSERT(skm->skm_magic == SKM_MAGIC);
ASSERT(skc->skc_magic == SKC_MAGIC);
ASSERT(skc->skc_mag[skm->skm_cpu] == skm);
ASSERT(skm->skm_cpu == smp_processor_id());
if (skm->skm_avail > 0 &&
time_after(jiffies, skm->skm_age + skc->skc_delay * HZ))
if (skm->skm_avail > 0)
if (time_after(jiffies, skm->skm_age + skc->skc_delay * HZ))
(void) spl_cache_flush(skc, skm, skm->skm_refill);
if (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags))
schedule_delayed_work_on(skm->skm_cpu, &skm->skm_work,
skc->skc_delay / 3 * HZ);
}
/*
* Called regularly to keep a downward pressure on the size of idle
* magazines and to release free slabs from the cache. This function
* never calls the registered reclaim function, that only occurs
* under memory pressure or with a direct call to spl_kmem_reap().
* Called regularly to keep a downward pressure on the cache.
*
* Objects older than skc->skc_delay seconds in the per-cpu magazines will
* be returned to the caches. This is done to prevent idle magazines from
* holding memory which could be better used elsewhere. The delay is
* present to prevent thrashing the magazine.
*
* The newly released objects may result in empty partial slabs. Those
* slabs should be released to the system. Otherwise moving the objects
* out of the magazines is just wasted work.
*/
static void
spl_cache_age(void *data)
{
spl_kmem_cache_t *skc =
spl_get_work_data(data, spl_kmem_cache_t, skc_work.work);
spl_kmem_cache_t *skc = (spl_kmem_cache_t *)data;
taskqid_t id = 0;
ASSERT(skc->skc_magic == SKC_MAGIC);
atomic_inc(&skc->skc_ref);
spl_on_each_cpu(spl_magazine_age, skc, 1);
spl_slab_reclaim(skc, skc->skc_reap, 0);
if (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags))
schedule_delayed_work(&skc->skc_work, skc->skc_delay / 3 * HZ);
while (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags) && !id) {
id = taskq_dispatch_delay(
spl_kmem_cache_taskq, spl_cache_age, skc, TQ_SLEEP,
ddi_get_lbolt() + skc->skc_delay / 3 * HZ);
/* Destroy issued after dispatch immediately cancel it */
if (test_bit(KMC_BIT_DESTROY, &skc->skc_flags) && id)
taskq_cancel_id(spl_kmem_cache_taskq, id);
}
spin_lock(&skc->skc_lock);
skc->skc_taskqid = id;
spin_unlock(&skc->skc_lock);
atomic_dec(&skc->skc_ref);
}
/*
@ -1380,7 +1390,6 @@ spl_magazine_alloc(spl_kmem_cache_t *skc, int cpu)
skm->skm_size = skc->skc_mag_size;
skm->skm_refill = skc->skc_mag_refill;
skm->skm_cache = skc;
spl_init_delayed_work(&skm->skm_work, spl_magazine_age, skm);
skm->skm_age = jiffies;
skm->skm_cpu = cpu;
}
@ -1427,11 +1436,6 @@ spl_magazine_create(spl_kmem_cache_t *skc)
}
}
/* Only after everything is allocated schedule magazine work */
for_each_online_cpu(i)
schedule_delayed_work_on(i, &skc->skc_mag[i]->skm_work,
skc->skc_delay / 3 * HZ);
SRETURN(0);
}
@ -1482,7 +1486,7 @@ spl_kmem_cache_create(char *name, size_t size, size_t align,
void *priv, void *vmp, int flags)
{
spl_kmem_cache_t *skc;
int rc, kmem_flags = KM_SLEEP;
int rc;
SENTRY;
ASSERTF(!(flags & KMC_NOMAGAZINE), "Bad KMC_NOMAGAZINE (%x)\n", flags);
@ -1490,25 +1494,22 @@ spl_kmem_cache_create(char *name, size_t size, size_t align,
ASSERTF(!(flags & KMC_QCACHE), "Bad KMC_QCACHE (%x)\n", flags);
ASSERT(vmp == NULL);
/* We may be called when there is a non-zero preempt_count or
* interrupts are disabled is which case we must not sleep.
*/
if (current_thread_info()->preempt_count || irqs_disabled())
kmem_flags = KM_NOSLEEP;
might_sleep();
/* Allocate memory for a new cache an initialize it. Unfortunately,
/*
* Allocate memory for a new cache an initialize it. Unfortunately,
* this usually ends up being a large allocation of ~32k because
* we need to allocate enough memory for the worst case number of
* cpus in the magazine, skc_mag[NR_CPUS]. Because of this we
* explicitly pass KM_NODEBUG to suppress the kmem warning */
skc = (spl_kmem_cache_t *)kmem_zalloc(sizeof(*skc),
kmem_flags | KM_NODEBUG);
* explicitly pass KM_NODEBUG to suppress the kmem warning
*/
skc = kmem_zalloc(sizeof(*skc), KM_SLEEP| KM_NODEBUG);
if (skc == NULL)
SRETURN(NULL);
skc->skc_magic = SKC_MAGIC;
skc->skc_name_size = strlen(name) + 1;
skc->skc_name = (char *)kmem_alloc(skc->skc_name_size, kmem_flags);
skc->skc_name = (char *)kmem_alloc(skc->skc_name_size, KM_SLEEP);
if (skc->skc_name == NULL) {
kmem_free(skc, sizeof(*skc));
SRETURN(NULL);
@ -1569,8 +1570,9 @@ spl_kmem_cache_create(char *name, size_t size, size_t align,
if (rc)
SGOTO(out, rc);
spl_init_delayed_work(&skc->skc_work, spl_cache_age, skc);
schedule_delayed_work(&skc->skc_work, skc->skc_delay / 3 * HZ);
skc->skc_taskqid = taskq_dispatch_delay(spl_kmem_cache_taskq,
spl_cache_age, skc, TQ_SLEEP,
ddi_get_lbolt() + skc->skc_delay / 3 * HZ);
down_write(&spl_kmem_cache_sem);
list_add_tail(&skc->skc_list, &spl_kmem_cache_list);
@ -1603,7 +1605,7 @@ void
spl_kmem_cache_destroy(spl_kmem_cache_t *skc)
{
DECLARE_WAIT_QUEUE_HEAD(wq);
int i;
taskqid_t id;
SENTRY;
ASSERT(skc->skc_magic == SKC_MAGIC);
@ -1612,13 +1614,14 @@ spl_kmem_cache_destroy(spl_kmem_cache_t *skc)
list_del_init(&skc->skc_list);
up_write(&spl_kmem_cache_sem);
/* Cancel any and wait for any pending delayed work */
/* Cancel any and wait for any pending delayed tasks */
VERIFY(!test_and_set_bit(KMC_BIT_DESTROY, &skc->skc_flags));
cancel_delayed_work_sync(&skc->skc_work);
for_each_online_cpu(i)
cancel_delayed_work_sync(&skc->skc_mag[i]->skm_work);
flush_scheduled_work();
spin_lock(&skc->skc_lock);
id = skc->skc_taskqid;
spin_unlock(&skc->skc_lock);
taskq_cancel_id(spl_kmem_cache_taskq, id);
/* Wait until all current callers complete, this is mainly
* to catch the case where a low memory situation triggers a
@ -1694,8 +1697,7 @@ spl_cache_obj(spl_kmem_cache_t *skc, spl_kmem_slab_t *sks)
static void
spl_cache_grow_work(void *data)
{
spl_kmem_alloc_t *ska =
spl_get_work_data(data, spl_kmem_alloc_t, ska_work.work);
spl_kmem_alloc_t *ska = (spl_kmem_alloc_t *)data;
spl_kmem_cache_t *skc = ska->ska_cache;
spl_kmem_slab_t *sks;
@ -1774,8 +1776,9 @@ spl_cache_grow(spl_kmem_cache_t *skc, int flags, void **obj)
atomic_inc(&skc->skc_ref);
ska->ska_cache = skc;
ska->ska_flags = flags & ~__GFP_FS;
spl_init_delayed_work(&ska->ska_work, spl_cache_grow_work, ska);
schedule_delayed_work(&ska->ska_work, 0);
taskq_init_ent(&ska->ska_tqe);
taskq_dispatch_ent(spl_kmem_cache_taskq,
spl_cache_grow_work, ska, 0, &ska->ska_tqe);
}
/*
@ -2397,6 +2400,8 @@ spl_kmem_init(void)
init_rwsem(&spl_kmem_cache_sem);
INIT_LIST_HEAD(&spl_kmem_cache_list);
spl_kmem_cache_taskq = taskq_create("spl_kmem_cache",
1, maxclsyspri, 1, 32, TASKQ_PREPOPULATE);
spl_register_shrinker(&spl_kmem_cache_shrinker);
@ -2435,6 +2440,7 @@ spl_kmem_fini(void)
SENTRY;
spl_unregister_shrinker(&spl_kmem_cache_shrinker);
taskq_destroy(spl_kmem_cache_taskq);
SEXIT;
}

View File

@ -69,6 +69,8 @@ retry:
t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));
ASSERT(!timer_pending(&t->tqent_timer));
list_del_init(&t->tqent_list);
SRETURN(t);
@ -86,7 +88,7 @@ retry:
/*
* Sleep periodically polling the free list for an available
* taskq_ent_t. Dispatching with TQ_SLEEP should always succeed
* but we cannot block forever waiting for an taskq_entq_t to
* but we cannot block forever waiting for an taskq_ent_t to
* show up in the free list, otherwise a deadlock can happen.
*
* Therefore, we need to allocate a new task even if the number
@ -126,6 +128,7 @@ task_free(taskq_t *tq, taskq_ent_t *t)
ASSERT(t);
ASSERT(spin_is_locked(&tq->tq_lock));
ASSERT(list_empty(&t->tqent_list));
ASSERT(!timer_pending(&t->tqent_timer));
kmem_free(t, sizeof(taskq_ent_t));
tq->tq_nalloc--;
@ -145,6 +148,9 @@ task_done(taskq_t *tq, taskq_ent_t *t)
ASSERT(t);
ASSERT(spin_is_locked(&tq->tq_lock));
/* Wake tasks blocked in taskq_wait_id() */
wake_up_all(&t->tqent_waitq);
list_del_init(&t->tqent_list);
if (tq->tq_nalloc <= tq->tq_minalloc) {
@ -162,31 +168,241 @@ task_done(taskq_t *tq, taskq_ent_t *t)
}
/*
* As tasks are submitted to the task queue they are assigned a
* monotonically increasing taskqid and added to the tail of the pending
* list. As worker threads become available the tasks are removed from
* the head of the pending or priority list, giving preference to the
* priority list. The tasks are then removed from their respective
* list, and the taskq_thread servicing the task is added to the active
* list, preserving the order using the serviced task's taskqid.
* Finally, as tasks complete the taskq_thread servicing the task is
* removed from the active list. This means that the pending task and
* active taskq_thread lists are always kept sorted by taskqid. Thus the
* lowest outstanding incomplete taskqid can be determined simply by
* checking the min taskqid for each head item on the pending, priority,
* and active taskq_thread list. This value is stored in
* tq->tq_lowest_id and only updated to the new lowest id when the
* previous lowest id completes. All taskqids lower than
* tq->tq_lowest_id must have completed. It is also possible larger
* taskqid's have completed because they may be processed in parallel by
* several worker threads. However, this is not a problem because the
* behavior of taskq_wait_id() is to block until all previously
* submitted taskqid's have completed.
* When a delayed task timer expires remove it from the delay list and
* add it to the priority list in order for immediate processing.
*/
static void
task_expire(unsigned long data)
{
taskq_ent_t *w, *t = (taskq_ent_t *)data;
taskq_t *tq = t->tqent_taskq;
struct list_head *l;
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
if (t->tqent_flags & TQENT_FLAG_CANCEL) {
ASSERT(list_empty(&t->tqent_list));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
return;
}
/*
* The priority list must be maintained in strict task id order
* from lowest to highest for lowest_id to be easily calculable.
*/
list_del(&t->tqent_list);
list_for_each_prev(l, &tq->tq_prio_list) {
w = list_entry(l, taskq_ent_t, tqent_list);
if (w->tqent_id < t->tqent_id) {
list_add(&t->tqent_list, l);
break;
}
}
if (l == &tq->tq_prio_list)
list_add(&t->tqent_list, &tq->tq_prio_list);
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
wake_up(&tq->tq_work_waitq);
}
/*
* Returns the lowest incomplete taskqid_t. The taskqid_t may
* be queued on the pending list, on the priority list, on the
* delay list, or on the work list currently being handled, but
* it is not 100% complete yet.
*/
static taskqid_t
taskq_lowest_id(taskq_t *tq)
{
taskqid_t lowest_id = tq->tq_next_id;
taskq_ent_t *t;
taskq_thread_t *tqt;
SENTRY;
ASSERT(tq);
ASSERT(spin_is_locked(&tq->tq_lock));
if (!list_empty(&tq->tq_pend_list)) {
t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
lowest_id = MIN(lowest_id, t->tqent_id);
}
if (!list_empty(&tq->tq_prio_list)) {
t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
lowest_id = MIN(lowest_id, t->tqent_id);
}
if (!list_empty(&tq->tq_delay_list)) {
t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);
lowest_id = MIN(lowest_id, t->tqent_id);
}
if (!list_empty(&tq->tq_active_list)) {
tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
tqt_active_list);
ASSERT(tqt->tqt_id != 0);
lowest_id = MIN(lowest_id, tqt->tqt_id);
}
SRETURN(lowest_id);
}
/*
* Insert a task into a list keeping the list sorted by increasing taskqid.
*/
static void
taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
{
taskq_thread_t *w;
struct list_head *l;
SENTRY;
ASSERT(tq);
ASSERT(tqt);
ASSERT(spin_is_locked(&tq->tq_lock));
list_for_each_prev(l, &tq->tq_active_list) {
w = list_entry(l, taskq_thread_t, tqt_active_list);
if (w->tqt_id < tqt->tqt_id) {
list_add(&tqt->tqt_active_list, l);
break;
}
}
if (l == &tq->tq_active_list)
list_add(&tqt->tqt_active_list, &tq->tq_active_list);
SEXIT;
}
/*
* Find and return a task from the given list if it exists. The list
* must be in lowest to highest task id order.
*/
static taskq_ent_t *
taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)
{
struct list_head *l;
taskq_ent_t *t;
SENTRY;
ASSERT(spin_is_locked(&tq->tq_lock));
list_for_each(l, lh) {
t = list_entry(l, taskq_ent_t, tqent_list);
if (t->tqent_id == id)
SRETURN(t);
if (t->tqent_id > id)
break;
}
SRETURN(NULL);
}
/*
* Find an already dispatched task given the task id regardless of what
* state it is in. If a task is still pending or executing it will be
* returned and 'active' set appropriately. If the task has already
* been run then NULL is returned.
*/
static taskq_ent_t *
taskq_find(taskq_t *tq, taskqid_t id, int *active)
{
taskq_thread_t *tqt;
struct list_head *l;
taskq_ent_t *t;
SENTRY;
ASSERT(spin_is_locked(&tq->tq_lock));
*active = 0;
t = taskq_find_list(tq, &tq->tq_delay_list, id);
if (t)
SRETURN(t);
t = taskq_find_list(tq, &tq->tq_prio_list, id);
if (t)
SRETURN(t);
t = taskq_find_list(tq, &tq->tq_pend_list, id);
if (t)
SRETURN(t);
list_for_each(l, &tq->tq_active_list) {
tqt = list_entry(l, taskq_thread_t, tqt_active_list);
if (tqt->tqt_id == id) {
t = tqt->tqt_task;
*active = 1;
SRETURN(t);
}
}
SRETURN(NULL);
}
/*
* The taskq_wait_id() function blocks until the passed task id completes.
* This does not guarantee that all lower task id's have completed.
*/
void
taskq_wait_id(taskq_t *tq, taskqid_t id)
{
DEFINE_WAIT(wait);
taskq_ent_t *t;
int active = 0;
SENTRY;
ASSERT(tq);
ASSERT(id > 0);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
t = taskq_find(tq, id, &active);
if (t)
prepare_to_wait(&t->tqent_waitq, &wait, TASK_UNINTERRUPTIBLE);
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
/*
* We rely on the kernels autoremove_wake_function() function to
* remove us from the wait queue in the context of wake_up().
* Once woken the taskq_ent_t pointer must never be accessed.
*/
if (t) {
t = NULL;
schedule();
__set_current_state(TASK_RUNNING);
}
SEXIT;
}
EXPORT_SYMBOL(taskq_wait_id);
/*
* The taskq_wait() function will block until all previously submitted
* tasks have been completed. A previously submitted task is defined as
* a task with a lower task id than the current task queue id. Note that
* all task id's are assigned monotonically at dispatch time.
*
* XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are
* 64-bit values so even if a taskq is processing 2^24 (16,777,216)
* taskqid_ts per second it will still take 2^40 seconds, 34,865 years,
* before the wrap occurs. I can live with that for now.
* Waiting for all previous tasks to complete is accomplished by tracking
* the lowest outstanding task id. As tasks are dispatched they are added
* added to the tail of the pending, priority, or delay lists. And as
* worker threads become available the tasks are removed from the heads
* of these lists and linked to the worker threads. This ensures the
* lists are kept in lowest to highest task id order.
*
* Therefore the lowest outstanding task id can be quickly determined by
* checking the head item from all of these lists. This value is stored
* with the task queue as the lowest id. It only needs to be recalculated
* when either the task with the current lowest id completes or is canceled.
*
* By blocking until the lowest task id exceeds the current task id when
* the function was called we ensure all previous tasks have completed.
*
* NOTE: When there are multiple worked threads it is possible for larger
* task ids to complete before smaller ones. Conversely when the task
* queue contains delay tasks with small task ids, you may block for a
* considerable length of time waiting for them to expire and execute.
*/
static int
taskq_wait_check(taskq_t *tq, taskqid_t id)
@ -201,19 +417,14 @@ taskq_wait_check(taskq_t *tq, taskqid_t id)
}
void
__taskq_wait_id(taskq_t *tq, taskqid_t id)
taskq_wait_all(taskq_t *tq, taskqid_t id)
{
SENTRY;
ASSERT(tq);
wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id));
SEXIT;
}
EXPORT_SYMBOL(__taskq_wait_id);
EXPORT_SYMBOL(taskq_wait_all);
void
__taskq_wait(taskq_t *tq)
taskq_wait(taskq_t *tq)
{
taskqid_t id;
SENTRY;
@ -224,15 +435,15 @@ __taskq_wait(taskq_t *tq)
id = tq->tq_next_id - 1;
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
__taskq_wait_id(tq, id);
taskq_wait_all(tq, id);
SEXIT;
}
EXPORT_SYMBOL(__taskq_wait);
EXPORT_SYMBOL(taskq_wait);
int
__taskq_member(taskq_t *tq, void *t)
taskq_member(taskq_t *tq, void *t)
{
struct list_head *l;
taskq_thread_t *tqt;
@ -249,10 +460,67 @@ __taskq_member(taskq_t *tq, void *t)
SRETURN(0);
}
EXPORT_SYMBOL(__taskq_member);
EXPORT_SYMBOL(taskq_member);
/*
* Cancel an already dispatched task given the task id. Still pending tasks
* will be immediately canceled, and if the task is active the function will
* block until it completes. Preallocated tasks which are canceled must be
* freed by the caller.
*/
int
taskq_cancel_id(taskq_t *tq, taskqid_t id)
{
taskq_ent_t *t;
int active = 0;
int rc = ENOENT;
SENTRY;
ASSERT(tq);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
t = taskq_find(tq, id, &active);
if (t && !active) {
list_del_init(&t->tqent_list);
t->tqent_flags |= TQENT_FLAG_CANCEL;
/*
* When canceling the lowest outstanding task id we
* must recalculate the new lowest outstanding id.
*/
if (tq->tq_lowest_id == t->tqent_id) {
tq->tq_lowest_id = taskq_lowest_id(tq);
ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);
}
/*
* The task_expire() function takes the tq->tq_lock so drop
* drop the lock before synchronously cancelling the timer.
*/
if (timer_pending(&t->tqent_timer)) {
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
del_timer_sync(&t->tqent_timer);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
}
if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))
task_done(tq, t);
rc = 0;
}
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
if (active) {
taskq_wait_id(tq, id);
rc = EBUSY;
}
SRETURN(rc);
}
EXPORT_SYMBOL(taskq_cancel_id);
taskqid_t
__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
{
taskq_ent_t *t;
taskqid_t rc = 0;
@ -287,6 +555,10 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
tq->tq_next_id++;
t->tqent_func = func;
t->tqent_arg = arg;
t->tqent_taskq = tq;
t->tqent_timer.data = 0;
t->tqent_timer.function = NULL;
t->tqent_timer.expires = 0;
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
@ -297,10 +569,54 @@ out:
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
SRETURN(rc);
}
EXPORT_SYMBOL(__taskq_dispatch);
EXPORT_SYMBOL(taskq_dispatch);
taskqid_t
taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
uint_t flags, clock_t expire_time)
{
taskq_ent_t *t;
taskqid_t rc = 0;
SENTRY;
ASSERT(tq);
ASSERT(func);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
/* Taskq being destroyed and all tasks drained */
if (!(tq->tq_flags & TQ_ACTIVE))
SGOTO(out, rc = 0);
if ((t = task_alloc(tq, flags)) == NULL)
SGOTO(out, rc = 0);
spin_lock(&t->tqent_lock);
/* Queue to the delay list for subsequent execution */
list_add_tail(&t->tqent_list, &tq->tq_delay_list);
t->tqent_id = rc = tq->tq_next_id;
tq->tq_next_id++;
t->tqent_func = func;
t->tqent_arg = arg;
t->tqent_taskq = tq;
t->tqent_timer.data = (unsigned long)t;
t->tqent_timer.function = task_expire;
t->tqent_timer.expires = (unsigned long)expire_time;
add_timer(&t->tqent_timer);
ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
spin_unlock(&t->tqent_lock);
out:
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
SRETURN(rc);
}
EXPORT_SYMBOL(taskq_dispatch_delay);
void
__taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
taskq_ent_t *t)
{
SENTRY;
@ -335,6 +651,7 @@ __taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
tq->tq_next_id++;
t->tqent_func = func;
t->tqent_arg = arg;
t->tqent_taskq = tq;
spin_unlock(&t->tqent_lock);
@ -343,91 +660,29 @@ out:
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
SEXIT;
}
EXPORT_SYMBOL(__taskq_dispatch_ent);
EXPORT_SYMBOL(taskq_dispatch_ent);
int
__taskq_empty_ent(taskq_ent_t *t)
taskq_empty_ent(taskq_ent_t *t)
{
return list_empty(&t->tqent_list);
}
EXPORT_SYMBOL(__taskq_empty_ent);
EXPORT_SYMBOL(taskq_empty_ent);
void
__taskq_init_ent(taskq_ent_t *t)
taskq_init_ent(taskq_ent_t *t)
{
spin_lock_init(&t->tqent_lock);
init_waitqueue_head(&t->tqent_waitq);
init_timer(&t->tqent_timer);
INIT_LIST_HEAD(&t->tqent_list);
t->tqent_id = 0;
t->tqent_func = NULL;
t->tqent_arg = NULL;
t->tqent_flags = 0;
t->tqent_taskq = NULL;
}
EXPORT_SYMBOL(__taskq_init_ent);
/*
* Returns the lowest incomplete taskqid_t. The taskqid_t may
* be queued on the pending list, on the priority list, or on
* the work list currently being handled, but it is not 100%
* complete yet.
*/
static taskqid_t
taskq_lowest_id(taskq_t *tq)
{
taskqid_t lowest_id = tq->tq_next_id;
taskq_ent_t *t;
taskq_thread_t *tqt;
SENTRY;
ASSERT(tq);
ASSERT(spin_is_locked(&tq->tq_lock));
if (!list_empty(&tq->tq_pend_list)) {
t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);
lowest_id = MIN(lowest_id, t->tqent_id);
}
if (!list_empty(&tq->tq_prio_list)) {
t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);
lowest_id = MIN(lowest_id, t->tqent_id);
}
if (!list_empty(&tq->tq_active_list)) {
tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,
tqt_active_list);
ASSERT(tqt->tqt_id != 0);
lowest_id = MIN(lowest_id, tqt->tqt_id);
}
SRETURN(lowest_id);
}
/*
* Insert a task into a list keeping the list sorted by increasing
* taskqid.
*/
static void
taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)
{
taskq_thread_t *w;
struct list_head *l;
SENTRY;
ASSERT(tq);
ASSERT(tqt);
ASSERT(spin_is_locked(&tq->tq_lock));
list_for_each_prev(l, &tq->tq_active_list) {
w = list_entry(l, taskq_thread_t, tqt_active_list);
if (w->tqt_id < tqt->tqt_id) {
list_add(&tqt->tqt_active_list, l);
break;
}
}
if (l == &tq->tq_active_list)
list_add(&tqt->tqt_active_list, &tq->tq_active_list);
SEXIT;
}
EXPORT_SYMBOL(taskq_init_ent);
static int
taskq_thread(void *args)
@ -482,6 +737,7 @@ taskq_thread(void *args)
* preallocated taskq_ent_t, tqent_id must be
* stored prior to executing tqent_func. */
tqt->tqt_id = t->tqent_id;
tqt->tqt_task = t;
/* We must store a copy of the flags prior to
* servicing the task (servicing a prealloc'd task
@ -500,6 +756,7 @@ taskq_thread(void *args)
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
tq->tq_nactive--;
list_del_init(&tqt->tqt_active_list);
tqt->tqt_task = NULL;
/* For prealloc'd tasks, we don't free anything. */
if ((tq->tq_flags & TASKQ_DYNAMIC) ||
@ -533,7 +790,7 @@ taskq_thread(void *args)
}
taskq_t *
__taskq_create(const char *name, int nthreads, pri_t pri,
taskq_create(const char *name, int nthreads, pri_t pri,
int minalloc, int maxalloc, uint_t flags)
{
taskq_t *tq;
@ -577,6 +834,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
INIT_LIST_HEAD(&tq->tq_free_list);
INIT_LIST_HEAD(&tq->tq_pend_list);
INIT_LIST_HEAD(&tq->tq_prio_list);
INIT_LIST_HEAD(&tq->tq_delay_list);
init_waitqueue_head(&tq->tq_work_waitq);
init_waitqueue_head(&tq->tq_wait_waitq);
@ -611,16 +869,16 @@ __taskq_create(const char *name, int nthreads, pri_t pri,
wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j);
if (rc) {
__taskq_destroy(tq);
taskq_destroy(tq);
tq = NULL;
}
SRETURN(tq);
}
EXPORT_SYMBOL(__taskq_create);
EXPORT_SYMBOL(taskq_create);
void
__taskq_destroy(taskq_t *tq)
taskq_destroy(taskq_t *tq)
{
struct task_struct *thread;
taskq_thread_t *tqt;
@ -633,7 +891,7 @@ __taskq_destroy(taskq_t *tq)
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
/* TQ_ACTIVE cleared prevents new tasks being added to pending */
__taskq_wait(tq);
taskq_wait(tq);
spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags);
@ -670,6 +928,7 @@ __taskq_destroy(taskq_t *tq)
ASSERT(list_empty(&tq->tq_free_list));
ASSERT(list_empty(&tq->tq_pend_list));
ASSERT(list_empty(&tq->tq_prio_list));
ASSERT(list_empty(&tq->tq_delay_list));
spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags);
@ -677,7 +936,7 @@ __taskq_destroy(taskq_t *tq)
SEXIT;
}
EXPORT_SYMBOL(__taskq_destroy);
EXPORT_SYMBOL(taskq_destroy);
int
spl_taskq_init(void)

View File

@ -25,6 +25,7 @@
\*****************************************************************************/
#include <sys/taskq.h>
#include <sys/random.h>
#include <sys/kmem.h>
#include "splat-internal.h"
@ -63,6 +64,14 @@
#define SPLAT_TASKQ_TEST8_NAME "contention"
#define SPLAT_TASKQ_TEST8_DESC "1 queue, 100 threads, 131072 tasks"
#define SPLAT_TASKQ_TEST9_ID 0x0209
#define SPLAT_TASKQ_TEST9_NAME "delay"
#define SPLAT_TASKQ_TEST9_DESC "Delayed task execution"
#define SPLAT_TASKQ_TEST10_ID 0x020a
#define SPLAT_TASKQ_TEST10_NAME "cancel"
#define SPLAT_TASKQ_TEST10_DESC "Cancel task execution"
#define SPLAT_TASKQ_ORDER_MAX 8
#define SPLAT_TASKQ_DEPTH_MAX 16
@ -70,9 +79,10 @@
typedef struct splat_taskq_arg {
int flag;
int id;
atomic_t count;
atomic_t *count;
int order[SPLAT_TASKQ_ORDER_MAX];
unsigned int depth;
unsigned long expire;
taskq_t *tq;
taskq_ent_t *tqe;
spinlock_t lock;
@ -415,8 +425,8 @@ splat_taskq_test3(struct file *file, void *arg)
/*
* Create a taskq and dispatch a large number of tasks to the queue.
* Then use taskq_wait() to block until all the tasks complete, then
* cross check that all the tasks ran by checking tg_arg->count which
* is incremented in the task function. Finally cleanup the taskq.
* cross check that all the tasks ran by checking the shared atomic
* counter which is incremented in the task function.
*
* First we try with a large 'maxalloc' value, then we try with a small one.
* We should not drop tasks when TQ_SLEEP is used in taskq_dispatch(), even
@ -428,7 +438,7 @@ splat_taskq_test4_func(void *arg)
splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg;
ASSERT(tq_arg);
atomic_inc(&tq_arg->count);
atomic_inc(tq_arg->count);
}
static int
@ -439,6 +449,7 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc,
taskqid_t id;
splat_taskq_arg_t tq_arg;
taskq_ent_t *tqes;
atomic_t count;
int i, j, rc = 0;
tqes = kmalloc(sizeof(*tqes) * nr_tasks, GFP_KERNEL);
@ -461,9 +472,10 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc,
tq_arg.file = file;
tq_arg.name = SPLAT_TASKQ_TEST4_NAME;
tq_arg.count = &count;
for (i = 1; i <= nr_tasks; i *= 2) {
atomic_set(&tq_arg.count, 0);
atomic_set(tq_arg.count, 0);
splat_vprint(file, SPLAT_TASKQ_TEST4_NAME,
"Taskq '%s' function '%s' dispatched %d times\n",
tq_arg.name, sym2str(splat_taskq_test4_func), i);
@ -495,8 +507,8 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc,
taskq_wait(tq);
splat_vprint(file, SPLAT_TASKQ_TEST4_NAME, "Taskq '%s' "
"%d/%d dispatches finished\n", tq_arg.name,
atomic_read(&tq_arg.count), i);
if (atomic_read(&tq_arg.count) != i) {
atomic_read(&count), i);
if (atomic_read(&count) != i) {
rc = -ERANGE;
goto out;
@ -548,10 +560,10 @@ splat_taskq_test4(struct file *file, void *arg)
* next pending task as soon as it completes its current task. This
* means that tasks do not strictly complete in order in which they
* were dispatched (increasing task id). This is fine but we need to
* verify that taskq_wait_id() blocks until the passed task id and all
* verify that taskq_wait_all() blocks until the passed task id and all
* lower task ids complete. We do this by dispatching the following
* specific sequence of tasks each of which block for N time units.
* We then use taskq_wait_id() to unblock at specific task id and
* We then use taskq_wait_all() to unblock at specific task id and
* verify the only the expected task ids have completed and in the
* correct order. The two cases of interest are:
*
@ -562,17 +574,17 @@ splat_taskq_test4(struct file *file, void *arg)
*
* The following table shows each task id and how they will be
* scheduled. Each rows represent one time unit and each column
* one of the three worker threads. The places taskq_wait_id()
* one of the three worker threads. The places taskq_wait_all()
* must unblock for a specific id are identified as well as the
* task ids which must have completed and their order.
*
* +-----+ <--- taskq_wait_id(tq, 8) unblocks
* +-----+ <--- taskq_wait_all(tq, 8) unblocks
* | | Required Completion Order: 1,2,4,5,3,8,6,7
* +-----+ |
* | | |
* | | +-----+
* | | | 8 |
* | | +-----+ <--- taskq_wait_id(tq, 3) unblocks
* | | +-----+ <--- taskq_wait_all(tq, 3) unblocks
* | | 7 | | Required Completion Order: 1,2,4,5,3
* | +-----+ |
* | 6 | | |
@ -657,9 +669,12 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc)
splat_taskq_arg_t tq_arg;
int order1[SPLAT_TASKQ_ORDER_MAX] = { 1,2,4,5,3,0,0,0 };
int order2[SPLAT_TASKQ_ORDER_MAX] = { 1,2,4,5,3,8,6,7 };
taskq_ent_t tqes[SPLAT_TASKQ_ORDER_MAX];
taskq_ent_t *tqes;
int i, rc = 0;
tqes = kmem_alloc(sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX, KM_SLEEP);
memset(tqes, 0, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX);
splat_vprint(file, SPLAT_TASKQ_TEST5_NAME,
"Taskq '%s' creating (%s dispatch)\n",
SPLAT_TASKQ_TEST5_NAME,
@ -712,13 +727,13 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc)
splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' "
"waiting for taskqid %d completion\n", tq_arg.name, 3);
taskq_wait_id(tq, 3);
taskq_wait_all(tq, 3);
if ((rc = splat_taskq_test_order(&tq_arg, order1)))
goto out;
splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' "
"waiting for taskqid %d completion\n", tq_arg.name, 8);
taskq_wait_id(tq, 8);
taskq_wait_all(tq, 8);
rc = splat_taskq_test_order(&tq_arg, order2);
out:
@ -726,6 +741,8 @@ out:
"Taskq '%s' destroying\n", tq_arg.name);
taskq_destroy(tq);
kmem_free(tqes, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX);
return rc;
}
@ -811,10 +828,13 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc)
splat_taskq_id_t tq_id[SPLAT_TASKQ_ORDER_MAX];
splat_taskq_arg_t tq_arg;
int order[SPLAT_TASKQ_ORDER_MAX] = { 1,2,3,6,7,8,4,5 };
taskq_ent_t tqes[SPLAT_TASKQ_ORDER_MAX];
taskq_ent_t *tqes;
int i, rc = 0;
uint_t tflags;
tqes = kmem_alloc(sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX, KM_SLEEP);
memset(tqes, 0, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX);
splat_vprint(file, SPLAT_TASKQ_TEST6_NAME,
"Taskq '%s' creating (%s dispatch)\n",
SPLAT_TASKQ_TEST6_NAME,
@ -874,7 +894,7 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc)
splat_vprint(file, SPLAT_TASKQ_TEST6_NAME, "Taskq '%s' "
"waiting for taskqid %d completion\n", tq_arg.name,
SPLAT_TASKQ_ORDER_MAX);
taskq_wait_id(tq, SPLAT_TASKQ_ORDER_MAX);
taskq_wait_all(tq, SPLAT_TASKQ_ORDER_MAX);
rc = splat_taskq_test_order(&tq_arg, order);
out:
@ -882,6 +902,8 @@ out:
"Taskq '%s' destroying\n", tq_arg.name);
taskq_destroy(tq);
kmem_free(tqes, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX);
return rc;
}
@ -975,7 +997,7 @@ splat_taskq_test7_impl(struct file *file, void *arg, boolean_t prealloc)
if (tq_arg.flag == 0) {
splat_vprint(file, SPLAT_TASKQ_TEST7_NAME,
"Taskq '%s' waiting\n", tq_arg.name);
taskq_wait_id(tq, SPLAT_TASKQ_DEPTH_MAX);
taskq_wait_all(tq, SPLAT_TASKQ_DEPTH_MAX);
}
splat_vprint(file, SPLAT_TASKQ_TEST7_NAME,
@ -1011,7 +1033,7 @@ splat_taskq_test8_func(void *arg)
splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg;
ASSERT(tq_arg);
atomic_inc(&tq_arg->count);
atomic_inc(tq_arg->count);
}
#define TEST8_NUM_TASKS 0x20000
@ -1025,6 +1047,7 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc,
taskqid_t id;
splat_taskq_arg_t tq_arg;
taskq_ent_t **tqes;
atomic_t count;
int i, j, rc = 0;
tqes = vmalloc(sizeof(*tqes) * TEST8_NUM_TASKS);
@ -1048,8 +1071,9 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc,
tq_arg.file = file;
tq_arg.name = SPLAT_TASKQ_TEST8_NAME;
tq_arg.count = &count;
atomic_set(tq_arg.count, 0);
atomic_set(&tq_arg.count, 0);
for (i = 0; i < TEST8_NUM_TASKS; i++) {
tqes[i] = kmalloc(sizeof(taskq_ent_t), GFP_KERNEL);
if (tqes[i] == NULL) {
@ -1079,9 +1103,9 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc,
taskq_wait(tq);
splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' "
"%d/%d dispatches finished\n", tq_arg.name,
atomic_read(&tq_arg.count), TEST8_NUM_TASKS);
atomic_read(tq_arg.count), TEST8_NUM_TASKS);
if (atomic_read(&tq_arg.count) != TEST8_NUM_TASKS)
if (atomic_read(tq_arg.count) != TEST8_NUM_TASKS)
rc = -ERANGE;
out:
@ -1106,6 +1130,271 @@ splat_taskq_test8(struct file *file, void *arg)
return rc;
}
/*
* Create a taskq and dispatch a number of delayed tasks to the queue.
* For each task verify that it was run no early than requested.
*/
static void
splat_taskq_test9_func(void *arg)
{
splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg;
ASSERT(tq_arg);
if (ddi_get_lbolt() >= tq_arg->expire)
atomic_inc(tq_arg->count);
kmem_free(tq_arg, sizeof(splat_taskq_arg_t));
}
static int
splat_taskq_test9(struct file *file, void *arg)
{
taskq_t *tq;
atomic_t count;
int i, rc = 0;
int minalloc = 1;
int maxalloc = 10;
int nr_tasks = 100;
splat_vprint(file, SPLAT_TASKQ_TEST9_NAME,
"Taskq '%s' creating (%s dispatch) (%d/%d/%d)\n",
SPLAT_TASKQ_TEST9_NAME, "delay", minalloc, maxalloc, nr_tasks);
if ((tq = taskq_create(SPLAT_TASKQ_TEST9_NAME, 3, maxclsyspri,
minalloc, maxalloc, TASKQ_PREPOPULATE)) == NULL) {
splat_vprint(file, SPLAT_TASKQ_TEST9_NAME,
"Taskq '%s' create failed\n", SPLAT_TASKQ_TEST9_NAME);
return -EINVAL;
}
atomic_set(&count, 0);
for (i = 1; i <= nr_tasks; i++) {
splat_taskq_arg_t *tq_arg;
taskqid_t id;
uint32_t rnd;
/* A random timeout in jiffies of at most 5 seconds */
get_random_bytes((void *)&rnd, 4);
rnd = rnd % (5 * HZ);
tq_arg = kmem_alloc(sizeof(splat_taskq_arg_t), KM_SLEEP);
tq_arg->file = file;
tq_arg->name = SPLAT_TASKQ_TEST9_NAME;
tq_arg->expire = ddi_get_lbolt() + rnd;
tq_arg->count = &count;
splat_vprint(file, SPLAT_TASKQ_TEST9_NAME,
"Taskq '%s' delay dispatch %u jiffies\n",
SPLAT_TASKQ_TEST9_NAME, rnd);
id = taskq_dispatch_delay(tq, splat_taskq_test9_func,
tq_arg, TQ_SLEEP, ddi_get_lbolt() + rnd);
if (id == 0) {
splat_vprint(file, SPLAT_TASKQ_TEST9_NAME,
"Taskq '%s' delay dispatch failed\n",
SPLAT_TASKQ_TEST9_NAME);
kmem_free(tq_arg, sizeof(splat_taskq_arg_t));
taskq_wait(tq);
rc = -EINVAL;
goto out;
}
}
splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' waiting for "
"%d delay dispatches\n", SPLAT_TASKQ_TEST9_NAME, nr_tasks);
taskq_wait(tq);
if (atomic_read(&count) != nr_tasks)
rc = -ERANGE;
splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' %d/%d delay "
"dispatches finished on time\n", SPLAT_TASKQ_TEST9_NAME,
atomic_read(&count), nr_tasks);
splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' destroying\n",
SPLAT_TASKQ_TEST9_NAME);
out:
taskq_destroy(tq);
return rc;
}
/*
* Create a taskq and dispatch then cancel tasks in the queue.
*/
static void
splat_taskq_test10_func(void *arg)
{
splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg;
uint8_t rnd;
if (ddi_get_lbolt() >= tq_arg->expire)
atomic_inc(tq_arg->count);
/* Randomly sleep to further perturb the system */
get_random_bytes((void *)&rnd, 1);
msleep(1 + (rnd % 9));
}
static int
splat_taskq_test10(struct file *file, void *arg)
{
taskq_t *tq;
splat_taskq_arg_t **tqas;
atomic_t count;
int i, j, rc = 0;
int minalloc = 1;
int maxalloc = 10;
int nr_tasks = 100;
int canceled = 0;
int completed = 0;
int blocked = 0;
unsigned long start, cancel;
tqas = vmalloc(sizeof(*tqas) * nr_tasks);
if (tqas == NULL)
return -ENOMEM;
memset(tqas, 0, sizeof(*tqas) * nr_tasks);
splat_vprint(file, SPLAT_TASKQ_TEST10_NAME,
"Taskq '%s' creating (%s dispatch) (%d/%d/%d)\n",
SPLAT_TASKQ_TEST10_NAME, "delay", minalloc, maxalloc, nr_tasks);
if ((tq = taskq_create(SPLAT_TASKQ_TEST10_NAME, 3, maxclsyspri,
minalloc, maxalloc, TASKQ_PREPOPULATE)) == NULL) {
splat_vprint(file, SPLAT_TASKQ_TEST10_NAME,
"Taskq '%s' create failed\n", SPLAT_TASKQ_TEST10_NAME);
rc = -EINVAL;
goto out_free;
}
atomic_set(&count, 0);
for (i = 0; i < nr_tasks; i++) {
splat_taskq_arg_t *tq_arg;
uint32_t rnd;
/* A random timeout in jiffies of at most 5 seconds */
get_random_bytes((void *)&rnd, 4);
rnd = rnd % (5 * HZ);
tq_arg = kmem_alloc(sizeof(splat_taskq_arg_t), KM_SLEEP);
tq_arg->file = file;
tq_arg->name = SPLAT_TASKQ_TEST10_NAME;
tq_arg->count = &count;
tqas[i] = tq_arg;
/*
* Dispatch every 1/3 one immediately to mix it up, the cancel
* code is inherently racy and we want to try and provoke any
* subtle concurrently issues.
*/
if ((i % 3) == 0) {
tq_arg->expire = ddi_get_lbolt();
tq_arg->id = taskq_dispatch(tq, splat_taskq_test10_func,
tq_arg, TQ_SLEEP);
} else {
tq_arg->expire = ddi_get_lbolt() + rnd;
tq_arg->id = taskq_dispatch_delay(tq,
splat_taskq_test10_func,
tq_arg, TQ_SLEEP, ddi_get_lbolt() + rnd);
}
if (tq_arg->id == 0) {
splat_vprint(file, SPLAT_TASKQ_TEST10_NAME,
"Taskq '%s' dispatch failed\n",
SPLAT_TASKQ_TEST10_NAME);
kmem_free(tq_arg, sizeof(splat_taskq_arg_t));
taskq_wait(tq);
rc = -EINVAL;
goto out;
} else {
splat_vprint(file, SPLAT_TASKQ_TEST10_NAME,
"Taskq '%s' dispatch %lu in %lu jiffies\n",
SPLAT_TASKQ_TEST10_NAME, (unsigned long)tq_arg->id,
!(i % 3) ? 0 : tq_arg->expire - ddi_get_lbolt());
}
}
/*
* Start randomly canceling tasks for the duration of the test. We
* happen to know the valid task id's will be in the range 1..nr_tasks
* because the taskq is private and was just created. However, we
* have no idea of a particular task has already executed or not.
*/
splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' randomly "
"canceling task ids\n", SPLAT_TASKQ_TEST10_NAME);
start = ddi_get_lbolt();
i = 0;
while (ddi_get_lbolt() < start + 5 * HZ) {
taskqid_t id;
uint32_t rnd;
i++;
cancel = ddi_get_lbolt();
get_random_bytes((void *)&rnd, 4);
id = 1 + (rnd % nr_tasks);
rc = taskq_cancel_id(tq, id);
/*
* Keep track of the results of the random cancels.
*/
if (rc == 0) {
canceled++;
} else if (rc == ENOENT) {
completed++;
} else if (rc == EBUSY) {
blocked++;
} else {
rc = -EINVAL;
break;
}
/*
* Verify we never get blocked to long in taskq_cancel_id().
* The worst case is 10ms if we happen to cancel the task
* which is currently executing. We allow a factor of 2x.
*/
if (ddi_get_lbolt() - cancel > HZ / 50) {
splat_vprint(file, SPLAT_TASKQ_TEST10_NAME,
"Taskq '%s' cancel for %lu took %lu\n",
SPLAT_TASKQ_TEST10_NAME, (unsigned long)id,
ddi_get_lbolt() - cancel);
rc = -ETIMEDOUT;
break;
}
get_random_bytes((void *)&rnd, 4);
msleep(1 + (rnd % 100));
rc = 0;
}
taskq_wait(tq);
/*
* Cross check the results of taskq_cancel_id() with the number of
* times the dispatched function actually ran successfully.
*/
if ((rc == 0) && (nr_tasks - canceled != atomic_read(&count)))
rc = -EDOM;
splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' %d attempts, "
"%d canceled, %d completed, %d blocked, %d/%d tasks run\n",
SPLAT_TASKQ_TEST10_NAME, i, canceled, completed, blocked,
atomic_read(&count), nr_tasks);
splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' destroying %d\n",
SPLAT_TASKQ_TEST10_NAME, rc);
out:
taskq_destroy(tq);
out_free:
for (j = 0; j < nr_tasks && tqas[j] != NULL; j++)
kmem_free(tqas[j], sizeof(splat_taskq_arg_t));
vfree(tqas);
return rc;
}
splat_subsystem_t *
splat_taskq_init(void)
{
@ -1139,6 +1428,10 @@ splat_taskq_init(void)
SPLAT_TASKQ_TEST7_ID, splat_taskq_test7);
SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST8_NAME, SPLAT_TASKQ_TEST8_DESC,
SPLAT_TASKQ_TEST8_ID, splat_taskq_test8);
SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST9_NAME, SPLAT_TASKQ_TEST9_DESC,
SPLAT_TASKQ_TEST9_ID, splat_taskq_test9);
SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST10_NAME, SPLAT_TASKQ_TEST10_DESC,
SPLAT_TASKQ_TEST10_ID, splat_taskq_test10);
return sub;
}
@ -1147,6 +1440,8 @@ void
splat_taskq_fini(splat_subsystem_t *sub)
{
ASSERT(sub);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST10_ID);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST7_ID);
SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST6_ID);