size_t mapsize;
MDB_dbi dbi;
MDB_env *env;
- MDB_txn *rdtxn;
- MDB_txn *wrtxn;
+
+ /** Cached transactions
+ *
+ * - only one of (ro,rw) may be active at once
+ * - non-NULL .ro may be active or reset
+ * - non-NULL .rw is always active
+ */
+ struct {
+ bool ro_active;
+ MDB_txn *ro, *rw;
+ } txn;
};
/** @brief Convert LMDB error code. */
case ENOSPC:
return kr_error(ENOSPC);
default:
- kr_log_info("[cache] LMDB error: %s\n", mdb_strerror(error));
+ kr_log_error("[cache] LMDB error: %s\n", mdb_strerror(error));
+ assert(false);
return -abs(error);
}
}
return 0;
}
-static int txn_begin(struct lmdb_env *env, MDB_txn **txn, bool rdonly)
+/** Obtain a transaction. (they're cached in env->txn) */
+static int txn_get(struct lmdb_env *env, MDB_txn **txn, bool rdonly)
{
- /* Always barrier for write transaction. */
assert(env && txn);
- if (env->wrtxn) {
- mdb_txn_abort(env->wrtxn);
- env->wrtxn = NULL;
- }
- /* Renew pending read-only transaction
- * or abort it to clear reader slot before writing. */
- if (env->rdtxn) {
- if (rdonly) {
- *txn = env->rdtxn;
- env->rdtxn = NULL;
- return 0;
- } else {
- mdb_txn_abort(env->rdtxn);
- env->rdtxn = NULL;
+ if (env->txn.rw) {
+ /* Reuse the *open* RW txn even if only reading is requested.
+ * We leave the management of this to the cdb_sync command.
+ * The user may e.g. want to do some reads between the writes. */
+ *txn = env->txn.rw;
+ return kr_ok();
+ }
+
+ if (!rdonly) {
+ /* avoid two active transactions */
+ if (env->txn.ro && env->txn.ro_active) {
+ mdb_txn_reset(env->txn.ro);
+ env->txn.ro_active = false;
+ }
+ int ret = mdb_txn_begin(env->env, NULL, 0/*RW*/, &env->txn.rw);
+ if (ret == MDB_SUCCESS) {
+ *txn = env->txn.rw;
+ assert(*txn);
}
+ return lmdb_error(ret);
}
- unsigned flags = rdonly ? MDB_RDONLY : 0;
- return lmdb_error(mdb_txn_begin(env->env, NULL, flags, txn));
-}
-static int txn_end(struct lmdb_env *env, MDB_txn *txn)
-{
- assert(env && txn);
- /* Cache read transactions */
- if (!env->rdtxn) {
- env->rdtxn = txn;
- } else {
- mdb_txn_abort(txn);
+ /* Get an active RO txn and return it. */
+ if (!env->txn.ro) { //:unlikely
+ int ret = mdb_txn_begin(env->env, NULL, MDB_RDONLY, &env->txn.ro);
+ if (ret != MDB_SUCCESS) {
+ return lmdb_error(ret);
+ }
+ } else if (!env->txn.ro_active) {
+ int ret = mdb_txn_renew(env->txn.ro);
+ if (ret != MDB_SUCCESS) {
+ return lmdb_error(ret);
+ }
}
- return 0;
+ env->txn.ro_active = true;
+ *txn = env->txn.ro;
+ assert(*txn);
+ return kr_ok();
}
static int cdb_sync(knot_db_t *db)
{
struct lmdb_env *env = db;
- int ret = 0;
- if (env->wrtxn) {
- ret = lmdb_error(mdb_txn_commit(env->wrtxn));
- env->wrtxn = NULL; /* In-flight transaction is committed. */
- }
- if (env->rdtxn) {
- mdb_txn_abort(env->rdtxn);
- env->rdtxn = NULL;
+ int ret = kr_ok();
+ if (env->txn.rw) {
+ ret = mdb_txn_commit(env->txn.rw);
+ if (ret != MDB_BAD_TXN) {
+ /* _BAD_TXN happens during overfull clear with multiple forks :-/ */
+ ret = lmdb_error(ret);
+ }
+ env->txn.rw = NULL; /* the transaction got freed even in case of errors */
+ } else if (env->txn.ro && env->txn.ro_active) {
+ mdb_txn_reset(env->txn.ro);
+ env->txn.ro_active = false;
}
return ret;
}
static void cdb_close_env(struct lmdb_env *env)
{
assert(env && env->env);
+
+ /* Get rid of any transactions. */
cdb_sync(env);
+ if (env->txn.ro) {
+ mdb_txn_abort(env->txn.ro);
+ env->txn.ro = NULL;
+ }
+
mdb_env_sync(env->env, 1);
mdb_dbi_close(env->env, env->dbi);
mdb_env_close(env->env);
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
- int ret = txn_begin(env, &txn, true);
+ int ret = txn_get(env, &txn, true);
if (ret != 0) {
return ret;
}
MDB_stat stat;
ret = mdb_stat(txn, env->dbi, &stat);
- /* Always abort, serves as a checkpoint for in-flight transaction. */
- mdb_txn_abort(txn);
return (ret == MDB_SUCCESS) ? stat.ms_entries : lmdb_error(ret);
}
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
- int ret = txn_begin(env, &txn, true);
- if (ret != 0) {
+ int ret = txn_get(env, &txn, true);
+ if (ret) {
return ret;
}
MDB_val _key = { .mv_size = key[i].len, .mv_data = key[i].data };
MDB_val _val = { .mv_size = val[i].len, .mv_data = val[i].data };
ret = mdb_get(txn, env->dbi, &_key, &_val);
+ if (ret != MDB_SUCCESS) {
+ return lmdb_error(ret);
+ }
/* Update the result. */
val[i].data = _val.mv_data;
val[i].len = _val.mv_size;
}
-
- txn_end(env, txn);
- return lmdb_error(ret);
+ return kr_ok();
}
-static int cdb_write(struct lmdb_env *env, MDB_txn *txn, knot_db_val_t *key, knot_db_val_t *val, unsigned flags)
+static int cdb_write(struct lmdb_env *env, MDB_txn **txn, knot_db_val_t *key, knot_db_val_t *val, unsigned flags)
{
/* Convert key structs and write */
MDB_val _key = { key->len, key->data };
MDB_val _val = { val->len, val->data };
- int ret = mdb_put(txn, env->dbi, &_key, &_val, flags);
+ int ret = mdb_put(*txn, env->dbi, &_key, &_val, flags);
+
+ /* Try to recover from doing too much writing in a single transaction. */
+ if (ret == MDB_TXN_FULL) {
+ ret = cdb_sync(env);
+ if (ret) {
+ ret = txn_get(env, txn, false);
+ }
+ if (ret) {
+ ret = mdb_put(*txn, env->dbi, &_key, &_val, flags);
+ }
+ }
if (ret != MDB_SUCCESS) {
return lmdb_error(ret);
}
+
/* Update the result. */
val->data = _val.mv_data;
val->len = _val.mv_size;
- return 0;
+ return kr_ok();
}
static int cdb_writev(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int maxcount)
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
- int ret = txn_begin(env, &txn, false);
- if (ret != 0) {
- return ret;
- }
+ int ret = txn_get(env, &txn, false);
- bool reserved = false;
- for (int i = 0; i < maxcount; ++i) {
+ for (int i = 0; ret == kr_ok() && i < maxcount; ++i) {
/* This is LMDB specific optimisation,
* if caller specifies value with NULL data and non-zero length,
* LMDB will preallocate the entry for caller and leave write
unsigned mdb_flags = 0;
if (val[i].len > 0 && val[i].data == NULL) {
mdb_flags |= MDB_RESERVE;
- reserved = true;
- }
- ret = cdb_write(env, txn, &key[i], &val[i], mdb_flags);
- if (ret != 0) {
- mdb_txn_abort(txn);
- return ret;
}
+ ret = cdb_write(env, &txn, &key[i], &val[i], mdb_flags);
}
- /* Leave transaction open if reserved. */
- if (reserved) {
- assert(env->wrtxn == NULL);
- env->wrtxn = txn;
- } else {
- ret = lmdb_error(mdb_txn_commit(txn));
- }
return ret;
}
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
- int ret = txn_begin(env, &txn, false);
- if (ret != 0) {
- return ret;
- }
+ int ret = txn_get(env, &txn, false);
- for (int i = 0; i < maxcount; ++i) {
+ for (int i = 0; ret == kr_ok() && i < maxcount; ++i) {
MDB_val _key = { key[i].len, key[i].data };
MDB_val val = { 0, NULL };
- ret = mdb_del(txn, env->dbi, &_key, &val);
- if (ret != 0) {
- mdb_txn_abort(txn);
- return lmdb_error(ret);
- }
+ ret = lmdb_error(mdb_del(txn, env->dbi, &_key, &val));
}
- return lmdb_error(mdb_txn_commit(txn));
+ return ret;
}
static int cdb_match(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int maxcount)
{
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
- int ret = txn_begin(env, &txn, true);
+ int ret = txn_get(env, &txn, true);
if (ret != 0) {
return ret;
}
MDB_cursor *cur = NULL;
ret = mdb_cursor_open(txn, env->dbi, &cur);
if (ret != 0) {
- mdb_txn_abort(txn);
return lmdb_error(ret);
}
MDB_val cur_key = { key->len, key->data }, cur_val = { 0, NULL };
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE);
- if (ret != 0) {
+ if (ret != MDB_SUCCESS) {
mdb_cursor_close(cur);
- mdb_txn_abort(txn);
return lmdb_error(ret);
}
int results = 0;
- while (ret == 0) {
+ while (ret == MDB_SUCCESS) {
/* Retrieve current key and compare with prefix */
if (cur_key.mv_size < key->len || memcmp(cur_key.mv_data, key->data, key->len) != 0) {
break;
}
mdb_cursor_close(cur);
- txn_end(env, txn);
return results;
}
/* Prune old records */
struct lmdb_env *env = db;
MDB_txn *txn = NULL;
- int ret = txn_begin(env, &txn, false);
+ int ret = txn_get(env, &txn, false);
if (ret != 0) {
return ret;
}
MDB_cursor *cur = NULL;
ret = mdb_cursor_open(txn, env->dbi, &cur);
if (ret != 0) {
- mdb_txn_abort(txn);
return lmdb_error(ret);
}
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_FIRST);
if (ret != 0) {
mdb_cursor_close(cur);
- mdb_txn_abort(txn);
return lmdb_error(ret);
}
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT);
}
mdb_cursor_close(cur);
- ret = lmdb_error(mdb_txn_commit(txn));
return ret < 0 ? ret : results;
}
ret = kr_cache_peek(cache, KR_CACHE_USER, dname, KNOT_RRTYPE_TSIG, &entry, 0);
cache->api = api_saved;
assert_int_not_equal(ret, 0);
+ kr_cache_sync(cache);
}
static void test_fake_insert(void **state)
KNOT_RRTYPE_TSIG, &global_fake_ce, global_namedb_data);
assert_int_equal(ret_cache_ins_ok, 0);
assert_int_equal(ret_cache_ins_inval, KNOT_EINVAL);
+ kr_cache_sync(cache);
}
/* Test invalid parameters and some api failures. */
assert_int_not_equal(kr_cache_remove(cache, KR_CACHE_RR, NULL, 0), 0);
assert_int_not_equal(kr_cache_remove(NULL, 0, NULL, 0), 0);
assert_int_not_equal(kr_cache_clear(NULL), 0);
+ kr_cache_sync(cache);
}
/* Test cache write */
struct kr_cache *cache = (*state);
int ret = kr_cache_insert_rr(cache, &global_rr, 0, 0, CACHE_TIME);
assert_int_equal(ret, 0);
+ kr_cache_sync(cache);
}
static void test_materialize(void **state)
assert_int_equal(query_ret, 0);
assert_true(rr_equal);
}
+ kr_cache_sync(cache);
}
/* Test cache read (simulate aged entry) */
struct kr_cache *cache = (*state);
int ret = kr_cache_peek_rr(cache, &cache_rr, &rank, &flags, ×tamp);
assert_int_equal(ret, kr_error(ESTALE));
+ kr_cache_sync(cache);
}
/* Test cache removal */
assert_int_equal(ret, 0);
ret = kr_cache_peek_rr(cache, &cache_rr, &rank, &flags, ×tamp);
assert_int_equal(ret, KNOT_ENOENT);
+ kr_cache_sync(cache);
}
/* Test cache fill */
if (ret != 0) {
break;
}
+ ret = kr_cache_sync(cache);
+ if (ret != 0) {
+ break;
+ }
}
/* Expect we run out of space */
assert_int_equal(ret, kr_error(ENOSPC));
+ kr_cache_sync(cache);
}
/* Test cache clear */