Improve zfs send performance by bypassing the ARC

When doing a zfs send on a dataset with small recordsize (e.g. 8K),
performance is dominated by the per-block overheads.  This is especially
true with `zfs send --compressed`, which further reduces the amount of
data sent, for the same number of blocks.  Several threads are involved,
but the limiting factor is the `send_prefetch` thread, which is 100% on
CPU.

The main job of the `send_prefetch` thread is to issue zio's for the
data that will be needed by the main thread.  It does this by calling
`arc_read(ARC_FLAG_PREFETCH)`.  This has an immediate cost of creating
an arc_hdr, which takes around 14% of one CPU.  It also induces later
costs by other threads:

 * Since the data was only prefetched, dmu_send()->dmu_dump_write() will
   need to call arc_read() again to get the data.  This will have to
   look up the arc_hdr in the hash table and copy the data from the
   scatter ABD in the arc_hdr to a linear ABD in arc_buf.  This takes
   27% of one CPU.

 * dmu_dump_write() needs to arc_buf_destroy()  This takes 11% of one
   CPU.

 * arc_adjust() will need to evict this arc_hdr, taking about 50% of one
   CPU.

All of these costs can be avoided by bypassing the ARC if the data is
not already cached.  This commit changes `zfs send` to check for the
data in the ARC, and if it is not found then we directly call
`zio_read()`, reading the data into a linear ABD which is used by
dmu_dump_write() directly.

The performance improvement is best expressed in terms of how many
blocks can be processed by `zfs send` in one second.  This change
increases the metric by 50%, from ~100,000 to ~150,000.  When the amount
of data per block is small (e.g. 2KB), there is a corresponding
reduction in the elapsed time of `zfs send >/dev/null` (from 86 minutes
to 58 minutes in this test case).

In addition to improving the performance of `zfs send`, this change
makes `zfs send` not pollute the ARC cache.  In most cases the data will
not be reused, so this allows us to keep caching useful data in the MRU
(hit-once) part of the ARC.

Reviewed-by: Paul Dagnelie <pcd@delphix.com>
Reviewed-by: Serapheim Dimitropoulos <serapheim@delphix.com>
Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Signed-off-by: Matthew Ahrens <mahrens@delphix.com>
Closes #10067
This commit is contained in:
Matthew Ahrens 2020-03-10 10:51:04 -07:00 committed by GitHub
parent 9be70c3784
commit 1dc32a67e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 235 additions and 151 deletions

View File

@ -146,6 +146,12 @@ typedef enum arc_flags
ARC_FLAG_COMPRESSED_ARC = 1 << 20,
ARC_FLAG_SHARED_DATA = 1 << 21,
/*
* Fail this arc_read() (with ENOENT) if the data is not already present
* in cache.
*/
ARC_FLAG_CACHED_ONLY = 1 << 22,
/*
* The arc buffer's compression mode is stored in the top 7 bits of the
* flags field, so these dummy flags are included so that MDB can

View File

@ -554,6 +554,7 @@ typedef struct arc_stats {
kstat_named_t arcstat_need_free;
kstat_named_t arcstat_sys_free;
kstat_named_t arcstat_raw_size;
kstat_named_t arcstat_cached_only_in_progress;
} arc_stats_t;
typedef enum free_memory_reason_t {

View File

@ -548,7 +548,8 @@ arc_stats_t arc_stats = {
{ "demand_hit_prescient_prefetch", KSTAT_DATA_UINT64 },
{ "arc_need_free", KSTAT_DATA_UINT64 },
{ "arc_sys_free", KSTAT_DATA_UINT64 },
{ "arc_raw_size", KSTAT_DATA_UINT64 }
{ "arc_raw_size", KSTAT_DATA_UINT64 },
{ "cached_only_in_progress", KSTAT_DATA_UINT64 },
};
#define ARCSTAT_MAX(stat, val) { \
@ -5563,6 +5564,13 @@ top:
if (HDR_IO_IN_PROGRESS(hdr)) {
zio_t *head_zio = hdr->b_l1hdr.b_acb->acb_zio_head;
if (*arc_flags & ARC_FLAG_CACHED_ONLY) {
mutex_exit(hash_lock);
ARCSTAT_BUMP(arcstat_cached_only_in_progress);
rc = SET_ERROR(ENOENT);
goto out;
}
ASSERT3P(head_zio, !=, NULL);
if ((hdr->b_flags & ARC_FLAG_PRIO_ASYNC_READ) &&
priority == ZIO_PRIORITY_SYNC_READ) {
@ -5698,12 +5706,21 @@ top:
uint64_t size;
abd_t *hdr_abd;
if (*arc_flags & ARC_FLAG_CACHED_ONLY) {
rc = SET_ERROR(ENOENT);
if (hash_lock != NULL)
mutex_exit(hash_lock);
goto out;
}
/*
* Gracefully handle a damaged logical block size as a
* checksum error.
*/
if (lsize > spa_maxblocksize(spa)) {
rc = SET_ERROR(ECKSUM);
if (hash_lock != NULL)
mutex_exit(hash_lock);
goto out;
}

View File

@ -156,8 +156,15 @@ struct send_range {
union {
struct srd {
dmu_object_type_t obj_type;
uint32_t datablksz;
uint32_t datablksz; // logical size
uint32_t datasz; // payload size
blkptr_t bp;
arc_buf_t *abuf;
abd_t *abd;
kmutex_t lock;
kcondvar_t cv;
boolean_t io_outstanding;
int io_err;
} data;
struct srh {
uint32_t datablksz;
@ -222,6 +229,20 @@ range_free(struct send_range *range)
size_t size = sizeof (dnode_phys_t) *
(range->sru.object.dnp->dn_extra_slots + 1);
kmem_free(range->sru.object.dnp, size);
} else if (range->type == DATA) {
mutex_enter(&range->sru.data.lock);
while (range->sru.data.io_outstanding)
cv_wait(&range->sru.data.cv, &range->sru.data.lock);
if (range->sru.data.abd != NULL)
abd_free(range->sru.data.abd);
if (range->sru.data.abuf != NULL) {
arc_buf_destroy(range->sru.data.abuf,
&range->sru.data.abuf);
}
mutex_exit(&range->sru.data.lock);
cv_destroy(&range->sru.data.cv);
mutex_destroy(&range->sru.data.lock);
}
kmem_free(range, sizeof (*range));
}
@ -830,7 +851,7 @@ dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,
}
static boolean_t
send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
send_do_embed(const blkptr_t *bp, uint64_t featureflags)
{
if (!BP_IS_EMBEDDED(bp))
return (B_FALSE);
@ -839,7 +860,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
* Compression function must be legacy, or explicitly enabled.
*/
if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LZ4)))
!(featureflags & DMU_BACKUP_FEATURE_LZ4)))
return (B_FALSE);
/*
@ -847,7 +868,7 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
*/
switch (BPE_GET_ETYPE(bp)) {
case BP_EMBEDDED_TYPE_DATA:
if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
return (B_TRUE);
break;
default:
@ -858,8 +879,8 @@ send_do_embed(dmu_send_cookie_t *dscp, const blkptr_t *bp)
/*
* This function actually handles figuring out what kind of record needs to be
* dumped, reading the data (which has hopefully been prefetched), and calling
* the appropriate helper function.
* dumped, and calling the appropriate helper function. In most cases,
* the data has already been read by send_reader_thread().
*/
static int
do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
@ -894,7 +915,6 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
blkptr_t *bp = &srdp->bp;
spa_t *spa =
dmu_objset_spa(dscp->dsc_os);
arc_buf_t *abuf = NULL;
ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
@ -914,6 +934,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
zb.zb_level = 0;
zb.zb_blkid = range->start_blkid;
arc_buf_t *abuf = NULL;
if (!dscp->dsc_dso->dso_dryrun && arc_read(NULL, spa,
bp, arc_getbuf_func, &abuf, ZIO_PRIORITY_ASYNC_READ,
zioflags, &aflags, &zb) != 0)
@ -925,7 +946,7 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
arc_buf_destroy(abuf, &abuf);
return (err);
}
if (send_do_embed(dscp, bp)) {
if (send_do_embed(bp, dscp->dsc_featureflags)) {
err = dump_write_embedded(dscp, range->object,
range->start_blkid * srdp->datablksz,
srdp->datablksz, bp);
@ -936,70 +957,24 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
range->start_blkid * srdp->datablksz >=
dscp->dsc_resume_offset));
/* it's a level-0 block of a regular object */
arc_flags_t aflags = ARC_FLAG_WAIT;
uint64_t offset;
/*
* If we have large blocks stored on disk but the send flags
* don't allow us to send large blocks, we split the data from
* the arc buf into chunks.
*/
boolean_t split_large_blocks =
srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
/*
* Raw sends require that we always get raw data as it exists
* on disk, so we assert that we are not splitting blocks here.
*/
boolean_t request_raw =
(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
/*
* We should only request compressed data from the ARC if all
* the following are true:
* - stream compression was requested
* - we aren't splitting large blocks into smaller chunks
* - the data won't need to be byteswapped before sending
* - this isn't an embedded block
* - this isn't metadata (if receiving on a different endian
* system it can be byteswapped more easily)
*/
boolean_t request_compressed =
(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
!split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
!BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
IMPLY(request_raw, !split_large_blocks);
IMPLY(request_raw, BP_IS_PROTECTED(bp));
if (!dscp->dsc_dso->dso_dryrun) {
enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));
if (request_raw)
zioflags |= ZIO_FLAG_RAW;
else if (request_compressed)
zioflags |= ZIO_FLAG_RAW_COMPRESS;
zbookmark_phys_t zb;
zb.zb_objset = dmu_objset_id(dscp->dsc_os);
zb.zb_object = range->object;
zb.zb_level = 0;
zb.zb_blkid = range->start_blkid;
err = arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, &zb);
}
mutex_enter(&srdp->lock);
while (srdp->io_outstanding)
cv_wait(&srdp->cv, &srdp->lock);
err = srdp->io_err;
mutex_exit(&srdp->lock);
if (err != 0) {
if (zfs_send_corrupt_data &&
!dscp->dsc_dso->dso_dryrun) {
/* Send a block filled with 0x"zfs badd bloc" */
abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
srdp->datablksz);
/*
* Send a block filled with 0x"zfs badd bloc"
*/
srdp->abuf = arc_alloc_buf(spa, &srdp->abuf,
ARC_BUFC_DATA, srdp->datablksz);
uint64_t *ptr;
for (ptr = abuf->b_data;
(char *)ptr < (char *)abuf->b_data +
for (ptr = srdp->abuf->b_data;
(char *)ptr < (char *)srdp->abuf->b_data +
srdp->datablksz; ptr++)
*ptr = 0x2f5baddb10cULL;
} else {
@ -1007,41 +982,47 @@ do_dump(dmu_send_cookie_t *dscp, struct send_range *range)
}
}
offset = range->start_blkid * srdp->datablksz;
ASSERT(dscp->dsc_dso->dso_dryrun ||
srdp->abuf != NULL || srdp->abd != NULL);
if (split_large_blocks) {
ASSERT0(arc_is_encrypted(abuf));
ASSERT3U(arc_get_compression(abuf), ==,
ZIO_COMPRESS_OFF);
char *buf = abuf->b_data;
uint64_t offset = range->start_blkid * srdp->datablksz;
char *data = NULL;
if (srdp->abd != NULL) {
data = abd_to_buf(srdp->abd);
ASSERT3P(srdp->abuf, ==, NULL);
} else if (srdp->abuf != NULL) {
data = srdp->abuf->b_data;
}
/*
* If we have large blocks stored on disk but the send flags
* don't allow us to send large blocks, we split the data from
* the arc buf into chunks.
*/
if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
!(dscp->dsc_featureflags &
DMU_BACKUP_FEATURE_LARGE_BLOCKS)) {
while (srdp->datablksz > 0 && err == 0) {
int n = MIN(srdp->datablksz,
SPA_OLD_MAXBLOCKSIZE);
err = dmu_dump_write(dscp, srdp->obj_type,
range->object, offset, n, n, NULL, buf);
range->object, offset, n, n, NULL, data);
offset += n;
buf += n;
/*
* When doing dry run, data==NULL is used as a
* sentinel value by
* dmu_dump_write()->dump_record().
*/
if (data != NULL)
data += n;
srdp->datablksz -= n;
}
} else {
int psize;
if (abuf != NULL) {
psize = arc_buf_size(abuf);
if (arc_get_compression(abuf) !=
ZIO_COMPRESS_OFF) {
ASSERT3S(psize, ==, BP_GET_PSIZE(bp));
}
} else if (!request_compressed) {
psize = srdp->datablksz;
} else {
psize = BP_GET_PSIZE(bp);
}
err = dmu_dump_write(dscp, srdp->obj_type,
range->object, offset, srdp->datablksz, psize, bp,
(abuf == NULL ? NULL : abuf->b_data));
range->object, offset,
srdp->datablksz, srdp->datasz, bp, data);
}
if (abuf != NULL)
arc_buf_destroy(abuf, &abuf);
return (err);
}
case HOLE: {
@ -1086,6 +1067,14 @@ range_alloc(enum type type, uint64_t object, uint64_t start_blkid,
range->start_blkid = start_blkid;
range->end_blkid = end_blkid;
range->eos_marker = eos;
if (type == DATA) {
range->sru.data.abd = NULL;
range->sru.data.abuf = NULL;
mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL);
range->sru.data.io_outstanding = 0;
range->sru.data.io_err = 0;
}
return (range);
}
@ -1596,19 +1585,115 @@ send_merge_thread(void *arg)
thread_exit();
}
struct send_prefetch_thread_arg {
struct send_reader_thread_arg {
struct send_merge_thread_arg *smta;
bqueue_t q;
boolean_t cancel;
boolean_t issue_prefetches;
boolean_t issue_reads;
uint64_t featureflags;
int error;
};
static void
dmu_send_read_done(zio_t *zio)
{
struct send_range *range = zio->io_private;
mutex_enter(&range->sru.data.lock);
if (zio->io_error != 0) {
abd_free(range->sru.data.abd);
range->sru.data.abd = NULL;
range->sru.data.io_err = zio->io_error;
}
ASSERT(range->sru.data.io_outstanding);
range->sru.data.io_outstanding = B_FALSE;
cv_broadcast(&range->sru.data.cv);
mutex_exit(&range->sru.data.lock);
}
static void
issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range)
{
struct srd *srdp = &range->sru.data;
blkptr_t *bp = &srdp->bp;
objset_t *os = srta->smta->os;
ASSERT3U(range->type, ==, DATA);
ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
/*
* If we have large blocks stored on disk but
* the send flags don't allow us to send large
* blocks, we split the data from the arc buf
* into chunks.
*/
boolean_t split_large_blocks =
srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&
!(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
/*
* We should only request compressed data from the ARC if all
* the following are true:
* - stream compression was requested
* - we aren't splitting large blocks into smaller chunks
* - the data won't need to be byteswapped before sending
* - this isn't an embedded block
* - this isn't metadata (if receiving on a different endian
* system it can be byteswapped more easily)
*/
boolean_t request_compressed =
(srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
!split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
!BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
if (srta->featureflags & DMU_BACKUP_FEATURE_RAW)
zioflags |= ZIO_FLAG_RAW;
else if (request_compressed)
zioflags |= ZIO_FLAG_RAW_COMPRESS;
srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ?
BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp);
if (!srta->issue_reads)
return;
if (BP_IS_REDACTED(bp))
return;
if (send_do_embed(bp, srta->featureflags))
return;
zbookmark_phys_t zb = {
.zb_objset = dmu_objset_id(os),
.zb_object = range->object,
.zb_level = 0,
.zb_blkid = range->start_blkid,
};
arc_flags_t aflags = ARC_FLAG_CACHED_ONLY;
int arc_err = arc_read(NULL, os->os_spa, bp,
arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ,
zioflags, &aflags, &zb);
/*
* If the data is not already cached in the ARC, we read directly
* from zio. This avoids the performance overhead of adding a new
* entry to the ARC, and we also avoid polluting the ARC cache with
* data that is not likely to be used in the future.
*/
if (arc_err != 0) {
srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE);
srdp->io_outstanding = B_TRUE;
zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd,
srdp->datasz, dmu_send_read_done, range,
ZIO_PRIORITY_ASYNC_READ, zioflags, &zb));
}
}
/*
* Create a new record with the given values.
*/
static void
enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn,
uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)
{
enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :
@ -1629,18 +1714,7 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
range->sru.data.datablksz = datablksz;
range->sru.data.obj_type = dn->dn_type;
range->sru.data.bp = *bp;
if (spta->issue_prefetches) {
zbookmark_phys_t zb = {0};
zb.zb_objset = dmu_objset_id(dn->dn_objset);
zb.zb_object = dn->dn_object;
zb.zb_level = 0;
zb.zb_blkid = blkid;
arc_flags_t aflags = ARC_FLAG_NOWAIT |
ARC_FLAG_PREFETCH;
(void) arc_read(NULL, dn->dn_objset->os_spa, bp, NULL,
NULL, ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
ZIO_FLAG_SPECULATIVE, &aflags, &zb);
}
issue_data_read(srta, range);
break;
case REDACT:
range->sru.redact.datablksz = datablksz;
@ -1659,12 +1733,12 @@ enqueue_range(struct send_prefetch_thread_arg *spta, bqueue_t *q, dnode_t *dn,
* it issues prefetches for the data we need to send.
*/
static void
send_prefetch_thread(void *arg)
send_reader_thread(void *arg)
{
struct send_prefetch_thread_arg *spta = arg;
struct send_merge_thread_arg *smta = spta->smta;
struct send_reader_thread_arg *srta = arg;
struct send_merge_thread_arg *smta = srta->smta;
bqueue_t *inq = &smta->q;
bqueue_t *outq = &spta->q;
bqueue_t *outq = &srta->q;
objset_t *os = smta->os;
fstrans_cookie_t cookie = spl_fstrans_mark();
struct send_range *range = bqueue_dequeue(inq);
@ -1680,30 +1754,14 @@ send_prefetch_thread(void *arg)
*/
uint64_t last_obj = UINT64_MAX;
uint64_t last_obj_exists = B_TRUE;
while (!range->eos_marker && !spta->cancel && smta->error == 0 &&
while (!range->eos_marker && !srta->cancel && smta->error == 0 &&
err == 0) {
switch (range->type) {
case DATA: {
zbookmark_phys_t zb;
zb.zb_objset = dmu_objset_id(os);
zb.zb_object = range->object;
zb.zb_level = 0;
zb.zb_blkid = range->start_blkid;
ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);
if (!BP_IS_REDACTED(&range->sru.data.bp) &&
spta->issue_prefetches &&
!BP_IS_EMBEDDED(&range->sru.data.bp)) {
arc_flags_t aflags = ARC_FLAG_NOWAIT |
ARC_FLAG_PREFETCH;
(void) arc_read(NULL, os->os_spa,
&range->sru.data.bp, NULL, NULL,
ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL |
ZIO_FLAG_SPECULATIVE, &aflags, &zb);
}
case DATA:
issue_data_read(srta, range);
bqueue_enqueue(outq, range, range->sru.data.datablksz);
range = get_next_range_nofree(inq, range);
break;
}
case HOLE:
case OBJECT:
case OBJECT_RANGE:
@ -1805,7 +1863,7 @@ send_prefetch_thread(void *arg)
datablksz);
uint64_t nblks = (offset / datablksz) -
blkid;
enqueue_range(spta, outq, dn, blkid,
enqueue_range(srta, outq, dn, blkid,
nblks, NULL, datablksz);
blkid += nblks;
}
@ -1816,7 +1874,7 @@ send_prefetch_thread(void *arg)
if (err != 0)
break;
ASSERT(!BP_IS_HOLE(&bp));
enqueue_range(spta, outq, dn, blkid, 1, &bp,
enqueue_range(srta, outq, dn, blkid, 1, &bp,
datablksz);
}
rw_exit(&dn->dn_struct_rwlock);
@ -1825,11 +1883,11 @@ send_prefetch_thread(void *arg)
}
}
}
if (spta->cancel || err != 0) {
if (srta->cancel || err != 0) {
smta->cancel = B_TRUE;
spta->error = err;
srta->error = err;
} else if (smta->error != 0) {
spta->error = smta->error;
srta->error = smta->error;
}
while (!range->eos_marker)
range = get_next_range(inq, range);
@ -2052,15 +2110,17 @@ setup_merge_thread(struct send_merge_thread_arg *smt_arg,
}
static void
setup_prefetch_thread(struct send_prefetch_thread_arg *spt_arg,
struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg)
setup_reader_thread(struct send_reader_thread_arg *srt_arg,
struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg,
uint64_t featureflags)
{
VERIFY0(bqueue_init(&spt_arg->q, zfs_send_queue_ff,
VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff,
MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
offsetof(struct send_range, ln)));
spt_arg->smta = smt_arg;
spt_arg->issue_prefetches = !dspp->dso->dso_dryrun;
(void) thread_create(NULL, 0, send_prefetch_thread, spt_arg, 0,
srt_arg->smta = smt_arg;
srt_arg->issue_reads = !dspp->dso->dso_dryrun;
srt_arg->featureflags = featureflags;
(void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0,
curproc, TS_RUN, minclsyspri);
}
@ -2265,7 +2325,7 @@ dmu_send_impl(struct dmu_send_params *dspp)
struct send_thread_arg *to_arg;
struct redact_list_thread_arg *rlt_arg;
struct send_merge_thread_arg *smt_arg;
struct send_prefetch_thread_arg *spt_arg;
struct send_reader_thread_arg *srt_arg;
struct send_range *range;
redaction_list_t *from_rl = NULL;
redaction_list_t *redact_rl = NULL;
@ -2348,7 +2408,7 @@ dmu_send_impl(struct dmu_send_params *dspp)
to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);
rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);
smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);
spt_arg = kmem_zalloc(sizeof (*spt_arg), KM_SLEEP);
srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP);
drr = create_begin_record(dspp, os, featureflags);
dssp = setup_send_progress(dspp);
@ -2457,12 +2517,12 @@ dmu_send_impl(struct dmu_send_params *dspp)
setup_from_thread(from_arg, from_rl, dssp);
setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);
setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);
setup_prefetch_thread(spt_arg, dspp, smt_arg);
setup_reader_thread(srt_arg, dspp, smt_arg, featureflags);
range = bqueue_dequeue(&spt_arg->q);
range = bqueue_dequeue(&srt_arg->q);
while (err == 0 && !range->eos_marker) {
err = do_dump(&dsc, range);
range = get_next_range(&spt_arg->q, range);
range = get_next_range(&srt_arg->q, range);
if (issig(JUSTLOOKING) && issig(FORREAL))
err = SET_ERROR(EINTR);
}
@ -2474,22 +2534,22 @@ dmu_send_impl(struct dmu_send_params *dspp)
* pending records before exiting.
*/
if (err != 0) {
spt_arg->cancel = B_TRUE;
srt_arg->cancel = B_TRUE;
while (!range->eos_marker) {
range = get_next_range(&spt_arg->q, range);
range = get_next_range(&srt_arg->q, range);
}
}
range_free(range);
bqueue_destroy(&spt_arg->q);
bqueue_destroy(&srt_arg->q);
bqueue_destroy(&smt_arg->q);
if (dspp->redactbook != NULL)
bqueue_destroy(&rlt_arg->q);
bqueue_destroy(&to_arg->q);
bqueue_destroy(&from_arg->q);
if (err == 0 && spt_arg->error != 0)
err = spt_arg->error;
if (err == 0 && srt_arg->error != 0)
err = srt_arg->error;
if (err != 0)
goto out;
@ -2532,7 +2592,7 @@ out:
kmem_free(to_arg, sizeof (*to_arg));
kmem_free(rlt_arg, sizeof (*rlt_arg));
kmem_free(smt_arg, sizeof (*smt_arg));
kmem_free(spt_arg, sizeof (*spt_arg));
kmem_free(srt_arg, sizeof (*srt_arg));
dsl_dataset_long_rele(to_ds, FTAG);
if (from_rl != NULL) {