mm_ctx_t *pool;
};
+struct kr_txn
+{
+ MDB_dbi dbi;
+ MDB_txn *txn;
+ MDB_txn *parent;
+ mm_ctx_t *mm;
+};
+
/* MDB access */
static int dbase_open(struct kr_cache *cache, const char *handle)
return ret;
}
- DEBUG_MSG("opened '%s'\n", handle);
-
+ DEBUG_MSG("OPEN '%s'\n", handle);
return 0;
}
{
mdb_close(cache->env, cache->dbi);
mdb_env_close(cache->env);
+ DEBUG_MSG("CLOSE\n");
}
/* data access */
-static MDB_cursor *cursor_acquire(struct kr_cache *cache, unsigned flags)
+static MDB_cursor *cursor_acquire(struct kr_txn *txn)
{
- MDB_txn *txn = NULL;
MDB_cursor *cursor = NULL;
- int ret = mdb_txn_begin(cache->env, NULL, flags, &txn);
- if (ret != 0) {
- return NULL;
- }
- ret = mdb_cursor_open(txn, cache->dbi, &cursor);
+ int ret = mdb_cursor_open(txn->txn, txn->dbi, &cursor);
if (ret != 0) {
- mdb_txn_abort(txn);
return NULL;
}
- DEBUG_MSG("cursor acquire\n");
-
return cursor;
}
-static int cursor_release(MDB_cursor *cursor, bool commit)
+static void cursor_release(MDB_cursor *cursor)
{
- MDB_txn *txn = mdb_cursor_txn(cursor);
mdb_cursor_close(cursor);
-
- int ret = 0;
- if (commit) {
- DEBUG_MSG("cursor release / commit\n");
- ret = mdb_txn_commit(txn);
- } else {
- DEBUG_MSG("cursor release\n");
- mdb_txn_abort(txn);
- }
-
- return ret;
}
/* data serialization */
return key;
}
+static int del_entry(MDB_cursor *cur)
+{
+ /* Remember duplicate data count. */
+ size_t rr_count = 0;
+ mdb_cursor_count(cur, &rr_count);
+
+ /* Remove key if last entry. */
+ int ret = 0;
+ if (rr_count == 1) {
+ ret = mdb_cursor_del(cur, MDB_NODUPDATA);
+ } else {
+ ret = mdb_cursor_del(cur, 0);
+ }
+
+ return ret;
+}
+
static int pack_entry(MDB_cursor *cur, const knot_dname_t *name, uint16_t type,
const knot_rdata_t *rdata, uint32_t expire)
{
MDB_val key = pack_key(name);
MDB_val data = { datalen, buf };
- return mdb_cursor_put(cur, &key, &data, 0);
+ return mdb_cursor_put(cur, &key, &data, MDB_APPENDDUP);
}
static int pack_list(MDB_cursor *cur, const knot_rrset_t *rr)
#ifndef NDEBUG
char *owner = knot_dname_to_str(rr->owner);
- DEBUG_MSG("packed RR '%s' TYPE %u, %u RDATA RC=%s\n", owner, rr->type, rr->rrs.rr_count, mdb_strerror(ret));
+ DEBUG_MSG("STORE RR '%s' TYPE %u, %u RDATA => %s\n", owner, rr->type, rr->rrs.rr_count, mdb_strerror(ret));
free(owner);
#endif
/* Check if TTL expired (with negative grace period). */
if (knot_rdata_ttl(rd) <= now + KR_TTL_GRACE) {
- mdb_cursor_del(cur, 0);
- return 0;
+ return del_entry(cur);
}
return knot_rdataset_add(&rr->rrs, rd, mm);
static int unpack_list(MDB_cursor *cur, knot_rrset_t *rr, mm_ctx_t *mm)
{
- int ret = 0;
uint32_t now = time(NULL);
MDB_val key = pack_key(rr->owner);
- MDB_val data;
+ MDB_val data = { 0, NULL };
+
+ /* Fetch first entry. */
+ int ret = mdb_cursor_get(cur, &key, &data, MDB_SET_KEY);
- while ((ret = mdb_cursor_get(cur, &key, &data, MDB_NEXT_DUP)) == 0) {
+ /* Unpack, and find chained duplicates. */
+ while (ret == 0) {
ret = unpack_entry(cur, rr, &data, now, mm);
if (ret != 0) {
return ret;
}
+
+ ret = mdb_cursor_get(cur, &key, &data, MDB_NEXT_DUP);
}
#ifndef NDEBUG
char *owner = knot_dname_to_str(rr->owner);
- DEBUG_MSG("unpacked RR '%s' TYPE %u, %u RDATA\n", owner, rr->type, rr->rrs.rr_count);
+ DEBUG_MSG("LOAD RR '%s' TYPE %u => %u RECORDS\n", owner, rr->type, rr->rrs.rr_count);
free(owner);
#endif
mm_free(cache->pool, cache);
}
-int kr_cache_query(struct kr_cache *cache, knot_rrset_t *rr, mm_ctx_t *mm)
+struct kr_txn *kr_cache_txn_begin(struct kr_cache *cache, struct kr_txn *parent, mm_ctx_t *mm)
+{
+ struct kr_txn *txn = mm_alloc(mm, sizeof(struct kr_txn));
+ if (txn == NULL) {
+ return NULL;
+ }
+ memset(txn, 0, sizeof(struct kr_txn));
+
+ txn->dbi = cache->dbi;
+ txn->mm = mm;
+ if (parent) {
+ txn->parent = parent->txn;
+ }
+
+ int ret = mdb_txn_begin(cache->env, txn->parent, 0, &txn->txn);
+ if (ret != 0) {
+ mm_free(mm, txn);
+ return NULL;
+ }
+
+#ifndef NDEBUG
+ MDB_stat stat;
+ mdb_stat(txn->txn, txn->dbi, &stat);
+ DEBUG_MSG("TX_BEGIN, %zu entries\n", stat.ms_entries);
+#endif
+
+ return txn;
+}
+
+int kr_cache_txn_commit(struct kr_txn *txn)
{
- MDB_cursor *cursor = cursor_acquire(cache, MDB_RDONLY);
+
+#ifndef NDEBUG
+ MDB_stat stat;
+ mdb_stat(txn->txn, txn->dbi, &stat);
+ DEBUG_MSG("TX_COMMIT, %zu entries\n", stat.ms_entries);
+#endif
+
+ int ret = mdb_txn_commit(txn->txn);
+ mm_free(txn->mm, txn);
+ return ret;
+}
+
+void kr_cache_txn_abort(struct kr_txn *txn)
+{
+
+#ifndef NDEBUG
+ MDB_stat stat;
+ mdb_stat(txn->txn, txn->dbi, &stat);
+ DEBUG_MSG("TX_ABORT, %zu entries\n", stat.ms_entries);
+#endif
+
+ mdb_txn_abort(txn->txn);
+ mm_free(txn->mm, txn);
+}
+
+int kr_cache_query(struct kr_txn *txn, knot_rrset_t *rr)
+{
+ MDB_cursor *cursor = cursor_acquire(txn);
if (cursor == NULL) {
return -1;
}
- int ret = unpack_list(cursor, rr, mm);
+ int ret = unpack_list(cursor, rr, txn->mm);
if (knot_rrset_empty(rr)) {
ret = -1;
}
- cursor_release(cursor, false);
+ cursor_release(cursor);
return ret;
}
-int kr_cache_insert(struct kr_cache *cache, const knot_rrset_t *rr, unsigned flags)
+int kr_cache_insert(struct kr_txn *txn, const knot_rrset_t *rr, unsigned flags)
{
- MDB_cursor *cursor = cursor_acquire(cache, 0);
+ MDB_cursor *cursor = cursor_acquire(txn);
if (cursor == NULL) {
return -1;
}
/* TODO: cache eviction if full */
-
int ret = pack_list(cursor, rr);
- if (ret != 0) {
- cursor_release(cursor, false);
- return ret;
- }
- cursor_release(cursor, true);
+ cursor_release(cursor);
return ret;
}
-int kr_cache_remove(struct kr_cache *cache, const knot_rrset_t *rr)
+int kr_cache_remove(struct kr_txn *txn, const knot_rrset_t *rr)
{
- MDB_cursor *cursor = cursor_acquire(cache, 0);
+ MDB_cursor *cursor = cursor_acquire(txn);
if (cursor == NULL) {
return -1;
}
mdb_cursor_del(cursor, 0);
#ifndef NDEBUG
char *owner = knot_dname_to_str(rr->owner);
- DEBUG_MSG("removed RR '%s' TYPE %u (%p)\n", owner, rr->type, PACKED_RDATA(data.mv_data));
+ DEBUG_MSG("DEL RR '%s' TYPE %u (%p)\n", owner, rr->type, PACKED_RDATA(data.mv_data));
free(owner);
#endif
}
}
- cursor_release(cursor, true);
+ cursor_release(cursor);
return ret;
}