From: Vladimír Čunát Date: Fri, 1 Sep 2017 13:33:30 +0000 (+0200) Subject: cache: rework reusing transactions (LMDB) X-Git-Tag: v1.4.0~9^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e2621d928e4e9a61fac488d0278bb593cfdfb656;p=thirdparty%2Fknot-resolver.git cache: rework reusing transactions (LMDB) Previously a read transaction could be held open by each fork indefinitely. That was done for better speed, but it had a downside of keeping old pages alive and potentially reading only old data, until some writes were attempted by that fork. Now kr_cache_ provides explicit API for suitable points where to break transactions, reusing the _sync command. On LMDB side the read-only transaction is only reset and later renewed, supposedly giving better performance than aborting (see LMDB docs on reset+renew). Performance: preliminary testing with two forks, resperf on comcast query-set shows no noticeable difference in peak QPS. --- diff --git a/daemon/bindings.c b/daemon/bindings.c index 6eb95e990..ebcd799fa 100644 --- a/daemon/bindings.c +++ b/daemon/bindings.c @@ -726,7 +726,9 @@ static int cache_prefixed(struct kr_cache *cache, const char *args, knot_db_val_ } /* Start prefix search */ - return kr_cache_match(cache, namespace, buf, results, maxresults); + int ret = kr_cache_match(cache, namespace, buf, results, maxresults); + kr_cache_sync(cache); + return ret; } /** @internal Delete iterated key. */ @@ -752,6 +754,7 @@ static int cache_remove_prefix(struct kr_cache *cache, const char *args) result_set[i].data = dst; } cache->api->remove(cache->db, result_set, ret); + kr_cache_sync(cache); /* Free keys */ for (int i = 0; i < ret; ++i) { free(result_set[i].data); diff --git a/lib/cache.c b/lib/cache.c index 3ab1e66a1..0cda5e719 100644 --- a/lib/cache.c +++ b/lib/cache.c @@ -109,11 +109,15 @@ void kr_cache_close(struct kr_cache *cache) } } -void kr_cache_sync(struct kr_cache *cache) +int kr_cache_sync(struct kr_cache *cache) { - if (cache_isvalid(cache) && cache->api->sync) { - cache_op(cache, sync); + if (!cache_isvalid(cache)) { + return kr_error(EINVAL); } + if (cache->api->sync) { + return cache_op(cache, sync); + } + return kr_ok(); } /** @@ -237,7 +241,6 @@ int kr_cache_insert(struct kr_cache *cache, uint8_t tag, const knot_dname_t *nam return ret; } entry_write(entry.data, header, data); - ret = cache_op(cache, sync); /* Make sure the entry is comitted. */ } else { /* Other backends must prepare contiguous data first */ auto_free char *buffer = malloc(entry.len); diff --git a/lib/cache.h b/lib/cache.h index 75d97bd35..8552575af 100644 --- a/lib/cache.h +++ b/lib/cache.h @@ -98,12 +98,9 @@ int kr_cache_open(struct kr_cache *cache, const struct kr_cdb_api *api, struct k KR_EXPORT void kr_cache_close(struct kr_cache *cache); -/** - * Synchronise cache with the backing store. - * @param cache structure - */ +/** Run after a row of operations to release transaction/lock if needed. */ KR_EXPORT -void kr_cache_sync(struct kr_cache *cache); +int kr_cache_sync(struct kr_cache *cache); /** * Return true if cache is open and enabled. diff --git a/lib/cdb.h b/lib/cdb.h index d48ee8b23..daf173109 100644 --- a/lib/cdb.h +++ b/lib/cdb.h @@ -37,6 +37,8 @@ struct kr_cdb_api { void (*close)(knot_db_t *db); int (*count)(knot_db_t *db); int (*clear)(knot_db_t *db); + + /** Run after a row of operations to release transaction/lock if needed. */ int (*sync)(knot_db_t *db); /* Data access */ diff --git a/lib/cdb_lmdb.c b/lib/cdb_lmdb.c index 1161872d3..550d6e48c 100644 --- a/lib/cdb_lmdb.c +++ b/lib/cdb_lmdb.c @@ -37,8 +37,17 @@ struct lmdb_env size_t mapsize; MDB_dbi dbi; MDB_env *env; - MDB_txn *rdtxn; - MDB_txn *wrtxn; + + /** Cached transactions + * + * - only one of (ro,rw) may be active at once + * - non-NULL .ro may be active or reset + * - non-NULL .rw is always active + */ + struct { + bool ro_active; + MDB_txn *ro, *rw; + } txn; }; /** @brief Convert LMDB error code. */ @@ -52,7 +61,8 @@ static int lmdb_error(int error) case ENOSPC: return kr_error(ENOSPC); default: - kr_log_info("[cache] LMDB error: %s\n", mdb_strerror(error)); + kr_log_error("[cache] LMDB error: %s\n", mdb_strerror(error)); + assert(false); return -abs(error); } } @@ -77,53 +87,64 @@ static int set_mapsize(MDB_env *env, size_t map_size) return 0; } -static int txn_begin(struct lmdb_env *env, MDB_txn **txn, bool rdonly) +/** Obtain a transaction. (they're cached in env->txn) */ +static int txn_get(struct lmdb_env *env, MDB_txn **txn, bool rdonly) { - /* Always barrier for write transaction. */ assert(env && txn); - if (env->wrtxn) { - mdb_txn_abort(env->wrtxn); - env->wrtxn = NULL; - } - /* Renew pending read-only transaction - * or abort it to clear reader slot before writing. */ - if (env->rdtxn) { - if (rdonly) { - *txn = env->rdtxn; - env->rdtxn = NULL; - return 0; - } else { - mdb_txn_abort(env->rdtxn); - env->rdtxn = NULL; + if (env->txn.rw) { + /* Reuse the *open* RW txn even if only reading is requested. + * We leave the management of this to the cdb_sync command. + * The user may e.g. want to do some reads between the writes. */ + *txn = env->txn.rw; + return kr_ok(); + } + + if (!rdonly) { + /* avoid two active transactions */ + if (env->txn.ro && env->txn.ro_active) { + mdb_txn_reset(env->txn.ro); + env->txn.ro_active = false; + } + int ret = mdb_txn_begin(env->env, NULL, 0/*RW*/, &env->txn.rw); + if (ret == MDB_SUCCESS) { + *txn = env->txn.rw; + assert(*txn); } + return lmdb_error(ret); } - unsigned flags = rdonly ? MDB_RDONLY : 0; - return lmdb_error(mdb_txn_begin(env->env, NULL, flags, txn)); -} -static int txn_end(struct lmdb_env *env, MDB_txn *txn) -{ - assert(env && txn); - /* Cache read transactions */ - if (!env->rdtxn) { - env->rdtxn = txn; - } else { - mdb_txn_abort(txn); + /* Get an active RO txn and return it. */ + if (!env->txn.ro) { //:unlikely + int ret = mdb_txn_begin(env->env, NULL, MDB_RDONLY, &env->txn.ro); + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } + } else if (!env->txn.ro_active) { + int ret = mdb_txn_renew(env->txn.ro); + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } } - return 0; + env->txn.ro_active = true; + *txn = env->txn.ro; + assert(*txn); + return kr_ok(); } static int cdb_sync(knot_db_t *db) { struct lmdb_env *env = db; - int ret = 0; - if (env->wrtxn) { - ret = lmdb_error(mdb_txn_commit(env->wrtxn)); - env->wrtxn = NULL; /* In-flight transaction is committed. */ - } - if (env->rdtxn) { - mdb_txn_abort(env->rdtxn); - env->rdtxn = NULL; + int ret = kr_ok(); + if (env->txn.rw) { + ret = mdb_txn_commit(env->txn.rw); + if (ret != MDB_BAD_TXN) { + /* _BAD_TXN happens during overfull clear with multiple forks :-/ */ + ret = lmdb_error(ret); + } + env->txn.rw = NULL; /* the transaction got freed even in case of errors */ + } else if (env->txn.ro && env->txn.ro_active) { + mdb_txn_reset(env->txn.ro); + env->txn.ro_active = false; } return ret; } @@ -132,7 +153,14 @@ static int cdb_sync(knot_db_t *db) static void cdb_close_env(struct lmdb_env *env) { assert(env && env->env); + + /* Get rid of any transactions. */ cdb_sync(env); + if (env->txn.ro) { + mdb_txn_abort(env->txn.ro); + env->txn.ro = NULL; + } + mdb_env_sync(env->env, 1); mdb_dbi_close(env->env, env->dbi); mdb_env_close(env->env); @@ -250,7 +278,7 @@ static int cdb_count(knot_db_t *db) { struct lmdb_env *env = db; MDB_txn *txn = NULL; - int ret = txn_begin(env, &txn, true); + int ret = txn_get(env, &txn, true); if (ret != 0) { return ret; } @@ -258,8 +286,6 @@ static int cdb_count(knot_db_t *db) MDB_stat stat; ret = mdb_stat(txn, env->dbi, &stat); - /* Always abort, serves as a checkpoint for in-flight transaction. */ - mdb_txn_abort(txn); return (ret == MDB_SUCCESS) ? stat.ms_entries : lmdb_error(ret); } @@ -351,8 +377,8 @@ static int cdb_readv(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int { struct lmdb_env *env = db; MDB_txn *txn = NULL; - int ret = txn_begin(env, &txn, true); - if (ret != 0) { + int ret = txn_get(env, &txn, true); + if (ret) { return ret; } @@ -361,41 +387,50 @@ static int cdb_readv(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int MDB_val _key = { .mv_size = key[i].len, .mv_data = key[i].data }; MDB_val _val = { .mv_size = val[i].len, .mv_data = val[i].data }; ret = mdb_get(txn, env->dbi, &_key, &_val); + if (ret != MDB_SUCCESS) { + return lmdb_error(ret); + } /* Update the result. */ val[i].data = _val.mv_data; val[i].len = _val.mv_size; } - - txn_end(env, txn); - return lmdb_error(ret); + return kr_ok(); } -static int cdb_write(struct lmdb_env *env, MDB_txn *txn, knot_db_val_t *key, knot_db_val_t *val, unsigned flags) +static int cdb_write(struct lmdb_env *env, MDB_txn **txn, knot_db_val_t *key, knot_db_val_t *val, unsigned flags) { /* Convert key structs and write */ MDB_val _key = { key->len, key->data }; MDB_val _val = { val->len, val->data }; - int ret = mdb_put(txn, env->dbi, &_key, &_val, flags); + int ret = mdb_put(*txn, env->dbi, &_key, &_val, flags); + + /* Try to recover from doing too much writing in a single transaction. */ + if (ret == MDB_TXN_FULL) { + ret = cdb_sync(env); + if (ret) { + ret = txn_get(env, txn, false); + } + if (ret) { + ret = mdb_put(*txn, env->dbi, &_key, &_val, flags); + } + } if (ret != MDB_SUCCESS) { return lmdb_error(ret); } + /* Update the result. */ val->data = _val.mv_data; val->len = _val.mv_size; - return 0; + return kr_ok(); } static int cdb_writev(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int maxcount) { struct lmdb_env *env = db; MDB_txn *txn = NULL; - int ret = txn_begin(env, &txn, false); - if (ret != 0) { - return ret; - } + int ret = txn_get(env, &txn, false); - bool reserved = false; - for (int i = 0; i < maxcount; ++i) { + for (int i = 0; ret == kr_ok() && i < maxcount; ++i) { /* This is LMDB specific optimisation, * if caller specifies value with NULL data and non-zero length, * LMDB will preallocate the entry for caller and leave write @@ -404,22 +439,10 @@ static int cdb_writev(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int unsigned mdb_flags = 0; if (val[i].len > 0 && val[i].data == NULL) { mdb_flags |= MDB_RESERVE; - reserved = true; - } - ret = cdb_write(env, txn, &key[i], &val[i], mdb_flags); - if (ret != 0) { - mdb_txn_abort(txn); - return ret; } + ret = cdb_write(env, &txn, &key[i], &val[i], mdb_flags); } - /* Leave transaction open if reserved. */ - if (reserved) { - assert(env->wrtxn == NULL); - env->wrtxn = txn; - } else { - ret = lmdb_error(mdb_txn_commit(txn)); - } return ret; } @@ -427,29 +450,22 @@ static int cdb_remove(knot_db_t *db, knot_db_val_t *key, int maxcount) { struct lmdb_env *env = db; MDB_txn *txn = NULL; - int ret = txn_begin(env, &txn, false); - if (ret != 0) { - return ret; - } + int ret = txn_get(env, &txn, false); - for (int i = 0; i < maxcount; ++i) { + for (int i = 0; ret == kr_ok() && i < maxcount; ++i) { MDB_val _key = { key[i].len, key[i].data }; MDB_val val = { 0, NULL }; - ret = mdb_del(txn, env->dbi, &_key, &val); - if (ret != 0) { - mdb_txn_abort(txn); - return lmdb_error(ret); - } + ret = lmdb_error(mdb_del(txn, env->dbi, &_key, &val)); } - return lmdb_error(mdb_txn_commit(txn)); + return ret; } static int cdb_match(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int maxcount) { struct lmdb_env *env = db; MDB_txn *txn = NULL; - int ret = txn_begin(env, &txn, true); + int ret = txn_get(env, &txn, true); if (ret != 0) { return ret; } @@ -463,20 +479,18 @@ static int cdb_match(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int MDB_cursor *cur = NULL; ret = mdb_cursor_open(txn, env->dbi, &cur); if (ret != 0) { - mdb_txn_abort(txn); return lmdb_error(ret); } MDB_val cur_key = { key->len, key->data }, cur_val = { 0, NULL }; ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_SET_RANGE); - if (ret != 0) { + if (ret != MDB_SUCCESS) { mdb_cursor_close(cur); - mdb_txn_abort(txn); return lmdb_error(ret); } int results = 0; - while (ret == 0) { + while (ret == MDB_SUCCESS) { /* Retrieve current key and compare with prefix */ if (cur_key.mv_size < key->len || memcmp(cur_key.mv_data, key->data, key->len) != 0) { break; @@ -493,7 +507,6 @@ static int cdb_match(knot_db_t *db, knot_db_val_t *key, knot_db_val_t *val, int } mdb_cursor_close(cur); - txn_end(env, txn); return results; } @@ -506,7 +519,7 @@ static int cdb_prune(knot_db_t *db, int limit) /* Prune old records */ struct lmdb_env *env = db; MDB_txn *txn = NULL; - int ret = txn_begin(env, &txn, false); + int ret = txn_get(env, &txn, false); if (ret != 0) { return ret; } @@ -514,7 +527,6 @@ static int cdb_prune(knot_db_t *db, int limit) MDB_cursor *cur = NULL; ret = mdb_cursor_open(txn, env->dbi, &cur); if (ret != 0) { - mdb_txn_abort(txn); return lmdb_error(ret); } @@ -522,7 +534,6 @@ static int cdb_prune(knot_db_t *db, int limit) ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_FIRST); if (ret != 0) { mdb_cursor_close(cur); - mdb_txn_abort(txn); return lmdb_error(ret); } @@ -548,7 +559,6 @@ static int cdb_prune(knot_db_t *db, int limit) ret = mdb_cursor_get(cur, &cur_key, &cur_val, MDB_NEXT); } mdb_cursor_close(cur); - ret = lmdb_error(mdb_txn_commit(txn)); return ret < 0 ? ret : results; } diff --git a/lib/layer/pktcache.c b/lib/layer/pktcache.c index c18a40c79..702d299ee 100644 --- a/lib/layer/pktcache.c +++ b/lib/layer/pktcache.c @@ -159,6 +159,7 @@ static int pktcache_peek(kr_layer_t *ctx, knot_pkt_t *pkt) /* Fetch either answer to original or minimized query */ uint8_t flags = 0; int ret = loot_pktcache(req->ctx, pkt, req, &flags); + kr_cache_sync(&req->ctx->cache); if (ret == 0) { qry->flags.CACHED = true; qry->flags.NO_MINIMIZE = true; @@ -296,11 +297,13 @@ static int pktcache_stash(kr_layer_t *ctx, knot_pkt_t *pkt) } /* Stash answer in the cache */ - int ret = kr_cache_insert(cache, KR_CACHE_PKT, qname, qtype, &header, data); - if (ret == 0) { + int ret1 = kr_cache_insert(cache, KR_CACHE_PKT, qname, qtype, &header, data); + int ret2 = kr_cache_sync(cache); + if (!ret1 && !ret2) { VERBOSE_MSG(qry, "=> answer cached for TTL=%u\n", ttl); + } else { + VERBOSE_MSG(qry, "=> stashing failed; codes: %d and %d\n", ret1, ret2); } - kr_cache_sync(cache); return ctx->state; } diff --git a/lib/layer/rrcache.c b/lib/layer/rrcache.c index 1800eb3b8..fc7adc481 100644 --- a/lib/layer/rrcache.c +++ b/lib/layer/rrcache.c @@ -252,6 +252,7 @@ static int rrcache_peek(kr_layer_t *ctx, knot_pkt_t *pkt) } } } + kr_cache_sync(&req->ctx->cache); if (ret == 0) { VERBOSE_MSG(qry, "=> satisfied from cache\n"); qry->flags.CACHED = true; @@ -451,14 +452,21 @@ static int rrcache_stash(kr_layer_t *ctx, knot_pkt_t *pkt) /* Open write transaction */ struct kr_cache *cache = &req->ctx->cache; ret = stash_commit(&stash, qry, cache, req); + if (ret == 0) { + ret = kr_cache_sync(cache); + } else { + kr_cache_sync(cache); + } /* Clear if full */ if (ret == kr_error(ENOSPC)) { + kr_log_info("[cache] clearing because overfull\n"); ret = kr_cache_clear(cache); if (ret != 0 && ret != kr_error(EEXIST)) { - kr_log_error("[ rc ] failed to clear cache: %s\n", kr_strerror(ret)); + kr_log_error("[cache] failed to clear cache: %s\n", kr_strerror(ret)); } + } else if (ret) { + VERBOSE_MSG(qry, "=> stashing failed: %d\n", ret); } - kr_cache_sync(cache); } return ctx->state; } diff --git a/lib/resolve.c b/lib/resolve.c index fabcdd007..8a85f8453 100644 --- a/lib/resolve.c +++ b/lib/resolve.c @@ -188,11 +188,12 @@ static void check_empty_nonterms(struct kr_query *qry, knot_pkt_t *pkt, struct k /* @todo We could stop resolution here for NXDOMAIN, but we can't because of broken CDNs */ qry->flags.NO_MINIMIZE = true; kr_make_query(qry, pkt); - return; + break; } assert(target[0]); target = knot_wire_next_label(target, NULL); } + kr_cache_sync(cache); } static int ns_fetch_cut(struct kr_query *qry, const knot_dname_t *requested_name, diff --git a/lib/zonecut.c b/lib/zonecut.c index f1ce2c755..8f51b0f0d 100644 --- a/lib/zonecut.c +++ b/lib/zonecut.c @@ -458,6 +458,7 @@ int kr_zonecut_find_cached(struct kr_context *ctx, struct kr_zonecut *cut, const } update_cut_name(cut, label); mm_free(cut->pool, qname); + kr_cache_sync(&ctx->cache); return kr_ok(); } /* Subtract label from QNAME. */ @@ -467,6 +468,7 @@ int kr_zonecut_find_cached(struct kr_context *ctx, struct kr_zonecut *cut, const break; } } + kr_cache_sync(&ctx->cache); mm_free(cut->pool, qname); return kr_error(ENOENT); } diff --git a/tests/test_cache.c b/tests/test_cache.c index ea462f570..bd2060427 100644 --- a/tests/test_cache.c +++ b/tests/test_cache.c @@ -166,6 +166,7 @@ static void test_fake_invalid (void **state) ret = kr_cache_peek(cache, KR_CACHE_USER, dname, KNOT_RRTYPE_TSIG, &entry, 0); cache->api = api_saved; assert_int_not_equal(ret, 0); + kr_cache_sync(cache); } static void test_fake_insert(void **state) @@ -184,6 +185,7 @@ static void test_fake_insert(void **state) KNOT_RRTYPE_TSIG, &global_fake_ce, global_namedb_data); assert_int_equal(ret_cache_ins_ok, 0); assert_int_equal(ret_cache_ins_inval, KNOT_EINVAL); + kr_cache_sync(cache); } /* Test invalid parameters and some api failures. */ @@ -217,6 +219,7 @@ static void test_invalid(void **state) assert_int_not_equal(kr_cache_remove(cache, KR_CACHE_RR, NULL, 0), 0); assert_int_not_equal(kr_cache_remove(NULL, 0, NULL, 0), 0); assert_int_not_equal(kr_cache_clear(NULL), 0); + kr_cache_sync(cache); } /* Test cache write */ @@ -226,6 +229,7 @@ static void test_insert_rr(void **state) struct kr_cache *cache = (*state); int ret = kr_cache_insert_rr(cache, &global_rr, 0, 0, CACHE_TIME); assert_int_equal(ret, 0); + kr_cache_sync(cache); } static void test_materialize(void **state) @@ -276,6 +280,7 @@ static void test_query(void **state) assert_int_equal(query_ret, 0); assert_true(rr_equal); } + kr_cache_sync(cache); } /* Test cache read (simulate aged entry) */ @@ -290,6 +295,7 @@ static void test_query_aged(void **state) struct kr_cache *cache = (*state); int ret = kr_cache_peek_rr(cache, &cache_rr, &rank, &flags, ×tamp); assert_int_equal(ret, kr_error(ESTALE)); + kr_cache_sync(cache); } /* Test cache removal */ @@ -306,6 +312,7 @@ static void test_remove(void **state) assert_int_equal(ret, 0); ret = kr_cache_peek_rr(cache, &cache_rr, &rank, &flags, ×tamp); assert_int_equal(ret, KNOT_ENOENT); + kr_cache_sync(cache); } /* Test cache fill */ @@ -322,10 +329,15 @@ static void test_fill(void **state) if (ret != 0) { break; } + ret = kr_cache_sync(cache); + if (ret != 0) { + break; + } } /* Expect we run out of space */ assert_int_equal(ret, kr_error(ENOSPC)); + kr_cache_sync(cache); } /* Test cache clear */