Add feature-commit-cb branch

This commit is contained in:
Brian Behlendorf 2008-11-20 12:09:33 -08:00
parent 34dc7c2f25
commit 976b01efda
9 changed files with 161 additions and 1 deletions

1
.topdeps Normal file
View File

@ -0,0 +1 @@
master

12
.topmsg Normal file
View File

@ -0,0 +1,12 @@
From: Brian Behlendorf <behlendorf1@llnl.gov>
Subject: [PATCH] feature commit cb
ZFS commit callbacks (v3) some version of this support is expected
to appear in an official release from the core ZFS team.
NOTE: The ztest test case was dropped because it assumed userspace
pthreads support. We should certainly keep the test case but the
user space modification should be properly written to handle and
kernel space build as well.
Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>

View File

@ -66,6 +66,7 @@ struct objset_impl;
typedef struct objset objset_t;
typedef struct dmu_tx dmu_tx_t;
typedef struct dsl_dir dsl_dir_t;
typedef void dmu_callback_func_t(void *dcb_data, int error);
typedef enum dmu_object_type {
DMU_OT_NONE,
@ -416,6 +417,32 @@ int dmu_tx_assign(dmu_tx_t *tx, uint64_t txg_how);
void dmu_tx_wait(dmu_tx_t *tx);
void dmu_tx_commit(dmu_tx_t *tx);
/*
* To add a commit callback, you must first call dmu_tx_callback_data_create().
* This will return a pointer to a memory area of size "bytes" (which can be 0,
* or just the size of a pointer if there is a large or existing external data
* struct to be referenced) that the caller and the callback can use to exchange
* data.
*
* The callback can then be registered by calling dmu_tx_callback_commit_add()
* with the pointer returned by dmu_tx_callback_data_create() passed in the
* dcb_data argument. The transaction must be already created, but it cannot
* be committed or aborted. It can be assigned to a txg or not.
*
* The callback will be called after the transaction has been safely written
* to stable storage and will also be called if the dmu_tx is aborted.
* If there is any error which prevents the transaction from being committed
* to disk, the callback will be called with a value of error != 0.
*
* When the callback data is no longer needed, it must be destroyed by the
* caller's code with dmu_tx_callback_data_destroy(). This is typically done at
* the end of the callback function.
*/
void *dmu_tx_callback_data_create(size_t bytes);
int dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func,
void *dcb_data);
int dmu_tx_callback_data_destroy(void *dcb_data);
/*
* Free up the data blocks for a defined range of a file. If size is
* zero, the range from offset to end-of-file is freed.

View File

@ -230,6 +230,19 @@ extern "C" {
struct objset;
struct dmu_pool;
#define DMU_CALLBACK_MAGIC 0xca11bac0ca11bacfull
#define container_of(ptr, type, member) \
((type *)((char *)(ptr) - offsetof(type, member)))
typedef struct dmu_callback {
list_node_t dcb_node; /* linked to tx_callbacks list */
uint64_t dcb_magic; /* magic number to verify header */
dmu_callback_func_t *dcb_func; /* caller function pointer */
size_t dcb_bytes; /* caller private data size */
char dcb_data[0]; /* caller private data */
} dmu_callback_t;
#ifdef __cplusplus
}
#endif

View File

@ -59,6 +59,7 @@ struct dmu_tx {
txg_handle_t tx_txgh;
void *tx_tempreserve_cookie;
struct dmu_tx_hold *tx_needassign_txh;
list_t tx_callbacks; /* list of dmu_callback_t on this dmu_tx */
uint8_t tx_anyobj;
int tx_err;
#ifdef ZFS_DEBUG
@ -107,6 +108,11 @@ void dmu_tx_abort(dmu_tx_t *tx);
uint64_t dmu_tx_get_txg(dmu_tx_t *tx);
void dmu_tx_wait(dmu_tx_t *tx);
void *dmu_tx_callback_data_create(size_t bytes);
int dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func,
void *dcb_data);
int dmu_tx_callback_data_destroy(void *dcb_data);
/*
* These routines are defined in dmu_spa.h, and are called by the SPA.
*/

View File

@ -71,6 +71,7 @@ extern void txg_sync_stop(struct dsl_pool *dp);
extern uint64_t txg_hold_open(struct dsl_pool *dp, txg_handle_t *txghp);
extern void txg_rele_to_quiesce(txg_handle_t *txghp);
extern void txg_rele_to_sync(txg_handle_t *txghp);
extern void txg_rele_commit_cb(txg_handle_t *txghp, list_t *tx_callbacks);
extern void txg_suspend(struct dsl_pool *dp);
extern void txg_resume(struct dsl_pool *dp);

View File

@ -39,6 +39,7 @@ struct tx_cpu {
kmutex_t tc_lock;
kcondvar_t tc_cv[TXG_SIZE];
uint64_t tc_count[TXG_SIZE];
list_t tc_callbacks[TXG_SIZE]; /* post-commit callbacks */
char tc_pad[16];
};

View File

@ -50,6 +50,8 @@ dmu_tx_create_dd(dsl_dir_t *dd)
tx->tx_pool = dd->dd_pool;
list_create(&tx->tx_holds, sizeof (dmu_tx_hold_t),
offsetof(dmu_tx_hold_t, txh_node));
list_create(&tx->tx_callbacks, sizeof (dmu_callback_t),
offsetof(dmu_callback_t, dcb_node));
#ifdef ZFS_DEBUG
refcount_create(&tx->tx_space_written);
refcount_create(&tx->tx_space_freed);
@ -986,6 +988,9 @@ dmu_tx_commit(dmu_tx_t *tx)
if (tx->tx_tempreserve_cookie)
dsl_dir_tempreserve_clear(tx->tx_tempreserve_cookie, tx);
if (!list_is_empty(&tx->tx_callbacks))
txg_rele_commit_cb(&tx->tx_txgh, &tx->tx_callbacks);
if (tx->tx_anyobj == FALSE)
txg_rele_to_sync(&tx->tx_txgh);
list_destroy(&tx->tx_holds);
@ -998,6 +1003,8 @@ dmu_tx_commit(dmu_tx_t *tx)
refcount_destroy_many(&tx->tx_space_freed,
refcount_count(&tx->tx_space_freed));
#endif
ASSERT(list_is_empty(&tx->tx_callbacks));
list_destroy(&tx->tx_callbacks);
kmem_free(tx, sizeof (dmu_tx_t));
}
@ -1005,6 +1012,7 @@ void
dmu_tx_abort(dmu_tx_t *tx)
{
dmu_tx_hold_t *txh;
dmu_callback_t *dcb;
ASSERT(tx->tx_txg == 0);
@ -1016,6 +1024,16 @@ dmu_tx_abort(dmu_tx_t *tx)
if (dn != NULL)
dnode_rele(dn, tx);
}
while (dcb = list_head(&tx->tx_callbacks)) {
list_remove(&tx->tx_callbacks, dcb);
/*
* Call the callback with an error code. The callback will
* call dmu_tx_callback_data_destroy to free the memory.
*/
dcb->dcb_func(dcb->dcb_data, ECANCELED);
}
list_destroy(&tx->tx_holds);
#ifdef ZFS_DEBUG
refcount_destroy_many(&tx->tx_space_written,
@ -1023,6 +1041,7 @@ dmu_tx_abort(dmu_tx_t *tx)
refcount_destroy_many(&tx->tx_space_freed,
refcount_count(&tx->tx_space_freed));
#endif
list_destroy(&tx->tx_callbacks);
kmem_free(tx, sizeof (dmu_tx_t));
}
@ -1032,3 +1051,45 @@ dmu_tx_get_txg(dmu_tx_t *tx)
ASSERT(tx->tx_txg != 0);
return (tx->tx_txg);
}
void *
dmu_tx_callback_data_create(size_t bytes)
{
dmu_callback_t *dcb;
dcb = kmem_alloc(sizeof (dmu_callback_t) + bytes, KM_SLEEP);
dcb->dcb_magic = DMU_CALLBACK_MAGIC;
dcb->dcb_bytes = bytes;
return &dcb->dcb_data;
}
int
dmu_tx_callback_commit_add(dmu_tx_t *tx, dmu_callback_func_t *dcb_func,
void *dcb_data)
{
dmu_callback_t *dcb = container_of(dcb_data, dmu_callback_t, dcb_data);
if (dcb->dcb_magic != DMU_CALLBACK_MAGIC)
return (EINVAL);
dcb->dcb_func = dcb_func;
list_insert_tail(&tx->tx_callbacks, dcb);
return (0);
}
int
dmu_tx_callback_data_destroy(void *dcb_data)
{
dmu_callback_t *dcb = container_of(dcb_data, dmu_callback_t, dcb_data);
if (dcb->dcb_magic != DMU_CALLBACK_MAGIC)
return (EINVAL);
kmem_free(dcb, sizeof (dmu_callback_t) + dcb->dcb_bytes);
return (0);
}

View File

@ -28,6 +28,7 @@
#include <sys/zfs_context.h>
#include <sys/txg_impl.h>
#include <sys/dmu_impl.h>
#include <sys/dmu_tx.h>
#include <sys/dsl_pool.h>
#include <sys/callb.h>
@ -66,6 +67,9 @@ txg_init(dsl_pool_t *dp, uint64_t txg)
for (i = 0; i < TXG_SIZE; i++) {
cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
NULL);
list_create(&tx->tx_cpu[c].tc_callbacks[i],
sizeof (dmu_callback_t), offsetof(dmu_callback_t,
dcb_node));
}
}
@ -93,8 +97,11 @@ txg_fini(dsl_pool_t *dp)
int i;
mutex_destroy(&tx->tx_cpu[c].tc_lock);
for (i = 0; i < TXG_SIZE; i++)
for (i = 0; i < TXG_SIZE; i++) {
cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
ASSERT(list_is_empty(&tx->tx_cpu[c].tc_callbacks[i]));
list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
}
}
kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
@ -235,6 +242,21 @@ txg_rele_to_sync(txg_handle_t *th)
th->th_cpu = NULL; /* defensive */
}
void
txg_rele_commit_cb(txg_handle_t *th, list_t *tx_callbacks)
{
dmu_callback_t *dcb;
tx_cpu_t *tc = th->th_cpu;
int g = th->th_txg & TXG_MASK;
mutex_enter(&tc->tc_lock);
while (dcb = list_head(tx_callbacks)) {
list_remove(tx_callbacks, dcb);
list_insert_tail(&tc->tc_callbacks[g], dcb);
}
mutex_exit(&tc->tc_lock);
}
static void
txg_quiesce(dsl_pool_t *dp, uint64_t txg)
{
@ -335,6 +357,21 @@ txg_sync_thread(dsl_pool_t *dp)
spa_sync(dp->dp_spa, txg);
delta = lbolt - start;
/*
* Call all the callbacks for this txg. The callbacks must
* call dmu_tx_callback_data_destroy to free memory.
*/
for (int c = 0; c < max_ncpus; c++) {
int g = txg & TXG_MASK;
tx_cpu_t *tc = &tx->tx_cpu[c];
/* No need to lock tx_cpu_t */
while (dcb = list_head(&tc->tc_callbacks[g])) {
list_remove(&tc->tc_callbacks[g], dcb);
dcb->dcb_func(dcb->dcb_data, 0);
}
}
written = dp->dp_space_towrite[txg & TXG_MASK];
dp->dp_space_towrite[txg & TXG_MASK] = 0;
ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
@ -390,6 +427,7 @@ txg_quiesce_thread(dsl_pool_t *dp)
{
tx_state_t *tx = &dp->dp_tx;
callb_cpr_t cpr;
dmu_callback_t *dcb;
txg_thread_enter(tx, &cpr);