/** @brief Convert LMDB error code. */
static int lmdb_error(int error)
{
- /* _BAD_TXN may happen with overfull DB,
- * even during mdb_get with a single fork :-/ */
- if (error == MDB_BAD_TXN) {
- kr_log_info("[cache] MDB_BAD_TXN, probably overfull\n");
- error = ENOSPC;
- }
switch (error) {
case MDB_SUCCESS:
return kr_ok();
} else {
ret = mdb_cursor_open(txn, env->dbi, &env->txn.ro_curs);
}
- if (ret) return ret;
+ if (ret) return lmdb_error(ret);
env->txn.ro_curs_active = true;
success:
assert(env->txn.ro_curs_active && env->txn.ro && env->txn.ro_active
}
}
+/** Abort all transactions.
+ *
+ * This is useful after an error happens, as those (always?) require abortion.
+ * It's possible that _reset() would suffice and marking cursor inactive,
+ * but these errors should be rare so let's close them completely. */
+static void txn_abort(struct lmdb_env *env)
+{
+ txn_free_ro(env);
+ if (env->txn.rw) {
+ mdb_txn_abort(env->txn.rw);
+ env->txn.rw = NULL; /* the transaction got freed even in case of errors */
+ }
+}
+
/*! \brief Close the database. */
static void cdb_close_env(struct lmdb_env *env, struct kr_cdb_stats *stats)
{
assert(env && env->env);
/* Get rid of any transactions. */
- cdb_commit(env, stats);
txn_free_ro(env);
+ cdb_commit(env, stats);
mdb_env_sync(env->env, 1);
stats->close++;
stats->count++;
ret = mdb_stat(txn, env->dbi, &stat);
- return (ret == MDB_SUCCESS) ? stat.ms_entries : lmdb_error(ret);
+ if (ret == MDB_SUCCESS) {
+ return stat.ms_entries;
+ } else {
+ txn_abort(env);
+ return lmdb_error(ret);
+ }
}
static int reopen_env(struct lmdb_env *env, struct kr_cdb_stats *stats)
* though it's best for them to reopen soon. */
/* We are about to switch to a different file, so end all txns, to be sure. */
- (void) cdb_commit(db, stats);
txn_free_ro(db);
+ (void) cdb_commit(db, stats);
const char *path = NULL;
int ret = mdb_env_get_path(env->env, &path);
stats->read++;
ret = mdb_get(txn, env->dbi, &_key, &_val);
if (ret != MDB_SUCCESS) {
- if (ret == MDB_NOTFOUND)
+ if (ret == MDB_NOTFOUND) {
stats->read_miss++;
+ } else {
+ txn_abort(env);
+ }
ret = lmdb_error(ret);
if (ret == kr_error(ENOSPC)) {
/* we're likely to be forced to cache clear anyway */
if (ret) {
stats->write++;
ret = mdb_put(*txn, env->dbi, &_key, &_val, flags);
-
}
}
if (ret != MDB_SUCCESS) {
+ txn_abort(env);
return lmdb_error(ret);
}
else if (ret == KNOT_ENOENT) {
stats->remove_miss++;
ret = kr_ok(); /* skip over non-existing entries */
+ } else {
+ txn_abort(env);
+ break;
}
}
MDB_cursor *cur = NULL;
ret = mdb_cursor_open(txn, env->dbi, &cur);
if (ret != 0) {
+ txn_abort(env);
return lmdb_error(ret);
}
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE);
if (ret != MDB_SUCCESS) {
mdb_cursor_close(cur);
+ if (ret != MDB_NOTFOUND) {
+ txn_abort(env);
+ }
return lmdb_error(ret);
}
stats->match++;
ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT);
}
- if (results == 0)
- stats->match_miss++;
-
mdb_cursor_close(cur);
+ if (ret != MDB_SUCCESS && ret != MDB_NOTFOUND) {
+ txn_abort(env);
+ return lmdb_error(ret);
+ } else if (results == 0) {
+ stats->match_miss++;
+ }
return results;
}
MDB_val val2_m = { 0, NULL };
stats->read_leq++;
ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_SET_RANGE);
- if (ret) {
- stats->read_leq_miss++;
- return lmdb_error(ret);
- }
+ if (ret) goto failure;
/* test for equality //:unlikely */
if (key2_m.mv_size == key->len
&& memcmp(key2_m.mv_data, key->data, key->len) == 0) {
/* we must be greater than key; do one step to smaller */
stats->read_leq++;
ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_PREV);
- if (ret) {
- stats->read_leq_miss++;
- return lmdb_error(ret);
- }
+ if (ret) goto failure;
ret = 1;
success:
/* finalize the output */
*key = val_mdb2knot(key2_m);
*val = val_mdb2knot(val2_m);
return ret;
+failure:
+ if (ret == MDB_NOTFOUND) {
+ stats->read_leq_miss++;
+ } else {
+ txn_abort(env);
+ }
+ return lmdb_error(ret);
}
static double cdb_usage(knot_db_t *db)