Add 'feature-commit-cb' branch for DMU commit callbacks.

This commit is contained in:
Brian Behlendorf 2009-03-19 20:30:14 -07:00
parent d164b20935
commit 29703a5b4e
9 changed files with 415 additions and 11 deletions

1
.topdeps Normal file
View File

@ -0,0 +1 @@
master

8
.topmsg Normal file
View File

@ -0,0 +1,8 @@
From: Brian Behlendorf <behlendorf1@llnl.gov>
Subject: [PATCH] feature commit cb
DMU commit callbacks. Provides an API to attach callbacks to transactions,
which are called by the DMU when they are safely committed to disk.
See Lustre bug 14117 for details.
Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>

View File

@ -164,6 +164,7 @@ typedef void ztest_func_t(ztest_args_t *);
ztest_func_t ztest_dmu_read_write;
ztest_func_t ztest_dmu_write_parallel;
ztest_func_t ztest_dmu_object_alloc_free;
ztest_func_t ztest_dmu_commit_callbacks;
ztest_func_t ztest_zap;
ztest_func_t ztest_zap_parallel;
ztest_func_t ztest_traverse;
@ -198,6 +199,7 @@ ztest_info_t ztest_info[] = {
{ ztest_dmu_read_write, 1, &zopt_always },
{ ztest_dmu_write_parallel, 30, &zopt_always },
{ ztest_dmu_object_alloc_free, 1, &zopt_always },
{ ztest_dmu_commit_callbacks, 10, &zopt_always },
{ ztest_zap, 30, &zopt_always },
{ ztest_zap_parallel, 100, &zopt_always },
{ ztest_dsl_prop_get_set, 1, &zopt_sometimes },
@ -217,6 +219,16 @@ ztest_info_t ztest_info[] = {
#define ZTEST_SYNC_LOCKS 16
/*
* The following struct is used to hold a list of uncalled commit callbacks.
*
* The callbacks are ordered by txg number.
*/
typedef struct ztest_cb_list {
mutex_t zcl_callbacks_lock;
list_t zcl_callbacks;
} ztest_cb_list_t;
/*
* Stuff we need to share writably between parent and child.
*/
@ -233,6 +245,7 @@ typedef struct ztest_shared {
ztest_info_t zs_info[ZTEST_FUNCS];
mutex_t zs_sync_lock[ZTEST_SYNC_LOCKS];
uint64_t zs_seq[ZTEST_SYNC_LOCKS];
ztest_cb_list_t zs_cb_list;
} ztest_shared_t;
static char ztest_dev_template[] = "%s/%s.%llua";
@ -2433,6 +2446,205 @@ ztest_zap_parallel(ztest_args_t *za)
dmu_tx_commit(tx);
}
/*
* Commit callback data.
*/
typedef struct ztest_cb_data {
list_node_t zcd_node;
ztest_cb_list_t *zcd_zcl;
uint64_t zcd_txg;
int zcd_expected_err;
boolean_t zcd_added;
boolean_t zcd_called;
spa_t *zcd_spa;
} ztest_cb_data_t;
static void
ztest_commit_callback(void *arg, int error)
{
ztest_cb_data_t *data = arg;
ztest_cb_list_t *zcl;
uint64_t synced_txg;
VERIFY(data != NULL);
VERIFY3S(data->zcd_expected_err, ==, error);
VERIFY(!data->zcd_called);
synced_txg = spa_last_synced_txg(data->zcd_spa);
if (data->zcd_txg > synced_txg)
fatal(0, "commit callback of txg %llu prematurely called, last"
" synced txg = %llu\n", data->zcd_txg, synced_txg);
zcl = data->zcd_zcl;
data->zcd_called = B_TRUE;
if (error == ECANCELED) {
ASSERT3U(data->zcd_txg, ==, 0);
ASSERT(!data->zcd_added);
/*
* The private callback data should be destroyed here, but
* since we are going to check the zcd_called field after
* dmu_tx_abort(), we will destroy it there.
*/
return;
}
if (!data->zcd_added)
goto out;
ASSERT3U(data->zcd_txg, !=, 0);
/* Remove our callback from the list */
(void) mutex_lock(&zcl->zcl_callbacks_lock);
list_remove(&zcl->zcl_callbacks, data);
(void) mutex_unlock(&zcl->zcl_callbacks_lock);
out:
umem_free(data, sizeof (ztest_cb_data_t));
}
static ztest_cb_data_t *
ztest_create_cb_data(objset_t *os, ztest_cb_list_t *zcl, uint64_t txg)
{
ztest_cb_data_t *cb_data;
cb_data = umem_zalloc(sizeof (ztest_cb_data_t), UMEM_NOFAIL);
cb_data->zcd_zcl = zcl;
cb_data->zcd_txg = txg;
cb_data->zcd_spa = os->os->os_spa;
return (cb_data);
}
/*
* If a number of txgs equal to this threshold have been created after a commit
* callback has been registered but not called, then we assume there is an
* implementation bug.
*/
#define ZTEST_COMMIT_CALLBACK_THRESH 3
/*
* Commit callback test.
*/
void
ztest_dmu_commit_callbacks(ztest_args_t *za)
{
objset_t *os = za->za_os;
dmu_tx_t *tx;
ztest_cb_list_t *zcl = &ztest_shared->zs_cb_list;
ztest_cb_data_t *cb_data[3], *tmp_cb;
uint64_t old_txg, txg;
int i, error;
tx = dmu_tx_create(os);
cb_data[0] = ztest_create_cb_data(os, zcl, 0);
dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[0]);
dmu_tx_hold_write(tx, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t));
/* Every once in a while, abort the transaction on purpose */
if (ztest_random(100) == 0)
error = -1;
if (!error)
error = dmu_tx_assign(tx, TXG_NOWAIT);
txg = error ? 0 : dmu_tx_get_txg(tx);
cb_data[1] = ztest_create_cb_data(os, zcl, txg);
dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[1]);
if (error) {
/*
* It's not a strict requirement to call the registered
* callbacks from inside dmu_tx_abort(), but that's what
* happens in the current implementation so we will check for
* that.
*/
for (i = 0; i < 2; i++) {
cb_data[i]->zcd_expected_err = ECANCELED;
VERIFY(!cb_data[i]->zcd_called);
}
dmu_tx_abort(tx);
for (i = 0; i < 2; i++) {
VERIFY(cb_data[i]->zcd_called);
umem_free(cb_data[i], sizeof (ztest_cb_data_t));
}
return;
}
cb_data[0]->zcd_txg = txg;
cb_data[2] = ztest_create_cb_data(os, zcl, txg);
dmu_tx_callback_register(tx, ztest_commit_callback, cb_data[2]);
/*
* Read existing data to make sure there isn't a future leak.
*/
VERIFY(0 == dmu_read(os, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t),
&old_txg));
if (old_txg > txg)
fatal(0, "future leak: got %llx, open txg is %llx", old_txg,
txg);
dmu_write(os, ZTEST_DIROBJ, za->za_diroff, sizeof (uint64_t), &txg, tx);
(void) mutex_lock(&zcl->zcl_callbacks_lock);
/*
* Since commit callbacks don't have any ordering requirement and since
* it is theoretically possible for a commit callback to be called
* after an arbitrary amount of time has elapsed since its txg has been
* synced, it is difficult to reliably determine whether a commit
* callback hasn't been called due to high load or due to a flawed
* implementation.
*
* In practice, we will assume that if after a certain number of txgs a
* commit callback hasn't been called, then most likely there's an
* implementation bug..
*/
tmp_cb = list_head(&zcl->zcl_callbacks);
if (tmp_cb != NULL)
VERIFY3U(tmp_cb->zcd_txg, >,
txg - ZTEST_COMMIT_CALLBACK_THRESH);
/*
* Let's find the place to insert our callbacks.
*
* Even though the list is ordered by txg, it is possible for the
* insertion point to not be the end because our txg may already be
* quiescing at this point and other callbacks in the open txg may
* have sneaked in.
*/
tmp_cb = list_tail(&zcl->zcl_callbacks);
while (tmp_cb != NULL && tmp_cb->zcd_txg > txg)
tmp_cb = list_prev(&zcl->zcl_callbacks, tmp_cb);
/* Add the 3 callbacks to the list */
for (i = 0; i < 3; i++) {
if (tmp_cb == NULL)
list_insert_head(&zcl->zcl_callbacks, cb_data[i]);
else
list_insert_after(&zcl->zcl_callbacks, tmp_cb,
cb_data[i]);
cb_data[i]->zcd_added = B_TRUE;
VERIFY(!cb_data[i]->zcd_called);
tmp_cb = cb_data[i];
}
(void) mutex_unlock(&zcl->zcl_callbacks_lock);
dmu_tx_commit(tx);
}
void
ztest_dsl_prop_get_set(ztest_args_t *za)
{
@ -3041,6 +3253,11 @@ ztest_run(char *pool)
(void) _mutex_init(&zs->zs_vdev_lock, USYNC_THREAD, NULL);
(void) rwlock_init(&zs->zs_name_lock, USYNC_THREAD, NULL);
(void) _mutex_init(&zs->zs_cb_list.zcl_callbacks_lock, USYNC_THREAD,
NULL);
list_create(&zs->zs_cb_list.zcl_callbacks, sizeof (ztest_cb_data_t),
offsetof(ztest_cb_data_t, zcd_node));
for (t = 0; t < ZTEST_SYNC_LOCKS; t++)
(void) _mutex_init(&zs->zs_sync_lock[t], USYNC_THREAD, NULL);
@ -3240,6 +3457,12 @@ ztest_run(char *pool)
spa_close(spa, FTAG);
kernel_fini();
list_destroy(&zs->zs_cb_list.zcl_callbacks);
(void) _mutex_destroy(&zs->zs_cb_list.zcl_callbacks_lock);
(void) rwlock_destroy(&zs->zs_name_lock);
(void) _mutex_destroy(&zs->zs_vdev_lock);
}
void

View File

@ -48,6 +48,8 @@ dmu_tx_create_dd(dsl_dir_t *dd)
tx->tx_pool = dd->dd_pool;
list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
offsetof(dmu_tx_hold_t, txh_node));
list_create(&tx->tx_callbacks, sizeof (dmu_tx_callback_t),
offsetof(dmu_tx_callback_t, dcb_node));
#ifdef ZFS_DEBUG
refcount_create(&tx->tx_space_written);
refcount_create(&tx->tx_space_freed);
@ -1020,8 +1022,13 @@ dmu_tx_commit(dmu_tx_t *tx)
if (tx->tx_tempreserve_cookie)
dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
if (!list_is_empty(&tx->tx_callbacks))
txg_register_callbacks(&tx->tx_txgh, &tx->tx_callbacks);
if (tx->tx_anyobj == FALSE)
txg_rele_to_sync(&tx->tx_txgh);
list_destroy(&tx->tx_callbacks);
list_destroy(&tx->tx_holds);
#ifdef ZFS_DEBUG
dprintf("towrite=%llu written=%llu tofree=%llu freed=%llu\n",
@ -1050,6 +1057,14 @@ dmu_tx_abort(dmu_tx_t *tx)
if (dn != NULL)
dnode_rele(dn, tx);
}
/*
* Call any registered callbacks with an error code.
*/
if (!list_is_empty(&tx->tx_callbacks))
dmu_tx_callback(&tx->tx_callbacks, ECANCELED);
list_destroy(&tx->tx_callbacks);
list_destroy(&tx->tx_holds);
#ifdef ZFS_DEBUG
refcount_destroy_many(&tx->tx_space_written,
@ -1066,3 +1081,31 @@ dmu_tx_get_txg(dmu_tx_t *tx)
ASSERT(tx->tx_txg != 0);
return (tx->tx_txg);
}
void
dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *func, void *data)
{
dmu_tx_callback_t *dcb;
dcb = kmem_alloc(sizeof (dmu_tx_callback_t), KM_SLEEP);
dcb->dcb_func = func;
dcb->dcb_data = data;
list_insert_tail(&tx->tx_callbacks, dcb);
}
/*
* Call all the commit callbacks on a list, with a given error code.
*/
void
dmu_tx_callback(list_t *cb_list, int error)
{
dmu_tx_callback_t *dcb;
while (dcb = list_head(cb_list)) {
list_remove(cb_list, dcb);
dcb->dcb_func(dcb->dcb_data, error);
kmem_free(dcb, sizeof (dmu_tx_callback_t));
}
}

View File

@ -429,6 +429,26 @@ int dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how);
void dmu_tx_wait(dmu_tx_t *tx);
void dmu_tx_commit(dmu_tx_t *tx);
/*
* To register a commit callback, dmu_tx_callback_register() must be called.
*
* dcb_data is a pointer to caller private data that is passed on as a
* callback parameter. The caller is responsible for properly allocating and
* freeing it.
*
* When registering a callback, the transaction must be already created, but
* it cannot be committed or aborted. It can be assigned to a txg or not.
*
* The callback will be called after the transaction has been safely written
* to stable storage and will also be called if the dmu_tx is aborted.
* If there is any error which prevents the transaction from being committed to
* disk, the callback will be called with a value of error != 0.
*/
typedef void dmu_tx_callback_func_t(void *dcb_data, int error);
void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func,
void *dcb_data);
/*
* Free up the data blocks for a defined range of a file. If size is
* zero, the range from offset to end-of-file is freed.

View File

@ -26,8 +26,6 @@
#ifndef _SYS_DMU_TX_H
#define _SYS_DMU_TX_H
#pragma ident "%Z%%M% %I% %E% SMI"
#include <sys/inttypes.h>
#include <sys/dmu.h>
#include <sys/txg.h>
@ -59,6 +57,7 @@ struct dmu_tx {
txg_handle_t tx_txgh;
void *tx_tempreserve_cookie;
struct dmu_tx_hold *tx_needassign_txh;
list_t tx_callbacks; /* list of dmu_tx_callback_t on this dmu_tx */
uint8_t tx_anyobj;
int tx_err;
#ifdef ZFS_DEBUG
@ -98,6 +97,11 @@ typedef struct dmu_tx_hold {
#endif
} dmu_tx_hold_t;
typedef struct dmu_tx_callback {
list_node_t dcb_node; /* linked to tx_callbacks list */
dmu_tx_callback_func_t *dcb_func; /* caller function pointer */
void *dcb_data; /* caller private data */
} dmu_tx_callback_t;
/*
* These routines are defined in dmu.h, and are called by the user.
@ -109,6 +113,10 @@ void dmu_tx_abort(dmu_tx_t *tx);
uint64_t dmu_tx_get_txg(dmu_tx_t *tx);
void dmu_tx_wait(dmu_tx_t *tx);
void dmu_tx_callback_register(dmu_tx_t *tx, dmu_tx_callback_func_t *dcb_func,
void *dcb_data);
void dmu_tx_callback(list_t *cb_list, int error);
/*
* These routines are defined in dmu_spa.h, and are called by the SPA.
*/

View File

@ -26,8 +26,6 @@
#ifndef _SYS_TXG_H
#define _SYS_TXG_H
#pragma ident "%Z%%M% %I% %E% SMI"
#include <sys/spa.h>
#include <sys/zfs_context.h>
@ -71,6 +69,7 @@ extern void txg_sync_stop(struct dsl_pool *dp);
extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp);
extern void txg_rele_to_quiesce(txg_handle_t *txghp);
extern void txg_rele_to_sync(txg_handle_t *txghp);
extern void txg_register_callbacks(txg_handle_t *txghp, list_t *tx_callbacks);
extern void txg_suspend(struct dsl_pool *dp);
extern void txg_resume(struct dsl_pool *dp);

View File

@ -33,10 +33,16 @@
extern "C" {
#endif
typedef struct tx_cb {
tx_cpu_t *tcb_tc;
uint64_t tcb_txg;
} tx_cb_t;
struct tx_cpu {
kmutex_t tc_lock;
kcondvar_t tc_cv[TXG_SIZE];
uint64_t tc_count[TXG_SIZE];
list_t tc_callbacks[TXG_SIZE]; /* commit cb list */
char tc_pad[16];
};
@ -64,6 +70,8 @@ typedef struct tx_state {
kthread_t *tx_sync_thread;
kthread_t *tx_quiesce_thread;
taskq_t *tx_commit_cb_taskq; /* commit callback taskq */
} tx_state_t;
#ifdef __cplusplus

View File

@ -26,6 +26,7 @@
#include <sys/zfs_context.h>
#include <sys/txg_impl.h>
#include <sys/dmu_impl.h>
#include <sys/dmu_tx.h>
#include <sys/dsl_pool.h>
#include <sys/callb.h>
@ -57,6 +58,9 @@ txg_init(dsl_pool_t *dp, uint64_t txg)
for (i = 0; i < TXG_SIZE; i++) {
cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
NULL);
list_create(&tx->tx_cpu[c].tc_callbacks[i],
sizeof (dmu_tx_callback_t),
offsetof(dmu_tx_callback_t, dcb_node));
}
}
@ -96,9 +100,14 @@ txg_fini(dsl_pool_t *dp)
int i;
mutex_destroy(&tx->tx_cpu[c].tc_lock);
for (i = 0; i < TXG_SIZE; i++)
for (i = 0; i < TXG_SIZE; i++) {
cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
}
}
if (tx->tx_commit_cb_taskq != NULL)
taskq_destroy(tx->tx_commit_cb_taskq);
kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
@ -229,25 +238,55 @@ txg_rele_to_quiesce(txg_handle_t *th)
}
void
txg_rele_to_sync(txg_handle_t *th)
txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
{
tx_cpu_t *tc = th->th_cpu;
int g = th->th_txg & TXG_MASK;
mutex_enter(&tc->tc_lock);
list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
mutex_exit(&tc->tc_lock);
}
static void
txg_exit(tx_cpu_t *tc, uint64_t txg)
{
int g = txg & TXG_MASK;
mutex_enter(&tc->tc_lock);
ASSERT(tc->tc_count[g] != 0);
if (--tc->tc_count[g] == 0)
cv_broadcast(&tc->tc_cv[g]);
mutex_exit(&tc->tc_lock);
}
void
txg_rele_to_sync(txg_handle_t *th)
{
txg_exit(th->th_cpu, th->th_txg);
th->th_cpu = NULL; /* defensive */
}
static void
txg_wait_exit(tx_state_t *tx, uint64_t txg)
{
int g = txg & TXG_MASK;
int c;
for (c = 0; c < max_ncpus; c++) {
tx_cpu_t *tc = &tx->tx_cpu[c];
mutex_enter(&tc->tc_lock);
while (tc->tc_count[g] != 0)
cv_wait(&tc->tc_cv[g], &tc->tc_lock);
mutex_exit(&tc->tc_lock);
}
}
static void
txg_quiesce(dsl_pool_t *dp, uint64_t txg)
{
tx_state_t *tx = &dp->dp_tx;
int g = txg & TXG_MASK;
int c;
/*
@ -269,12 +308,60 @@ txg_quiesce(dsl_pool_t *dp, uint64_t txg)
/*
* Quiesce the transaction group by waiting for everyone to txg_exit().
*/
txg_wait_exit(tx, txg);
}
static void
txg_callback(tx_cb_t *tcb)
{
tx_cpu_t *tc = tcb->tcb_tc;
int g = tcb->tcb_txg & TXG_MASK;
dmu_tx_callback(&tc->tc_callbacks[g], 0);
txg_exit(tc, tcb->tcb_txg);
kmem_free(tcb, sizeof (tx_cb_t));
}
/*
* Dispatch the commit callbacks registered on this txg to worker threads.
*/
static void
txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
{
int c;
tx_state_t *tx = &dp->dp_tx;
tx_cb_t *tcb;
for (c = 0; c < max_ncpus; c++) {
tx_cpu_t *tc = &tx->tx_cpu[c];
mutex_enter(&tc->tc_lock);
while (tc->tc_count[g] != 0)
cv_wait(&tc->tc_cv[g], &tc->tc_lock);
mutex_exit(&tc->tc_lock);
/* No need to lock tx_cpu_t at this point */
int g = txg & TXG_MASK;
if (list_is_empty(&tc->tc_callbacks[g]))
continue;
if (tx->tx_commit_cb_taskq == NULL) {
/*
* Commit callback taskq hasn't been created yet.
*/
tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
max_ncpus, minclsyspri, max_ncpus, max_ncpus * 4,
TASKQ_PREPOPULATE);
}
tcb = kmem_alloc(sizeof (tx_cb_t), KM_SLEEP);
tcb->tcb_txg = txg;
tcb->tcb_tc = tc;
/* There shouldn't be any holders on this txg at this point */
ASSERT3U(tc->tc_count[g], ==, 0);
tc->tc_count[g]++;
(void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
txg_callback, tcb, TQ_SLEEP);
}
}
@ -345,6 +432,13 @@ txg_sync_thread(dsl_pool_t *dp)
spa_sync(dp->dp_spa, txg);
delta = lbolt - start;
/*
* Dispatch commit callbacks to worker threads and wait for
* them to finish.
*/
txg_dispatch_callbacks(dp, txg);
txg_wait_exit(tx, txg);
mutex_enter(&tx->tx_sync_lock);
rw_enter(&tx->tx_suspend, RW_WRITER);
tx->tx_synced_txg = txg;