diff --git a/config/spl-build.m4 b/config/spl-build.m4 index e8ecbc6547..ea25e206f1 100644 --- a/config/spl-build.m4 +++ b/config/spl-build.m4 @@ -26,7 +26,6 @@ AC_DEFUN([SPL_AC_CONFIG_KERNEL], [ SPL_AC_TYPE_ATOMIC64_CMPXCHG SPL_AC_TYPE_ATOMIC64_XCHG SPL_AC_TYPE_UINTPTR_T - SPL_AC_3ARGS_INIT_WORK SPL_AC_2ARGS_REGISTER_SYSCTL SPL_AC_SET_SHRINKER SPL_AC_3ARGS_SHRINKER_CALLBACK @@ -870,26 +869,6 @@ AC_DEFUN([SPL_AC_TYPE_UINTPTR_T], ]) ]) -dnl # -dnl # 2.6.20 API change, -dnl # INIT_WORK use 2 args and not store data inside -dnl # -AC_DEFUN([SPL_AC_3ARGS_INIT_WORK], - [AC_MSG_CHECKING([whether INIT_WORK wants 3 args]) - SPL_LINUX_TRY_COMPILE([ - #include - ],[ - struct work_struct work __attribute__ ((unused)); - INIT_WORK(&work, NULL, NULL); - ],[ - AC_MSG_RESULT(yes) - AC_DEFINE(HAVE_3ARGS_INIT_WORK, 1, - [INIT_WORK wants 3 args]) - ],[ - AC_MSG_RESULT(no) - ]) -]) - dnl # dnl # 2.6.21 API change, dnl # 'register_sysctl_table' use only one argument instead of two diff --git a/include/linux/workqueue_compat.h b/include/linux/workqueue_compat.h deleted file mode 100644 index a92800ce51..0000000000 --- a/include/linux/workqueue_compat.h +++ /dev/null @@ -1,49 +0,0 @@ -/*****************************************************************************\ - * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC. - * Copyright (C) 2007 The Regents of the University of California. - * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Brian Behlendorf . - * UCRL-CODE-235197 - * - * This file is part of the SPL, Solaris Porting Layer. - * For details, see . - * - * The SPL is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * The SPL is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * for more details. - * - * You should have received a copy of the GNU General Public License along - * with the SPL. If not, see . -\*****************************************************************************/ - -#ifndef _SPL_WORKQUEUE_COMPAT_H -#define _SPL_WORKQUEUE_COMPAT_H - -#include -#include - -#ifdef HAVE_3ARGS_INIT_WORK - -#define delayed_work work_struct - -#define spl_init_work(wq, cb, d) INIT_WORK((wq), (void *)(cb), \ - (void *)(d)) -#define spl_init_delayed_work(wq,cb,d) INIT_WORK((wq), (void *)(cb), \ - (void *)(d)) -#define spl_get_work_data(d, t, f) (t *)(d) - -#else - -#define spl_init_work(wq, cb, d) INIT_WORK((wq), (void *)(cb)); -#define spl_init_delayed_work(wq,cb,d) INIT_DELAYED_WORK((wq), (void *)(cb)); -#define spl_get_work_data(d, t, f) (t *)container_of(d, t, f) - -#endif /* HAVE_3ARGS_INIT_WORK */ - -#endif /* _SPL_WORKQUEUE_COMPAT_H */ diff --git a/include/sys/kmem.h b/include/sys/kmem.h index 83adc8d2a3..6904bec3f1 100644 --- a/include/sys/kmem.h +++ b/include/sys/kmem.h @@ -37,6 +37,7 @@ #include #include #include +#include /* * Memory allocation interfaces @@ -406,7 +407,6 @@ typedef struct spl_kmem_magazine { uint32_t skm_size; /* Magazine size */ uint32_t skm_refill; /* Batch refill size */ struct spl_kmem_cache *skm_cache; /* Owned by cache */ - struct delayed_work skm_work; /* Magazine reclaim work */ unsigned long skm_age; /* Last cache access */ unsigned int skm_cpu; /* Owned by cpu */ void *skm_objs[0]; /* Object pointers */ @@ -432,7 +432,7 @@ typedef struct spl_kmem_slab { typedef struct spl_kmem_alloc { struct spl_kmem_cache *ska_cache; /* Owned by cache */ int ska_flags; /* Allocation flags */ - struct delayed_work ska_work; /* Allocation work */ + taskq_ent_t ska_tqe; /* Task queue entry */ } spl_kmem_alloc_t; typedef struct spl_kmem_emergency { @@ -460,7 +460,7 @@ typedef struct spl_kmem_cache { uint32_t skc_delay; /* Slab reclaim interval */ uint32_t skc_reap; /* Slab reclaim count */ atomic_t skc_ref; /* Ref count callers */ - struct delayed_work skc_work; /* Slab reclaim work */ + taskqid_t skc_taskqid; /* Slab reclaim task */ struct list_head skc_list; /* List of caches linkage */ struct list_head skc_complete_list;/* Completely alloc'ed */ struct list_head skc_partial_list; /* Partially alloc'ed */ diff --git a/include/sys/taskq.h b/include/sys/taskq.h index 6b09bdf1b2..3839de2885 100644 --- a/include/sys/taskq.h +++ b/include/sys/taskq.h @@ -33,101 +33,103 @@ #include #include -#define TASKQ_NAMELEN 31 +#define TASKQ_NAMELEN 31 -#define TASKQ_PREPOPULATE 0x00000001 -#define TASKQ_CPR_SAFE 0x00000002 -#define TASKQ_DYNAMIC 0x00000004 -#define TASKQ_THREADS_CPU_PCT 0x00000008 -#define TASKQ_DC_BATCH 0x00000010 - -typedef unsigned long taskqid_t; -typedef void (task_func_t)(void *); - -typedef struct taskq_ent { - spinlock_t tqent_lock; - struct list_head tqent_list; - taskqid_t tqent_id; - task_func_t *tqent_func; - void *tqent_arg; - uintptr_t tqent_flags; -} taskq_ent_t; - -#define TQENT_FLAG_PREALLOC 0x1 +#define TASKQ_PREPOPULATE 0x00000001 +#define TASKQ_CPR_SAFE 0x00000002 +#define TASKQ_DYNAMIC 0x00000004 +#define TASKQ_THREADS_CPU_PCT 0x00000008 +#define TASKQ_DC_BATCH 0x00000010 /* * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as * KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly * large so as not to conflict with already used GFP_* defines. */ -#define TQ_SLEEP 0x00000000 -#define TQ_NOSLEEP 0x00000001 -#define TQ_PUSHPAGE 0x00000002 -#define TQ_NOQUEUE 0x01000000 -#define TQ_NOALLOC 0x02000000 -#define TQ_NEW 0x04000000 -#define TQ_FRONT 0x08000000 -#define TQ_ACTIVE 0x80000000 +#define TQ_SLEEP 0x00000000 +#define TQ_NOSLEEP 0x00000001 +#define TQ_PUSHPAGE 0x00000002 +#define TQ_NOQUEUE 0x01000000 +#define TQ_NOALLOC 0x02000000 +#define TQ_NEW 0x04000000 +#define TQ_FRONT 0x08000000 +#define TQ_ACTIVE 0x80000000 + +typedef unsigned long taskqid_t; +typedef void (task_func_t)(void *); typedef struct taskq { - spinlock_t tq_lock; /* protects taskq_t */ - unsigned long tq_lock_flags; /* interrupt state */ - const char *tq_name; /* taskq name */ - struct list_head tq_thread_list;/* list of all threads */ - struct list_head tq_active_list;/* list of active threads */ - int tq_nactive; /* # of active threads */ - int tq_nthreads; /* # of total threads */ - int tq_pri; /* priority */ - int tq_minalloc; /* min task_t pool size */ - int tq_maxalloc; /* max task_t pool size */ - int tq_nalloc; /* cur task_t pool size */ - uint_t tq_flags; /* flags */ - taskqid_t tq_next_id; /* next pend/work id */ - taskqid_t tq_lowest_id; /* lowest pend/work id */ - struct list_head tq_free_list; /* free task_t's */ - struct list_head tq_pend_list; /* pending task_t's */ - struct list_head tq_prio_list; /* priority pending task_t's */ - wait_queue_head_t tq_work_waitq; /* new work waitq */ - wait_queue_head_t tq_wait_waitq; /* wait waitq */ + spinlock_t tq_lock; /* protects taskq_t */ + unsigned long tq_lock_flags; /* interrupt state */ + const char *tq_name; /* taskq name */ + struct list_head tq_thread_list;/* list of all threads */ + struct list_head tq_active_list;/* list of active threads */ + int tq_nactive; /* # of active threads */ + int tq_nthreads; /* # of total threads */ + int tq_pri; /* priority */ + int tq_minalloc; /* min task_t pool size */ + int tq_maxalloc; /* max task_t pool size */ + int tq_nalloc; /* cur task_t pool size */ + uint_t tq_flags; /* flags */ + taskqid_t tq_next_id; /* next pend/work id */ + taskqid_t tq_lowest_id; /* lowest pend/work id */ + struct list_head tq_free_list; /* free task_t's */ + struct list_head tq_pend_list; /* pending task_t's */ + struct list_head tq_prio_list; /* priority pending task_t's */ + struct list_head tq_delay_list; /* delayed task_t's */ + wait_queue_head_t tq_work_waitq; /* new work waitq */ + wait_queue_head_t tq_wait_waitq; /* wait waitq */ } taskq_t; +typedef struct taskq_ent { + spinlock_t tqent_lock; + wait_queue_head_t tqent_waitq; + struct timer_list tqent_timer; + struct list_head tqent_list; + taskqid_t tqent_id; + task_func_t *tqent_func; + void *tqent_arg; + taskq_t *tqent_taskq; + uintptr_t tqent_flags; +} taskq_ent_t; + +#define TQENT_FLAG_PREALLOC 0x1 +#define TQENT_FLAG_CANCEL 0x2 + typedef struct taskq_thread { - struct list_head tqt_thread_list; - struct list_head tqt_active_list; - struct task_struct *tqt_thread; - taskq_t *tqt_tq; - taskqid_t tqt_id; - uintptr_t tqt_flags; + struct list_head tqt_thread_list; + struct list_head tqt_active_list; + struct task_struct *tqt_thread; + taskq_t *tqt_tq; + taskqid_t tqt_id; + taskq_ent_t *tqt_task; + uintptr_t tqt_flags; } taskq_thread_t; /* Global system-wide dynamic task queue available for all consumers */ extern taskq_t *system_taskq; -extern taskqid_t __taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); -extern void __taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *); -extern int __taskq_empty_ent(taskq_ent_t *); -extern void __taskq_init_ent(taskq_ent_t *); -extern taskq_t *__taskq_create(const char *, int, pri_t, int, int, uint_t); -extern void __taskq_destroy(taskq_t *); -extern void __taskq_wait_id(taskq_t *, taskqid_t); -extern void __taskq_wait(taskq_t *); -extern int __taskq_member(taskq_t *, void *); +extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); +extern taskqid_t taskq_dispatch_delay(taskq_t *, task_func_t, void *, + uint_t, clock_t); +extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, + taskq_ent_t *); +extern int taskq_empty_ent(taskq_ent_t *); +extern void taskq_init_ent(taskq_ent_t *); +extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t); +extern void taskq_destroy(taskq_t *); +extern void taskq_wait_id(taskq_t *, taskqid_t); +extern void taskq_wait_all(taskq_t *, taskqid_t); +extern void taskq_wait(taskq_t *); +extern int taskq_cancel_id(taskq_t *, taskqid_t); +extern int taskq_member(taskq_t *, void *); + +#define taskq_create_proc(name, nthreads, pri, min, max, proc, flags) \ + taskq_create(name, nthreads, pri, min, max, flags) +#define taskq_create_sysdc(name, nthreads, min, max, proc, dc, flags) \ + taskq_create(name, nthreads, maxclsyspri, min, max, flags) int spl_taskq_init(void); void spl_taskq_fini(void); -#define taskq_member(tq, t) __taskq_member(tq, t) -#define taskq_wait_id(tq, id) __taskq_wait_id(tq, id) -#define taskq_wait(tq) __taskq_wait(tq) -#define taskq_dispatch(tq, f, p, fl) __taskq_dispatch(tq, f, p, fl) -#define taskq_dispatch_ent(tq, f, p, fl, t) __taskq_dispatch_ent(tq, f, p, fl, t) -#define taskq_empty_ent(t) __taskq_empty_ent(t) -#define taskq_init_ent(t) __taskq_init_ent(t) -#define taskq_create(n, th, p, mi, ma, fl) __taskq_create(n, th, p, mi, ma, fl) -#define taskq_create_proc(n, th, p, mi, ma, pr, fl) \ - __taskq_create(n, th, p, mi, ma, fl) -#define taskq_create_sysdc(n, th, mi, ma, pr, dc, fl) \ - __taskq_create(n, th, maxclsyspri, mi, ma, fl) -#define taskq_destroy(tq) __taskq_destroy(tq) - #endif /* _SPL_TASKQ_H */ diff --git a/include/sys/types.h b/include/sys/types.h index 35905eb973..b867be111f 100644 --- a/include/sys/types.h +++ b/include/sys/types.h @@ -34,7 +34,6 @@ #include #include #include -#include #include #include #include diff --git a/module/spl/spl-kmem.c b/module/spl/spl-kmem.c index b171d446ae..bc08a55982 100644 --- a/module/spl/spl-kmem.c +++ b/module/spl/spl-kmem.c @@ -825,6 +825,7 @@ EXPORT_SYMBOL(vmem_free_debug); struct list_head spl_kmem_cache_list; /* List of caches */ struct rw_semaphore spl_kmem_cache_sem; /* Cache list lock */ +taskq_t *spl_kmem_cache_taskq; /* Task queue for ageing / reclaim */ static int spl_cache_flush(spl_kmem_cache_t *skc, spl_kmem_magazine_t *skm, int flush); @@ -1243,50 +1244,59 @@ spl_emergency_free(spl_kmem_cache_t *skc, void *obj) SRETURN(0); } -/* - * Called regularly on all caches to age objects out of the magazines - * which have not been access in skc->skc_delay seconds. This prevents - * idle magazines from holding memory which might be better used by - * other caches or parts of the system. The delay is present to - * prevent thrashing the magazine. - */ static void spl_magazine_age(void *data) { - spl_kmem_magazine_t *skm = - spl_get_work_data(data, spl_kmem_magazine_t, skm_work.work); - spl_kmem_cache_t *skc = skm->skm_cache; + spl_kmem_cache_t *skc = (spl_kmem_cache_t *)data; + spl_kmem_magazine_t *skm = skc->skc_mag[smp_processor_id()]; ASSERT(skm->skm_magic == SKM_MAGIC); - ASSERT(skc->skc_magic == SKC_MAGIC); - ASSERT(skc->skc_mag[skm->skm_cpu] == skm); + ASSERT(skm->skm_cpu == smp_processor_id()); - if (skm->skm_avail > 0 && - time_after(jiffies, skm->skm_age + skc->skc_delay * HZ)) - (void)spl_cache_flush(skc, skm, skm->skm_refill); - - if (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags)) - schedule_delayed_work_on(skm->skm_cpu, &skm->skm_work, - skc->skc_delay / 3 * HZ); + if (skm->skm_avail > 0) + if (time_after(jiffies, skm->skm_age + skc->skc_delay * HZ)) + (void) spl_cache_flush(skc, skm, skm->skm_refill); } /* - * Called regularly to keep a downward pressure on the size of idle - * magazines and to release free slabs from the cache. This function - * never calls the registered reclaim function, that only occurs - * under memory pressure or with a direct call to spl_kmem_reap(). + * Called regularly to keep a downward pressure on the cache. + * + * Objects older than skc->skc_delay seconds in the per-cpu magazines will + * be returned to the caches. This is done to prevent idle magazines from + * holding memory which could be better used elsewhere. The delay is + * present to prevent thrashing the magazine. + * + * The newly released objects may result in empty partial slabs. Those + * slabs should be released to the system. Otherwise moving the objects + * out of the magazines is just wasted work. */ static void spl_cache_age(void *data) { - spl_kmem_cache_t *skc = - spl_get_work_data(data, spl_kmem_cache_t, skc_work.work); + spl_kmem_cache_t *skc = (spl_kmem_cache_t *)data; + taskqid_t id = 0; ASSERT(skc->skc_magic == SKC_MAGIC); + + atomic_inc(&skc->skc_ref); + spl_on_each_cpu(spl_magazine_age, skc, 1); spl_slab_reclaim(skc, skc->skc_reap, 0); - if (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags)) - schedule_delayed_work(&skc->skc_work, skc->skc_delay / 3 * HZ); + while (!test_bit(KMC_BIT_DESTROY, &skc->skc_flags) && !id) { + id = taskq_dispatch_delay( + spl_kmem_cache_taskq, spl_cache_age, skc, TQ_SLEEP, + ddi_get_lbolt() + skc->skc_delay / 3 * HZ); + + /* Destroy issued after dispatch immediately cancel it */ + if (test_bit(KMC_BIT_DESTROY, &skc->skc_flags) && id) + taskq_cancel_id(spl_kmem_cache_taskq, id); + } + + spin_lock(&skc->skc_lock); + skc->skc_taskqid = id; + spin_unlock(&skc->skc_lock); + + atomic_dec(&skc->skc_ref); } /* @@ -1380,7 +1390,6 @@ spl_magazine_alloc(spl_kmem_cache_t *skc, int cpu) skm->skm_size = skc->skc_mag_size; skm->skm_refill = skc->skc_mag_refill; skm->skm_cache = skc; - spl_init_delayed_work(&skm->skm_work, spl_magazine_age, skm); skm->skm_age = jiffies; skm->skm_cpu = cpu; } @@ -1427,11 +1436,6 @@ spl_magazine_create(spl_kmem_cache_t *skc) } } - /* Only after everything is allocated schedule magazine work */ - for_each_online_cpu(i) - schedule_delayed_work_on(i, &skc->skc_mag[i]->skm_work, - skc->skc_delay / 3 * HZ); - SRETURN(0); } @@ -1482,7 +1486,7 @@ spl_kmem_cache_create(char *name, size_t size, size_t align, void *priv, void *vmp, int flags) { spl_kmem_cache_t *skc; - int rc, kmem_flags = KM_SLEEP; + int rc; SENTRY; ASSERTF(!(flags & KMC_NOMAGAZINE), "Bad KMC_NOMAGAZINE (%x)\n", flags); @@ -1490,25 +1494,22 @@ spl_kmem_cache_create(char *name, size_t size, size_t align, ASSERTF(!(flags & KMC_QCACHE), "Bad KMC_QCACHE (%x)\n", flags); ASSERT(vmp == NULL); - /* We may be called when there is a non-zero preempt_count or - * interrupts are disabled is which case we must not sleep. - */ - if (current_thread_info()->preempt_count || irqs_disabled()) - kmem_flags = KM_NOSLEEP; + might_sleep(); - /* Allocate memory for a new cache an initialize it. Unfortunately, + /* + * Allocate memory for a new cache an initialize it. Unfortunately, * this usually ends up being a large allocation of ~32k because * we need to allocate enough memory for the worst case number of * cpus in the magazine, skc_mag[NR_CPUS]. Because of this we - * explicitly pass KM_NODEBUG to suppress the kmem warning */ - skc = (spl_kmem_cache_t *)kmem_zalloc(sizeof(*skc), - kmem_flags | KM_NODEBUG); + * explicitly pass KM_NODEBUG to suppress the kmem warning + */ + skc = kmem_zalloc(sizeof(*skc), KM_SLEEP| KM_NODEBUG); if (skc == NULL) SRETURN(NULL); skc->skc_magic = SKC_MAGIC; skc->skc_name_size = strlen(name) + 1; - skc->skc_name = (char *)kmem_alloc(skc->skc_name_size, kmem_flags); + skc->skc_name = (char *)kmem_alloc(skc->skc_name_size, KM_SLEEP); if (skc->skc_name == NULL) { kmem_free(skc, sizeof(*skc)); SRETURN(NULL); @@ -1569,8 +1570,9 @@ spl_kmem_cache_create(char *name, size_t size, size_t align, if (rc) SGOTO(out, rc); - spl_init_delayed_work(&skc->skc_work, spl_cache_age, skc); - schedule_delayed_work(&skc->skc_work, skc->skc_delay / 3 * HZ); + skc->skc_taskqid = taskq_dispatch_delay(spl_kmem_cache_taskq, + spl_cache_age, skc, TQ_SLEEP, + ddi_get_lbolt() + skc->skc_delay / 3 * HZ); down_write(&spl_kmem_cache_sem); list_add_tail(&skc->skc_list, &spl_kmem_cache_list); @@ -1603,7 +1605,7 @@ void spl_kmem_cache_destroy(spl_kmem_cache_t *skc) { DECLARE_WAIT_QUEUE_HEAD(wq); - int i; + taskqid_t id; SENTRY; ASSERT(skc->skc_magic == SKC_MAGIC); @@ -1612,13 +1614,14 @@ spl_kmem_cache_destroy(spl_kmem_cache_t *skc) list_del_init(&skc->skc_list); up_write(&spl_kmem_cache_sem); - /* Cancel any and wait for any pending delayed work */ + /* Cancel any and wait for any pending delayed tasks */ VERIFY(!test_and_set_bit(KMC_BIT_DESTROY, &skc->skc_flags)); - cancel_delayed_work_sync(&skc->skc_work); - for_each_online_cpu(i) - cancel_delayed_work_sync(&skc->skc_mag[i]->skm_work); - flush_scheduled_work(); + spin_lock(&skc->skc_lock); + id = skc->skc_taskqid; + spin_unlock(&skc->skc_lock); + + taskq_cancel_id(spl_kmem_cache_taskq, id); /* Wait until all current callers complete, this is mainly * to catch the case where a low memory situation triggers a @@ -1694,8 +1697,7 @@ spl_cache_obj(spl_kmem_cache_t *skc, spl_kmem_slab_t *sks) static void spl_cache_grow_work(void *data) { - spl_kmem_alloc_t *ska = - spl_get_work_data(data, spl_kmem_alloc_t, ska_work.work); + spl_kmem_alloc_t *ska = (spl_kmem_alloc_t *)data; spl_kmem_cache_t *skc = ska->ska_cache; spl_kmem_slab_t *sks; @@ -1774,8 +1776,9 @@ spl_cache_grow(spl_kmem_cache_t *skc, int flags, void **obj) atomic_inc(&skc->skc_ref); ska->ska_cache = skc; ska->ska_flags = flags & ~__GFP_FS; - spl_init_delayed_work(&ska->ska_work, spl_cache_grow_work, ska); - schedule_delayed_work(&ska->ska_work, 0); + taskq_init_ent(&ska->ska_tqe); + taskq_dispatch_ent(spl_kmem_cache_taskq, + spl_cache_grow_work, ska, 0, &ska->ska_tqe); } /* @@ -2397,6 +2400,8 @@ spl_kmem_init(void) init_rwsem(&spl_kmem_cache_sem); INIT_LIST_HEAD(&spl_kmem_cache_list); + spl_kmem_cache_taskq = taskq_create("spl_kmem_cache", + 1, maxclsyspri, 1, 32, TASKQ_PREPOPULATE); spl_register_shrinker(&spl_kmem_cache_shrinker); @@ -2435,6 +2440,7 @@ spl_kmem_fini(void) SENTRY; spl_unregister_shrinker(&spl_kmem_cache_shrinker); + taskq_destroy(spl_kmem_cache_taskq); SEXIT; } diff --git a/module/spl/spl-taskq.c b/module/spl/spl-taskq.c index 7ea20461b1..c9ae0a50b6 100644 --- a/module/spl/spl-taskq.c +++ b/module/spl/spl-taskq.c @@ -57,60 +57,62 @@ task_km_flags(uint_t flags) static taskq_ent_t * task_alloc(taskq_t *tq, uint_t flags) { - taskq_ent_t *t; - int count = 0; - SENTRY; + taskq_ent_t *t; + int count = 0; + SENTRY; - ASSERT(tq); - ASSERT(spin_is_locked(&tq->tq_lock)); + ASSERT(tq); + ASSERT(spin_is_locked(&tq->tq_lock)); retry: - /* Acquire taskq_ent_t's from free list if available */ - if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { - t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); + /* Acquire taskq_ent_t's from free list if available */ + if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { + t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); - ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL)); + ASSERT(!timer_pending(&t->tqent_timer)); - list_del_init(&t->tqent_list); - SRETURN(t); - } + list_del_init(&t->tqent_list); + SRETURN(t); + } - /* Free list is empty and memory allocations are prohibited */ - if (flags & TQ_NOALLOC) - SRETURN(NULL); + /* Free list is empty and memory allocations are prohibited */ + if (flags & TQ_NOALLOC) + SRETURN(NULL); - /* Hit maximum taskq_ent_t pool size */ - if (tq->tq_nalloc >= tq->tq_maxalloc) { - if (flags & TQ_NOSLEEP) - SRETURN(NULL); + /* Hit maximum taskq_ent_t pool size */ + if (tq->tq_nalloc >= tq->tq_maxalloc) { + if (flags & TQ_NOSLEEP) + SRETURN(NULL); - /* - * Sleep periodically polling the free list for an available - * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed - * but we cannot block forever waiting for an taskq_entq_t to - * show up in the free list, otherwise a deadlock can happen. - * - * Therefore, we need to allocate a new task even if the number - * of allocated tasks is above tq->tq_maxalloc, but we still - * end up delaying the task allocation by one second, thereby - * throttling the task dispatch rate. - */ - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - schedule_timeout(HZ / 100); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - if (count < 100) - SGOTO(retry, count++); - } + /* + * Sleep periodically polling the free list for an available + * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed + * but we cannot block forever waiting for an taskq_ent_t to + * show up in the free list, otherwise a deadlock can happen. + * + * Therefore, we need to allocate a new task even if the number + * of allocated tasks is above tq->tq_maxalloc, but we still + * end up delaying the task allocation by one second, thereby + * throttling the task dispatch rate. + */ + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + schedule_timeout(HZ / 100); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + if (count < 100) + SGOTO(retry, count++); + } - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - t = kmem_alloc(sizeof(taskq_ent_t), task_km_flags(flags)); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + t = kmem_alloc(sizeof(taskq_ent_t), task_km_flags(flags)); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - if (t) { - taskq_init_ent(t); - tq->tq_nalloc++; - } + if (t) { + taskq_init_ent(t); + tq->tq_nalloc++; + } - SRETURN(t); + SRETURN(t); } /* @@ -120,15 +122,16 @@ retry: static void task_free(taskq_t *tq, taskq_ent_t *t) { - SENTRY; + SENTRY; - ASSERT(tq); - ASSERT(t); + ASSERT(tq); + ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); ASSERT(list_empty(&t->tqent_list)); + ASSERT(!timer_pending(&t->tqent_timer)); - kmem_free(t, sizeof(taskq_ent_t)); - tq->tq_nalloc--; + kmem_free(t, sizeof(taskq_ent_t)); + tq->tq_nalloc--; SEXIT; } @@ -145,48 +148,261 @@ task_done(taskq_t *tq, taskq_ent_t *t) ASSERT(t); ASSERT(spin_is_locked(&tq->tq_lock)); + /* Wake tasks blocked in taskq_wait_id() */ + wake_up_all(&t->tqent_waitq); + list_del_init(&t->tqent_list); - if (tq->tq_nalloc <= tq->tq_minalloc) { + if (tq->tq_nalloc <= tq->tq_minalloc) { t->tqent_id = 0; t->tqent_func = NULL; t->tqent_arg = NULL; t->tqent_flags = 0; - list_add_tail(&t->tqent_list, &tq->tq_free_list); + list_add_tail(&t->tqent_list, &tq->tq_free_list); } else { task_free(tq, t); } - SEXIT; + SEXIT; } /* - * As tasks are submitted to the task queue they are assigned a - * monotonically increasing taskqid and added to the tail of the pending - * list. As worker threads become available the tasks are removed from - * the head of the pending or priority list, giving preference to the - * priority list. The tasks are then removed from their respective - * list, and the taskq_thread servicing the task is added to the active - * list, preserving the order using the serviced task's taskqid. - * Finally, as tasks complete the taskq_thread servicing the task is - * removed from the active list. This means that the pending task and - * active taskq_thread lists are always kept sorted by taskqid. Thus the - * lowest outstanding incomplete taskqid can be determined simply by - * checking the min taskqid for each head item on the pending, priority, - * and active taskq_thread list. This value is stored in - * tq->tq_lowest_id and only updated to the new lowest id when the - * previous lowest id completes. All taskqids lower than - * tq->tq_lowest_id must have completed. It is also possible larger - * taskqid's have completed because they may be processed in parallel by - * several worker threads. However, this is not a problem because the - * behavior of taskq_wait_id() is to block until all previously - * submitted taskqid's have completed. + * When a delayed task timer expires remove it from the delay list and + * add it to the priority list in order for immediate processing. + */ +static void +task_expire(unsigned long data) +{ + taskq_ent_t *w, *t = (taskq_ent_t *)data; + taskq_t *tq = t->tqent_taskq; + struct list_head *l; + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + if (t->tqent_flags & TQENT_FLAG_CANCEL) { + ASSERT(list_empty(&t->tqent_list)); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + return; + } + + /* + * The priority list must be maintained in strict task id order + * from lowest to highest for lowest_id to be easily calculable. + */ + list_del(&t->tqent_list); + list_for_each_prev(l, &tq->tq_prio_list) { + w = list_entry(l, taskq_ent_t, tqent_list); + if (w->tqent_id < t->tqent_id) { + list_add(&t->tqent_list, l); + break; + } + } + if (l == &tq->tq_prio_list) + list_add(&t->tqent_list, &tq->tq_prio_list); + + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + wake_up(&tq->tq_work_waitq); +} + +/* + * Returns the lowest incomplete taskqid_t. The taskqid_t may + * be queued on the pending list, on the priority list, on the + * delay list, or on the work list currently being handled, but + * it is not 100% complete yet. + */ +static taskqid_t +taskq_lowest_id(taskq_t *tq) +{ + taskqid_t lowest_id = tq->tq_next_id; + taskq_ent_t *t; + taskq_thread_t *tqt; + SENTRY; + + ASSERT(tq); + ASSERT(spin_is_locked(&tq->tq_lock)); + + if (!list_empty(&tq->tq_pend_list)) { + t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list); + lowest_id = MIN(lowest_id, t->tqent_id); + } + + if (!list_empty(&tq->tq_prio_list)) { + t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list); + lowest_id = MIN(lowest_id, t->tqent_id); + } + + if (!list_empty(&tq->tq_delay_list)) { + t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list); + lowest_id = MIN(lowest_id, t->tqent_id); + } + + if (!list_empty(&tq->tq_active_list)) { + tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, + tqt_active_list); + ASSERT(tqt->tqt_id != 0); + lowest_id = MIN(lowest_id, tqt->tqt_id); + } + + SRETURN(lowest_id); +} + +/* + * Insert a task into a list keeping the list sorted by increasing taskqid. + */ +static void +taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) +{ + taskq_thread_t *w; + struct list_head *l; + + SENTRY; + ASSERT(tq); + ASSERT(tqt); + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each_prev(l, &tq->tq_active_list) { + w = list_entry(l, taskq_thread_t, tqt_active_list); + if (w->tqt_id < tqt->tqt_id) { + list_add(&tqt->tqt_active_list, l); + break; + } + } + if (l == &tq->tq_active_list) + list_add(&tqt->tqt_active_list, &tq->tq_active_list); + + SEXIT; +} + +/* + * Find and return a task from the given list if it exists. The list + * must be in lowest to highest task id order. + */ +static taskq_ent_t * +taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id) +{ + struct list_head *l; + taskq_ent_t *t; + SENTRY; + + ASSERT(spin_is_locked(&tq->tq_lock)); + + list_for_each(l, lh) { + t = list_entry(l, taskq_ent_t, tqent_list); + + if (t->tqent_id == id) + SRETURN(t); + + if (t->tqent_id > id) + break; + } + + SRETURN(NULL); +} + +/* + * Find an already dispatched task given the task id regardless of what + * state it is in. If a task is still pending or executing it will be + * returned and 'active' set appropriately. If the task has already + * been run then NULL is returned. + */ +static taskq_ent_t * +taskq_find(taskq_t *tq, taskqid_t id, int *active) +{ + taskq_thread_t *tqt; + struct list_head *l; + taskq_ent_t *t; + SENTRY; + + ASSERT(spin_is_locked(&tq->tq_lock)); + *active = 0; + + t = taskq_find_list(tq, &tq->tq_delay_list, id); + if (t) + SRETURN(t); + + t = taskq_find_list(tq, &tq->tq_prio_list, id); + if (t) + SRETURN(t); + + t = taskq_find_list(tq, &tq->tq_pend_list, id); + if (t) + SRETURN(t); + + list_for_each(l, &tq->tq_active_list) { + tqt = list_entry(l, taskq_thread_t, tqt_active_list); + if (tqt->tqt_id == id) { + t = tqt->tqt_task; + *active = 1; + SRETURN(t); + } + } + + SRETURN(NULL); +} + +/* + * The taskq_wait_id() function blocks until the passed task id completes. + * This does not guarantee that all lower task id's have completed. + */ +void +taskq_wait_id(taskq_t *tq, taskqid_t id) +{ + DEFINE_WAIT(wait); + taskq_ent_t *t; + int active = 0; + SENTRY; + + ASSERT(tq); + ASSERT(id > 0); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + t = taskq_find(tq, id, &active); + if (t) + prepare_to_wait(&t->tqent_waitq, &wait, TASK_UNINTERRUPTIBLE); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + /* + * We rely on the kernels autoremove_wake_function() function to + * remove us from the wait queue in the context of wake_up(). + * Once woken the taskq_ent_t pointer must never be accessed. + */ + if (t) { + t = NULL; + schedule(); + __set_current_state(TASK_RUNNING); + } + + SEXIT; +} +EXPORT_SYMBOL(taskq_wait_id); + +/* + * The taskq_wait() function will block until all previously submitted + * tasks have been completed. A previously submitted task is defined as + * a task with a lower task id than the current task queue id. Note that + * all task id's are assigned monotonically at dispatch time. * - * XXX: Taskqid_t wrapping is not handled. However, taskqid_t's are - * 64-bit values so even if a taskq is processing 2^24 (16,777,216) - * taskqid_ts per second it will still take 2^40 seconds, 34,865 years, - * before the wrap occurs. I can live with that for now. + * Waiting for all previous tasks to complete is accomplished by tracking + * the lowest outstanding task id. As tasks are dispatched they are added + * added to the tail of the pending, priority, or delay lists. And as + * worker threads become available the tasks are removed from the heads + * of these lists and linked to the worker threads. This ensures the + * lists are kept in lowest to highest task id order. + * + * Therefore the lowest outstanding task id can be quickly determined by + * checking the head item from all of these lists. This value is stored + * with the task queue as the lowest id. It only needs to be recalculated + * when either the task with the current lowest id completes or is canceled. + * + * By blocking until the lowest task id exceeds the current task id when + * the function was called we ensure all previous tasks have completed. + * + * NOTE: When there are multiple worked threads it is possible for larger + * task ids to complete before smaller ones. Conversely when the task + * queue contains delay tasks with small task ids, you may block for a + * considerable length of time waiting for them to expire and execute. */ static int taskq_wait_check(taskq_t *tq, taskqid_t id) @@ -201,19 +417,14 @@ taskq_wait_check(taskq_t *tq, taskqid_t id) } void -__taskq_wait_id(taskq_t *tq, taskqid_t id) +taskq_wait_all(taskq_t *tq, taskqid_t id) { - SENTRY; - ASSERT(tq); - wait_event(tq->tq_wait_waitq, taskq_wait_check(tq, id)); - - SEXIT; } -EXPORT_SYMBOL(__taskq_wait_id); +EXPORT_SYMBOL(taskq_wait_all); void -__taskq_wait(taskq_t *tq) +taskq_wait(taskq_t *tq) { taskqid_t id; SENTRY; @@ -224,22 +435,22 @@ __taskq_wait(taskq_t *tq) id = tq->tq_next_id - 1; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - __taskq_wait_id(tq, id); + taskq_wait_all(tq, id); SEXIT; } -EXPORT_SYMBOL(__taskq_wait); +EXPORT_SYMBOL(taskq_wait); int -__taskq_member(taskq_t *tq, void *t) +taskq_member(taskq_t *tq, void *t) { struct list_head *l; taskq_thread_t *tqt; - SENTRY; + SENTRY; ASSERT(tq); - ASSERT(t); + ASSERT(t); list_for_each(l, &tq->tq_thread_list) { tqt = list_entry(l, taskq_thread_t, tqt_thread_list); @@ -247,21 +458,78 @@ __taskq_member(taskq_t *tq, void *t) SRETURN(1); } - SRETURN(0); + SRETURN(0); } -EXPORT_SYMBOL(__taskq_member); +EXPORT_SYMBOL(taskq_member); + +/* + * Cancel an already dispatched task given the task id. Still pending tasks + * will be immediately canceled, and if the task is active the function will + * block until it completes. Preallocated tasks which are canceled must be + * freed by the caller. + */ +int +taskq_cancel_id(taskq_t *tq, taskqid_t id) +{ + taskq_ent_t *t; + int active = 0; + int rc = ENOENT; + SENTRY; + + ASSERT(tq); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + t = taskq_find(tq, id, &active); + if (t && !active) { + list_del_init(&t->tqent_list); + t->tqent_flags |= TQENT_FLAG_CANCEL; + + /* + * When canceling the lowest outstanding task id we + * must recalculate the new lowest outstanding id. + */ + if (tq->tq_lowest_id == t->tqent_id) { + tq->tq_lowest_id = taskq_lowest_id(tq); + ASSERT3S(tq->tq_lowest_id, >, t->tqent_id); + } + + /* + * The task_expire() function takes the tq->tq_lock so drop + * drop the lock before synchronously cancelling the timer. + */ + if (timer_pending(&t->tqent_timer)) { + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + del_timer_sync(&t->tqent_timer); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + } + + if (!(t->tqent_flags & TQENT_FLAG_PREALLOC)) + task_done(tq, t); + + rc = 0; + } + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + + if (active) { + taskq_wait_id(tq, id); + rc = EBUSY; + } + + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_cancel_id); taskqid_t -__taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) +taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) { - taskq_ent_t *t; + taskq_ent_t *t; taskqid_t rc = 0; - SENTRY; + SENTRY; - ASSERT(tq); - ASSERT(func); + ASSERT(tq); + ASSERT(func); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* Taskq being destroyed and all tasks drained */ if (!(tq->tq_flags & TQ_ACTIVE)) @@ -272,7 +540,7 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) SGOTO(out, rc = 0); - if ((t = task_alloc(tq, flags)) == NULL) + if ((t = task_alloc(tq, flags)) == NULL) SGOTO(out, rc = 0); spin_lock(&t->tqent_lock); @@ -285,8 +553,12 @@ __taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) t->tqent_id = rc = tq->tq_next_id; tq->tq_next_id++; - t->tqent_func = func; - t->tqent_arg = arg; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + t->tqent_timer.data = 0; + t->tqent_timer.function = NULL; + t->tqent_timer.expires = 0; ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); @@ -297,10 +569,54 @@ out: spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); SRETURN(rc); } -EXPORT_SYMBOL(__taskq_dispatch); +EXPORT_SYMBOL(taskq_dispatch); + +taskqid_t +taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, + uint_t flags, clock_t expire_time) +{ + taskq_ent_t *t; + taskqid_t rc = 0; + SENTRY; + + ASSERT(tq); + ASSERT(func); + + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + + /* Taskq being destroyed and all tasks drained */ + if (!(tq->tq_flags & TQ_ACTIVE)) + SGOTO(out, rc = 0); + + if ((t = task_alloc(tq, flags)) == NULL) + SGOTO(out, rc = 0); + + spin_lock(&t->tqent_lock); + + /* Queue to the delay list for subsequent execution */ + list_add_tail(&t->tqent_list, &tq->tq_delay_list); + + t->tqent_id = rc = tq->tq_next_id; + tq->tq_next_id++; + t->tqent_func = func; + t->tqent_arg = arg; + t->tqent_taskq = tq; + t->tqent_timer.data = (unsigned long)t; + t->tqent_timer.function = task_expire; + t->tqent_timer.expires = (unsigned long)expire_time; + add_timer(&t->tqent_timer); + + ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); + + spin_unlock(&t->tqent_lock); +out: + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + SRETURN(rc); +} +EXPORT_SYMBOL(taskq_dispatch_delay); void -__taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, +taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, taskq_ent_t *t) { SENTRY; @@ -335,6 +651,7 @@ __taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, tq->tq_next_id++; t->tqent_func = func; t->tqent_arg = arg; + t->tqent_taskq = tq; spin_unlock(&t->tqent_lock); @@ -343,117 +660,55 @@ out: spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); SEXIT; } -EXPORT_SYMBOL(__taskq_dispatch_ent); +EXPORT_SYMBOL(taskq_dispatch_ent); int -__taskq_empty_ent(taskq_ent_t *t) +taskq_empty_ent(taskq_ent_t *t) { return list_empty(&t->tqent_list); } -EXPORT_SYMBOL(__taskq_empty_ent); +EXPORT_SYMBOL(taskq_empty_ent); void -__taskq_init_ent(taskq_ent_t *t) +taskq_init_ent(taskq_ent_t *t) { spin_lock_init(&t->tqent_lock); + init_waitqueue_head(&t->tqent_waitq); + init_timer(&t->tqent_timer); INIT_LIST_HEAD(&t->tqent_list); t->tqent_id = 0; t->tqent_func = NULL; t->tqent_arg = NULL; t->tqent_flags = 0; + t->tqent_taskq = NULL; } -EXPORT_SYMBOL(__taskq_init_ent); - -/* - * Returns the lowest incomplete taskqid_t. The taskqid_t may - * be queued on the pending list, on the priority list, or on - * the work list currently being handled, but it is not 100% - * complete yet. - */ -static taskqid_t -taskq_lowest_id(taskq_t *tq) -{ - taskqid_t lowest_id = tq->tq_next_id; - taskq_ent_t *t; - taskq_thread_t *tqt; - SENTRY; - - ASSERT(tq); - ASSERT(spin_is_locked(&tq->tq_lock)); - - if (!list_empty(&tq->tq_pend_list)) { - t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list); - lowest_id = MIN(lowest_id, t->tqent_id); - } - - if (!list_empty(&tq->tq_prio_list)) { - t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list); - lowest_id = MIN(lowest_id, t->tqent_id); - } - - if (!list_empty(&tq->tq_active_list)) { - tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, - tqt_active_list); - ASSERT(tqt->tqt_id != 0); - lowest_id = MIN(lowest_id, tqt->tqt_id); - } - - SRETURN(lowest_id); -} - -/* - * Insert a task into a list keeping the list sorted by increasing - * taskqid. - */ -static void -taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) -{ - taskq_thread_t *w; - struct list_head *l; - - SENTRY; - ASSERT(tq); - ASSERT(tqt); - ASSERT(spin_is_locked(&tq->tq_lock)); - - list_for_each_prev(l, &tq->tq_active_list) { - w = list_entry(l, taskq_thread_t, tqt_active_list); - if (w->tqt_id < tqt->tqt_id) { - list_add(&tqt->tqt_active_list, l); - break; - } - } - if (l == &tq->tq_active_list) - list_add(&tqt->tqt_active_list, &tq->tq_active_list); - - SEXIT; -} +EXPORT_SYMBOL(taskq_init_ent); static int taskq_thread(void *args) { - DECLARE_WAITQUEUE(wait, current); - sigset_t blocked; + DECLARE_WAITQUEUE(wait, current); + sigset_t blocked; taskq_thread_t *tqt = args; - taskq_t *tq; - taskq_ent_t *t; + taskq_t *tq; + taskq_ent_t *t; struct list_head *pend_list; SENTRY; - ASSERT(tqt); + ASSERT(tqt); tq = tqt->tqt_tq; - current->flags |= PF_NOFREEZE; + current->flags |= PF_NOFREEZE; - sigfillset(&blocked); - sigprocmask(SIG_BLOCK, &blocked, NULL); - flush_signals(current); + sigfillset(&blocked); + sigprocmask(SIG_BLOCK, &blocked, NULL); + flush_signals(current); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - tq->tq_nthreads++; - wake_up(&tq->tq_wait_waitq); - set_current_state(TASK_INTERRUPTIBLE); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + tq->tq_nthreads++; + wake_up(&tq->tq_wait_waitq); + set_current_state(TASK_INTERRUPTIBLE); - while (!kthread_should_stop()) { + while (!kthread_should_stop()) { if (list_empty(&tq->tq_pend_list) && list_empty(&tq->tq_prio_list)) { @@ -475,13 +730,14 @@ taskq_thread(void *args) pend_list = NULL; if (pend_list) { - t = list_entry(pend_list->next, taskq_ent_t, tqent_list); - list_del_init(&t->tqent_list); + t = list_entry(pend_list->next,taskq_ent_t,tqent_list); + list_del_init(&t->tqent_list); /* In order to support recursively dispatching a * preallocated taskq_ent_t, tqent_id must be * stored prior to executing tqent_func. */ tqt->tqt_id = t->tqent_id; + tqt->tqt_task = t; /* We must store a copy of the flags prior to * servicing the task (servicing a prealloc'd task @@ -491,15 +747,16 @@ taskq_thread(void *args) tqt->tqt_flags = t->tqent_flags; taskq_insert_in_order(tq, tqt); - tq->tq_nactive++; + tq->tq_nactive++; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); /* Perform the requested task */ - t->tqent_func(t->tqent_arg); + t->tqent_func(t->tqent_arg); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - tq->tq_nactive--; + tq->tq_nactive--; list_del_init(&tqt->tqt_active_list); + tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ if ((tq->tq_flags & TASKQ_DYNAMIC) || @@ -515,37 +772,37 @@ taskq_thread(void *args) tqt->tqt_id = 0; tqt->tqt_flags = 0; - wake_up_all(&tq->tq_wait_waitq); + wake_up_all(&tq->tq_wait_waitq); } set_current_state(TASK_INTERRUPTIBLE); - } + } __set_current_state(TASK_RUNNING); - tq->tq_nthreads--; + tq->tq_nthreads--; list_del_init(&tqt->tqt_thread_list); kmem_free(tqt, sizeof(taskq_thread_t)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); SRETURN(0); } taskq_t * -__taskq_create(const char *name, int nthreads, pri_t pri, - int minalloc, int maxalloc, uint_t flags) +taskq_create(const char *name, int nthreads, pri_t pri, + int minalloc, int maxalloc, uint_t flags) { - taskq_t *tq; + taskq_t *tq; taskq_thread_t *tqt; - int rc = 0, i, j = 0; - SENTRY; + int rc = 0, i, j = 0; + SENTRY; - ASSERT(name != NULL); - ASSERT(pri <= maxclsyspri); - ASSERT(minalloc >= 0); - ASSERT(maxalloc <= INT_MAX); - ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ + ASSERT(name != NULL); + ASSERT(pri <= maxclsyspri); + ASSERT(minalloc >= 0); + ASSERT(maxalloc <= INT_MAX); + ASSERT(!(flags & (TASKQ_CPR_SAFE | TASKQ_DYNAMIC))); /* Unsupported */ /* Scale the number of threads using nthreads as a percentage */ if (flags & TASKQ_THREADS_CPU_PCT) { @@ -556,35 +813,36 @@ __taskq_create(const char *name, int nthreads, pri_t pri, nthreads = MAX((num_online_cpus() * nthreads) / 100, 1); } - tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE); - if (tq == NULL) - SRETURN(NULL); + tq = kmem_alloc(sizeof(*tq), KM_PUSHPAGE); + if (tq == NULL) + SRETURN(NULL); - spin_lock_init(&tq->tq_lock); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - INIT_LIST_HEAD(&tq->tq_thread_list); - INIT_LIST_HEAD(&tq->tq_active_list); - tq->tq_name = name; - tq->tq_nactive = 0; + spin_lock_init(&tq->tq_lock); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + INIT_LIST_HEAD(&tq->tq_thread_list); + INIT_LIST_HEAD(&tq->tq_active_list); + tq->tq_name = name; + tq->tq_nactive = 0; tq->tq_nthreads = 0; - tq->tq_pri = pri; - tq->tq_minalloc = minalloc; - tq->tq_maxalloc = maxalloc; + tq->tq_pri = pri; + tq->tq_minalloc = minalloc; + tq->tq_maxalloc = maxalloc; tq->tq_nalloc = 0; - tq->tq_flags = (flags | TQ_ACTIVE); + tq->tq_flags = (flags | TQ_ACTIVE); tq->tq_next_id = 1; tq->tq_lowest_id = 1; - INIT_LIST_HEAD(&tq->tq_free_list); - INIT_LIST_HEAD(&tq->tq_pend_list); - INIT_LIST_HEAD(&tq->tq_prio_list); - init_waitqueue_head(&tq->tq_work_waitq); - init_waitqueue_head(&tq->tq_wait_waitq); + INIT_LIST_HEAD(&tq->tq_free_list); + INIT_LIST_HEAD(&tq->tq_pend_list); + INIT_LIST_HEAD(&tq->tq_prio_list); + INIT_LIST_HEAD(&tq->tq_delay_list); + init_waitqueue_head(&tq->tq_work_waitq); + init_waitqueue_head(&tq->tq_wait_waitq); - if (flags & TASKQ_PREPOPULATE) - for (i = 0; i < minalloc; i++) - task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW)); + if (flags & TASKQ_PREPOPULATE) + for (i = 0; i < minalloc; i++) + task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); for (i = 0; i < nthreads; i++) { tqt = kmem_alloc(sizeof(*tqt), KM_PUSHPAGE); @@ -594,7 +852,7 @@ __taskq_create(const char *name, int nthreads, pri_t pri, tqt->tqt_id = 0; tqt->tqt_thread = kthread_create(taskq_thread, tqt, - "%s/%d", name, i); + "%s/%d", name, i); if (tqt->tqt_thread) { list_add(&tqt->tqt_thread_list, &tq->tq_thread_list); kthread_bind(tqt->tqt_thread, i % num_online_cpus()); @@ -607,20 +865,20 @@ __taskq_create(const char *name, int nthreads, pri_t pri, } } - /* Wait for all threads to be started before potential destroy */ + /* Wait for all threads to be started before potential destroy */ wait_event(tq->tq_wait_waitq, tq->tq_nthreads == j); - if (rc) { - __taskq_destroy(tq); - tq = NULL; - } + if (rc) { + taskq_destroy(tq); + tq = NULL; + } - SRETURN(tq); + SRETURN(tq); } -EXPORT_SYMBOL(__taskq_create); +EXPORT_SYMBOL(taskq_create); void -__taskq_destroy(taskq_t *tq) +taskq_destroy(taskq_t *tq) { struct task_struct *thread; taskq_thread_t *tqt; @@ -629,13 +887,13 @@ __taskq_destroy(taskq_t *tq) ASSERT(tq); spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); - tq->tq_flags &= ~TQ_ACTIVE; + tq->tq_flags &= ~TQ_ACTIVE; spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); /* TQ_ACTIVE cleared prevents new tasks being added to pending */ - __taskq_wait(tq); + taskq_wait(tq); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); /* * Signal each thread to exit and block until it does. Each thread @@ -651,53 +909,54 @@ __taskq_destroy(taskq_t *tq) kthread_stop(thread); - spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); + spin_lock_irqsave(&tq->tq_lock, tq->tq_lock_flags); } - while (!list_empty(&tq->tq_free_list)) { + while (!list_empty(&tq->tq_free_list)) { t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); - list_del_init(&t->tqent_list); - task_free(tq, t); - } + list_del_init(&t->tqent_list); + task_free(tq, t); + } - ASSERT(tq->tq_nthreads == 0); - ASSERT(tq->tq_nalloc == 0); - ASSERT(list_empty(&tq->tq_thread_list)); - ASSERT(list_empty(&tq->tq_active_list)); - ASSERT(list_empty(&tq->tq_free_list)); - ASSERT(list_empty(&tq->tq_pend_list)); - ASSERT(list_empty(&tq->tq_prio_list)); + ASSERT(tq->tq_nthreads == 0); + ASSERT(tq->tq_nalloc == 0); + ASSERT(list_empty(&tq->tq_thread_list)); + ASSERT(list_empty(&tq->tq_active_list)); + ASSERT(list_empty(&tq->tq_free_list)); + ASSERT(list_empty(&tq->tq_pend_list)); + ASSERT(list_empty(&tq->tq_prio_list)); + ASSERT(list_empty(&tq->tq_delay_list)); - spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); + spin_unlock_irqrestore(&tq->tq_lock, tq->tq_lock_flags); - kmem_free(tq, sizeof(taskq_t)); + kmem_free(tq, sizeof(taskq_t)); SEXIT; } -EXPORT_SYMBOL(__taskq_destroy); +EXPORT_SYMBOL(taskq_destroy); int spl_taskq_init(void) { - SENTRY; + SENTRY; /* Solaris creates a dynamic taskq of up to 64 threads, however in * a Linux environment 1 thread per-core is usually about right */ - system_taskq = taskq_create("spl_system_taskq", num_online_cpus(), + system_taskq = taskq_create("spl_system_taskq", num_online_cpus(), minclsyspri, 4, 512, TASKQ_PREPOPULATE); if (system_taskq == NULL) SRETURN(1); - SRETURN(0); + SRETURN(0); } void spl_taskq_fini(void) { - SENTRY; + SENTRY; taskq_destroy(system_taskq); - SEXIT; + SEXIT; } diff --git a/module/splat/splat-taskq.c b/module/splat/splat-taskq.c index b94930cc9b..5a9681e21a 100644 --- a/module/splat/splat-taskq.c +++ b/module/splat/splat-taskq.c @@ -25,6 +25,7 @@ \*****************************************************************************/ #include +#include #include #include "splat-internal.h" @@ -63,6 +64,14 @@ #define SPLAT_TASKQ_TEST8_NAME "contention" #define SPLAT_TASKQ_TEST8_DESC "1 queue, 100 threads, 131072 tasks" +#define SPLAT_TASKQ_TEST9_ID 0x0209 +#define SPLAT_TASKQ_TEST9_NAME "delay" +#define SPLAT_TASKQ_TEST9_DESC "Delayed task execution" + +#define SPLAT_TASKQ_TEST10_ID 0x020a +#define SPLAT_TASKQ_TEST10_NAME "cancel" +#define SPLAT_TASKQ_TEST10_DESC "Cancel task execution" + #define SPLAT_TASKQ_ORDER_MAX 8 #define SPLAT_TASKQ_DEPTH_MAX 16 @@ -70,9 +79,10 @@ typedef struct splat_taskq_arg { int flag; int id; - atomic_t count; + atomic_t *count; int order[SPLAT_TASKQ_ORDER_MAX]; unsigned int depth; + unsigned long expire; taskq_t *tq; taskq_ent_t *tqe; spinlock_t lock; @@ -415,8 +425,8 @@ splat_taskq_test3(struct file *file, void *arg) /* * Create a taskq and dispatch a large number of tasks to the queue. * Then use taskq_wait() to block until all the tasks complete, then - * cross check that all the tasks ran by checking tg_arg->count which - * is incremented in the task function. Finally cleanup the taskq. + * cross check that all the tasks ran by checking the shared atomic + * counter which is incremented in the task function. * * First we try with a large 'maxalloc' value, then we try with a small one. * We should not drop tasks when TQ_SLEEP is used in taskq_dispatch(), even @@ -428,7 +438,7 @@ splat_taskq_test4_func(void *arg) splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; ASSERT(tq_arg); - atomic_inc(&tq_arg->count); + atomic_inc(tq_arg->count); } static int @@ -439,6 +449,7 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc, taskqid_t id; splat_taskq_arg_t tq_arg; taskq_ent_t *tqes; + atomic_t count; int i, j, rc = 0; tqes = kmalloc(sizeof(*tqes) * nr_tasks, GFP_KERNEL); @@ -461,9 +472,10 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc, tq_arg.file = file; tq_arg.name = SPLAT_TASKQ_TEST4_NAME; + tq_arg.count = &count; for (i = 1; i <= nr_tasks; i *= 2) { - atomic_set(&tq_arg.count, 0); + atomic_set(tq_arg.count, 0); splat_vprint(file, SPLAT_TASKQ_TEST4_NAME, "Taskq '%s' function '%s' dispatched %d times\n", tq_arg.name, sym2str(splat_taskq_test4_func), i); @@ -495,8 +507,8 @@ splat_taskq_test4_common(struct file *file, void *arg, int minalloc, taskq_wait(tq); splat_vprint(file, SPLAT_TASKQ_TEST4_NAME, "Taskq '%s' " "%d/%d dispatches finished\n", tq_arg.name, - atomic_read(&tq_arg.count), i); - if (atomic_read(&tq_arg.count) != i) { + atomic_read(&count), i); + if (atomic_read(&count) != i) { rc = -ERANGE; goto out; @@ -548,10 +560,10 @@ splat_taskq_test4(struct file *file, void *arg) * next pending task as soon as it completes its current task. This * means that tasks do not strictly complete in order in which they * were dispatched (increasing task id). This is fine but we need to - * verify that taskq_wait_id() blocks until the passed task id and all + * verify that taskq_wait_all() blocks until the passed task id and all * lower task ids complete. We do this by dispatching the following * specific sequence of tasks each of which block for N time units. - * We then use taskq_wait_id() to unblock at specific task id and + * We then use taskq_wait_all() to unblock at specific task id and * verify the only the expected task ids have completed and in the * correct order. The two cases of interest are: * @@ -562,17 +574,17 @@ splat_taskq_test4(struct file *file, void *arg) * * The following table shows each task id and how they will be * scheduled. Each rows represent one time unit and each column - * one of the three worker threads. The places taskq_wait_id() + * one of the three worker threads. The places taskq_wait_all() * must unblock for a specific id are identified as well as the * task ids which must have completed and their order. * - * +-----+ <--- taskq_wait_id(tq, 8) unblocks + * +-----+ <--- taskq_wait_all(tq, 8) unblocks * | | Required Completion Order: 1,2,4,5,3,8,6,7 * +-----+ | * | | | * | | +-----+ * | | | 8 | - * | | +-----+ <--- taskq_wait_id(tq, 3) unblocks + * | | +-----+ <--- taskq_wait_all(tq, 3) unblocks * | | 7 | | Required Completion Order: 1,2,4,5,3 * | +-----+ | * | 6 | | | @@ -657,9 +669,12 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc) splat_taskq_arg_t tq_arg; int order1[SPLAT_TASKQ_ORDER_MAX] = { 1,2,4,5,3,0,0,0 }; int order2[SPLAT_TASKQ_ORDER_MAX] = { 1,2,4,5,3,8,6,7 }; - taskq_ent_t tqes[SPLAT_TASKQ_ORDER_MAX]; + taskq_ent_t *tqes; int i, rc = 0; + tqes = kmem_alloc(sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX, KM_SLEEP); + memset(tqes, 0, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX); + splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' creating (%s dispatch)\n", SPLAT_TASKQ_TEST5_NAME, @@ -712,13 +727,13 @@ splat_taskq_test5_impl(struct file *file, void *arg, boolean_t prealloc) splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, 3); - taskq_wait_id(tq, 3); + taskq_wait_all(tq, 3); if ((rc = splat_taskq_test_order(&tq_arg, order1))) goto out; splat_vprint(file, SPLAT_TASKQ_TEST5_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, 8); - taskq_wait_id(tq, 8); + taskq_wait_all(tq, 8); rc = splat_taskq_test_order(&tq_arg, order2); out: @@ -726,6 +741,8 @@ out: "Taskq '%s' destroying\n", tq_arg.name); taskq_destroy(tq); + kmem_free(tqes, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX); + return rc; } @@ -811,10 +828,13 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc) splat_taskq_id_t tq_id[SPLAT_TASKQ_ORDER_MAX]; splat_taskq_arg_t tq_arg; int order[SPLAT_TASKQ_ORDER_MAX] = { 1,2,3,6,7,8,4,5 }; - taskq_ent_t tqes[SPLAT_TASKQ_ORDER_MAX]; + taskq_ent_t *tqes; int i, rc = 0; uint_t tflags; + tqes = kmem_alloc(sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX, KM_SLEEP); + memset(tqes, 0, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX); + splat_vprint(file, SPLAT_TASKQ_TEST6_NAME, "Taskq '%s' creating (%s dispatch)\n", SPLAT_TASKQ_TEST6_NAME, @@ -874,7 +894,7 @@ splat_taskq_test6_impl(struct file *file, void *arg, boolean_t prealloc) splat_vprint(file, SPLAT_TASKQ_TEST6_NAME, "Taskq '%s' " "waiting for taskqid %d completion\n", tq_arg.name, SPLAT_TASKQ_ORDER_MAX); - taskq_wait_id(tq, SPLAT_TASKQ_ORDER_MAX); + taskq_wait_all(tq, SPLAT_TASKQ_ORDER_MAX); rc = splat_taskq_test_order(&tq_arg, order); out: @@ -882,6 +902,8 @@ out: "Taskq '%s' destroying\n", tq_arg.name); taskq_destroy(tq); + kmem_free(tqes, sizeof(*tqes) * SPLAT_TASKQ_ORDER_MAX); + return rc; } @@ -975,7 +997,7 @@ splat_taskq_test7_impl(struct file *file, void *arg, boolean_t prealloc) if (tq_arg.flag == 0) { splat_vprint(file, SPLAT_TASKQ_TEST7_NAME, "Taskq '%s' waiting\n", tq_arg.name); - taskq_wait_id(tq, SPLAT_TASKQ_DEPTH_MAX); + taskq_wait_all(tq, SPLAT_TASKQ_DEPTH_MAX); } splat_vprint(file, SPLAT_TASKQ_TEST7_NAME, @@ -1011,7 +1033,7 @@ splat_taskq_test8_func(void *arg) splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; ASSERT(tq_arg); - atomic_inc(&tq_arg->count); + atomic_inc(tq_arg->count); } #define TEST8_NUM_TASKS 0x20000 @@ -1025,6 +1047,7 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc, taskqid_t id; splat_taskq_arg_t tq_arg; taskq_ent_t **tqes; + atomic_t count; int i, j, rc = 0; tqes = vmalloc(sizeof(*tqes) * TEST8_NUM_TASKS); @@ -1048,8 +1071,9 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc, tq_arg.file = file; tq_arg.name = SPLAT_TASKQ_TEST8_NAME; + tq_arg.count = &count; + atomic_set(tq_arg.count, 0); - atomic_set(&tq_arg.count, 0); for (i = 0; i < TEST8_NUM_TASKS; i++) { tqes[i] = kmalloc(sizeof(taskq_ent_t), GFP_KERNEL); if (tqes[i] == NULL) { @@ -1079,9 +1103,9 @@ splat_taskq_test8_common(struct file *file, void *arg, int minalloc, taskq_wait(tq); splat_vprint(file, SPLAT_TASKQ_TEST8_NAME, "Taskq '%s' " "%d/%d dispatches finished\n", tq_arg.name, - atomic_read(&tq_arg.count), TEST8_NUM_TASKS); + atomic_read(tq_arg.count), TEST8_NUM_TASKS); - if (atomic_read(&tq_arg.count) != TEST8_NUM_TASKS) + if (atomic_read(tq_arg.count) != TEST8_NUM_TASKS) rc = -ERANGE; out: @@ -1106,6 +1130,271 @@ splat_taskq_test8(struct file *file, void *arg) return rc; } +/* + * Create a taskq and dispatch a number of delayed tasks to the queue. + * For each task verify that it was run no early than requested. + */ +static void +splat_taskq_test9_func(void *arg) +{ + splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; + ASSERT(tq_arg); + + if (ddi_get_lbolt() >= tq_arg->expire) + atomic_inc(tq_arg->count); + + kmem_free(tq_arg, sizeof(splat_taskq_arg_t)); +} + +static int +splat_taskq_test9(struct file *file, void *arg) +{ + taskq_t *tq; + atomic_t count; + int i, rc = 0; + int minalloc = 1; + int maxalloc = 10; + int nr_tasks = 100; + + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, + "Taskq '%s' creating (%s dispatch) (%d/%d/%d)\n", + SPLAT_TASKQ_TEST9_NAME, "delay", minalloc, maxalloc, nr_tasks); + if ((tq = taskq_create(SPLAT_TASKQ_TEST9_NAME, 3, maxclsyspri, + minalloc, maxalloc, TASKQ_PREPOPULATE)) == NULL) { + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, + "Taskq '%s' create failed\n", SPLAT_TASKQ_TEST9_NAME); + return -EINVAL; + } + + atomic_set(&count, 0); + + for (i = 1; i <= nr_tasks; i++) { + splat_taskq_arg_t *tq_arg; + taskqid_t id; + uint32_t rnd; + + /* A random timeout in jiffies of at most 5 seconds */ + get_random_bytes((void *)&rnd, 4); + rnd = rnd % (5 * HZ); + + tq_arg = kmem_alloc(sizeof(splat_taskq_arg_t), KM_SLEEP); + tq_arg->file = file; + tq_arg->name = SPLAT_TASKQ_TEST9_NAME; + tq_arg->expire = ddi_get_lbolt() + rnd; + tq_arg->count = &count; + + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, + "Taskq '%s' delay dispatch %u jiffies\n", + SPLAT_TASKQ_TEST9_NAME, rnd); + + id = taskq_dispatch_delay(tq, splat_taskq_test9_func, + tq_arg, TQ_SLEEP, ddi_get_lbolt() + rnd); + + if (id == 0) { + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, + "Taskq '%s' delay dispatch failed\n", + SPLAT_TASKQ_TEST9_NAME); + kmem_free(tq_arg, sizeof(splat_taskq_arg_t)); + taskq_wait(tq); + rc = -EINVAL; + goto out; + } + } + + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' waiting for " + "%d delay dispatches\n", SPLAT_TASKQ_TEST9_NAME, nr_tasks); + + taskq_wait(tq); + if (atomic_read(&count) != nr_tasks) + rc = -ERANGE; + + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' %d/%d delay " + "dispatches finished on time\n", SPLAT_TASKQ_TEST9_NAME, + atomic_read(&count), nr_tasks); + splat_vprint(file, SPLAT_TASKQ_TEST9_NAME, "Taskq '%s' destroying\n", + SPLAT_TASKQ_TEST9_NAME); +out: + taskq_destroy(tq); + + return rc; +} + +/* + * Create a taskq and dispatch then cancel tasks in the queue. + */ +static void +splat_taskq_test10_func(void *arg) +{ + splat_taskq_arg_t *tq_arg = (splat_taskq_arg_t *)arg; + uint8_t rnd; + + if (ddi_get_lbolt() >= tq_arg->expire) + atomic_inc(tq_arg->count); + + /* Randomly sleep to further perturb the system */ + get_random_bytes((void *)&rnd, 1); + msleep(1 + (rnd % 9)); +} + +static int +splat_taskq_test10(struct file *file, void *arg) +{ + taskq_t *tq; + splat_taskq_arg_t **tqas; + atomic_t count; + int i, j, rc = 0; + int minalloc = 1; + int maxalloc = 10; + int nr_tasks = 100; + int canceled = 0; + int completed = 0; + int blocked = 0; + unsigned long start, cancel; + + tqas = vmalloc(sizeof(*tqas) * nr_tasks); + if (tqas == NULL) + return -ENOMEM; + memset(tqas, 0, sizeof(*tqas) * nr_tasks); + + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' creating (%s dispatch) (%d/%d/%d)\n", + SPLAT_TASKQ_TEST10_NAME, "delay", minalloc, maxalloc, nr_tasks); + if ((tq = taskq_create(SPLAT_TASKQ_TEST10_NAME, 3, maxclsyspri, + minalloc, maxalloc, TASKQ_PREPOPULATE)) == NULL) { + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' create failed\n", SPLAT_TASKQ_TEST10_NAME); + rc = -EINVAL; + goto out_free; + } + + atomic_set(&count, 0); + + for (i = 0; i < nr_tasks; i++) { + splat_taskq_arg_t *tq_arg; + uint32_t rnd; + + /* A random timeout in jiffies of at most 5 seconds */ + get_random_bytes((void *)&rnd, 4); + rnd = rnd % (5 * HZ); + + tq_arg = kmem_alloc(sizeof(splat_taskq_arg_t), KM_SLEEP); + tq_arg->file = file; + tq_arg->name = SPLAT_TASKQ_TEST10_NAME; + tq_arg->count = &count; + tqas[i] = tq_arg; + + /* + * Dispatch every 1/3 one immediately to mix it up, the cancel + * code is inherently racy and we want to try and provoke any + * subtle concurrently issues. + */ + if ((i % 3) == 0) { + tq_arg->expire = ddi_get_lbolt(); + tq_arg->id = taskq_dispatch(tq, splat_taskq_test10_func, + tq_arg, TQ_SLEEP); + } else { + tq_arg->expire = ddi_get_lbolt() + rnd; + tq_arg->id = taskq_dispatch_delay(tq, + splat_taskq_test10_func, + tq_arg, TQ_SLEEP, ddi_get_lbolt() + rnd); + } + + if (tq_arg->id == 0) { + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' dispatch failed\n", + SPLAT_TASKQ_TEST10_NAME); + kmem_free(tq_arg, sizeof(splat_taskq_arg_t)); + taskq_wait(tq); + rc = -EINVAL; + goto out; + } else { + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' dispatch %lu in %lu jiffies\n", + SPLAT_TASKQ_TEST10_NAME, (unsigned long)tq_arg->id, + !(i % 3) ? 0 : tq_arg->expire - ddi_get_lbolt()); + } + } + + /* + * Start randomly canceling tasks for the duration of the test. We + * happen to know the valid task id's will be in the range 1..nr_tasks + * because the taskq is private and was just created. However, we + * have no idea of a particular task has already executed or not. + */ + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' randomly " + "canceling task ids\n", SPLAT_TASKQ_TEST10_NAME); + + start = ddi_get_lbolt(); + i = 0; + + while (ddi_get_lbolt() < start + 5 * HZ) { + taskqid_t id; + uint32_t rnd; + + i++; + cancel = ddi_get_lbolt(); + get_random_bytes((void *)&rnd, 4); + id = 1 + (rnd % nr_tasks); + rc = taskq_cancel_id(tq, id); + + /* + * Keep track of the results of the random cancels. + */ + if (rc == 0) { + canceled++; + } else if (rc == ENOENT) { + completed++; + } else if (rc == EBUSY) { + blocked++; + } else { + rc = -EINVAL; + break; + } + + /* + * Verify we never get blocked to long in taskq_cancel_id(). + * The worst case is 10ms if we happen to cancel the task + * which is currently executing. We allow a factor of 2x. + */ + if (ddi_get_lbolt() - cancel > HZ / 50) { + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, + "Taskq '%s' cancel for %lu took %lu\n", + SPLAT_TASKQ_TEST10_NAME, (unsigned long)id, + ddi_get_lbolt() - cancel); + rc = -ETIMEDOUT; + break; + } + + get_random_bytes((void *)&rnd, 4); + msleep(1 + (rnd % 100)); + rc = 0; + } + + taskq_wait(tq); + + /* + * Cross check the results of taskq_cancel_id() with the number of + * times the dispatched function actually ran successfully. + */ + if ((rc == 0) && (nr_tasks - canceled != atomic_read(&count))) + rc = -EDOM; + + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' %d attempts, " + "%d canceled, %d completed, %d blocked, %d/%d tasks run\n", + SPLAT_TASKQ_TEST10_NAME, i, canceled, completed, blocked, + atomic_read(&count), nr_tasks); + splat_vprint(file, SPLAT_TASKQ_TEST10_NAME, "Taskq '%s' destroying %d\n", + SPLAT_TASKQ_TEST10_NAME, rc); +out: + taskq_destroy(tq); +out_free: + for (j = 0; j < nr_tasks && tqas[j] != NULL; j++) + kmem_free(tqas[j], sizeof(splat_taskq_arg_t)); + vfree(tqas); + + return rc; +} + splat_subsystem_t * splat_taskq_init(void) { @@ -1139,6 +1428,10 @@ splat_taskq_init(void) SPLAT_TASKQ_TEST7_ID, splat_taskq_test7); SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST8_NAME, SPLAT_TASKQ_TEST8_DESC, SPLAT_TASKQ_TEST8_ID, splat_taskq_test8); + SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST9_NAME, SPLAT_TASKQ_TEST9_DESC, + SPLAT_TASKQ_TEST9_ID, splat_taskq_test9); + SPLAT_TEST_INIT(sub, SPLAT_TASKQ_TEST10_NAME, SPLAT_TASKQ_TEST10_DESC, + SPLAT_TASKQ_TEST10_ID, splat_taskq_test10); return sub; } @@ -1147,6 +1440,8 @@ void splat_taskq_fini(splat_subsystem_t *sub) { ASSERT(sub); + SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST10_ID); + SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST9_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST8_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST7_ID); SPLAT_TEST_FINI(sub, SPLAT_TASKQ_TEST6_ID);