]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
lib/cache: abort transactions on errors
authorVladimír Čunát <vladimir.cunat@nic.cz>
Mon, 17 Aug 2020 08:38:20 +0000 (10:38 +0200)
committerPetr Špaček <petr.spacek@nic.cz>
Mon, 7 Sep 2020 15:45:57 +0000 (17:45 +0200)
This apparently gets rid of MDB_BAD_TXN failures that we were getting
when cache overflows.  Unfortunately LMDB docs don't mention that
after operation failures one should abort the corresponding transaction.

lib/cache/cdb_lmdb.c
lib/cache/entry_list.c

index 06920d7c194daace834dc2751e1db205250b18a0..796c3c4204efee5047c1aff300d020d916b8b131 100644 (file)
@@ -62,12 +62,6 @@ struct libknot_lmdb_env {
 /** @brief Convert LMDB error code. */
 static int lmdb_error(int error)
 {
-       /* _BAD_TXN may happen with overfull DB,
-        * even during mdb_get with a single fork :-/ */
-       if (error == MDB_BAD_TXN) {
-               kr_log_info("[cache] MDB_BAD_TXN, probably overfull\n");
-               error = ENOSPC;
-       }
        switch (error) {
        case MDB_SUCCESS:
                return kr_ok();
@@ -216,7 +210,7 @@ static int txn_curs_get(struct lmdb_env *env, MDB_cursor **curs, struct kr_cdb_s
        } else {
                ret = mdb_cursor_open(txn, env->dbi, &env->txn.ro_curs);
        }
-       if (ret) return ret;
+       if (ret) return lmdb_error(ret);
        env->txn.ro_curs_active = true;
 success:
        assert(env->txn.ro_curs_active && env->txn.ro && env->txn.ro_active
@@ -238,14 +232,28 @@ static void txn_free_ro(struct lmdb_env *env)
        }
 }
 
+/** Abort all transactions.
+ *
+ * This is useful after an error happens, as those (always?) require abortion.
+ * It's possible that _reset() would suffice and marking cursor inactive,
+ * but these errors should be rare so let's close them completely. */
+static void txn_abort(struct lmdb_env *env)
+{
+       txn_free_ro(env);
+       if (env->txn.rw) {
+               mdb_txn_abort(env->txn.rw);
+               env->txn.rw = NULL; /* the transaction got freed even in case of errors */
+       }
+}
+
 /*! \brief Close the database. */
 static void cdb_close_env(struct lmdb_env *env, struct kr_cdb_stats *stats)
 {
        assert(env && env->env);
 
        /* Get rid of any transactions. */
-       cdb_commit(env, stats);
        txn_free_ro(env);
+       cdb_commit(env, stats);
 
        mdb_env_sync(env->env, 1);
        stats->close++;
@@ -395,7 +403,12 @@ static int cdb_count(knot_db_t *db, struct kr_cdb_stats *stats)
        stats->count++;
        ret = mdb_stat(txn, env->dbi, &stat);
 
-       return (ret == MDB_SUCCESS) ? stat.ms_entries : lmdb_error(ret);
+       if (ret == MDB_SUCCESS) {
+               return stat.ms_entries;
+       } else {
+               txn_abort(env);
+               return lmdb_error(ret);
+       }
 }
 
 static int reopen_env(struct lmdb_env *env, struct kr_cdb_stats *stats)
@@ -454,8 +467,8 @@ static int cdb_clear(knot_db_t *db, struct kr_cdb_stats *stats)
         * though it's best for them to reopen soon. */
 
        /* We are about to switch to a different file, so end all txns, to be sure. */
-       (void) cdb_commit(db, stats);
        txn_free_ro(db);
+       (void) cdb_commit(db, stats);
 
        const char *path = NULL;
        int ret = mdb_env_get_path(env->env, &path);
@@ -518,8 +531,11 @@ static int cdb_readv(knot_db_t *db, struct kr_cdb_stats *stats,
                stats->read++;
                ret = mdb_get(txn, env->dbi, &_key, &_val);
                if (ret != MDB_SUCCESS) {
-                       if (ret == MDB_NOTFOUND)
+                       if (ret == MDB_NOTFOUND) {
                                stats->read_miss++;
+                       } else {
+                               txn_abort(env);
+                       }
                        ret = lmdb_error(ret);
                        if (ret == kr_error(ENOSPC)) {
                                /* we're likely to be forced to cache clear anyway */
@@ -552,10 +568,10 @@ static int cdb_write(struct lmdb_env *env, MDB_txn **txn, const knot_db_val_t *k
                if (ret) {
                        stats->write++;
                        ret = mdb_put(*txn, env->dbi, &_key, &_val, flags);
-
                }
        }
        if (ret != MDB_SUCCESS) {
+               txn_abort(env);
                return lmdb_error(ret);
        }
 
@@ -606,6 +622,9 @@ static int cdb_remove(knot_db_t *db, struct kr_cdb_stats *stats,
                else if (ret == KNOT_ENOENT) {
                        stats->remove_miss++;
                        ret = kr_ok();  /* skip over non-existing entries */
+               } else {
+                       txn_abort(env);
+                       break;
                }
        }
 
@@ -626,6 +645,7 @@ static int cdb_match(knot_db_t *db, struct kr_cdb_stats *stats,
        MDB_cursor *cur = NULL;
        ret = mdb_cursor_open(txn, env->dbi, &cur);
        if (ret != 0) {
+               txn_abort(env);
                return lmdb_error(ret);
        }
 
@@ -635,6 +655,9 @@ static int cdb_match(knot_db_t *db, struct kr_cdb_stats *stats,
        ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE);
        if (ret != MDB_SUCCESS) {
                mdb_cursor_close(cur);
+               if (ret != MDB_NOTFOUND) {
+                       txn_abort(env);
+               }
                return lmdb_error(ret);
        }
 
@@ -655,11 +678,14 @@ static int cdb_match(knot_db_t *db, struct kr_cdb_stats *stats,
                stats->match++;
                ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT);
        }
-       if (results == 0)
-               stats->match_miss++;
-
 
        mdb_cursor_close(cur);
+       if (ret != MDB_SUCCESS && ret != MDB_NOTFOUND) {
+               txn_abort(env);
+               return lmdb_error(ret);
+       } else if (results == 0) {
+               stats->match_miss++;
+       }
        return results;
 }
 
@@ -676,10 +702,7 @@ static int cdb_read_leq(knot_db_t *env, struct kr_cdb_stats *stats,
        MDB_val val2_m = { 0, NULL };
        stats->read_leq++;
        ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_SET_RANGE);
-       if (ret) {
-               stats->read_leq_miss++;
-               return lmdb_error(ret);
-       }
+       if (ret) goto failure;
        /* test for equality //:unlikely */
        if (key2_m.mv_size == key->len
            && memcmp(key2_m.mv_data, key->data, key->len) == 0) {
@@ -691,16 +714,20 @@ static int cdb_read_leq(knot_db_t *env, struct kr_cdb_stats *stats,
        /* we must be greater than key; do one step to smaller */
        stats->read_leq++;
        ret = mdb_cursor_get(curs, &key2_m, &val2_m, MDB_PREV);
-       if (ret) {
-               stats->read_leq_miss++;
-               return lmdb_error(ret);
-       }
+       if (ret) goto failure;
        ret = 1;
 success:
        /* finalize the output */
        *key = val_mdb2knot(key2_m);
        *val = val_mdb2knot(val2_m);
        return ret;
+failure:
+       if (ret == MDB_NOTFOUND) {
+               stats->read_leq_miss++;
+       } else {
+               txn_abort(env);
+       }
+       return lmdb_error(ret);
 }
 
 static double cdb_usage(knot_db_t *db)
index 61fdac710e5f443b6025868eb4e41596821d3fc4..de90795518849a7d6afb78bc1e514e0f0058a6db 100644 (file)
@@ -168,8 +168,7 @@ static int cache_write_or_clear(struct kr_cache *cache, const knot_db_val_t *key
 {
        int ret = cache_op(cache, write, key, val, 1);
        if (!ret) return kr_ok();
-       /* Clear cache if overfull.  It's nontrivial to do better with LMDB.
-        * LATER: some garbage-collection mechanism. */
+       /* Clear cache if overfull.  Using kres-cache-gc service should prevent this. */
        if (ret == kr_error(ENOSPC)) {
                ret = kr_cache_clear(cache);
                const char *msg = "[cache] clearing because overfull, ret = %d\n";