From 5f81153b3c0fbb05cacddc544d9ff50b8e09939c Mon Sep 17 00:00:00 2001 From: =?utf8?q?Vladim=C3=ADr=20=C4=8Cun=C3=A1t?= Date: Mon, 17 Aug 2020 10:38:20 +0200 Subject: [PATCH] lib/cache: abort transactions on errors This apparently gets rid of MDB_BAD_TXN failures that we were getting when cache overflows. Unfortunately LMDB docs don't mention that after operation failures one should abort the corresponding transaction. --- lib/cache/cdb_lmdb.c | 73 +++++++++++++++++++++++++++++------------- lib/cache/entry_list.c | 3 +- 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/lib/cache/cdb_lmdb.c b/lib/cache/cdb_lmdb.c index 06920d7c1..796c3c420 100644 --- a/lib/cache/cdb_lmdb.c +++ b/lib/cache/cdb_lmdb.c @@ -62,12 +62,6 @@ struct libknot_lmdb_env { /** @brief Convert LMDB error code. */ static int lmdb_error(int error) { - /* _BAD_TXN may happen with overfull DB, - * even during mdb_get with a single fork :-/ */ - if (error == MDB_BAD_TXN) { - kr_log_info("[cache] MDB_BAD_TXN, probably overfull\n"); - error = ENOSPC; - } switch (error) { case MDB_SUCCESS: return kr_ok(); @@ -216,7 +210,7 @@ static int txn_curs_get(struct lmdb_env *env, MDB_cursor **curs, struct kr_cdb_s } else { ret = mdb_cursor_open(txn, env->dbi, &env->txn.ro_curs); } - if (ret) return ret; + if (ret) return lmdb_error(ret); env->txn.ro_curs_active = true; success: assert(env->txn.ro_curs_active && env->txn.ro && env->txn.ro_active @@ -238,14 +232,28 @@ static void txn_free_ro(struct lmdb_env *env) } } +/** Abort all transactions. + * + * This is useful after an error happens, as those (always?) require abortion. + * It's possible that _reset() would suffice and marking cursor inactive, + * but these errors should be rare so let's close them completely. */ +static void txn_abort(struct lmdb_env *env) +{ + txn_free_ro(env); + if (env->txn.rw) { + mdb_txn_abort(env->txn.rw); + env->txn.rw = NULL; /* the transaction got freed even in case of errors */ + } +} + /*! \brief Close the database. */ static void cdb_close_env(struct lmdb_env *env, struct kr_cdb_stats *stats) { assert(env && env->env); /* Get rid of any transactions. */ - cdb_commit(env, stats); txn_free_ro(env); + cdb_commit(env, stats); mdb_env_sync(env->env, 1); stats->close++; @@ -395,7 +403,12 @@ static int cdb_count(knot_db_t *db, struct kr_cdb_stats *stats) stats->count++; ret = mdb_stat(txn, env->dbi, &stat); - return (ret == MDB_SUCCESS) ? stat.ms_entries : lmdb_error(ret); + if (ret == MDB_SUCCESS) { + return stat.ms_entries; + } else { + txn_abort(env); + return lmdb_error(ret); + } } static int reopen_env(struct lmdb_env *env, struct kr_cdb_stats *stats) @@ -454,8 +467,8 @@ static int cdb_clear(knot_db_t *db, struct kr_cdb_stats *stats) * though it's best for them to reopen soon. */ /* We are about to switch to a different file, so end all txns, to be sure. */ - (void) cdb_commit(db, stats); txn_free_ro(db); + (void) cdb_commit(db, stats); const char *path = NULL; int ret = mdb_env_get_path(env->env, &path); @@ -518,8 +531,11 @@ static int cdb_readv(knot_db_t *db, struct kr_cdb_stats *stats, stats->read++; ret = mdb_get(txn, env->dbi, &_key, &_val); if (ret != MDB_SUCCESS) { - if (ret == MDB_NOTFOUND) + if (ret == MDB_NOTFOUND) { stats->read_miss++; + } else { + txn_abort(env); + } ret = lmdb_error(ret); if (ret == kr_error(ENOSPC)) { /* we're likely to be forced to cache clear anyway */ @@ -552,10 +568,10 @@ static int cdb_write(struct lmdb_env *env, MDB_txn **txn, const knot_db_val_t *k if (ret) { stats->write++; ret = mdb_put(*txn, env->dbi, &_key, &_val, flags); - } } if (ret != MDB_SUCCESS) { + txn_abort(env); return lmdb_error(ret); } @@ -606,6 +622,9 @@ static int cdb_remove(knot_db_t *db, struct kr_cdb_stats *stats, else if (ret == KNOT_ENOENT) { stats->remove_miss++; ret = kr_ok(); /* skip over non-existing entries */ + } else { + txn_abort(env); + break; } } @@ -626,6 +645,7 @@ static int cdb_match(knot_db_t *db, struct kr_cdb_stats *stats, MDB_cursor *cur = NULL; ret = mdb_cursor_open(txn, env->dbi, &cur); if (ret != 0) { + txn_abort(env); return lmdb_error(ret); } @@ -635,6 +655,9 @@ static int cdb_match(knot_db_t *db, struct kr_cdb_stats *stats, ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE); if (ret != MDB_SUCCESS) { mdb_cursor_close(cur); + if (ret != MDB_NOTFOUND) { + txn_abort(env); + } return lmdb_error(ret); } @@ -655,11 +678,14 @@ static int cdb_match(knot_db_t *db, struct kr_cdb_stats *stats, stats->match++; ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT); } - if (results == 0) - stats->match_miss++; - mdb_cursor_close(cur); + if (ret != MDB_SUCCESS && ret != MDB_NOTFOUND) { + txn_abort(env); + return lmdb_error(ret); + } else if (results == 0) { + stats->match_miss++; + } return results; } @@ -676,10 +702,7 @@ static int cdb_read_leq(knot_db_t *env, struct kr_cdb_stats *stats, MDB_val val2_m = { 0, NULL }; stats->read_leq++; ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_SET_RANGE); - if (ret) { - stats->read_leq_miss++; - return lmdb_error(ret); - } + if (ret) goto failure; /* test for equality //:unlikely */ if (key2_m.mv_size == key->len && memcmp(key2_m.mv_data, key->data, key->len) == 0) { @@ -691,16 +714,20 @@ static int cdb_read_leq(knot_db_t *env, struct kr_cdb_stats *stats, /* we must be greater than key; do one step to smaller */ stats->read_leq++; ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_PREV); - if (ret) { - stats->read_leq_miss++; - return lmdb_error(ret); - } + if (ret) goto failure; ret = 1; success: /* finalize the output */ *key = val_mdb2knot(key2_m); *val = val_mdb2knot(val2_m); return ret; +failure: + if (ret == MDB_NOTFOUND) { + stats->read_leq_miss++; + } else { + txn_abort(env); + } + return lmdb_error(ret); } static double cdb_usage(knot_db_t *db) diff --git a/lib/cache/entry_list.c b/lib/cache/entry_list.c index 61fdac710..de9079551 100644 --- a/lib/cache/entry_list.c +++ b/lib/cache/entry_list.c @@ -168,8 +168,7 @@ static int cache_write_or_clear(struct kr_cache *cache, const knot_db_val_t *key { int ret = cache_op(cache, write, key, val, 1); if (!ret) return kr_ok(); - /* Clear cache if overfull. It's nontrivial to do better with LMDB. - * LATER: some garbage-collection mechanism. */ + /* Clear cache if overfull. Using kres-cache-gc service should prevent this. */ if (ret == kr_error(ENOSPC)) { ret = kr_cache_clear(cache); const char *msg = "[cache] clearing because overfull, ret = %d\n"; -- 2.47.2