Use taskq for dump_bytes()

The vn_rdwr() function performs I/O by calling the vfs_write() or
vfs_read() functions.  These functions reside just below the system
call layer and the expectation is they have almost the entire 8k of
stack space to work with.  In fact, certain layered configurations
such as ext+lvm+md+multipath require the majority of this stack to
avoid stack overflows.

To avoid this posibility the vn_rdwr() call in dump_bytes() has been
moved to the ZIO_TYPE_FREE, taskq.  This ensures that all I/O will be
performed with the majority of the stack space available.  This ends
up being very similiar to as if the I/O were issued via sys_write()
or sys_read().

Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Closes #1399
Closes #1423
This commit is contained in:
Brian Behlendorf 2013-05-03 14:17:21 -07:00
parent 7ef5e54e2e
commit 044baf009a
5 changed files with 69 additions and 6 deletions

View File

@ -250,6 +250,9 @@ extern char *spa_config_path;
extern void spa_taskq_dispatch_ent(spa_t *spa, zio_type_t t, zio_taskq_type_t q,
task_func_t *func, void *arg, uint_t flags, taskq_ent_t *ent);
extern void spa_taskq_dispatch_sync(spa_t *, zio_type_t t, zio_taskq_type_t q,
task_func_t *func, void *arg, uint_t flags);
#ifdef __cplusplus
}

View File

@ -409,6 +409,7 @@ extern int taskq_empty_ent(taskq_ent_t *);
extern void taskq_init_ent(taskq_ent_t *);
extern void taskq_destroy(taskq_t *);
extern void taskq_wait(taskq_t *);
extern void taskq_wait_id(taskq_t *, taskqid_t);
extern int taskq_member(taskq_t *, kthread_t *);
extern int taskq_cancel_id(taskq_t *, taskqid_t);
extern void system_taskq_init(void);

View File

@ -211,6 +211,12 @@ taskq_wait(taskq_t *tq)
mutex_exit(&tq->tq_lock);
}
void
taskq_wait_id(taskq_t *tq, taskqid_t id)
{
taskq_wait(tq);
}
static void
taskq_thread(void *arg)
{

View File

@ -39,6 +39,7 @@
#include <sys/dsl_prop.h>
#include <sys/dsl_pool.h>
#include <sys/dsl_synctask.h>
#include <sys/spa_impl.h>
#include <sys/zfs_ioctl.h>
#include <sys/zap.h>
#include <sys/zio_checksum.h>
@ -53,21 +54,48 @@ int zfs_send_corrupt_data = B_FALSE;
static char *dmu_recv_tag = "dmu_recv_tag";
static int
dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
typedef struct dump_bytes_io {
dmu_sendarg_t *dbi_dsp;
void *dbi_buf;
int dbi_len;
} dump_bytes_io_t;
static void
dump_bytes_strategy(void *arg)
{
dump_bytes_io_t *dbi = (dump_bytes_io_t *)arg;
dmu_sendarg_t *dsp = dbi->dbi_dsp;
dsl_dataset_t *ds = dsp->dsa_os->os_dsl_dataset;
ssize_t resid; /* have to get resid to get detailed errno */
ASSERT3U(len % 8, ==, 0);
ASSERT3U(dbi->dbi_len % 8, ==, 0);
fletcher_4_incremental_native(buf, len, &dsp->dsa_zc);
fletcher_4_incremental_native(dbi->dbi_buf, dbi->dbi_len, &dsp->dsa_zc);
dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
(caddr_t)buf, len,
(caddr_t)dbi->dbi_buf, dbi->dbi_len,
0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
mutex_enter(&ds->ds_sendstream_lock);
*dsp->dsa_off += len;
*dsp->dsa_off += dbi->dbi_len;
mutex_exit(&ds->ds_sendstream_lock);
}
static int
dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
{
dump_bytes_io_t dbi;
dbi.dbi_dsp = dsp;
dbi.dbi_buf = buf;
dbi.dbi_len = len;
/*
* The vn_rdwr() call is performed in a taskq to ensure that there is
* always enough stack space to write safely to the target filesystem.
* The ZIO_TYPE_FREE threads are used because there can be a lot of
* them and they are used in vdev_file.c for a similar purpose.
*/
spa_taskq_dispatch_sync(dmu_objset_spa(dsp->dsa_os), ZIO_TYPE_FREE,
ZIO_TASKQ_ISSUE, dump_bytes_strategy, &dbi, TQ_SLEEP);
return (dsp->dsa_err);
}

View File

@ -926,6 +926,31 @@ spa_taskq_dispatch_ent(spa_t *spa, zio_type_t t, zio_taskq_type_t q,
taskq_dispatch_ent(tq, func, arg, flags, ent);
}
/*
* Same as spa_taskq_dispatch_ent() but block on the task until completion.
*/
void
spa_taskq_dispatch_sync(spa_t *spa, zio_type_t t, zio_taskq_type_t q,
task_func_t *func, void *arg, uint_t flags)
{
spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
taskq_t *tq;
taskqid_t id;
ASSERT3P(tqs->stqs_taskq, !=, NULL);
ASSERT3U(tqs->stqs_count, !=, 0);
if (tqs->stqs_count == 1) {
tq = tqs->stqs_taskq[0];
} else {
tq = tqs->stqs_taskq[gethrtime() % tqs->stqs_count];
}
id = taskq_dispatch(tq, func, arg, flags);
if (id)
taskq_wait_id(tq, id);
}
static void
spa_create_zio_taskqs(spa_t *spa)
{