diff --git a/zfs/lib/libzpool/dmu_tx.c b/zfs/lib/libzpool/dmu_tx.c index 18a640d6d0..42ce01d23c 100644 --- a/zfs/lib/libzpool/dmu_tx.c +++ b/zfs/lib/libzpool/dmu_tx.c @@ -48,6 +48,8 @@ dmu_tx_create_dd(dsl_dir_t *dd) tx->tx_pool = dd->dd_pool; list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t), offsetof(dmu_tx_hold_t, txh_node)); + list_create(&tx->tx_callbacks, sizeof (dmu_callback_t), + offsetof(dmu_callback_t, dcb_node)); #ifdef ZFS_DEBUG refcount_create(&tx->tx_space_written); refcount_create(&tx->tx_space_freed); @@ -1020,6 +1022,9 @@ dmu_tx_commit(dmu_tx_t *tx) if (tx->tx_tempreserve_cookie) dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx); + if (!list_is_empty(&tx->tx_callbacks)) + txg_rele_commit_cb(&tx->tx_txgh, &tx->tx_callbacks); + if (tx->tx_anyobj == FALSE) txg_rele_to_sync(&tx->tx_txgh); list_destroy(&tx->tx_holds); @@ -1032,6 +1037,8 @@ dmu_tx_commit(dmu_tx_t *tx) refcount_destroy_many(&tx->tx_space_freed, refcount_count(&tx->tx_space_freed)); #endif + ASSERT(list_is_empty(&tx->tx_callbacks)); + list_destroy(&tx->tx_callbacks); kmem_free(tx, sizeof (dmu_tx_t)); } @@ -1039,6 +1046,7 @@ void dmu_tx_abort(dmu_tx_t *tx) { dmu_tx_hold_t *txh; + dmu_callback_t *dcb; ASSERT(tx->tx_txg == 0); @@ -1050,6 +1058,16 @@ dmu_tx_abort(dmu_tx_t *tx) if (dn != NULL) dnode_rele(dn, tx); } + + while ((dcb = list_head(&tx->tx_callbacks))) { + list_remove(&tx->tx_callbacks, dcb); + + /* + * Call the callback with an error code. The callback will + * call dmu_tx_callback_data_destroy to free the memory. + */ + dcb->dcb_func(dcb->dcb_data, ECANCELED); + } list_destroy(&tx->tx_holds); #ifdef ZFS_DEBUG refcount_destroy_many(&tx->tx_space_written, @@ -1057,6 +1075,7 @@ dmu_tx_abort(dmu_tx_t *tx) refcount_destroy_many(&tx->tx_space_freed, refcount_count(&tx->tx_space_freed)); #endif + list_destroy(&tx->tx_callbacks); kmem_free(tx, sizeof (dmu_tx_t)); } @@ -1066,3 +1085,45 @@ dmu_tx_get_txg(dmu_tx_t *tx) ASSERT(tx->tx_txg != 0); return (tx->tx_txg); } + +void * +dmu_tx_callback_data_create(size_t bytes) +{ + dmu_callback_t *dcb; + + dcb = kmem_alloc(sizeof (dmu_callback_t) + bytes, KM_SLEEP); + + dcb->dcb_magic = DMU_CALLBACK_MAGIC; + dcb->dcb_bytes = bytes; + + return &dcb->dcb_data; +} + +int +dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func, + void *dcb_data) +{ + dmu_callback_t *dcb = container_of(dcb_data, dmu_callback_t, dcb_data); + + if (dcb->dcb_magic != DMU_CALLBACK_MAGIC) + return (EINVAL); + + dcb->dcb_func = dcb_func; + + list_insert_tail(&tx->tx_callbacks, dcb); + + return (0); +} + +int +dmu_tx_callback_data_destroy(void *dcb_data) +{ + dmu_callback_t *dcb = container_of(dcb_data, dmu_callback_t, dcb_data); + + if (dcb->dcb_magic != DMU_CALLBACK_MAGIC) + return (EINVAL); + + kmem_free(dcb, sizeof (dmu_callback_t) + dcb->dcb_bytes); + + return (0); +} diff --git a/zfs/lib/libzpool/include/sys/dmu.h b/zfs/lib/libzpool/include/sys/dmu.h index 3b1e5c8fbc..e4bcdfbe03 100644 --- a/zfs/lib/libzpool/include/sys/dmu.h +++ b/zfs/lib/libzpool/include/sys/dmu.h @@ -64,6 +64,7 @@ struct objset_impl; typedef struct objset objset_t; typedef struct dmu_tx dmu_tx_t; typedef struct dsl_dir dsl_dir_t; +typedef void dmu_callback_func_t(void *dcb_data, int error); typedef enum dmu_object_type { DMU_OT_NONE, @@ -429,6 +430,32 @@ int dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how); void dmu_tx_wait(dmu_tx_t *tx); void dmu_tx_commit(dmu_tx_t *tx); +/* + * To add a commit callback, you must first call dmu_tx_callback_data_create(). + * This will return a pointer to a memory area of size "bytes" (which can be 0, + * or just the size of a pointer if there is a large or existing external data + * struct to be referenced) that the caller and the callback can use to exchange + * data. + * + * The callback can then be registered by calling dmu_tx_callback_commit_add() + * with the pointer returned by dmu_tx_callback_data_create() passed in the + * dcb_data argument. The transaction must be already created, but it cannot + * be committed or aborted. It can be assigned to a txg or not. + * + * The callback will be called after the transaction has been safely written + * to stable storage and will also be called if the dmu_tx is aborted. + * If there is any error which prevents the transaction from being committed + * to disk, the callback will be called with a value of error != 0. + * + * When the callback data is no longer needed, it must be destroyed by the + * caller's code with dmu_tx_callback_data_destroy(). This is typically done at + * the end of the callback function. + */ +void *dmu_tx_callback_data_create(size_t bytes); +int dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func, + void *dcb_data); +int dmu_tx_callback_data_destroy(void *dcb_data); + /* * Free up the data blocks for a defined range of a file. If size is * zero, the range from offset to end-of-file is freed. diff --git a/zfs/lib/libzpool/include/sys/dmu_impl.h b/zfs/lib/libzpool/include/sys/dmu_impl.h index 96ce688e15..f32ab6ad71 100644 --- a/zfs/lib/libzpool/include/sys/dmu_impl.h +++ b/zfs/lib/libzpool/include/sys/dmu_impl.h @@ -232,6 +232,19 @@ extern "C" { struct objset; struct dmu_pool; +#define DMU_CALLBACK_MAGIC 0xca11bac0ca11bacfull + +#define container_of(ptr, type, member) \ + ((type *)((char *)(ptr) - offsetof(type, member))) + +typedef struct dmu_callback { + list_node_t dcb_node; /* linked to tx_callbacks list */ + uint64_t dcb_magic; /* magic number to verify header */ + dmu_callback_func_t *dcb_func; /* caller function pointer */ + size_t dcb_bytes; /* caller private data size */ + char dcb_data[0]; /* caller private data */ +} dmu_callback_t; + #ifdef __cplusplus } #endif diff --git a/zfs/lib/libzpool/include/sys/dmu_tx.h b/zfs/lib/libzpool/include/sys/dmu_tx.h index 2727daaaa7..47b9bcaa57 100644 --- a/zfs/lib/libzpool/include/sys/dmu_tx.h +++ b/zfs/lib/libzpool/include/sys/dmu_tx.h @@ -59,6 +59,7 @@ struct dmu_tx { txg_handle_t tx_txgh; void *tx_tempreserve_cookie; struct dmu_tx_hold *tx_needassign_txh; + list_t tx_callbacks; /* list of dmu_callback_t on this dmu_tx */ uint8_t tx_anyobj; int tx_err; #ifdef ZFS_DEBUG @@ -109,6 +110,11 @@ void dmu_tx_abort(dmu_tx_t *tx); uint64_t dmu_tx_get_txg(dmu_tx_t *tx); void dmu_tx_wait(dmu_tx_t *tx); +void *dmu_tx_callback_data_create(size_t bytes); +int dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func, + void *dcb_data); +int dmu_tx_callback_data_destroy(void *dcb_data); + /* * These routines are defined in dmu_spa.h, and are called by the SPA. */ diff --git a/zfs/lib/libzpool/include/sys/txg.h b/zfs/lib/libzpool/include/sys/txg.h index 23bdff211b..1349bd4dcb 100644 --- a/zfs/lib/libzpool/include/sys/txg.h +++ b/zfs/lib/libzpool/include/sys/txg.h @@ -71,6 +71,7 @@ extern void txg_sync_stop(struct dsl_pool *dp); extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp); extern void txg_rele_to_quiesce(txg_handle_t *txghp); extern void txg_rele_to_sync(txg_handle_t *txghp); +extern void txg_rele_commit_cb(txg_handle_t *txghp, list_t *tx_callbacks); extern void txg_suspend(struct dsl_pool *dp); extern void txg_resume(struct dsl_pool *dp); diff --git a/zfs/lib/libzpool/include/sys/txg_impl.h b/zfs/lib/libzpool/include/sys/txg_impl.h index 7413c662b3..a9a7c358b4 100644 --- a/zfs/lib/libzpool/include/sys/txg_impl.h +++ b/zfs/lib/libzpool/include/sys/txg_impl.h @@ -37,6 +37,7 @@ struct tx_cpu { kmutex_t tc_lock; kcondvar_t tc_cv[TXG_SIZE]; uint64_t tc_count[TXG_SIZE]; + list_t tc_callbacks[TXG_SIZE]; /* post-commit callbacks */ char tc_pad[16]; }; diff --git a/zfs/lib/libzpool/include/sys/zap.h b/zfs/lib/libzpool/include/sys/zap.h index f88cc068bd..687f7fcd75 100644 --- a/zfs/lib/libzpool/include/sys/zap.h +++ b/zfs/lib/libzpool/include/sys/zap.h @@ -316,6 +316,11 @@ void zap_cursor_advance(zap_cursor_t *zc); */ uint64_t zap_cursor_serialize(zap_cursor_t *zc); +/* + * Advance the cursor to the attribute having the key. + */ +int zap_cursor_move_to_key(zap_cursor_t *zc, const char *name, matchtype_t mt); + /* * Initialize a zap cursor pointing to the position recorded by * zap_cursor_serialize (in the "serialized" argument). You can also diff --git a/zfs/lib/libzpool/include/sys/zap_impl.h b/zfs/lib/libzpool/include/sys/zap_impl.h index 0dc02ab6b0..159ffaf8b0 100644 --- a/zfs/lib/libzpool/include/sys/zap_impl.h +++ b/zfs/lib/libzpool/include/sys/zap_impl.h @@ -210,6 +210,7 @@ int fzap_add_cd(zap_name_t *zn, uint64_t integer_size, uint64_t num_integers, const void *val, uint32_t cd, dmu_tx_t *tx); void fzap_upgrade(zap_t *zap, dmu_tx_t *tx); +int fzap_cursor_move_to_key(zap_cursor_t *zc, zap_name_t *zn); #ifdef __cplusplus } diff --git a/zfs/lib/libzpool/txg.c b/zfs/lib/libzpool/txg.c index 8e99ef7f26..b150ebd3cf 100644 --- a/zfs/lib/libzpool/txg.c +++ b/zfs/lib/libzpool/txg.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -57,6 +58,9 @@ txg_init(dsl_pool_t *dp, uint64_t txg) for (i = 0; i < TXG_SIZE; i++) { cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT, NULL); + list_create(&tx->tx_cpu[c].tc_callbacks[i], + sizeof (dmu_callback_t), offsetof(dmu_callback_t, + dcb_node)); } } @@ -98,8 +102,11 @@ txg_fini(dsl_pool_t *dp) int i; mutex_destroy(&tx->tx_cpu[c].tc_lock); - for (i = 0; i < TXG_SIZE; i++) + for (i = 0; i < TXG_SIZE; i++) { cv_destroy(&tx->tx_cpu[c].tc_cv[i]); + ASSERT(list_is_empty(&tx->tx_cpu[c].tc_callbacks[i])); + list_destroy(&tx->tx_cpu[c].tc_callbacks[i]); + } } kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t)); @@ -245,6 +252,21 @@ txg_rele_to_sync(txg_handle_t *th) th->th_cpu = NULL; /* defensive */ } +void +txg_rele_commit_cb(txg_handle_t *th, list_t *tx_callbacks) +{ + dmu_callback_t *dcb; + tx_cpu_t *tc = th->th_cpu; + int g = th->th_txg & TXG_MASK; + + mutex_enter(&tc->tc_lock); + while ((dcb = list_head(tx_callbacks))) { + list_remove(tx_callbacks, dcb); + list_insert_tail(&tc->tc_callbacks[g], dcb); + } + mutex_exit(&tc->tc_lock); +} + static void txg_quiesce(dsl_pool_t *dp, uint64_t txg) { @@ -285,7 +307,8 @@ txg_sync_thread(dsl_pool_t *dp) { tx_state_t *tx = &dp->dp_tx; callb_cpr_t cpr; - uint64_t start, delta; + uint64_t timeout, start, delta, timer; + int c, target; txg_thread_enter(tx, &cpr); @@ -347,6 +370,63 @@ txg_sync_thread(dsl_pool_t *dp) spa_sync(dp->dp_spa, txg); delta = lbolt - start; + /* + * Call all the callbacks for this txg. The callbacks must + * call dmu_tx_callback_data_destroy to free memory. + */ + for (c = 0; c < max_ncpus; c++) { + dmu_callback_t *dcb; + tx_cpu_t *tc = &tx->tx_cpu[c]; + int g = txg & TXG_MASK; + /* No need to lock tx_cpu_t */ + + while ((dcb = list_head(&tc->tc_callbacks[g]))) { + list_remove(&tc->tc_callbacks[g], dcb); + dcb->dcb_func(dcb->dcb_data, 0); + } + } + + written = dp->dp_space_towrite[txg & TXG_MASK]; + dp->dp_space_towrite[txg & TXG_MASK] = 0; + ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0); + + /* + * If the write limit max has not been explicitly set, set it + * to a fraction of available phisical memory (default 1/8th). + * Note that we must inflate the limit because the spa + * inflates write sizes to account for data replication. + * Check this each sync phase to catch changing memory size. + */ + if (zfs_write_limit_inflated == 0 || + (zfs_write_limit_shift && zfs_write_limit_max != + physmem * PAGESIZE >> zfs_write_limit_shift)) { + zfs_write_limit_max = + physmem * PAGESIZE >> zfs_write_limit_shift; + zfs_write_limit_inflated = + spa_get_asize(dp->dp_spa, zfs_write_limit_max); + if (zfs_write_limit_min > zfs_write_limit_inflated) + zfs_write_limit_inflated = zfs_write_limit_min; + } + + /* + * Attempt to keep the sync time consistant by adjusting the + * amount of write traffic allowed into each transaction group. + */ + target = zfs_txg_synctime * hz; + if (delta > target) { + uint64_t old = MIN(dp->dp_write_limit, written); + + dp->dp_write_limit = MAX(zfs_write_limit_min, + old * target / delta); + } else if (written >= dp->dp_write_limit && + delta >> 3 < target >> 3) { + uint64_t rescale = + MIN((100 * target) / delta, 200); + + dp->dp_write_limit = MIN(zfs_write_limit_inflated, + written * rescale / 100); + } + mutex_enter(&tx->tx_sync_lock); rw_enter(&tx->tx_suspend, RW_WRITER); tx->tx_synced_txg = txg; diff --git a/zfs/lib/libzpool/zap.c b/zfs/lib/libzpool/zap.c index 2989c61536..a9bd189f84 100644 --- a/zfs/lib/libzpool/zap.c +++ b/zfs/lib/libzpool/zap.c @@ -1080,6 +1080,29 @@ zap_stats_ptrtbl(zap_t *zap, uint64_t *tbl, int len, zap_stats_t *zs) } } +int fzap_cursor_move_to_key(zap_cursor_t *zc, zap_name_t *zn) +{ + int err; + zap_leaf_t *l; + zap_entry_handle_t zeh; + + if (zn->zn_name_orij && strlen(zn->zn_name_orij) > ZAP_MAXNAMELEN) + return (E2BIG); + + err = zap_deref_leaf(zc->zc_zap, zn->zn_hash, NULL, RW_READER, &l); + if (err != 0) + return (err); + + err = zap_leaf_lookup(l, zn, &zeh); + if (err != 0) + return (err); + + zc->zc_leaf = l; + zc->zc_hash = zeh.zeh_hash; + zc->zc_cd = zeh.zeh_cd; + return 0; +} + void fzap_get_stats(zap_t *zap, zap_stats_t *zs) { diff --git a/zfs/lib/libzpool/zap_micro.c b/zfs/lib/libzpool/zap_micro.c index 5ac0bb7421..02c13120c4 100644 --- a/zfs/lib/libzpool/zap_micro.c +++ b/zfs/lib/libzpool/zap_micro.c @@ -1045,6 +1045,45 @@ zap_cursor_advance(zap_cursor_t *zc) } } +int zap_cursor_move_to_key(zap_cursor_t *zc, const char *name, matchtype_t mt) +{ + int err = 0; + mzap_ent_t *mze; + zap_name_t *zn; + + if (zc->zc_zap == NULL) { + err = zap_lockdir(zc->zc_objset, zc->zc_zapobj, NULL, + RW_READER, TRUE, FALSE, &zc->zc_zap); + if (err) + return (err); + } else { + rw_enter(&zc->zc_zap->zap_rwlock, RW_READER); + } + + zn = zap_name_alloc(zc->zc_zap, name, mt); + if (zn == NULL) { + rw_exit(&zc->zc_zap->zap_rwlock); + return (ENOTSUP); + } + + if (!zc->zc_zap->zap_ismicro) { + err = fzap_cursor_move_to_key(zc, zn); + } else { + mze = mze_find(zn); + if (mze == NULL) { + err = (ENOENT); + goto out; + } + zc->zc_hash = mze->mze_hash; + zc->zc_cd = mze->mze_phys.mze_cd; + } + +out: + zap_name_free(zn); + rw_exit(&zc->zc_zap->zap_rwlock); + return (err); +} + int zap_get_stats(objset_t *os, uint64_t zapobj, zap_stats_t *zs) {