diff --git a/include/sys/spa_impl.h b/include/sys/spa_impl.h index 1b12b4e3a8..47dfe432ee 100644 --- a/include/sys/spa_impl.h +++ b/include/sys/spa_impl.h @@ -250,6 +250,9 @@ extern char *spa_config_path; extern void spa_taskq_dispatch_ent(spa_t *spa, zio_type_t t, zio_taskq_type_t q, task_func_t *func, void *arg, uint_t flags, taskq_ent_t *ent); +extern void spa_taskq_dispatch_sync(spa_t *, zio_type_t t, zio_taskq_type_t q, + task_func_t *func, void *arg, uint_t flags); + #ifdef __cplusplus } diff --git a/include/sys/zfs_context.h b/include/sys/zfs_context.h index a23bfdcf82..0b24216b59 100644 --- a/include/sys/zfs_context.h +++ b/include/sys/zfs_context.h @@ -409,6 +409,7 @@ extern int taskq_empty_ent(taskq_ent_t *); extern void taskq_init_ent(taskq_ent_t *); extern void taskq_destroy(taskq_t *); extern void taskq_wait(taskq_t *); +extern void taskq_wait_id(taskq_t *, taskqid_t); extern int taskq_member(taskq_t *, kthread_t *); extern int taskq_cancel_id(taskq_t *, taskqid_t); extern void system_taskq_init(void); diff --git a/lib/libzpool/taskq.c b/lib/libzpool/taskq.c index 64e214205e..96c0d5c2be 100644 --- a/lib/libzpool/taskq.c +++ b/lib/libzpool/taskq.c @@ -211,6 +211,12 @@ taskq_wait(taskq_t *tq) mutex_exit(&tq->tq_lock); } +void +taskq_wait_id(taskq_t *tq, taskqid_t id) +{ + taskq_wait(tq); +} + static void taskq_thread(void *arg) { diff --git a/module/zfs/dmu_send.c b/module/zfs/dmu_send.c index 921c3d76f4..b2c6bfe2b8 100644 --- a/module/zfs/dmu_send.c +++ b/module/zfs/dmu_send.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -53,21 +54,48 @@ int zfs_send_corrupt_data = B_FALSE; static char *dmu_recv_tag = "dmu_recv_tag"; -static int -dump_bytes(dmu_sendarg_t *dsp, void *buf, int len) +typedef struct dump_bytes_io { + dmu_sendarg_t *dbi_dsp; + void *dbi_buf; + int dbi_len; +} dump_bytes_io_t; + +static void +dump_bytes_strategy(void *arg) { + dump_bytes_io_t *dbi = (dump_bytes_io_t *)arg; + dmu_sendarg_t *dsp = dbi->dbi_dsp; dsl_dataset_t *ds = dsp->dsa_os->os_dsl_dataset; ssize_t resid; /* have to get resid to get detailed errno */ - ASSERT3U(len % 8, ==, 0); + ASSERT3U(dbi->dbi_len % 8, ==, 0); - fletcher_4_incremental_native(buf, len, &dsp->dsa_zc); + fletcher_4_incremental_native(dbi->dbi_buf, dbi->dbi_len, &dsp->dsa_zc); dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp, - (caddr_t)buf, len, + (caddr_t)dbi->dbi_buf, dbi->dbi_len, 0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid); mutex_enter(&ds->ds_sendstream_lock); - *dsp->dsa_off += len; + *dsp->dsa_off += dbi->dbi_len; mutex_exit(&ds->ds_sendstream_lock); +} + +static int +dump_bytes(dmu_sendarg_t *dsp, void *buf, int len) +{ + dump_bytes_io_t dbi; + + dbi.dbi_dsp = dsp; + dbi.dbi_buf = buf; + dbi.dbi_len = len; + + /* + * The vn_rdwr() call is performed in a taskq to ensure that there is + * always enough stack space to write safely to the target filesystem. + * The ZIO_TYPE_FREE threads are used because there can be a lot of + * them and they are used in vdev_file.c for a similar purpose. + */ + spa_taskq_dispatch_sync(dmu_objset_spa(dsp->dsa_os), ZIO_TYPE_FREE, + ZIO_TASKQ_ISSUE, dump_bytes_strategy, &dbi, TQ_SLEEP); return (dsp->dsa_err); } diff --git a/module/zfs/spa.c b/module/zfs/spa.c index e022c32586..82ee445ab4 100644 --- a/module/zfs/spa.c +++ b/module/zfs/spa.c @@ -926,6 +926,31 @@ spa_taskq_dispatch_ent(spa_t *spa, zio_type_t t, zio_taskq_type_t q, taskq_dispatch_ent(tq, func, arg, flags, ent); } +/* + * Same as spa_taskq_dispatch_ent() but block on the task until completion. + */ +void +spa_taskq_dispatch_sync(spa_t *spa, zio_type_t t, zio_taskq_type_t q, + task_func_t *func, void *arg, uint_t flags) +{ + spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q]; + taskq_t *tq; + taskqid_t id; + + ASSERT3P(tqs->stqs_taskq, !=, NULL); + ASSERT3U(tqs->stqs_count, !=, 0); + + if (tqs->stqs_count == 1) { + tq = tqs->stqs_taskq[0]; + } else { + tq = tqs->stqs_taskq[gethrtime() % tqs->stqs_count]; + } + + id = taskq_dispatch(tq, func, arg, flags); + if (id) + taskq_wait_id(tq, id); +} + static void spa_create_zio_taskqs(spa_t *spa) {