zstream: use zio_compress calls for compression

This is updating zstream to use the zio_compress calls rather than using
its own dispatch. Since that was fairly entangled, some refactoring
included.

Sponsored-by: Klara, Inc.
Sponsored-by: Wasabi Technology, Inc.
Signed-off-by: Rob Norris <rob.norris@klarasystems.com>
This commit is contained in:
Rob Norris 2024-07-05 15:01:57 +10:00 committed by Tony Hutter
parent 5eede0d5fd
commit b4d81b1a6a
2 changed files with 102 additions and 122 deletions

View File

@ -22,6 +22,8 @@
/* /*
* Copyright 2022 Axcient. All rights reserved. * Copyright 2022 Axcient. All rights reserved.
* Use is subject to license terms. * Use is subject to license terms.
*
* Copyright (c) 2024, Klara, Inc.
*/ */
#include <err.h> #include <err.h>
@ -257,58 +259,47 @@ zstream_do_decompress(int argc, char *argv[])
ENTRY e = {.key = key}; ENTRY e = {.key = key};
p = hsearch(e, FIND); p = hsearch(e, FIND);
if (p != NULL) { if (p == NULL) {
zio_decompress_func_t *xfunc = NULL; /*
switch ((enum zio_compress)(intptr_t)p->data) { * Read the contents of the block unaltered
case ZIO_COMPRESS_OFF: */
xfunc = NULL; (void) sfread(buf, payload_size, stdin);
break; break;
case ZIO_COMPRESS_LZJB:
xfunc = lzjb_decompress;
break;
case ZIO_COMPRESS_GZIP_1:
xfunc = gzip_decompress;
break;
case ZIO_COMPRESS_ZLE:
xfunc = zle_decompress;
break;
case ZIO_COMPRESS_LZ4:
xfunc = lz4_decompress_zfs;
break;
case ZIO_COMPRESS_ZSTD:
xfunc = zfs_zstd_decompress;
break;
default:
assert(B_FALSE);
} }
/* /*
* Read and decompress the block * Read and decompress the block
*/ */
enum zio_compress c =
(enum zio_compress)(intptr_t)p->data;
if (c == ZIO_COMPRESS_OFF) {
(void) sfread(buf, payload_size, stdin);
drrw->drr_compressiontype = ZIO_COMPRESS_OFF;
if (verbose)
fprintf(stderr,
"Resetting compression type to "
"off for ino %llu offset %llu\n",
(u_longlong_t)drrw->drr_object,
(u_longlong_t)drrw->drr_offset);
break;
}
char *lzbuf = safe_calloc(payload_size); char *lzbuf = safe_calloc(payload_size);
(void) sfread(lzbuf, payload_size, stdin); (void) sfread(lzbuf, payload_size, stdin);
if (xfunc == NULL) {
memcpy(buf, lzbuf, payload_size); abd_t sabd;
drrw->drr_compressiontype = abd_get_from_buf_struct(&sabd, lzbuf, payload_size);
ZIO_COMPRESS_OFF; int err = zio_decompress_data(c, &sabd, buf,
if (verbose) payload_size, payload_size, NULL);
fprintf(stderr, "Resetting " abd_free(&sabd);
"compression type to off "
"for ino %llu offset " if (err != 0) {
"%llu\n",
(u_longlong_t)
drrw->drr_object,
(u_longlong_t)
drrw->drr_offset);
} else if (0 != xfunc(lzbuf, buf,
payload_size, payload_size, 0)) {
/* /*
* The block must not be compressed, * The block must not be compressed, at least
* at least not with this compression * not with this compression type, possibly
* type, possibly because it gets * because it gets written multiple times in
* written multiple times in this * this stream.
* stream.
*/ */
warnx("decompression failed for " warnx("decompression failed for "
"ino %llu offset %llu", "ino %llu offset %llu",
@ -316,24 +307,16 @@ zstream_do_decompress(int argc, char *argv[])
(u_longlong_t)drrw->drr_offset); (u_longlong_t)drrw->drr_offset);
memcpy(buf, lzbuf, payload_size); memcpy(buf, lzbuf, payload_size);
} else if (verbose) { } else if (verbose) {
drrw->drr_compressiontype = drrw->drr_compressiontype = ZIO_COMPRESS_OFF;
ZIO_COMPRESS_OFF; fprintf(stderr, "successfully decompressed "
fprintf(stderr, "successfully " "ino %llu offset %llu\n",
"decompressed ino %llu "
"offset %llu\n",
(u_longlong_t)drrw->drr_object, (u_longlong_t)drrw->drr_object,
(u_longlong_t)drrw->drr_offset); (u_longlong_t)drrw->drr_offset);
} else { } else {
drrw->drr_compressiontype = drrw->drr_compressiontype = ZIO_COMPRESS_OFF;
ZIO_COMPRESS_OFF;
} }
free(lzbuf); free(lzbuf);
} else {
/*
* Read the contents of the block unaltered
*/
(void) sfread(buf, payload_size, stdin);
}
break; break;
} }

View File

@ -22,10 +22,9 @@
/* /*
* Copyright 2022 Axcient. All rights reserved. * Copyright 2022 Axcient. All rights reserved.
* Use is subject to license terms. * Use is subject to license terms.
*/ *
/*
* Copyright (c) 2022 by Delphix. All rights reserved. * Copyright (c) 2022 by Delphix. All rights reserved.
* Copyright (c) 2024, Klara, Inc.
*/ */
#include <err.h> #include <err.h>
@ -72,7 +71,7 @@ zstream_do_recompress(int argc, char *argv[])
dmu_replay_record_t *drr = &thedrr; dmu_replay_record_t *drr = &thedrr;
zio_cksum_t stream_cksum; zio_cksum_t stream_cksum;
int c; int c;
int level = -1; int level = 0;
while ((c = getopt(argc, argv, "l:")) != -1) { while ((c = getopt(argc, argv, "l:")) != -1) {
switch (c) { switch (c) {
@ -97,34 +96,22 @@ zstream_do_recompress(int argc, char *argv[])
if (argc != 1) if (argc != 1)
zstream_usage(); zstream_usage();
int type = 0;
zio_compress_info_t *cinfo = NULL; enum zio_compress ctype;
if (0 == strcmp(argv[0], "off")) { if (strcmp(argv[0], "off") == 0) {
type = ZIO_COMPRESS_OFF; ctype = ZIO_COMPRESS_OFF;
cinfo = &zio_compress_table[type];
} else if (0 == strcmp(argv[0], "inherit") ||
0 == strcmp(argv[0], "empty") ||
0 == strcmp(argv[0], "on")) {
// Fall through to invalid compression type case
} else { } else {
for (int i = 0; i < ZIO_COMPRESS_FUNCTIONS; i++) { for (ctype = 0; ctype < ZIO_COMPRESS_FUNCTIONS; ctype++) {
if (0 == strcmp(zio_compress_table[i].ci_name, if (strcmp(argv[0],
argv[0])) { zio_compress_table[ctype].ci_name) == 0)
cinfo = &zio_compress_table[i];
type = i;
break; break;
} }
} if (ctype == ZIO_COMPRESS_FUNCTIONS ||
} zio_compress_table[ctype].ci_compress == NULL) {
if (cinfo == NULL) {
fprintf(stderr, "Invalid compression type %s.\n", fprintf(stderr, "Invalid compression type %s.\n",
argv[0]); argv[0]);
exit(2); exit(2);
} }
if (cinfo->ci_compress == NULL) {
type = 0;
cinfo = &zio_compress_table[0];
} }
if (isatty(STDIN_FILENO)) { if (isatty(STDIN_FILENO)) {
@ -135,6 +122,7 @@ zstream_do_recompress(int argc, char *argv[])
exit(1); exit(1);
} }
abd_init();
fletcher_4_init(); fletcher_4_init();
zio_init(); zio_init();
zstd_init(); zstd_init();
@ -247,53 +235,60 @@ zstream_do_recompress(int argc, char *argv[])
(void) sfread(buf, payload_size, stdin); (void) sfread(buf, payload_size, stdin);
break; break;
} }
if (drrw->drr_compressiontype >= enum zio_compress dtype = drrw->drr_compressiontype;
ZIO_COMPRESS_FUNCTIONS) { if (dtype >= ZIO_COMPRESS_FUNCTIONS) {
fprintf(stderr, "Invalid compression type in " fprintf(stderr, "Invalid compression type in "
"stream: %d\n", drrw->drr_compressiontype); "stream: %d\n", dtype);
exit(3); exit(3);
} }
zio_compress_info_t *dinfo = if (zio_compress_table[dtype].ci_decompress == NULL)
&zio_compress_table[drrw->drr_compressiontype]; dtype = ZIO_COMPRESS_OFF;
/* Set up buffers to minimize memcpys */ /* Set up buffers to minimize memcpys */
char *cbuf, *dbuf; char *cbuf, *dbuf;
if (cinfo->ci_compress == NULL) if (ctype == ZIO_COMPRESS_OFF)
dbuf = buf; dbuf = buf;
else else
dbuf = safe_calloc(bufsz); dbuf = safe_calloc(bufsz);
if (dinfo->ci_decompress == NULL) if (dtype == ZIO_COMPRESS_OFF)
cbuf = dbuf; cbuf = dbuf;
else else
cbuf = safe_calloc(payload_size); cbuf = safe_calloc(payload_size);
/* Read and decompress the payload */ /* Read and decompress the payload */
(void) sfread(cbuf, payload_size, stdin); (void) sfread(cbuf, payload_size, stdin);
if (dinfo->ci_decompress != NULL) { if (dtype != ZIO_COMPRESS_OFF) {
if (0 != dinfo->ci_decompress(cbuf, dbuf, abd_t cabd;
payload_size, MIN(bufsz, abd_get_from_buf_struct(&cabd,
drrw->drr_logical_size), dinfo->ci_level)) { cbuf, payload_size);
if (zio_decompress_data(dtype, &cabd, dbuf,
payload_size,
MIN(bufsz, drrw->drr_logical_size),
NULL) != 0) {
warnx("decompression type %d failed " warnx("decompression type %d failed "
"for ino %llu offset %llu", "for ino %llu offset %llu",
type, dtype,
(u_longlong_t)drrw->drr_object, (u_longlong_t)drrw->drr_object,
(u_longlong_t)drrw->drr_offset); (u_longlong_t)drrw->drr_offset);
exit(4); exit(4);
} }
payload_size = drrw->drr_logical_size; payload_size = drrw->drr_logical_size;
abd_free(&cabd);
free(cbuf); free(cbuf);
} }
/* Recompress the payload */ /* Recompress the payload */
if (cinfo->ci_compress != NULL) { if (ctype != ZIO_COMPRESS_OFF) {
payload_size = P2ROUNDUP(cinfo->ci_compress( abd_t dabd;
dbuf, buf, drrw->drr_logical_size, abd_get_from_buf_struct(&dabd,
MIN(payload_size, bufsz), (level == -1 ? dbuf, drrw->drr_logical_size);
cinfo->ci_level : level)), payload_size = P2ROUNDUP(zio_compress_data(
ctype, &dabd, (void **)&buf,
drrw->drr_logical_size, level),
SPA_MINBLOCKSIZE); SPA_MINBLOCKSIZE);
if (payload_size != drrw->drr_logical_size) { if (payload_size != drrw->drr_logical_size) {
drrw->drr_compressiontype = type; drrw->drr_compressiontype = ctype;
drrw->drr_compressed_size = drrw->drr_compressed_size =
payload_size; payload_size;
} else { } else {
@ -301,9 +296,10 @@ zstream_do_recompress(int argc, char *argv[])
drrw->drr_compressiontype = 0; drrw->drr_compressiontype = 0;
drrw->drr_compressed_size = 0; drrw->drr_compressed_size = 0;
} }
abd_free(&dabd);
free(dbuf); free(dbuf);
} else { } else {
drrw->drr_compressiontype = type; drrw->drr_compressiontype = ctype;
drrw->drr_compressed_size = 0; drrw->drr_compressed_size = 0;
} }
break; break;
@ -371,6 +367,7 @@ zstream_do_recompress(int argc, char *argv[])
fletcher_4_fini(); fletcher_4_fini();
zio_fini(); zio_fini();
zstd_fini(); zstd_fini();
abd_fini();
return (0); return (0);
} }