]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
auth lmdb: implement schema v5, plus a bunch of cleanup
authorPeter van Dijk <peter.van.dijk@powerdns.com>
Fri, 10 Feb 2023 11:33:40 +0000 (12:33 +0100)
committerPeter van Dijk <peter.van.dijk@powerdns.com>
Thu, 16 Mar 2023 20:14:46 +0000 (21:14 +0100)
29 files changed:
.gitignore
ext/lmdb-safe/lmdb-safe.cc
ext/lmdb-safe/lmdb-safe.hh
ext/lmdb-safe/lmdb-typed.cc
ext/lmdb-safe/lmdb-typed.hh
modules/lmdbbackend/lmdbbackend.cc
modules/lmdbbackend/lmdbbackend.hh
modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-0 [deleted file]
modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-1 [deleted file]
modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-0 [deleted file]
modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-1 [deleted file]
modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb [moved from modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb with 95% similarity]
modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-0 [new file with mode: 0644]
modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-0-lock [moved from modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-0-lock with 97% similarity]
modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-1 [new file with mode: 0644]
modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-1-lock [moved from modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-1-lock with 97% similarity]
modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-lock [moved from modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-lock with 97% similarity]
modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb [moved from modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb with 95% similarity]
modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-0 [new file with mode: 0644]
modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-0-lock [moved from modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-0-lock with 97% similarity]
modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-1 [new file with mode: 0644]
modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-1-lock [moved from modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-1-lock with 97% similarity]
modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-lock [moved from modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-lock with 97% similarity]
pdns/Makefile.am
pdns/pdnsutil.cc
regression-tests.nobackend/lmdb-metadata-leak/command [new file with mode: 0755]
regression-tests.nobackend/lmdb-metadata-leak/description [new file with mode: 0644]
regression-tests.nobackend/lmdb-metadata-leak/expected_result [new file with mode: 0644]
tasks.py

index 8885c62b1f87b6032ea0d484a84c844554915ba9..07dd058c9e930d5bbea4cf4e2cff7043f43f0a37 100644 (file)
@@ -56,7 +56,7 @@ __pycache__
 .circleci/config.yml-local
 .env
 compile_commands.*
-/.cache
+.cache
 /.clang-tidy
 .gdb_history
 .venv
index fc9e7c7197a56c71d5f6cb358bb290d75b708ac0..ce43d5ae07ce2aa2713d0443c18ae7cd97308f53 100644 (file)
@@ -6,6 +6,11 @@
 #include <string.h>
 #include <map>
 
+#ifndef DNSDIST
+#include <lmdb.h>
+#include "../../pdns/gettime.hh"
+#endif
+
 using std::string;
 using std::runtime_error;
 using std::tuple;
@@ -16,6 +21,56 @@ static string MDBError(int rc)
   return mdb_strerror(rc);
 }
 
+#ifndef DNSDIST
+
+namespace LMDBLS {
+  // this also returns a pointer to the string's data. Do not hold on to it too long!
+  LSheader* LSassertFixedHeaderSize(std::string_view val) {
+    // cerr<<"val.size()="<<val.size()<<endl;
+    if (val.size() < LS_MIN_HEADER_SIZE) {
+      throw std::runtime_error("LSheader too short");
+    }
+
+    return (LSheader*)val.data();
+  }
+
+  size_t LScheckHeaderAndGetSize(std::string_view val, size_t datasize) {
+    LSheader* lsh = LSassertFixedHeaderSize(val);
+
+    if (lsh->d_version != 0) {
+      throw std::runtime_error("LSheader has wrong version (not zero)");
+    }
+
+    size_t headersize = LS_MIN_HEADER_SIZE;
+
+    headersize += ntohs(lsh->d_numextra) * LS_BLOCK_SIZE;
+
+    if (val.size() < headersize) {
+      throw std::runtime_error("LSheader too short for promised extra data");
+    }
+
+    if (datasize && val.size() < (headersize+datasize)) {
+      throw std::runtime_error("Trailing data after LSheader has wrong size");
+    }
+
+    return headersize;
+  }
+
+  size_t LScheckHeaderAndGetSize(const MDBOutVal *val, size_t datasize) {
+    return LScheckHeaderAndGetSize(val->getNoStripHeader<string_view>(), datasize);
+  }
+
+  bool LSisDeleted(std::string_view val) {
+    LSheader* lsh = LSassertFixedHeaderSize(val);
+
+    return (lsh->d_flags & LS_FLAG_DELETED) != 0;
+  }
+
+  bool s_flag_deleted{false};
+}
+
+#endif /* #ifndef DNSDIST */
+
 MDBDbi::MDBDbi(MDB_env* env, MDB_txn* txn, const string_view dbname, int flags)
 {
   // A transaction that uses this function must finish (either commit or abort) before any other transaction in the process may use this function.
@@ -184,6 +239,13 @@ MDB_txn *MDBRWTransactionImpl::openRWTransaction(MDBEnv *env, MDB_txn *parent, i
 MDBRWTransactionImpl::MDBRWTransactionImpl(MDBEnv* parent, int flags):
   MDBRWTransactionImpl(parent, openRWTransaction(parent, nullptr, flags))
 {
+#ifndef DNSDIST
+  struct timespec tp;
+
+  gettime(&tp, true);
+
+  d_txtime = tp.tv_sec * 1E9 + tp.tv_nsec;
+#endif
 }
 
 MDBRWTransactionImpl::~MDBRWTransactionImpl()
@@ -301,9 +363,10 @@ MDBRWCursor MDBRWTransactionImpl::getRWCursor(const MDBDbi& dbi)
   MDB_cursor *cursor;
   int rc= mdb_cursor_open(d_txn, dbi, &cursor);
   if(rc) {
-    throw std::runtime_error("Error creating RO cursor: "+std::string(mdb_strerror(rc)));
+    throw std::runtime_error("Error creating RW cursor: "+std::string(mdb_strerror(rc)));
   }
-  return MDBRWCursor(d_rw_cursors, cursor);
+
+  return MDBRWCursor(d_rw_cursors, cursor, d_txn, d_txtime);
 }
 
 MDBRWCursor MDBRWTransactionImpl::getCursor(const MDBDbi &dbi)
index c900fccd4cb675cce2c3a49b728e05c5ad12379c..a789feb43aead4e47c9d291fb86db57f7fa628e4 100644 (file)
 #include <vector>
 #include <algorithm>
 
+#include "config.h"
+
+#ifndef DNSDIST
+#include <boost/range/detail/common.hpp>
+#include <stdint.h>
+#include <netinet/in.h>
+#include <bit>
+#include <stdexcept>
+#include "../../pdns/misc.hh"
+#endif
+
 using std::string_view;
 
 /* open issues:
@@ -90,6 +101,62 @@ private:
 
 std::shared_ptr<MDBEnv> getMDBEnv(const char* fname, int flags, int mode, uint64_t mapsizeMB=(sizeof(void *)==4) ? 100 : 16000);
 
+#ifndef DNSDIST
+
+// FIXME do something more portable than __builtin_bswap64
+#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
+#define _LMDB_SAFE_BSWAP64MAYBE(x) __builtin_bswap64(x)
+#else
+#define _LMDB_SAFE_BSWAP64MAYBE(x) (x)
+#endif
+
+struct MDBOutVal; // forward declaration because of how the functions below tie in with MDBOutVal
+
+namespace LMDBLS {
+  class __attribute__((__packed__)) LSheader {
+  public:
+    uint64_t d_timestamp;
+    uint64_t d_txnid;
+    uint8_t d_version;
+    uint8_t d_flags;
+    uint32_t d_reserved;
+    uint16_t d_numextra;
+
+    LSheader(uint64_t timestamp, uint64_t txnid, uint8_t flags=0, uint8_t version=0, uint8_t numextra=0):
+      d_timestamp(_LMDB_SAFE_BSWAP64MAYBE(timestamp)),
+      d_txnid(_LMDB_SAFE_BSWAP64MAYBE(txnid)),
+      d_version(version),
+      d_flags(flags),
+      d_reserved(0),
+      d_numextra(htons(numextra))
+    {
+
+    }
+
+    std::string toString() {
+      return std::string((char*)this, sizeof(*this)) + std::string(ntohs(d_numextra)*8, '\0');
+    }
+
+
+  };
+
+  static_assert(sizeof(LSheader)==24, "LSheader size is wrong");
+
+  const size_t LS_MIN_HEADER_SIZE=sizeof(LSheader);  // FIXME: rename this so all code that relies on it breaks - it needs to use LScheckHeaderAndGetSize below
+  const size_t LS_BLOCK_SIZE=8;
+  const uint8_t LS_FLAG_DELETED = 0x01;
+
+  LSheader* LSassertFixedHeaderSize(std::string_view val);
+  size_t LScheckHeaderAndGetSize(std::string_view val, size_t datasize=0);
+  size_t LScheckHeaderAndGetSize(const MDBOutVal *val, size_t datasize=0);
+  bool LSisDeleted(std::string_view val);
+
+  extern bool s_flag_deleted;
+}
+
+#undef _LMDB_SAFE_BSWAP64MAYBE
+
+#endif /* ifndef DNSDIST */
 
 
 struct MDBOutVal
@@ -99,54 +166,97 @@ struct MDBOutVal
     return d_mdbval;
   }
 
+#ifndef DNSDIST
   template <class T,
-          typename std::enable_if<std::is_arithmetic<T>::value,
+          typename std::enable_if<std::is_integral<T>::value,
                                   T>::type* = nullptr> const
   T get()
+  {
+    T ret;
+
+    size_t offset = LMDBLS::LScheckHeaderAndGetSize(this, sizeof(T));
+
+    memcpy(&ret, (char *)d_mdbval.mv_data+offset, sizeof(T));
+    ret = ntohl(ret);
+    return ret;
+  }
+
+  template <class T,
+          typename std::enable_if<std::is_arithmetic<T>::value,
+                                  T>::type* = nullptr> const
+  T getNoStripHeader()
   {
     T ret;
     if(d_mdbval.mv_size != sizeof(T))
       throw std::runtime_error("MDB data has wrong length for type");
 
     memcpy(&ret, d_mdbval.mv_data, sizeof(T));
+    ret = ntohl(ret);
     return ret;
   }
 
+#endif /* ifndef DNSDIST */
+
   template <class T,
             typename std::enable_if<std::is_class<T>::value,T>::type* = nullptr>
   T get() const;
 
+
+#ifndef DNSDIST
+  template <class T,
+            typename std::enable_if<std::is_class<T>::value,T>::type* = nullptr>
+  T getNoStripHeader() const;
+#endif
+
+#ifndef DNSDIST
   template<class T>
   T get_struct() const
   {
     T ret;
-    if(d_mdbval.mv_size != sizeof(T))
-      throw std::runtime_error("MDB data has wrong length for type");
 
-    memcpy(&ret, d_mdbval.mv_data, sizeof(T));
+    size_t offset = LMDBLS::LScheckHeaderAndGetSize(this, sizeof(T));
+
+    memcpy(&ret, (char *)d_mdbval.mv_data+offset, sizeof(T));
     return ret;
   }
 
   template<class T>
   const T* get_struct_ptr() const
   {
-    if(d_mdbval.mv_size != sizeof(T))
-      throw std::runtime_error("MDB data has wrong length for type");
+    size_t offset = LMDBLS::LScheckHeaderAndGetSize(this, sizeof(T));
 
-    return reinterpret_cast<const T*>(d_mdbval.mv_data);
+    return reinterpret_cast<const T*>((char *)d_mdbval.mv_data+offset);
   }
-
+#endif
 
   MDB_val d_mdbval;
 };
 
 template<> inline std::string MDBOutVal::get<std::string>() const
 {
+#ifndef DNSDIST
+  size_t offset = LMDBLS::LScheckHeaderAndGetSize(this);
+
+  return std::string((char*)d_mdbval.mv_data+offset, d_mdbval.mv_size-offset);
+}
+
+template<> inline std::string MDBOutVal::getNoStripHeader<std::string>() const
+{
+#endif
   return std::string((char*)d_mdbval.mv_data, d_mdbval.mv_size);
 }
 
 template<> inline string_view MDBOutVal::get<string_view>() const
 {
+#ifndef DNSDIST
+  size_t offset = LMDBLS::LScheckHeaderAndGetSize(this);
+
+  return string_view((char*)d_mdbval.mv_data+offset, d_mdbval.mv_size-offset);
+}
+
+template<> inline string_view MDBOutVal::getNoStripHeader<string_view>() const
+{
+#endif
   return string_view((char*)d_mdbval.mv_data, d_mdbval.mv_size);
 }
 
@@ -157,15 +267,19 @@ public:
   {
   }
 
+#ifndef DNSDIST
   template <class T,
-            typename std::enable_if<std::is_arithmetic<T>::value,
+            typename std::enable_if<std::is_integral<T>::value,
                                     T>::type* = nullptr>
   MDBInVal(T i)
   {
-    memcpy(&d_memory[0], &i, sizeof(i));
+    auto j = htonl(i);    // all actual usage in our codebase is 32 bits. If that ever changes, this will break the build and avoid runtime surprises
+    memcpy(&d_memory[0], &j, sizeof(j));
+
     d_mdbval.mv_size = sizeof(T);
     d_mdbval.mv_data = d_memory;;
   }
+#endif
 
   MDBInVal(const char* s)
   {
@@ -202,13 +316,11 @@ public:
   MDB_val d_mdbval;
 private:
   MDBInVal(){}
+#ifndef DNSDIST
   char d_memory[sizeof(double)];
-
+#endif
 };
 
-
-
-
 class MDBROCursor;
 
 class MDBROTransactionImpl
@@ -249,8 +361,19 @@ public:
 
     int rc = mdb_get(d_txn, dbi, const_cast<MDB_val*>(&key.d_mdbval),
                      const_cast<MDB_val*>(&val.d_mdbval));
-    if(rc && rc != MDB_NOTFOUND)
+
+    if(rc && rc != MDB_NOTFOUND) {
       throw std::runtime_error("getting data: " + std::string(mdb_strerror(rc)));
+    }
+
+#ifndef DNSDIST
+    if(rc != MDB_NOTFOUND) {  // key was found, value was retrieved
+      std::string sval = val.getNoStripHeader<std::string>();
+      if (LMDBLS::LSisDeleted(sval)) {  // but it was deleted
+        rc = MDB_NOTFOUND;
+      }
+    }
+#endif
 
     return rc;
   }
@@ -301,18 +424,23 @@ class MDBGenCursor
 private:
   std::vector<T*> *d_registry;
   MDB_cursor* d_cursor{nullptr};
-
 public:
+  MDB_txn* d_txn{nullptr}; // ew, public
+  uint64_t d_txtime{0};
+
   MDBGenCursor():
     d_registry(nullptr),
-    d_cursor(nullptr)
+    d_cursor(nullptr),
+    d_txn(nullptr)
   {
 
   }
 
-  MDBGenCursor(std::vector<T*> &registry, MDB_cursor *cursor):
+  MDBGenCursor(std::vector<T*> &registry, MDB_cursor *cursor, MDB_txn *txn=nullptr, uint64_t txtime=0):
     d_registry(&registry),
-    d_cursor(cursor)
+    d_cursor(cursor),
+    d_txn(txn),
+    d_txtime(txtime)
   {
     registry.emplace_back(static_cast<T*>(this));
   }
@@ -363,13 +491,101 @@ public:
     close();
   }
 
+  /*
+   to support (skip) entries marked deleted=1 in the LS header, we need to do some magic here
+   this table notes, for each cursor op:
+   * the maximum number of entries we may need to look at (1 or inf)
+   * the subsequent op that needs to be done to skip over a deleted entry (or MDB_NOTFOUND to give up and say no)
+   (table partially copied from http://www.lmdb.tech/doc/group__mdb.html#ga1206b2af8b95e7f6b0ef6b28708c9127 which I hope is a stable URL)
+   (ops only relevant for DUPSORT/DUPFIXED have been omitted)
+   (table is grouped by "skip op")
+
+  | base op            | maxentries | skip op      | doc description of base op
+  | MDB_FIRST          | inf        | MDB_NEXT     | Position at first key/data item
+  | MDB_NEXT           | inf        | MDB_NEXT     | Position at next data item
+  | MDB_SET_RANGE      | inf        | MDB_NEXT     | Position at first key greater than or equal to specified key.
+  | MDB_LAST           | inf        | MDB_PREV     | Position at last key/data item
+  | MDB_PREV           | inf        | MDB_PREV     | Position at previous data item
+  | MDB_GET_CURRENT    | 1          | MDB_NOTFOUND | Return key/data at current cursor position
+  | MDB_SET            | 1          | MDB_NOTFOUND | Position at specified key
+  | MDB_SET_KEY        | 1          | MDB_NOTFOUND | Position at specified key, return key + data
+  */
+
+private:
+  int skipDeleted(MDBOutVal& key, MDBOutVal& data, MDB_cursor_op op, int rc)
+  {
+#ifndef DNSDIST
+    // when we get here
+    // * mdb_cursor_get has been called once
+    // * it did not return an error, but it might have returned MDB_NOTFOUND
+    // * if it returned MDB_NOTFOUND, there is nothing for us to do and we pass that on
+
+    if (rc == MDB_NOTFOUND) {
+      return rc;
+    }
+
+    // when we get here
+    // * mdb_cursor_get has been called at least once
+    // * it found an entry, as far as LMDB is concerned, so key+data contain something
+    // * but that might be a LS deleted=1 entry
+    // * we know the cursor op that got us here
+
+    while (true) {
+      std::string sval = data.getNoStripHeader<std::string>();
+
+      if (!LMDBLS::LSisDeleted(sval)) {
+        // done!
+
+        return rc;
+      }
+
+      // the found entry is set deleted, so we need to do something
+
+      // if this was a 1-entry op, this is the end
+      if (op == MDB_GET_CURRENT || op == MDB_SET || op == MDB_SET_KEY) {
+        return MDB_NOTFOUND;
+      }
+
+      // otherwise, we need to try to carry on
+      // all ops that do not map to NOTFOUND map to NEXT or PREV, including NEXT and PREV themselves
+      // so we just override the op to NEXT or PREV
+      if (op == MDB_FIRST || op == MDB_NEXT || op == MDB_SET_RANGE) {
+        op = MDB_NEXT;
+      }
+      else if (op == MDB_LAST || op == MDB_PREV) {
+        op = MDB_PREV;
+      }
+      else {
+        throw std::runtime_error("got unsupported mdb cursor op");
+      }
+
+      rc = mdb_cursor_get(d_cursor, &key.d_mdbval, &data.d_mdbval, op);
+      if(rc && rc != MDB_NOTFOUND) {
+         throw std::runtime_error("Unable to get from cursor: " + std::string(mdb_strerror(rc)));
+      }
+
+      if (rc == MDB_NOTFOUND) {
+        // we ended up finding nothing, so tell the caller
+        return rc;
+      }
+
+      // when we get here
+      // * the situation is just like the last time I wrote "when we get here"
+      // * except mdb_cursor_get has been called at least twice
+      // * so let's go back
+    }
+#else /* ifndef DNSDIST */
+    return rc;
+#endif
+  }
+
 public:
   int get(MDBOutVal& key, MDBOutVal& data, MDB_cursor_op op)
   {
     int rc = mdb_cursor_get(d_cursor, &key.d_mdbval, &data.d_mdbval, op);
     if(rc && rc != MDB_NOTFOUND)
        throw std::runtime_error("Unable to get from cursor: " + std::string(mdb_strerror(rc)));
-    return rc;
+    return skipDeleted(key, data, op, rc);
   }
 
   int find(const MDBInVal& in, MDBOutVal& key, MDBOutVal& data)
@@ -378,7 +594,7 @@ public:
     int rc=mdb_cursor_get(d_cursor, const_cast<MDB_val*>(&key.d_mdbval), &data.d_mdbval, MDB_SET);
     if(rc && rc != MDB_NOTFOUND)
        throw std::runtime_error("Unable to find from cursor: " + std::string(mdb_strerror(rc)));
-    return rc;
+    return skipDeleted(key, data, MDB_SET, rc);
   }
 
   int lower_bound(const MDBInVal& in, MDBOutVal& key, MDBOutVal& data)
@@ -388,7 +604,7 @@ public:
     int rc = mdb_cursor_get(d_cursor, const_cast<MDB_val*>(&key.d_mdbval), &data.d_mdbval, MDB_SET_RANGE);
     if(rc && rc != MDB_NOTFOUND)
        throw std::runtime_error("Unable to lower_bound from cursor: " + std::string(mdb_strerror(rc)));
-    return rc;
+    return skipDeleted(key, data, MDB_SET_RANGE, rc);
   }
 
 
@@ -397,7 +613,7 @@ public:
     int rc = mdb_cursor_get(d_cursor, const_cast<MDB_val*>(&key.d_mdbval), &data.d_mdbval, op);
     if(rc && rc != MDB_NOTFOUND)
        throw std::runtime_error("Unable to prevnext from cursor: " + std::string(mdb_strerror(rc)));
-    return rc;
+    return skipDeleted(key, data, op, rc);
   }
 
   int next(MDBOutVal& key, MDBOutVal& data)
@@ -415,7 +631,7 @@ public:
     int rc = mdb_cursor_get(d_cursor, const_cast<MDB_val*>(&key.d_mdbval), &data.d_mdbval, op);
     if(rc && rc != MDB_NOTFOUND)
        throw std::runtime_error("Unable to next from cursor: " + std::string(mdb_strerror(rc)));
-    return rc;
+    return skipDeleted(key, data, op, rc);
   }
 
   int current(MDBOutVal& key, MDBOutVal& data)
@@ -485,6 +701,8 @@ private:
 private:
   std::vector<MDBRWCursor*> d_rw_cursors;
 
+  uint64_t d_txtime{0};
+
   void closeRWCursors();
   inline void closeRORWCursors() {
     closeROCursors();
@@ -506,26 +724,41 @@ public:
 
   void clear(MDB_dbi dbi);
 
+#ifndef DNSDIST
   void put(MDB_dbi dbi, const MDBInVal& key, const MDBInVal& val, int flags=0)
   {
     if(!d_txn)
       throw std::runtime_error("Attempt to use a closed RW transaction for put");
     int rc;
+
+    size_t txid = mdb_txn_id(d_txn);
+
+    if (d_txtime == 0) { throw std::runtime_error("got zero txtime"); }
+
+    std::string ins =
+      LMDBLS::LSheader(d_txtime, txid).toString()+
+      std::string((const char*)val.d_mdbval.mv_data, val.d_mdbval.mv_size);
+
+    MDBInVal pval = ins;
+
     if((rc=mdb_put(d_txn, dbi,
                    const_cast<MDB_val*>(&key.d_mdbval),
-                   const_cast<MDB_val*>(&val.d_mdbval), flags)))
+                   const_cast<MDB_val*>(&pval.d_mdbval), flags))) {
       throw std::runtime_error("putting data: " + std::string(mdb_strerror(rc)));
+    }
   }
-
-
-  int del(MDBDbi& dbi, const MDBInVal& key, const MDBInVal& val)
+#else
+  void put(MDB_dbi dbi, const MDBInVal& key, const MDBInVal& val, int flags=0)
   {
+    if(!d_txn)
+      throw std::runtime_error("Attempt to use a closed RW transaction for put");
     int rc;
-    rc=mdb_del(d_txn, dbi, (MDB_val*)&key.d_mdbval, (MDB_val*)&val.d_mdbval);
-    if(rc && rc != MDB_NOTFOUND)
-      throw std::runtime_error("deleting data: " + std::string(mdb_strerror(rc)));
-    return rc;
+    if((rc=mdb_put(d_txn, dbi,
+                   const_cast<MDB_val*>(&key.d_mdbval),
+                   const_cast<MDB_val*>(&val.d_mdbval), flags)))
+      throw std::runtime_error("putting data: " + std::string(mdb_strerror(rc)));
   }
+#endif
 
   int del(MDBDbi& dbi, const MDBInVal& key)
   {
@@ -533,6 +766,26 @@ public:
     rc=mdb_del(d_txn, dbi, (MDB_val*)&key.d_mdbval, 0);
     if(rc && rc != MDB_NOTFOUND)
       throw std::runtime_error("deleting data: " + std::string(mdb_strerror(rc)));
+#ifndef DNSDIST
+    if(rc != MDB_NOTFOUND && LMDBLS::s_flag_deleted) {
+      // if it did exist, we need to mark it as deleted now
+
+      size_t txid = mdb_txn_id(d_txn);
+      if (d_txtime == 0) { throw std::runtime_error("got zero txtime"); }
+
+      std::string ins =
+        // std::string((const char*)&txid, sizeof(txid)) +
+        LMDBLS::LSheader(d_txtime, txid, LMDBLS::LS_FLAG_DELETED).toString();
+
+      MDBInVal pval = ins;
+
+      if((rc=mdb_put(d_txn, dbi,
+                     const_cast<MDB_val*>(&key.d_mdbval),
+                     const_cast<MDB_val*>(&pval.d_mdbval), 0))) {
+              throw std::runtime_error("marking data deleted: " + std::string(mdb_strerror(rc)));
+      }
+    }
+#endif
     return rc;
   }
 
@@ -544,17 +797,19 @@ public:
 
     int rc = mdb_get(d_txn, dbi, const_cast<MDB_val*>(&key.d_mdbval),
                      const_cast<MDB_val*>(&val.d_mdbval));
-    if(rc && rc != MDB_NOTFOUND)
-      throw std::runtime_error("getting data: " + std::string(mdb_strerror(rc)));
-    return rc;
-  }
+    if(rc && rc != MDB_NOTFOUND) {
+          throw std::runtime_error("getting data: " + std::string(mdb_strerror(rc)));
+    }
+
+#ifndef DNSDIST
+    if(rc != MDB_NOTFOUND) {  // key was found, value was retrieved
+      std::string sval = val.getNoStripHeader<std::string>();
+      if (LMDBLS::LSisDeleted(sval)) {  // but it was deleted
+        rc = MDB_NOTFOUND;
+      }
+    }
+#endif
 
-  int get(MDBDbi& dbi, const MDBInVal& key, string_view& val)
-  {
-    MDBOutVal out;
-    int rc = get(dbi, key, out);
-    if(!rc)
-      val = out.get<string_view>();
     return rc;
   }
 
@@ -568,6 +823,7 @@ public:
 
   MDBRWTransaction getRWTransaction();
   MDBROTransaction getROTransaction();
+
 };
 
 /* "A cursor in a write-transaction can be closed before its transaction ends, and will otherwise be closed when its transaction ends"
@@ -584,27 +840,71 @@ public:
   MDBRWCursor &operator=(MDBRWCursor &&src) = default;
   ~MDBRWCursor() = default;
 
+#ifndef DNSDIST
   void put(const MDBOutVal& key, const MDBInVal& data)
   {
+    size_t txid = mdb_txn_id(this->d_txn);
+
+    if (d_txtime == 0) { throw std::runtime_error("got zero txtime"); }
+
+    std::string ins =
+      LMDBLS::LSheader(d_txtime, txid).toString()+
+      std::string((const char*)data.d_mdbval.mv_data, data.d_mdbval.mv_size);
+
+    MDBInVal pval = ins;
+
     int rc = mdb_cursor_put(*this,
                             const_cast<MDB_val*>(&key.d_mdbval),
-                            const_cast<MDB_val*>(&data.d_mdbval), MDB_CURRENT);
+                            const_cast<MDB_val*>(&pval.d_mdbval), MDB_CURRENT);
     if(rc)
       throw std::runtime_error("mdb_cursor_put: " + std::string(mdb_strerror(rc)));
   }
-
-
-  int put(const MDBOutVal& key, const MDBOutVal& data, int flags=0)
+#else
+  void put(const MDBOutVal& key, const MDBInVal& data)
   {
-    // XXX check errors
-    return mdb_cursor_put(*this,
-                          const_cast<MDB_val*>(&key.d_mdbval),
-                          const_cast<MDB_val*>(&data.d_mdbval), flags);
+    int rc = mdb_cursor_put(*this,
+                            const_cast<MDB_val*>(&key.d_mdbval),
+                            const_cast<MDB_val*>(&data.d_mdbval), MDB_CURRENT);
+    if(rc)
+      throw std::runtime_error("mdb_cursor_put: " + std::string(mdb_strerror(rc)));
   }
+#endif
 
+#ifndef DNSDIST
   int del(int flags=0)
   {
-    return mdb_cursor_del(*this, flags);
-  }
+    MDBOutVal key, val;
 
+    if (LMDBLS::s_flag_deleted) {
+      int rc_get = mdb_cursor_get (*this, &key.d_mdbval, &val.d_mdbval, MDB_GET_CURRENT);
+
+      if(rc_get) {
+              throw std::runtime_error("getting key to mark data as deleted: " + std::string(mdb_strerror(rc_get)));
+      }
+
+      size_t txid = mdb_txn_id(d_txn);
+      if (d_txtime == 0) { throw std::runtime_error("got zero txtime"); }
+
+      std::string ins =
+        LMDBLS::LSheader(d_txtime, txid, LMDBLS::LS_FLAG_DELETED).toString();
+
+      std::string skey((const char*)key.d_mdbval.mv_data, key.d_mdbval.mv_size);
+
+      MDBInVal pkey = MDBInVal(skey);
+      MDBInVal pval = ins;
+
+      int rc_put = mdb_cursor_put(*this,
+                     const_cast<MDB_val*>(&pkey.d_mdbval),
+                     const_cast<MDB_val*>(&pval.d_mdbval), 0 /* MDB_CURRENT */);
+      if(rc_put) {
+              throw std::runtime_error("marking data deleted: " + std::string(mdb_strerror(rc_put)));
+      }
+      return rc_put;
+    }
+    else {
+      // do a normal delete
+      return mdb_cursor_del(*this, flags);
+    }
+  }
+#endif
 };
index 7b0abd339aec8c7321e73902ff873bc6fa52ca5c..720285352db5632c1400168c5943ad1851dfe6ac 100644 (file)
@@ -7,7 +7,7 @@ unsigned int MDBGetMaxID(MDBRWTransaction& txn, MDBDbi& dbi)
   MDBOutVal maxidval, maxcontent;
   unsigned int maxid{0};
   if(!cursor.get(maxidval, maxcontent, MDB_LAST)) {
-    maxid = maxidval.get<unsigned int>();
+    maxid = maxidval.getNoStripHeader<unsigned int>();
   }
   return maxid;
 }
@@ -28,5 +28,3 @@ unsigned int MDBGetRandomID(MDBRWTransaction& txn, MDBDbi& dbi)
   }
   throw std::runtime_error("MDBGetRandomID() could not assign an unused random ID");
 }
-
-
index 0810d9a97e82be2de0583da143dd2b3460f90ceb..469180e23dd5fb489899858ecb9aecae0a0e54cd 100644 (file)
@@ -1,4 +1,5 @@
 #pragma once
+#include <stdexcept>
 #include <string_view>
 #include <iostream>
 #include "lmdb-safe.hh"
@@ -41,6 +42,7 @@ unsigned int MDBGetMaxID(MDBRWTransaction& txn, MDBDbi& dbi);
 */
 unsigned int MDBGetRandomID(MDBRWTransaction& txn, MDBDbi& dbi);
 
+typedef std::vector<uint32_t> LMDBIDvec;
 
 /** This is our serialization interface.
     You can define your own serToString for your type if you know better
@@ -92,6 +94,49 @@ inline std::string keyConv(const T& t)
 }
 
 
+namespace {
+  MDBOutVal getKeyFromCombinedKey(MDBInVal combined) {
+    if (combined.d_mdbval.mv_size < sizeof(uint32_t)) {
+      throw std::runtime_error("combined key too short to get ID from");
+    }
+
+    MDBOutVal ret;
+    ret.d_mdbval.mv_data = (char*) combined.d_mdbval.mv_data;
+    ret.d_mdbval.mv_size = combined.d_mdbval.mv_size - sizeof(uint32_t);
+
+    return ret;
+  }
+
+  MDBOutVal getIDFromCombinedKey(MDBInVal combined) {
+    if (combined.d_mdbval.mv_size < sizeof(uint32_t)) {
+      throw std::runtime_error("combined key too short to get ID from");
+    }
+
+    MDBOutVal ret;
+    ret.d_mdbval.mv_data = (char*) combined.d_mdbval.mv_data + combined.d_mdbval.mv_size - sizeof(uint32_t);
+    ret.d_mdbval.mv_size = sizeof(uint32_t);
+
+    return ret;
+  }
+
+  std::string makeCombinedKey(MDBInVal key, MDBInVal val)
+  {
+    std::string lenprefix(sizeof(uint16_t), '\0');
+    std::string skey((char*) key.d_mdbval.mv_data, key.d_mdbval.mv_size);
+    std::string sval((char*) val.d_mdbval.mv_data, val.d_mdbval.mv_size);
+
+    uint16_t len = htons(skey.size());
+    memcpy((void*) lenprefix.data(), &len, sizeof(len));
+    std::string scombined = lenprefix + skey + sval;
+
+    // MDBInVal combined(scombined);
+
+    // std::cerr<<"scombined="<<makeHexDump(scombined)<<std::endl;
+    return scombined;
+  }
+}
+
+
 /** This is a struct that implements index operations, but
     only the operations that are broadcast to all indexes.
     Specifically, to deal with databases with less than the maximum
@@ -106,14 +151,24 @@ template<class Class,typename Type, typename Parent>
 struct LMDBIndexOps
 {
   explicit LMDBIndexOps(Parent* parent) : d_parent(parent){}
+
   void put(MDBRWTransaction& txn, const Class& t, uint32_t id, int flags=0)
   {
-    txn->put(d_idx, keyConv(d_parent->getMember(t)), id, flags);
+    std::string sempty("");
+    MDBInVal empty(sempty);
+
+    auto scombined = makeCombinedKey(keyConv(d_parent->getMember(t)), id);
+    MDBInVal combined(scombined);
+
+    txn->put(d_idx, combined, empty, flags);
   }
 
   void del(MDBRWTransaction& txn, const Class& t, uint32_t id)
   {
-    if(int rc = txn->del(d_idx, keyConv(d_parent->getMember(t)), id)) {
+    auto scombined = makeCombinedKey(keyConv(d_parent->getMember(t)), id);
+    MDBInVal combined(scombined);
+
+    if(int rc = txn->del(d_idx, combined)) {
       throw std::runtime_error("Error deleting from index: " + std::string(mdb_strerror(rc)));
     }
   }
@@ -124,6 +179,7 @@ struct LMDBIndexOps
   }
   MDBDbi d_idx;
   Parent* d_parent;
+
 };
 
 /** This is an index on a field in a struct, it derives from the LMDBIndexOps */
@@ -160,13 +216,13 @@ struct index_on_function : LMDBIndexOps<Class, Type, index_on_function<Class, Ty
 struct nullindex_t
 {
   template<typename Class>
-  void put(MDBRWTransaction& txn, const Class& t, uint32_t id, int flags=0)
+  void put(MDBRWTransaction& /* txn */, const Class& /* t */, uint32_t /* id */, int /* flags */ =0)
   {}
   template<typename Class>
-  void del(MDBRWTransaction& txn, const Class& t, uint32_t id)
+  void del(MDBRWTransaction& /* txn */, const Class& /* t */, uint32_t /* id */)
   {}
 
-  void openDB(std::shared_ptr<MDBEnv>& env, string_view str, int flags)
+  void openDB(std::shared_ptr<MDBEnv>& /* env */, string_view /* str */, int /* flags */)
   {
 
   }
@@ -181,12 +237,12 @@ public:
   TypedDBI(std::shared_ptr<MDBEnv> env, string_view name)
     : d_env(env), d_name(name)
   {
-    d_main = d_env->openDB(name, MDB_CREATE | MDB_INTEGERKEY);
+    d_main = d_env->openDB(name, MDB_CREATE);
 
     // now you might be tempted to go all MPL on this so we can get rid of the
     // ugly macro. I'm not very receptive to that idea since it will make things
     // EVEN uglier.
-#define openMacro(N) std::get<N>(d_tuple).openDB(d_env, std::string(name)+"_"#N, MDB_CREATE | MDB_DUPFIXED | MDB_DUPSORT);
+#define openMacro(N) std::get<N>(d_tuple).openDB(d_env, std::string(name)+"_"#N, MDB_CREATE);
     openMacro(0);
     openMacro(1);
     openMacro(2);
@@ -207,22 +263,22 @@ public:
     ReadonlyOperations(Parent& parent) : d_parent(parent)
     {}
 
-    //! Number of entries in main database
-    uint32_t size()
-    {
-      MDB_stat stat;
-      mdb_stat(**d_parent.d_txn, d_parent.d_parent->d_main, &stat);
-      return stat.ms_entries;
-    }
-
-    //! Number of entries in the various indexes - should be the same
-    template<int N>
-    uint32_t size()
-    {
-      MDB_stat stat;
-      mdb_stat(**d_parent.d_txn, std::get<N>(d_parent.d_parent->d_tuple).d_idx, &stat);
-      return stat.ms_entries;
-    }
+    // //! Number of entries in main database
+    // uint32_t size()
+    // {
+    //   MDB_stat stat;
+    //   mdb_stat(**d_parent.d_txn, d_parent.d_parent->d_main, &stat);
+    //   return stat.ms_entries;
+    // }
+
+    // //! Number of entries in the various indexes - should be the same
+    // template<int N>
+    // uint32_t size()
+    // {
+    //   MDB_stat stat;
+    //   mdb_stat(**d_parent.d_txn, std::get<N>(d_parent.d_parent->d_tuple).d_idx, &stat);
+    //   return stat.ms_entries;
+    // }
 
     //! Get item with id, from main table directly
     bool get(uint32_t id, T& t)
@@ -239,29 +295,44 @@ public:
     template<int N>
     uint32_t get(const typename std::tuple_element<N, tuple_t>::type::type& key, T& out)
     {
-      MDBOutVal id;
-      if(!(*d_parent.d_txn)->get(std::get<N>(d_parent.d_parent->d_tuple).d_idx, keyConv(key), id)) {
-        if(get(id.get<uint32_t>(), out))
-          return id.get<uint32_t>();
+      // MDBOutVal out;
+      // uint32_t id;
+
+      // auto range = (*d_parent.d_txn)->prefix_range<N>(domain);
+
+      // auto range = prefix_range<N>(key);
+      LMDBIDvec ids;
+
+      get_multi<N>(key, ids);
+
+      if (ids.size() == 0) {
+        return 0;
       }
-      return 0;
-    }
 
-    //! Cardinality of index N
-    template<int N>
-    uint32_t cardinality()
-    {
-      auto cursor = (*d_parent.d_txn)->getCursor(std::get<N>(d_parent.d_parent->d_tuple).d_idx);
-      bool first = true;
-      MDBOutVal key, data;
-      uint32_t count = 0;
-      while(!cursor.get(key, data, first ? MDB_FIRST : MDB_NEXT_NODUP)) {
-        ++count;
-        first=false;
+      if (ids.size() == 1) {
+        if (get(ids[0], out)) {
+          return ids[0];
+        }
       }
-      return count;
+
+      throw std::runtime_error("in index get, found more than one item");
     }
 
+    // //! Cardinality of index N
+    // template<int N>
+    // uint32_t cardinality()
+    // {
+    //   auto cursor = (*d_parent.d_txn)->getCursor(std::get<N>(d_parent.d_parent->d_tuple).d_idx);
+    //   bool first = true;
+    //   MDBOutVal key, data;
+    //   uint32_t count = 0;
+    //   while(!cursor.get(key, data, first ? MDB_FIRST : MDB_NEXT_NODUP)) {
+    //     ++count;
+    //     first=false;
+    //   }
+    //   return count;
+    // }
+
     //! End iterator type
     struct eiter_t
     {};
@@ -280,8 +351,9 @@ public:
         d_one_key(one_key),   // should we stop at end of key? (equal range)
         d_end(end)
       {
-        if(d_end)
+        if(d_end) {
           return;
+        }
         d_prefix.clear();
 
         if(d_cursor.get(d_key, d_id,  MDB_GET_CURRENT)) {
@@ -289,6 +361,16 @@ public:
           return;
         }
 
+        if (d_id.d_mdbval.mv_size < LMDBLS::LS_MIN_HEADER_SIZE) {
+          throw std::runtime_error("got short value");
+        }
+
+        // MDBOutVal id = d_id;
+
+        // id.d_mdbval.mv_size -= LS_HEADER_SIZE;
+        // id.d_mdbval.mv_data = (char*)d_id.d_mdbval.mv_data + LS_HEADER_SIZE;
+
+
         if(d_on_index) {
           if((*d_parent->d_txn)->get(d_parent->d_parent->d_main, d_id, d_data))
             throw std::runtime_error("Missing id in constructor");
@@ -314,6 +396,8 @@ public:
           return;
         }
 
+        d_id = getIDFromCombinedKey(d_key);
+
         if(d_on_index) {
           if((*d_parent->d_txn)->get(d_parent->d_parent->d_main, d_id, d_data))
             throw std::runtime_error("Missing id in constructor");
@@ -324,18 +408,18 @@ public:
       }
 
 
-      std::function<bool(const MDBOutVal&)> filter;
+      // std::function<bool(const MDBOutVal&)> filter;
       void del()
       {
         d_cursor.del();
       }
 
-      bool operator!=(const eiter_t& rhs) const
+      bool operator!=(const eiter_t& /* rhs */) const
       {
         return !d_end;
       }
 
-      bool operator==(const eiter_t& rhs) const
+      bool operator==(const eiter_t& /* rhs */) const
       {
         return d_end;
       }
@@ -351,33 +435,45 @@ public:
       }
 
       // implements generic ++ or --
-      iter_t& genoperator(MDB_cursor_op dupop, MDB_cursor_op op)
+      iter_t& genoperator(MDB_cursor_op op)
       {
         MDBOutVal data;
         int rc;
-      next:;
-        rc = d_cursor.get(d_key, d_id, d_one_key ? dupop : op);
-        if(rc == MDB_NOTFOUND) {
+      // next:;
+        if (!d_one_key) {
+          rc = d_cursor.get(d_key, d_id, op);
+        }
+        if(d_one_key || rc == MDB_NOTFOUND) {
           d_end = true;
         }
         else if(rc) {
           throw std::runtime_error("in genoperator, " + std::string(mdb_strerror(rc)));
         }
-        else if(!d_prefix.empty() && d_key.get<std::string>().rfind(d_prefix, 0)!=0) {
+        else if(!d_prefix.empty() &&
+          // d_key.getNoStripHeader<std::string>().rfind(d_prefix, 0)!=0 &&
+          getKeyFromCombinedKey(d_key).template getNoStripHeader<std::string>() != d_prefix) {
           d_end = true;
         }
         else {
+          // if (d_id.d_mdbval.mv_size < LS_HEADER_SIZE) throw std::runtime_error("got short value");
+
+          // MDBOutVal id = d_id;
+
+          // id.d_mdbval.mv_size -= LS_HEADER_SIZE;
+          // id.d_mdbval.mv_data = (char*)d_id.d_mdbval.mv_data+LS_HEADER_SIZE;
+
           if(d_on_index) {
+            d_id = getIDFromCombinedKey(d_key);
             if((*d_parent->d_txn)->get(d_parent->d_parent->d_main, d_id, data))
               throw std::runtime_error("Missing id field");
-            if(filter && !filter(data))
-              goto next;
+            // if(filter && !filter(data))
+            //   goto next;
 
             serFromString(data.get<std::string>(), d_t);
           }
           else {
-            if(filter && !filter(data))
-              goto next;
+            // if(filter && !filter(data))
+            //   goto next;
 
             serFromString(d_id.get<std::string>(), d_t);
           }
@@ -387,20 +483,23 @@ public:
 
       iter_t& operator++()
       {
-        return genoperator(MDB_NEXT_DUP, MDB_NEXT);
-      }
-      iter_t& operator--()
-      {
-        return genoperator(MDB_PREV_DUP, MDB_PREV);
+        return genoperator(MDB_NEXT);
       }
+      // iter_t& operator--()
+      // {
+      //   return genoperator(MDB_PREV);
+      // }
 
       // get ID this iterator points to
       uint32_t getID()
       {
-        if(d_on_index)
-          return d_id.get<uint32_t>();
-        else
-          return d_key.get<uint32_t>();
+        if(d_on_index) {
+          // return d_id.get<uint32_t>();
+          return d_id.getNoStripHeader<uint32_t>();
+        }
+        else {
+          return d_key.getNoStripHeader<uint32_t>();
+        }
       }
 
       const MDBOutVal& getKey()
@@ -474,7 +573,7 @@ public:
     {
       typename Parent::cursor_t cursor = (*d_parent.d_txn)->getCursor(std::get<N>(d_parent.d_parent->d_tuple).d_idx);
 
-      std::string keystr = keyConv(key);
+      std::string keystr = makeCombinedKey(keyConv(key), MDBInVal(""));
       MDBInVal in(keystr);
       MDBOutVal out, id;
       out.d_mdbval = in.d_mdbval;
@@ -506,7 +605,7 @@ public:
     {
       typename Parent::cursor_t cursor = (*d_parent.d_txn)->getCursor(std::get<N>(d_parent.d_parent->d_tuple).d_idx);
 
-      std::string keyString=keyConv(key);
+      std::string keyString=makeCombinedKey(keyConv(key), MDBInVal(""));
       MDBInVal in(keyString);
       MDBOutVal out, id;
       out.d_mdbval = in.d_mdbval;
@@ -525,19 +624,59 @@ public:
     {
       typename Parent::cursor_t cursor = (*d_parent.d_txn)->getCursor(std::get<N>(d_parent.d_parent->d_tuple).d_idx);
 
-      std::string keyString=keyConv(key);
+      std::string keyString=makeCombined(keyConv(key), MDBInVal(""));
       MDBInVal in(keyString);
       MDBOutVal out, id;
       out.d_mdbval = in.d_mdbval;
 
-      if(cursor.get(out, id,  MDB_SET_RANGE)) {
-                                              // on_index, one_key, end
+      if(cursor.get(out, id,  MDB_SET_RANGE) ||
+         getKeyFromCombinedKey(out).template getNoStripHeader<std::string>() != keyString) {
+                                                    // on_index, one_key, end
         return {iter_t{&d_parent, std::move(cursor), true, true, true}, eiter_t()};
       }
 
       return {iter_t(&d_parent, std::move(cursor), keyString), eiter_t()};
     };
 
+    template<int N>
+    void get_multi(const typename std::tuple_element<N, tuple_t>::type::type& key, LMDBIDvec& ids)
+    {
+      // std::cerr<<"in get_multi"<<std::endl;
+      typename Parent::cursor_t cursor = (*d_parent.d_txn)->getCursor(std::get<N>(d_parent.d_parent->d_tuple).d_idx);
+
+      std::string keyString=makeCombinedKey(keyConv(key), MDBInVal(""));
+      MDBInVal in(keyString);
+      MDBOutVal out, id;
+      out.d_mdbval = in.d_mdbval;
+
+      int rc = cursor.get(out, id,  MDB_SET_RANGE);
+
+      int scanned = 0;
+      while (rc == 0) {
+        scanned++;
+        auto sout = out.getNoStripHeader<std::string>(); // FIXME: this (and many others) could probably be string_view
+        auto thiskey = getKeyFromCombinedKey(out);
+        auto sthiskey = thiskey.getNoStripHeader<std::string>();
+
+        if (sout.find(keyString) != 0) {
+          // we are no longer in range, so we are done
+          break;
+        }
+
+        if (sthiskey == keyString) {
+          auto _id = getIDFromCombinedKey(out);
+          ids.push_back(_id.getNoStripHeader<uint32_t>());
+        }
+
+        rc = cursor.get(out, id, MDB_NEXT);
+      }
+
+      // std::cerr<<"get_multi scanned="<<scanned<<std::endl;
+      if (rc != 0 && rc != MDB_NOTFOUND) {
+        throw std::runtime_error("error during get_multi");
+      }
+    };
+
 
     Parent& d_parent;
   };
@@ -603,7 +742,8 @@ public:
         }
         else {
           id = MDBGetMaxID(*d_txn, d_parent->d_main) + 1;
-          flags = MDB_APPEND;
+          // FIXME: after dropping MDB_INTEGERKEY, we had to drop MDB_APPEND here. Check if this is an LMDB quirk.
+          // flags = MDB_APPEND;
         }
       }
       (*d_txn)->put(d_parent->d_main, id, serToString(t), flags);
index 086200f50b0536d0a0a42b5e905e6d8ae74d255d..7afa62b1ca68618489a0ab0046862aff82b57986 100644 (file)
@@ -49,7 +49,7 @@
 
 #include "lmdbbackend.hh"
 
-#define SCHEMAVERSION 4
+#define SCHEMAVERSION 5
 
 // List the class version here. Default is 0
 BOOST_CLASS_VERSION(LMDBBackend::KeyDataDB, 1)
@@ -59,6 +59,560 @@ static bool s_first = true;
 static int s_shards = 0;
 static std::mutex s_lmdbStartupLock;
 
+std::pair<uint32_t, uint32_t> LMDBBackend::getSchemaVersionAndShards(std::string& filename)
+{
+  // cerr << "getting schema version for path " << filename << endl;
+
+  uint32_t schemaversion;
+
+  int rc;
+  MDB_env* env = nullptr;
+  MDB_dbi dbi;
+  MDB_val key, data;
+  MDB_txn* txn = nullptr;
+
+  if ((rc = mdb_env_create(&env)) != 0) {
+    throw std::runtime_error("mdb_env_create failed");
+  }
+
+  if ((rc = mdb_env_set_maxdbs(env, 20)) != 0) {
+    mdb_env_close(env);
+    throw std::runtime_error("mdb_env_set_maxdbs failed");
+  }
+
+  if ((rc = mdb_env_open(env, filename.c_str(), MDB_NOSUBDIR, 0600)) != 0) {
+    mdb_env_close(env);
+    throw std::runtime_error("mdb_env_open failed");
+  }
+
+  if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) {
+    mdb_env_close(env);
+    throw std::runtime_error("mdb_txn_begin failed");
+  }
+
+  if ((rc = mdb_dbi_open(txn, "pdns", 0, &dbi)) != 0) {
+    if (rc == MDB_NOTFOUND) {
+      // this means nothing has been inited yet
+      // we pretend this means 5
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      return std::make_pair(5, 0);
+    }
+    mdb_txn_abort(txn);
+    mdb_env_close(env);
+    throw std::runtime_error("mdb_dbi_open failed");
+  }
+
+  key.mv_data = (char*)"schemaversion";
+  key.mv_size = strlen((char*)key.mv_data);
+
+  if ((rc = mdb_get(txn, dbi, &key, &data)) != 0) {
+    if (rc == MDB_NOTFOUND) {
+      // this means nothing has been inited yet
+      // we pretend this means 5
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      return std::make_pair(5, 0);
+    }
+
+    throw std::runtime_error("mdb_get pdns.schemaversion failed");
+  }
+
+  if (data.mv_size == 4) {
+    // schemaversion is < 5 and is stored in 32 bits, in host order
+
+    memcpy(&schemaversion, data.mv_data, data.mv_size);
+  }
+  else if (data.mv_size >= LMDBLS::LS_MIN_HEADER_SIZE + sizeof(schemaversion)) {
+    // schemaversion presumably is 5, stored in 32 bits, network order, after the LS header
+
+    // FIXME: get actual header size (including extension blocks) instead of just reading from the back
+    // FIXME: add a test for reading schemaversion and shards (and actual data, later) when there are variably sized headers
+    memcpy(&schemaversion, (char*)data.mv_data + data.mv_size - sizeof(schemaversion), sizeof(schemaversion));
+    schemaversion = ntohl(schemaversion);
+  }
+  else {
+    throw std::runtime_error("pdns.schemaversion had unexpected size");
+  }
+
+  uint32_t shards;
+
+  key.mv_data = (char*)"shards";
+  key.mv_size = strlen((char*)key.mv_data);
+
+  if ((rc = mdb_get(txn, dbi, &key, &data)) != 0) {
+    if (rc == MDB_NOTFOUND) {
+      cerr << "schemaversion was set, but shards was not. Dazed and confused, trying to exit." << endl;
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      exit(1);
+    }
+
+    throw std::runtime_error("mdb_get pdns.shards failed");
+  }
+
+  if (data.mv_size == 4) {
+    // 'shards' is stored in 32 bits, in host order
+
+    memcpy(&shards, data.mv_data, data.mv_size);
+  }
+  else if (data.mv_size >= LMDBLS::LS_MIN_HEADER_SIZE + sizeof(shards)) {
+    // FIXME: get actual header size (including extension blocks) instead of just reading from the back
+    memcpy(&shards, (char*)data.mv_data + data.mv_size - sizeof(shards), sizeof(shards));
+    shards = ntohl(shards);
+  }
+  else {
+    throw std::runtime_error("pdns.shards had unexpected size");
+  }
+
+  mdb_txn_abort(txn);
+  mdb_env_close(env);
+
+  return std::make_pair(schemaversion, shards);
+}
+
+namespace
+{
+// copy sdbi to tdbi, prepending an empty LS header (24 bytes of '\0') to all values
+void copyDBIAndAddLSHeader(MDB_txn* txn, MDB_dbi sdbi, MDB_dbi tdbi)
+{
+  // FIXME: clear out target dbi first
+
+  std::string header(LMDBLS::LS_MIN_HEADER_SIZE, '\0');
+  int rc;
+
+  MDB_cursor* cur;
+  MDB_val key, data;
+
+  if ((rc = mdb_cursor_open(txn, sdbi, &cur)) != 0) {
+    throw std::runtime_error("mdb_cursur_open failed");
+  }
+
+  rc = mdb_cursor_get(cur, &key, &data, MDB_FIRST);
+
+  while (rc == 0) {
+    std::string skey((char*)key.mv_data, key.mv_size);
+    std::string sdata((char*)data.mv_data, data.mv_size);
+
+    std::string stdata = header + sdata;
+
+    // cerr<<"got key="<<makeHexDump(skey)<<", data="<<makeHexDump(sdata)<<", sdata="<<makeHexDump(stdata)<<endl;
+
+    MDB_val tkey;
+    MDB_val tdata;
+
+    tkey.mv_data = (char*)skey.c_str();
+    tkey.mv_size = skey.size();
+    tdata.mv_data = (char*)stdata.c_str();
+    tdata.mv_size = stdata.size();
+
+    if ((rc = mdb_put(txn, tdbi, &tkey, &tdata, 0)) != 0) {
+      throw std::runtime_error("mdb_put failed");
+    }
+
+    rc = mdb_cursor_get(cur, &key, &data, MDB_NEXT);
+  }
+  if (rc != MDB_NOTFOUND) {
+    cerr << "rc=" << rc << endl;
+    throw std::runtime_error("error while iterating dbi");
+  }
+}
+
+// migrated a typed DBI:
+// 1. change keys (uint32_t) from host to network order
+// 2. prepend empty LS header to values
+void copyTypedDBI(MDB_txn* txn, MDB_dbi sdbi, MDB_dbi tdbi)
+{
+  // FIXME: clear out target dbi first
+
+  std::string header(LMDBLS::LS_MIN_HEADER_SIZE, '\0');
+  int rc;
+
+  MDB_cursor* cur;
+  MDB_val key, data;
+
+  if ((rc = mdb_cursor_open(txn, sdbi, &cur)) != 0) {
+    throw std::runtime_error("mdb_cursur_open failed");
+  }
+
+  rc = mdb_cursor_get(cur, &key, &data, MDB_FIRST);
+
+  while (rc == 0) {
+    // std::string skey((char*) key.mv_data, key.mv_size);
+    std::string sdata((char*)data.mv_data, data.mv_size);
+
+    std::string stdata = header + sdata;
+
+    uint32_t id;
+
+    if (key.mv_size != sizeof(uint32_t)) {
+      throw std::runtime_error("got non-uint32_t key in TypedDBI");
+    }
+
+    memcpy((void*)&id, key.mv_data, sizeof(uint32_t));
+
+    id = htonl(id);
+
+    // cerr<<"got key="<<makeHexDump(skey)<<", data="<<makeHexDump(sdata)<<", sdata="<<makeHexDump(stdata)<<endl;
+
+    MDB_val tkey;
+    MDB_val tdata;
+
+    tkey.mv_data = (char*)&id;
+    tkey.mv_size = sizeof(uint32_t);
+    tdata.mv_data = (char*)stdata.c_str();
+    tdata.mv_size = stdata.size();
+
+    if ((rc = mdb_put(txn, tdbi, &tkey, &tdata, 0)) != 0) {
+      throw std::runtime_error("mdb_put failed");
+    }
+
+    rc = mdb_cursor_get(cur, &key, &data, MDB_NEXT);
+  }
+  if (rc != MDB_NOTFOUND) {
+    cerr << "rc=" << rc << endl;
+    throw std::runtime_error("error while iterating dbi");
+  }
+}
+
+// migrated an index DBI:
+// newkey = oldkey.len(), oldkey, htonl(oldvalue)
+// newvalue = empty lsheader
+void copyIndexDBI(MDB_txn* txn, MDB_dbi sdbi, MDB_dbi tdbi)
+{
+  // FIXME: clear out target dbi first
+
+  std::string header(LMDBLS::LS_MIN_HEADER_SIZE, '\0');
+  int rc;
+
+  MDB_cursor* cur;
+  MDB_val key, data;
+
+  if ((rc = mdb_cursor_open(txn, sdbi, &cur)) != 0) {
+    throw std::runtime_error("mdb_cursur_open failed");
+  }
+
+  rc = mdb_cursor_get(cur, &key, &data, MDB_FIRST);
+
+  while (rc == 0) {
+    std::string lenprefix(sizeof(uint16_t), '\0');
+    std::string skey((char*)key.mv_data, key.mv_size);
+
+    uint32_t id;
+
+    if (data.mv_size != sizeof(uint32_t)) {
+      throw std::runtime_error("got non-uint32_t ID value in IndexDBI");
+    }
+
+    memcpy((void*)&id, data.mv_data, sizeof(uint32_t));
+    id = htonl(id);
+
+    uint16_t len = htons(skey.size());
+    memcpy((void*)lenprefix.data(), &len, sizeof(len));
+    std::string stkey = lenprefix + skey + std::string((char*)&id, sizeof(uint32_t));
+
+    MDB_val tkey;
+    MDB_val tdata;
+
+    tkey.mv_data = (char*)stkey.c_str();
+    tkey.mv_size = stkey.size();
+    tdata.mv_data = (char*)header.c_str();
+    tdata.mv_size = header.size();
+
+    if ((rc = mdb_put(txn, tdbi, &tkey, &tdata, 0)) != 0) {
+      throw std::runtime_error("mdb_put failed");
+    }
+
+    rc = mdb_cursor_get(cur, &key, &data, MDB_NEXT);
+  }
+  if (rc != MDB_NOTFOUND) {
+    throw std::runtime_error("error while iterating dbi");
+  }
+}
+
+}
+
+bool LMDBBackend::upgradeToSchemav5(std::string& filename)
+{
+  int rc;
+  MDB_env* env = nullptr;
+  MDB_dbi dbi;
+  MDB_txn* txn = nullptr;
+
+  auto currentSchemaVersionAndShards = getSchemaVersionAndShards(filename);
+  uint32_t currentSchemaVersion = currentSchemaVersionAndShards.first;
+  uint32_t shards = currentSchemaVersionAndShards.second;
+
+  if (currentSchemaVersion != 3 && currentSchemaVersion != 4) {
+    throw std::runtime_error("upgrade to v5 requested but current schema is not v3 or v4, stopping");
+  }
+
+  if ((rc = mdb_env_create(&env)) != 0) {
+    throw std::runtime_error("mdb_env_create failed");
+  }
+
+  if ((rc = mdb_env_set_maxdbs(env, 20)) != 0) {
+    mdb_env_close(env);
+    throw std::runtime_error("mdb_env_set_maxdbs failed");
+  }
+
+  if ((rc = mdb_env_open(env, filename.c_str(), MDB_NOSUBDIR, 0600)) != 0) {
+    mdb_env_close(env);
+    throw std::runtime_error("mdb_env_open failed");
+  }
+
+  if ((rc = mdb_txn_begin(env, NULL, 0, &txn)) != 0) {
+    mdb_env_close(env);
+    throw std::runtime_error("mdb_txn_begin failed");
+  }
+
+  std::cerr << "migrating shards" << std::endl;
+  for (uint32_t i = 0; i < shards; i++) {
+    string shardfile = filename + "-" + std::to_string(i);
+    if (access(shardfile.c_str(), F_OK) < 0) {
+      if (errno == ENOENT) {
+        // apparently this shard doesn't exist yet, moving on
+        std::cerr << "shard " << shardfile << " not found, continuing" << std::endl;
+        continue;
+      }
+    }
+
+    std::cerr << "migrating shard " << shardfile << std::endl;
+    MDB_env* shenv = nullptr;
+    MDB_dbi shdbi;
+    MDB_dbi shdbi2;
+    MDB_txn* shtxn = nullptr;
+
+    if ((rc = mdb_env_create(&shenv)) != 0) {
+      throw std::runtime_error("mdb_env_create failed");
+    }
+
+    if ((rc = mdb_env_set_maxdbs(shenv, 8)) != 0) {
+      mdb_env_close(env);
+      throw std::runtime_error("mdb_env_set_maxdbs failed");
+    }
+
+    if ((rc = mdb_env_open(shenv, shardfile.c_str(), MDB_NOSUBDIR, 0600)) != 0) {
+      mdb_env_close(env);
+      throw std::runtime_error("mdb_env_open failed");
+    }
+
+    if ((rc = mdb_txn_begin(shenv, NULL, 0, &shtxn)) != 0) {
+      mdb_env_close(env);
+      throw std::runtime_error("mdb_txn_begin failed");
+    }
+
+    if ((rc = mdb_dbi_open(shtxn, "records", 0, &shdbi)) != 0) {
+      if (rc == MDB_NOTFOUND) {
+        mdb_txn_abort(shtxn);
+        mdb_env_close(shenv);
+        continue;
+      }
+      mdb_txn_abort(shtxn);
+      mdb_env_close(shenv);
+      throw std::runtime_error("mdb_dbi_open shard records failed");
+    }
+
+    // FIXME: pdns_v5 is wrong, it should not be suffixed, but it's easy to test LS-prefix-insertion on this small table
+    // FIXME: remember to fix endianness
+    if ((rc = mdb_dbi_open(shtxn, "records_v5", MDB_CREATE, &shdbi2)) != 0) {
+      mdb_dbi_close(shenv, shdbi);
+      mdb_txn_abort(shtxn);
+      mdb_env_close(shenv);
+      throw std::runtime_error("mdb_dbi_open shard records_v5 failed");
+    }
+
+    try {
+      // FIXME: this function appears to be perfect for records, and specifically wrong for anything that is not records
+      copyDBIAndAddLSHeader(shtxn, shdbi, shdbi2);
+    }
+    catch (std::exception& e) {
+      mdb_dbi_close(shenv, shdbi2);
+      mdb_dbi_close(shenv, shdbi);
+      mdb_txn_abort(shtxn);
+      mdb_env_close(shenv);
+      throw std::runtime_error("copyDBIAndAddLSHeader failed");
+    }
+
+    cerr << "shard mbd_drop=" << mdb_drop(shtxn, shdbi, 1) << endl;
+    mdb_txn_commit(shtxn);
+    mdb_dbi_close(shenv, shdbi2);
+    mdb_env_close(shenv);
+  }
+
+  MDB_dbi fromtypeddbi[4];
+  MDB_dbi totypeddbi[4];
+
+  int index = 0;
+
+  for (const std::string& dbname : {"domains", "keydata", "tsig", "metadata"}) {
+    std::cerr << "migrating " << dbname << std::endl;
+    std::string tdbname = dbname + "_v5";
+
+    if ((rc = mdb_dbi_open(txn, dbname.c_str(), 0, &fromtypeddbi[index])) != 0) {
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      throw std::runtime_error("mdb_dbi_open typeddbi failed");
+    }
+
+    if ((rc = mdb_dbi_open(txn, tdbname.c_str(), MDB_CREATE, &totypeddbi[index])) != 0) {
+      mdb_dbi_close(env, fromtypeddbi[index]);
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      throw std::runtime_error("mdb_dbi_open typeddbi target failed");
+    }
+
+    try {
+      copyTypedDBI(txn, fromtypeddbi[index], totypeddbi[index]);
+    }
+    catch (std::exception& e) {
+      mdb_dbi_close(env, totypeddbi[index]);
+      mdb_dbi_close(env, fromtypeddbi[index]);
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      throw std::runtime_error("copyTypedDBI failed");
+    }
+
+    // mdb_dbi_close(env, dbi2);
+    // mdb_dbi_close(env, dbi);
+    std::cerr << "migrated " << dbname << std::endl;
+
+    index++;
+  }
+
+  MDB_dbi fromindexdbi[4];
+  MDB_dbi toindexdbi[4];
+
+  index = 0;
+
+  for (const std::string& dbname : {"domains", "keydata", "tsig", "metadata"}) {
+    std::string fdbname = dbname + "_0";
+    std::cerr << "migrating " << dbname << std::endl;
+    std::string tdbname = dbname + "_v5_0";
+
+    if ((rc = mdb_dbi_open(txn, fdbname.c_str(), 0, &fromindexdbi[index])) != 0) {
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      throw std::runtime_error("mdb_dbi_open indexdbi failed");
+    }
+
+    if ((rc = mdb_dbi_open(txn, tdbname.c_str(), MDB_CREATE, &toindexdbi[index])) != 0) {
+      mdb_dbi_close(env, fromindexdbi[index]);
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      throw std::runtime_error("mdb_dbi_open indexdbi target failed");
+    }
+
+    try {
+      copyIndexDBI(txn, fromindexdbi[index], toindexdbi[index]);
+    }
+    catch (std::exception& e) {
+      mdb_dbi_close(env, toindexdbi[index]);
+      mdb_dbi_close(env, fromindexdbi[index]);
+      mdb_txn_abort(txn);
+      mdb_env_close(env);
+      throw std::runtime_error("copyIndexDBI failed");
+    }
+
+    // mdb_dbi_close(env, dbi2);
+    // mdb_dbi_close(env, dbi);
+    std::cerr << "migrated " << dbname << std::endl;
+
+    index++;
+  }
+
+  // finally, migrate the pdns db
+  if ((rc = mdb_dbi_open(txn, "pdns", 0, &dbi)) != 0) {
+    mdb_txn_abort(txn);
+    mdb_env_close(env);
+    throw std::runtime_error("mdb_dbi_open pdns failed");
+  }
+
+  MDB_val key, data;
+
+  std::string header(LMDBLS::LS_MIN_HEADER_SIZE, '\0');
+
+  for (const std::string& keyname : {"schemaversion", "shards"}) {
+    cerr << "migrating pdns." << keyname << endl;
+
+    key.mv_data = (char*)keyname.c_str();
+    key.mv_size = keyname.size();
+
+    if ((rc = mdb_get(txn, dbi, &key, &data))) {
+      throw std::runtime_error("mdb_get pdns.shards failed");
+    }
+
+    uint32_t value;
+
+    if (data.mv_size != sizeof(uint32_t)) {
+      throw std::runtime_error("got non-uint32_t key");
+    }
+
+    memcpy((void*)&value, data.mv_data, sizeof(uint32_t));
+
+    value = htonl(value);
+    if (keyname == "schemaversion") {
+      value = htonl(5);
+    }
+
+    std::string sdata((char*)data.mv_data, data.mv_size);
+
+    std::string stdata = header + std::string((char*)&value, sizeof(uint32_t));
+    ;
+
+    MDB_val tdata;
+
+    tdata.mv_data = (char*)stdata.c_str();
+    tdata.mv_size = stdata.size();
+
+    if ((rc = mdb_put(txn, dbi, &key, &tdata, 0)) != 0) {
+      throw std::runtime_error("mdb_put failed");
+    }
+  }
+
+  for (const std::string& keyname : {"uuid"}) {
+    cerr << "migrating pdns." << keyname << endl;
+
+    key.mv_data = (char*)keyname.c_str();
+    key.mv_size = keyname.size();
+
+    if ((rc = mdb_get(txn, dbi, &key, &data))) {
+      throw std::runtime_error("mdb_get pdns.shards failed");
+    }
+
+    std::string sdata((char*)data.mv_data, data.mv_size);
+
+    std::string stdata = header + sdata;
+
+    MDB_val tdata;
+
+    tdata.mv_data = (char*)stdata.c_str();
+    tdata.mv_size = stdata.size();
+
+    if ((rc = mdb_put(txn, dbi, &key, &tdata, 0)) != 0) {
+      throw std::runtime_error("mdb_put failed");
+    }
+  }
+
+  for (int i = 0; i < 4; i++) {
+    mdb_drop(txn, fromtypeddbi[i], 1);
+    mdb_drop(txn, fromindexdbi[i], 1);
+  }
+
+  cerr << "txn commit=" << mdb_txn_commit(txn) << endl;
+
+  for (int i = 0; i < 4; i++) {
+    mdb_dbi_close(env, totypeddbi[i]);
+    mdb_dbi_close(env, toindexdbi[i]);
+  }
+  mdb_env_close(env);
+
+  // throw std::runtime_error("migration done");
+  cerr << "migration done" << endl;
+  // exit(1);
+  return true;
+}
+
 LMDBBackend::LMDBBackend(const std::string& suffix)
 {
   // overlapping domain ids in combination with relative names are a recipe for disaster
@@ -90,31 +644,48 @@ LMDBBackend::LMDBBackend(const std::string& suffix)
   catch (const std::exception& e) {
     throw std::runtime_error(std::string("Unable to parse the 'map-size' LMDB value: ") + e.what());
   }
-  d_tdomains = std::make_shared<tdomains_t>(getMDBEnv(getArg("filename").c_str(), MDB_NOSUBDIR | d_asyncFlag, 0600, mapSize), "domains");
-  d_tmeta = std::make_shared<tmeta_t>(d_tdomains->getEnv(), "metadata");
-  d_tkdb = std::make_shared<tkdb_t>(d_tdomains->getEnv(), "keydata");
-  d_ttsig = std::make_shared<ttsig_t>(d_tdomains->getEnv(), "tsig");
 
-  auto pdnsdbi = d_tdomains->getEnv()->openDB("pdns", MDB_CREATE);
+  LMDBLS::s_flag_deleted = mustDo("flag-deleted");
+
+  bool opened = false;
 
   if (s_first) {
     std::lock_guard<std::mutex> l(s_lmdbStartupLock);
     if (s_first) {
-      auto txn = d_tdomains->getEnv()->getRWTransaction();
+      auto filename = getArg("filename");
 
-      uint32_t schemaversion = 1;
-      MDBOutVal _schemaversion;
-      if (!txn->get(pdnsdbi, "schemaversion", _schemaversion)) {
-        schemaversion = _schemaversion.get<uint32_t>();
+      auto currentSchemaVersionAndShards = getSchemaVersionAndShards(filename);
+      uint32_t currentSchemaVersion = currentSchemaVersionAndShards.first;
+      // std::cerr<<"current schema version: "<<currentSchemaVersion<<", shards="<<currentSchemaVersionAndShards.second<<std::endl;
+
+      // FIXME: I accidentally took out the code that checks pdns.conf lmdb-schema-version
+
+      if (currentSchemaVersion < 3) {
+        throw std::runtime_error("this version of the lmdbbackend can only upgrade from schema v3/v4 to v5. Upgrading from older schemas is not yet supported.");
       }
 
-      if (schemaversion != SCHEMAVERSION) {
-        if (getArgAsNum("schema-version") != SCHEMAVERSION) {
-          throw std::runtime_error("Expected LMDB schema version " + std::to_string(SCHEMAVERSION) + " but got " + std::to_string(schemaversion));
+      if (currentSchemaVersion == 3 || currentSchemaVersion == 4) {
+        if (!upgradeToSchemav5(filename)) {
+          throw std::runtime_error("Failed to perform LMDB schema version upgrade from v4 to v5");
         }
-        txn->put(pdnsdbi, "schemaversion", SCHEMAVERSION);
+        currentSchemaVersion = 5;
+      }
+
+      if (currentSchemaVersion != 5) {
+        throw std::runtime_error("Somehow, we are not at schema version 5. Giving up");
       }
 
+      d_tdomains = std::make_shared<tdomains_t>(getMDBEnv(getArg("filename").c_str(), MDB_NOSUBDIR | d_asyncFlag, 0600, mapSize), "domains_v5");
+      d_tmeta = std::make_shared<tmeta_t>(d_tdomains->getEnv(), "metadata_v5");
+      d_tkdb = std::make_shared<tkdb_t>(d_tdomains->getEnv(), "keydata_v5");
+      d_ttsig = std::make_shared<ttsig_t>(d_tdomains->getEnv(), "tsig_v5");
+
+      auto pdnsdbi = d_tdomains->getEnv()->openDB("pdns", MDB_CREATE);
+
+      opened = true;
+
+      auto txn = d_tdomains->getEnv()->getRWTransaction();
+
       MDBOutVal shards;
       if (!txn->get(pdnsdbi, "shards", shards)) {
         s_shards = shards.get<uint32_t>();
@@ -134,17 +705,23 @@ LMDBBackend::LMDBBackend(const std::string& suffix)
         txn->put(pdnsdbi, "uuid", uuids);
       }
 
+      MDBOutVal _schemaversion;
+      if (txn->get(pdnsdbi, "schemaversion", _schemaversion)) {
+        // our DB is entirely new, so we need to write the schemaversion
+        txn->put(pdnsdbi, "schemaversion", currentSchemaVersion);
+      }
       txn->commit();
 
-      if (schemaversion < 3) {
-        if (!upgradeToSchemav3()) {
-          throw std::runtime_error("Failed to perform LMDB schema version upgrade to " + std::to_string(SCHEMAVERSION) + " from " + std::to_string(schemaversion));
-        }
-      }
       s_first = false;
     }
   }
 
+  if (!opened) {
+    d_tdomains = std::make_shared<tdomains_t>(getMDBEnv(getArg("filename").c_str(), MDB_NOSUBDIR | d_asyncFlag, 0600, mapSize), "domains_v5");
+    d_tmeta = std::make_shared<tmeta_t>(d_tdomains->getEnv(), "metadata_v5");
+    d_tkdb = std::make_shared<tkdb_t>(d_tdomains->getEnv(), "keydata_v5");
+    d_ttsig = std::make_shared<ttsig_t>(d_tdomains->getEnv(), "tsig_v5");
+  }
   d_trecords.resize(s_shards);
   d_dolog = ::arg().mustDo("query-logging");
 }
@@ -155,7 +732,7 @@ namespace serialization
 {
 
   template <class Archive>
-  void save(Archive& ar, const DNSName& g, const unsigned int version)
+  void save(Archive& ar, const DNSName& g, const unsigned int /* version */)
   {
     if (g.empty()) {
       ar& std::string();
@@ -166,7 +743,7 @@ namespace serialization
   }
 
   template <class Archive>
-  void load(Archive& ar, DNSName& g, const unsigned int version)
+  void load(Archive& ar, DNSName& g, const unsigned int /* version */)
   {
     string tmp;
     ar& tmp;
@@ -179,13 +756,13 @@ namespace serialization
   }
 
   template <class Archive>
-  void save(Archive& ar, const QType& g, const unsigned int version)
+  void save(Archive& ar, const QType& g, const unsigned int /* version */)
   {
     ar& g.getCode();
   }
 
   template <class Archive>
-  void load(Archive& ar, QType& g, const unsigned int version)
+  void load(Archive& ar, QType& g, const unsigned int /* version */)
   {
     uint16_t tmp;
     ar& tmp;
@@ -193,7 +770,7 @@ namespace serialization
   }
 
   template <class Archive>
-  void save(Archive& ar, const DomainInfo& g, const unsigned int version)
+  void save(Archive& ar, const DomainInfo& g, const unsigned int /* version */)
   {
     ar& g.zone;
     ar& g.last_check;
@@ -227,13 +804,13 @@ namespace serialization
   }
 
   template <class Archive>
-  void serialize(Archive& ar, LMDBBackend::DomainMeta& g, const unsigned int version)
+  void serialize(Archive& ar, LMDBBackend::DomainMeta& g, const unsigned int /* version */)
   {
     ar& g.domain& g.key& g.value;
   }
 
   template <class Archive>
-  void save(Archive& ar, const LMDBBackend::KeyDataDB& g, const unsigned int version)
+  void save(Archive& ar, const LMDBBackend::KeyDataDB& g, const unsigned int /* version */)
   {
     ar& g.domain& g.content& g.flags& g.active& g.published;
   }
@@ -251,7 +828,7 @@ namespace serialization
   }
 
   template <class Archive>
-  void serialize(Archive& ar, TSIGKey& g, const unsigned int version)
+  void serialize(Archive& ar, TSIGKey& g, const unsigned int /* version */)
   {
     ar& g.name;
     ar& g.algorithm; // this is the ordername
@@ -363,8 +940,8 @@ void LMDBBackend::deleteDomainRecords(RecordsRWTransaction& txn, uint32_t domain
   MDBOutVal key, val;
   //  cout<<"Match: "<<makeHexDump(match);
   if (!cursor.lower_bound(match, key, val)) {
-    while (key.get<StringView>().rfind(match, 0) == 0) {
-      if (qtype == QType::ANY || co.getQType(key.get<StringView>()) == qtype)
+    while (key.getNoStripHeader<StringView>().rfind(match, 0) == 0) {
+      if (qtype == QType::ANY || co.getQType(key.getNoStripHeader<StringView>()) == qtype)
         cursor.del();
       if (cursor.next(key, val))
         break;
@@ -575,7 +1152,7 @@ std::shared_ptr<LMDBBackend::RecordsRWTransaction> LMDBBackend::getRecordsRWTran
   if (!shard.env) {
     shard.env = getMDBEnv((getArg("filename") + "-" + std::to_string(id % s_shards)).c_str(),
                           MDB_NOSUBDIR | d_asyncFlag, 0600);
-    shard.dbi = shard.env->openDB("records", MDB_CREATE);
+    shard.dbi = shard.env->openDB("records_v5", MDB_CREATE);
   }
   auto ret = std::make_shared<RecordsRWTransaction>(shard.env->getRWTransaction());
   ret->db = std::make_shared<RecordsDB>(shard);
@@ -592,7 +1169,7 @@ std::shared_ptr<LMDBBackend::RecordsROTransaction> LMDBBackend::getRecordsROTran
     }
     shard.env = getMDBEnv((getArg("filename") + "-" + std::to_string(id % s_shards)).c_str(),
                           MDB_NOSUBDIR | d_asyncFlag, 0600);
-    shard.dbi = shard.env->openDB("records", MDB_CREATE);
+    shard.dbi = shard.env->openDB("records_v5", MDB_CREATE);
   }
 
   if (rwtxn) {
@@ -607,6 +1184,8 @@ std::shared_ptr<LMDBBackend::RecordsROTransaction> LMDBBackend::getRecordsROTran
   }
 }
 
+#if 0
+// FIXME reinstate soon
 bool LMDBBackend::upgradeToSchemav3()
 {
   g_log << Logger::Warning << "Upgrading LMDB schema" << endl;
@@ -644,7 +1223,7 @@ bool LMDBBackend::upgradeToSchemav3()
     string_view currentKey;
     string value;
     for (;;) {
-      auto newKey = key.get<string_view>();
+      auto newKey = key.getNoStripHeader<string_view>();
       if (currentKey.compare(newKey) != 0) {
         if (value.size() > 0) {
           newTxn->put(newShard.dbi, currentKey, value);
@@ -668,6 +1247,7 @@ bool LMDBBackend::upgradeToSchemav3()
 
   return true;
 }
+#endif
 
 bool LMDBBackend::deleteDomain(const DNSName& domain)
 {
@@ -693,10 +1273,12 @@ bool LMDBBackend::deleteDomain(const DNSName& domain)
 
   { // Remove metadata
     auto txn = d_tmeta->getRWTransaction();
-    auto range = txn.equal_range<0>(domain);
+    LMDBIDvec ids;
+
+    txn.get_multi<0>(domain, ids);
 
-    for (auto& iter = range.first; iter != range.second; ++iter) {
-      iter.del();
+    for (auto _id : ids) {
+      txn.del(_id);
     }
 
     txn.commit();
@@ -704,10 +1286,11 @@ bool LMDBBackend::deleteDomain(const DNSName& domain)
 
   { // Remove cryptokeys
     auto txn = d_tkdb->getRWTransaction();
-    auto range = txn.equal_range<0>(domain);
+    LMDBIDvec ids;
+    txn.get_multi<0>(domain, ids);
 
-    for (auto& iter = range.first; iter != range.second; ++iter) {
-      iter.del();
+    for (auto _id : ids) {
+      txn.del(_id);
     }
 
     txn.commit();
@@ -725,17 +1308,18 @@ bool LMDBBackend::deleteDomain(const DNSName& domain)
   return true;
 }
 
-bool LMDBBackend::list(const DNSName& target, int id, bool include_disabled)
+bool LMDBBackend::list(const DNSName& target, int /* id */, bool include_disabled)
 {
   d_includedisabled = include_disabled;
 
   DomainInfo di;
   {
     auto dtxn = d_tdomains->getROTransaction();
-    if ((di.id = dtxn.get<0>(target, di)))
-      ; //      cout<<"Found domain "<<target<<" on domain_id "<<di.id <<", list requested "<<id<<endl;
+    if ((di.id = dtxn.get<0>(target, di))) {
+      // cerr << "Found domain " << target << " on domain_id " << di.id << ", list requested " << id << endl;
+    }
     else {
-      // cout<<"Did not find "<<target<<endl;
+      // cerr << "Did not find " << target << endl;
       return false;
     }
   }
@@ -747,8 +1331,10 @@ bool LMDBBackend::list(const DNSName& target, int id, bool include_disabled)
   d_matchkey = co(di.id);
 
   MDBOutVal key, val;
-  if (d_getcursor->lower_bound(d_matchkey, key, val) || key.get<StringView>().rfind(d_matchkey, 0) != 0) {
-    // cout<<"Found nothing for list"<<endl;
+  auto a = d_getcursor->lower_bound(d_matchkey, key, val);
+  auto b0 = key.getNoStripHeader<StringView>();
+  auto b = b0.rfind(d_matchkey, 0);
+  if (a || b != 0) {
     d_getcursor.reset();
   }
 
@@ -761,7 +1347,7 @@ bool LMDBBackend::list(const DNSName& target, int id, bool include_disabled)
   return true;
 }
 
-void LMDBBackend::lookup(const QType& type, const DNSName& qdomain, int zoneId, DNSPacket* p)
+void LMDBBackend::lookup(const QType& type, const DNSName& qdomain, int zoneId, DNSPacket* /* p */)
 {
   if (d_dolog) {
     g_log << Logger::Warning << "Got lookup for " << qdomain << "|" << type.toString() << " in zone " << zoneId << endl;
@@ -810,7 +1396,7 @@ void LMDBBackend::lookup(const QType& type, const DNSName& qdomain, int zoneId,
     d_matchkey = co(zoneId, relqname, type.getCode());
   }
 
-  if (d_getcursor->lower_bound(d_matchkey, key, val) || key.get<StringView>().rfind(d_matchkey, 0) != 0) {
+  if (d_getcursor->lower_bound(d_matchkey, key, val) || key.getNoStripHeader<StringView>().rfind(d_matchkey, 0) != 0) {
     d_getcursor.reset();
     if (d_dolog) {
       g_log << Logger::Warning << "Query " << ((long)(void*)this) << ": " << d_dtime.udiffNoReset() << " usec to execute (found nothing)" << endl;
@@ -832,6 +1418,7 @@ void LMDBBackend::lookup(const QType& type, const DNSName& qdomain, int zoneId,
 bool LMDBBackend::get(DNSZoneRecord& zr)
 {
   for (;;) {
+    // std::cerr<<"d_getcursor="<<d_getcursor<<std::endl;
     if (!d_getcursor) {
       d_rotxn.reset();
       return false;
@@ -842,12 +1429,13 @@ bool LMDBBackend::get(DNSZoneRecord& zr)
     if (d_currentrrset.empty()) {
       d_getcursor->current(d_currentKey, d_currentVal);
 
-      key = d_currentKey.get<string_view>();
+      key = d_currentKey.getNoStripHeader<string_view>();
       zr.dr.d_type = compoundOrdername::getQType(key).getCode();
 
       if (zr.dr.d_type == QType::NSEC3) {
         // Hit a magic NSEC3 skipping
-        if (d_getcursor->next(d_currentKey, d_currentVal) || d_currentKey.get<StringView>().rfind(d_matchkey, 0) != 0) {
+        if (d_getcursor->next(d_currentKey, d_currentVal) || d_currentKey.getNoStripHeader<StringView>().rfind(d_matchkey, 0) != 0) {
+          // cerr<<"resetting d_getcursor 1"<<endl;
           d_getcursor.reset();
         }
         continue;
@@ -857,7 +1445,7 @@ bool LMDBBackend::get(DNSZoneRecord& zr)
       d_currentrrsetpos = 0;
     }
     else {
-      key = d_currentKey.get<string_view>();
+      key = d_currentKey.getNoStripHeader<string_view>();
     }
     try {
       const auto& lrr = d_currentrrset.at(d_currentrrsetpos++);
@@ -874,7 +1462,8 @@ bool LMDBBackend::get(DNSZoneRecord& zr)
 
       if (d_currentrrsetpos >= d_currentrrset.size()) {
         d_currentrrset.clear(); // will invalidate lrr
-        if (d_getcursor->next(d_currentKey, d_currentVal) || d_currentKey.get<StringView>().rfind(d_matchkey, 0) != 0) {
+        if (d_getcursor->next(d_currentKey, d_currentVal) || d_currentKey.getNoStripHeader<StringView>().rfind(d_matchkey, 0) != 0) {
+          // cerr<<"resetting d_getcursor 2"<<endl;
           d_getcursor.reset();
         }
       }
@@ -935,9 +1524,23 @@ bool LMDBBackend::getDomainInfo(const DNSName& domain, DomainInfo& di, bool gets
 {
   {
     auto txn = d_tdomains->getROTransaction();
+    // auto range = txn.prefix_range<0>(domain);
+
+    // bool found = false;
 
-    if (!(di.id = txn.get<0>(domain, di)))
+    // for (auto& iter = range.first ; iter != range.second; ++iter) {
+    //   found = true;
+    //   di.id = iter.getID();
+    //   di.backend = this;
+    // }
+
+    // if (!found) {
+    //   return false;
+    // }
+    if (!(di.id = txn.get<0>(domain, di))) {
       return false;
+    }
+
     di.backend = this;
   }
 
@@ -1022,11 +1625,12 @@ bool LMDBBackend::createDomain(const DNSName& domain, const DomainInfo::DomainKi
   return true;
 }
 
-void LMDBBackend::getAllDomains(vector<DomainInfo>* domains, bool doSerial, bool include_disabled)
+void LMDBBackend::getAllDomains(vector<DomainInfo>* domains, bool /* doSerial */, bool include_disabled)
 {
   domains->clear();
   auto txn = d_tdomains->getROTransaction();
   for (auto iter = txn.begin(); iter != txn.end(); ++iter) {
+    // cerr<<"iter"<<endl;
     DomainInfo di = *iter;
     di.id = iter.getID();
 
@@ -1173,10 +1777,15 @@ bool LMDBBackend::getAllDomainMetadata(const DNSName& name, std::map<std::string
 {
   meta.clear();
   auto txn = d_tmeta->getROTransaction();
-  auto range = txn.equal_range<0>(name);
+  LMDBIDvec ids;
+  txn.get_multi<0>(name, ids);
 
-  for (auto& iter = range.first; iter != range.second; ++iter) {
-    meta[iter->key].push_back(iter->value);
+  DomainMeta dm;
+  // cerr<<"getAllDomainMetadata start"<<endl;
+  for (auto id : ids) {
+    if (txn.get(id, dm)) {
+      meta[dm.key].push_back(dm.value);
+    }
   }
   return true;
 }
@@ -1185,11 +1794,17 @@ bool LMDBBackend::setDomainMetadata(const DNSName& name, const std::string& kind
 {
   auto txn = d_tmeta->getRWTransaction();
 
-  auto range = txn.equal_range<0>(name);
+  LMDBIDvec ids;
+  txn.get_multi<0>(name, ids);
 
-  for (auto& iter = range.first; iter != range.second; ++iter) {
-    if (iter->key == kind)
-      iter.del();
+  DomainMeta dmeta;
+  for (auto id : ids) {
+    if (txn.get(id, dmeta)) {
+      if (dmeta.key == kind) {
+        // cerr<<"delete"<<endl;
+        txn.del(id);
+      }
+    }
   }
 
   for (const auto& m : meta) {
@@ -1203,10 +1818,16 @@ bool LMDBBackend::setDomainMetadata(const DNSName& name, const std::string& kind
 bool LMDBBackend::getDomainKeys(const DNSName& name, std::vector<KeyData>& keys)
 {
   auto txn = d_tkdb->getROTransaction();
-  auto range = txn.equal_range<0>(name);
-  for (auto& iter = range.first; iter != range.second; ++iter) {
-    KeyData kd{iter->content, iter.getID(), iter->flags, iter->active, iter->published};
-    keys.push_back(kd);
+  LMDBIDvec ids;
+  txn.get_multi<0>(name, ids);
+
+  KeyDataDB key;
+
+  for (auto id : ids) {
+    if (txn.get(id, key)) {
+      KeyData kd{key.content, id, key.flags, key.active, key.published};
+      keys.push_back(kd);
+    }
   }
 
   return true;
@@ -1333,13 +1954,13 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
     cursor.last(key, val);
 
     for (;;) {
-      if (co.getDomainID(key.get<StringView>()) != id) {
+      if (co.getDomainID(key.getNoStripHeader<StringView>()) != id) {
         //cout<<"Last record also not part of this zone!"<<endl;
         // this implies something is wrong in the database, nothing we can do
         return false;
       }
 
-      if (co.getQType(key.get<StringView>()) == QType::NSEC3) {
+      if (co.getQType(key.getNoStripHeader<StringView>()) == QType::NSEC3) {
         serFromString(val.get<StringView>(), lrr);
         if (!lrr.ttl) // the kind of NSEC3 we need
           break;
@@ -1349,7 +1970,7 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
         return false;
       }
     }
-    before = co.getQName(key.get<StringView>());
+    before = co.getQName(key.getNoStripHeader<StringView>());
     unhashed = DNSName(lrr.content.c_str(), lrr.content.size(), 0, false) + di.zone;
 
     // now to find after .. at the beginning of the zone
@@ -1358,28 +1979,28 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
       return false;
     }
     for (;;) {
-      if (co.getQType(key.get<StringView>()) == QType::NSEC3) {
+      if (co.getQType(key.getNoStripHeader<StringView>()) == QType::NSEC3) {
         serFromString(val.get<StringView>(), lrr);
         if (!lrr.ttl)
           break;
       }
 
-      if (cursor.next(key, val) || co.getDomainID(key.get<StringView>()) != id) {
+      if (cursor.next(key, val) || co.getDomainID(key.getNoStripHeader<StringView>()) != id) {
         // cout<<"hit end of zone or database when we shouldn't"<<endl;
         return false;
       }
     }
-    after = co.getQName(key.get<StringView>());
+    after = co.getQName(key.getNoStripHeader<StringView>());
     // cout<<"returning: before="<<before<<", after="<<after<<", unhashed: "<<unhashed<<endl;
     return true;
   }
 
   // cout<<"Ended up at "<<co.getQName(key.get<StringView>()) <<endl;
 
-  before = co.getQName(key.get<StringView>());
+  before = co.getQName(key.getNoStripHeader<StringView>());
   if (before == qname) {
     // cout << "Ended up on exact right node" << endl;
-    before = co.getQName(key.get<StringView>());
+    before = co.getQName(key.getNoStripHeader<StringView>());
     // unhashed should be correct now, maybe check?
     if (cursor.next(key, val)) {
       // xxx should find first hash now
@@ -1389,18 +2010,18 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
         return false;
       }
       for (;;) {
-        if (co.getQType(key.get<StringView>()) == QType::NSEC3) {
+        if (co.getQType(key.getNoStripHeader<StringView>()) == QType::NSEC3) {
           serFromString(val.get<StringView>(), lrr);
           if (!lrr.ttl)
             break;
         }
 
-        if (cursor.next(key, val) || co.getDomainID(key.get<StringView>()) != id) {
+        if (cursor.next(key, val) || co.getDomainID(key.getNoStripHeader<StringView>()) != id) {
           // cout<<"hit end of zone or database when we shouldn't" << __LINE__<<endl;
           return false;
         }
       }
-      after = co.getQName(key.get<StringView>());
+      after = co.getQName(key.getNoStripHeader<StringView>());
       // cout<<"returning: before="<<before<<", after="<<after<<", unhashed: "<<unhashed<<endl;
       return true;
     }
@@ -1409,7 +2030,7 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
     // cout <<"Going backwards to find 'before'"<<endl;
     int count = 0;
     for (;;) {
-      if (co.getQName(key.get<StringView>()).canonCompare(qname) && co.getQType(key.get<StringView>()) == QType::NSEC3) {
+      if (co.getQName(key.getNoStripHeader<StringView>()).canonCompare(qname) && co.getQType(key.getNoStripHeader<StringView>()) == QType::NSEC3) {
         // cout<<"Potentially stopping traverse at "<< co.getQName(key.get<StringView>()) <<", " << (co.getQName(key.get<StringView>()).canonCompare(qname))<<endl;
         // cout<<"qname = "<<qname<<endl;
         // cout<<"here  = "<<co.getQName(key.get<StringView>())<<endl;
@@ -1418,7 +2039,7 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
           break;
       }
 
-      if (cursor.prev(key, val) || co.getDomainID(key.get<StringView>()) != id) {
+      if (cursor.prev(key, val) || co.getDomainID(key.getNoStripHeader<StringView>()) != id) {
         // cout <<"XXX Hit *beginning* of zone or database"<<endl;
         // this can happen, must deal with it
         // should now find the last hash of the zone
@@ -1431,13 +2052,13 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
           cursor.prev(key, val);
 
         for (;;) {
-          if (co.getDomainID(key.get<StringView>()) != id) {
+          if (co.getDomainID(key.getNoStripHeader<StringView>()) != id) {
             //cout<<"Last record also not part of this zone!"<<endl;
             // this implies something is wrong in the database, nothing we can do
             return false;
           }
 
-          if (co.getQType(key.get<StringView>()) == QType::NSEC3) {
+          if (co.getQType(key.getNoStripHeader<StringView>()) == QType::NSEC3) {
             serFromString(val.get<StringView>(), lrr);
             if (!lrr.ttl) // the kind of NSEC3 we need
               break;
@@ -1447,7 +2068,7 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
             return false;
           }
         }
-        before = co.getQName(key.get<StringView>());
+        before = co.getQName(key.getNoStripHeader<StringView>());
         unhashed = DNSName(lrr.content.c_str(), lrr.content.size(), 0, false) + di.zone;
         // cout <<"Should still find 'after'!"<<endl;
         // for 'after', we need to find the first hash of this zone
@@ -1458,7 +2079,7 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
           return false;
         }
         for (;;) {
-          if (co.getQType(key.get<StringView>()) == QType::NSEC3) {
+          if (co.getQType(key.getNoStripHeader<StringView>()) == QType::NSEC3) {
             serFromString(val.get<StringView>(), lrr);
             if (!lrr.ttl)
               break;
@@ -1470,14 +2091,14 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
             return false;
           }
         }
-        after = co.getQName(key.get<StringView>());
+        after = co.getQName(key.getNoStripHeader<StringView>());
 
         // cout<<"returning: before="<<before<<", after="<<after<<", unhashed: "<<unhashed<<endl;
         return true;
       }
       ++count;
     }
-    before = co.getQName(key.get<StringView>());
+    before = co.getQName(key.getNoStripHeader<StringView>());
     unhashed = DNSName(lrr.content.c_str(), lrr.content.size(), 0, false) + di.zone;
     // cout<<"Went backwards, found "<<before<<endl;
     // return us to starting point
@@ -1486,7 +2107,7 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
   }
   //  cout<<"Now going forward"<<endl;
   for (int count = 0;; ++count) {
-    if ((count && cursor.next(key, val)) || co.getDomainID(key.get<StringView>()) != id) {
+    if ((count && cursor.next(key, val)) || co.getDomainID(key.getNoStripHeader<StringView>()) != id) {
       // cout <<"Hit end of database or zone, finding first hash then in zone "<<id<<endl;
       if (cursor.lower_bound(co(id), key, val)) {
         // cout<<"hit end of zone find when we shouldn't"<<endl;
@@ -1494,7 +2115,7 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
         return false;
       }
       for (;;) {
-        if (co.getQType(key.get<StringView>()) == QType::NSEC3) {
+        if (co.getQType(key.getNoStripHeader<StringView>()) == QType::NSEC3) {
           serFromString(val.get<StringView>(), lrr);
           if (!lrr.ttl)
             break;
@@ -1507,21 +2128,21 @@ bool LMDBBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const DNSName& qna
         }
         // cout << "Next.. "<<endl;
       }
-      after = co.getQName(key.get<StringView>());
+      after = co.getQName(key.getNoStripHeader<StringView>());
 
       // cout<<"returning: before="<<before<<", after="<<after<<", unhashed: "<<unhashed<<endl;
       return true;
     }
 
     // cout<<"After "<<co.getQName(key.get<StringView>()) <<endl;
-    if (co.getQType(key.get<StringView>()) == QType::NSEC3) {
+    if (co.getQType(key.getNoStripHeader<StringView>()) == QType::NSEC3) {
       serFromString(val.get<StringView>(), lrr);
       if (!lrr.ttl) {
         break;
       }
     }
   }
-  after = co.getQName(key.get<StringView>());
+  after = co.getQName(key.getNoStripHeader<StringView>());
   // cout<<"returning: before="<<before<<", after="<<after<<", unhashed: "<<unhashed<<endl;
   return true;
 }
@@ -1541,8 +2162,8 @@ bool LMDBBackend::getBeforeAndAfterNames(uint32_t id, const DNSName& zonenameU,
   if (cursor.lower_bound(matchkey, key, val)) {
     // cout << "Hit end of database, bummer"<<endl;
     cursor.last(key, val);
-    if (co.getDomainID(key.get<string_view>()) == id) {
-      before = co.getQName(key.get<string_view>()) + zonename;
+    if (co.getDomainID(key.getNoStripHeader<string_view>()) == id) {
+      before = co.getQName(key.getNoStripHeader<string_view>()) + zonename;
       after = zonename;
     }
     // else
@@ -1551,7 +2172,7 @@ bool LMDBBackend::getBeforeAndAfterNames(uint32_t id, const DNSName& zonenameU,
   }
   // cout<<"Cursor is at "<<co.getQName(key.get<string_view>()) <<", in zone id "<<co.getDomainID(key.get<string_view>())<< endl;
 
-  if (co.getQType(key.get<string_view>()).getCode() && co.getDomainID(key.get<string_view>()) == id && co.getQName(key.get<string_view>()) == qname2) { // don't match ENTs
+  if (co.getQType(key.getNoStripHeader<string_view>()).getCode() && co.getDomainID(key.getNoStripHeader<string_view>()) == id && co.getQName(key.getNoStripHeader<string_view>()) == qname2) { // don't match ENTs
     // cout << "Had an exact match!"<<endl;
     before = qname2 + zonename;
     int rc;
@@ -1560,23 +2181,23 @@ bool LMDBBackend::getBeforeAndAfterNames(uint32_t id, const DNSName& zonenameU,
       if (rc)
         break;
 
-      if (co.getDomainID(key.get<string_view>()) == id && key.get<StringView>().rfind(matchkey, 0) == 0)
+      if (co.getDomainID(key.getNoStripHeader<string_view>()) == id && key.getNoStripHeader<StringView>().rfind(matchkey, 0) == 0)
         continue;
       LMDBResourceRecord lrr;
       serFromString(val.get<StringView>(), lrr);
-      if (co.getQType(key.get<string_view>()).getCode() && (lrr.auth || co.getQType(key.get<string_view>()).getCode() == QType::NS))
+      if (co.getQType(key.getNoStripHeader<string_view>()).getCode() && (lrr.auth || co.getQType(key.getNoStripHeader<string_view>()).getCode() == QType::NS))
         break;
     }
-    if (rc || co.getDomainID(key.get<string_view>()) != id) {
+    if (rc || co.getDomainID(key.getNoStripHeader<string_view>()) != id) {
       // cout << "We hit the end of the zone or database. 'after' is apex" << endl;
       after = zonename;
       return false;
     }
-    after = co.getQName(key.get<string_view>()) + zonename;
+    after = co.getQName(key.getNoStripHeader<string_view>()) + zonename;
     return true;
   }
 
-  if (co.getDomainID(key.get<string_view>()) != id) {
+  if (co.getDomainID(key.getNoStripHeader<string_view>()) != id) {
     // cout << "Ended up in next zone, 'after' is zonename" <<endl;
     after = zonename;
     // cout << "Now hunting for previous" << endl;
@@ -1588,39 +2209,39 @@ bool LMDBBackend::getBeforeAndAfterNames(uint32_t id, const DNSName& zonenameU,
         return false;
       }
 
-      if (co.getDomainID(key.get<string_view>()) != id) {
-        // cout<<"Reversed into zone, but found wrong zone id " << co.getDomainID(key.get<string_view>()) << " != "<<id<<endl;
+      if (co.getDomainID(key.getNoStripHeader<string_view>()) != id) {
+        // cout<<"Reversed into zone, but found wrong zone id " << co.getDomainID(key.getNoStripHeader<string_view>()) << " != "<<id<<endl;
         // "this can't happen"
         return false;
       }
       LMDBResourceRecord lrr;
       serFromString(val.get<StringView>(), lrr);
-      if (co.getQType(key.get<string_view>()).getCode() && (lrr.auth || co.getQType(key.get<string_view>()).getCode() == QType::NS))
+      if (co.getQType(key.getNoStripHeader<string_view>()).getCode() && (lrr.auth || co.getQType(key.getNoStripHeader<string_view>()).getCode() == QType::NS))
         break;
     }
 
-    before = co.getQName(key.get<string_view>()) + zonename;
+    before = co.getQName(key.getNoStripHeader<string_view>()) + zonename;
     // cout<<"Found: "<< before<<endl;
     return true;
   }
 
-  // cout <<"We ended up after "<<qname<<", on "<<co.getQName(key.get<string_view>())<<endl;
+  // cout <<"We ended up after "<<qname<<", on "<<co.getQName(key.getNoStripHeader<string_view>())<<endl;
 
   int skips = 0;
   for (;;) {
     LMDBResourceRecord lrr;
     serFromString(val.get<StringView>(), lrr);
-    if (co.getQType(key.get<string_view>()).getCode() && (lrr.auth || co.getQType(key.get<string_view>()).getCode() == QType::NS)) {
-      after = co.getQName(key.get<string_view>()) + zonename;
-      // cout <<"Found auth ("<<lrr.auth<<") or an NS record "<<after<<", type: "<<co.getQType(key.get<string_view>()).toString()<<", ttl = "<<lrr.ttl<<endl;
+    if (co.getQType(key.getNoStripHeader<string_view>()).getCode() && (lrr.auth || co.getQType(key.getNoStripHeader<string_view>()).getCode() == QType::NS)) {
+      after = co.getQName(key.getNoStripHeader<string_view>()) + zonename;
+      // cout <<"Found auth ("<<lrr.auth<<") or an NS record "<<after<<", type: "<<co.getQType(key.getNoStripHeader<string_view>()).toString()<<", ttl = "<<lrr.ttl<<endl;
       // cout << makeHexDump(val.get<string>()) << endl;
       break;
     }
-    // cout <<"  oops, " << co.getQName(key.get<string_view>()) << " was not auth "<<lrr.auth<< " type=" << lrr.qtype.toString()<<" or NS, so need to skip ahead a bit more" << endl;
+    // cout <<"  oops, " << co.getQName(key.getNoStripHeader<string_view>()) << " was not auth "<<lrr.auth<< " type=" << lrr.qtype.toString()<<" or NS, so need to skip ahead a bit more" << endl;
     int rc = cursor.next(key, val);
     if (!rc)
       ++skips;
-    if (rc || co.getDomainID(key.get<string_view>()) != id) {
+    if (rc || co.getDomainID(key.getNoStripHeader<string_view>()) != id) {
       // cout << "  oops, hit end of database or zone. This means after is apex" <<endl;
       after = zonename;
       break;
@@ -1632,16 +2253,16 @@ bool LMDBBackend::getBeforeAndAfterNames(uint32_t id, const DNSName& zonenameU,
 
   for (;;) {
     int rc = cursor.prev(key, val);
-    if (rc || co.getDomainID(key.get<string_view>()) != id) {
+    if (rc || co.getDomainID(key.getNoStripHeader<string_view>()) != id) {
       // XX I don't think this case can happen
       // cout << "We hit the beginning of the zone or database.. now what" << endl;
       return false;
     }
-    before = co.getQName(key.get<string_view>()) + zonename;
+    before = co.getQName(key.getNoStripHeader<string_view>()) + zonename;
     LMDBResourceRecord lrr;
     serFromString(val.get<string_view>(), lrr);
     // cout<<"And before to "<<before<<", auth = "<<rr.auth<<endl;
-    if (co.getQType(key.get<string_view>()).getCode() && (lrr.auth || co.getQType(key.get<string_view>()) == QType::NS))
+    if (co.getQType(key.getNoStripHeader<string_view>()).getCode() && (lrr.auth || co.getQType(key.getNoStripHeader<string_view>()) == QType::NS))
       break;
     // cout << "Oops, that was wrong, go back one more"<<endl;
   }
@@ -1685,15 +2306,15 @@ bool LMDBBackend::updateDNSSECOrderNameAndAuth(uint32_t domain_id, const DNSName
   bool hasOrderName = !ordername.empty();
   bool needNSEC3 = hasOrderName;
 
-  for (; key.get<StringView>().rfind(matchkey, 0) == 0;) {
+  for (; key.getNoStripHeader<StringView>().rfind(matchkey, 0) == 0;) {
     vector<LMDBResourceRecord> lrrs;
 
-    if (co.getQType(key.get<StringView>()) != QType::NSEC3) {
+    if (co.getQType(key.getNoStripHeader<StringView>()) != QType::NSEC3) {
       serFromString(val.get<StringView>(), lrrs);
       bool changed = false;
       vector<LMDBResourceRecord> newRRs;
       for (auto lrr : lrrs) {
-        lrr.qtype = co.getQType(key.get<StringView>());
+        lrr.qtype = co.getQType(key.getNoStripHeader<StringView>());
         if (!needNSEC3 && qtype != QType::ANY) {
           needNSEC3 = (lrr.ordername && QType(qtype) != lrr.qtype);
         }
@@ -1717,7 +2338,10 @@ bool LMDBBackend::updateDNSSECOrderNameAndAuth(uint32_t domain_id, const DNSName
   bool del = false;
   LMDBResourceRecord lrr;
   matchkey = co(domain_id, rel, QType::NSEC3);
-  if (!txn->txn->get(txn->db->dbi, matchkey, val)) {
+  // cerr<<"here qname="<<qname<<" ordername="<<ordername<<" qtype="<<qtype<<" matchkey="<<makeHexDump(matchkey)<<endl;
+  int txngetrc;
+  if (!(txngetrc = txn->txn->get(txn->db->dbi, matchkey, val))) {
+    cerr << "rc =" << txngetrc << " size=" << val.get<string_view>().size() << endl;
     serFromString(val.get<string_view>(), lrr);
 
     if (needNSEC3) {
@@ -1812,12 +2436,16 @@ bool LMDBBackend::updateEmptyNonTerminals(uint32_t domain_id, set<DNSName>& inse
 bool LMDBBackend::getTSIGKey(const DNSName& name, DNSName& algorithm, string& content)
 {
   auto txn = d_ttsig->getROTransaction();
-  auto range = txn.equal_range<0>(name);
-
-  for (auto& iter = range.first; iter != range.second; ++iter) {
-    if (algorithm.empty() || algorithm == DNSName(iter->algorithm)) {
-      algorithm = DNSName(iter->algorithm);
-      content = iter->key;
+  LMDBIDvec ids;
+  txn.get_multi<0>(name, ids);
+
+  TSIGKey key;
+  for (auto id : ids) {
+    if (txn.get(id, key)) {
+      if (algorithm.empty() || algorithm == DNSName(key.algorithm)) {
+        algorithm = DNSName(key.algorithm);
+        content = key.key;
+      }
     }
   }
 
@@ -1829,9 +2457,15 @@ bool LMDBBackend::setTSIGKey(const DNSName& name, const DNSName& algorithm, cons
 {
   auto txn = d_ttsig->getRWTransaction();
 
-  for (auto range = txn.equal_range<0>(name); range.first != range.second; ++range.first) {
-    if (range.first->algorithm == algorithm) {
-      txn.del(range.first.getID());
+  LMDBIDvec ids;
+  txn.get_multi<0>(name, ids);
+
+  TSIGKey key;
+  for (auto id : ids) {
+    if (txn.get(id, key)) {
+      if (key.algorithm == algorithm) {
+        txn.del(id);
+      }
     }
   }
 
@@ -1848,10 +2482,16 @@ bool LMDBBackend::setTSIGKey(const DNSName& name, const DNSName& algorithm, cons
 bool LMDBBackend::deleteTSIGKey(const DNSName& name)
 {
   auto txn = d_ttsig->getRWTransaction();
-  TSIGKey tk;
 
-  for (auto range = txn.equal_range<0>(name); range.first != range.second; ++range.first) {
-    txn.del(range.first.getID());
+  LMDBIDvec ids;
+  txn.get_multi<0>(name, ids);
+
+  TSIGKey key;
+
+  for (auto id : ids) {
+    if (txn.get(id, key)) {
+      txn.del(id);
+    }
   }
   txn.commit();
   return true;
@@ -1867,6 +2507,21 @@ bool LMDBBackend::getTSIGKeys(std::vector<struct TSIGKey>& keys)
   return true;
 }
 
+string LMDBBackend::directBackendCmd(const string& query)
+{
+  if (query == "info") {
+    ostringstream ret;
+
+    ret << "shards: " << s_shards << endl;
+    ret << "schemaversion: " << SCHEMAVERSION << endl;
+
+    return ret.str();
+  }
+  else {
+    return "unknown lmdbbackend command\n";
+  }
+}
+
 class LMDBFactory : public BackendFactory
 {
 public:
@@ -1881,6 +2536,7 @@ public:
     declare(suffix, "schema-version", "Maximum allowed schema version to run on this DB. If a lower version is found, auto update is performed", std::to_string(SCHEMAVERSION));
     declare(suffix, "random-ids", "Numeric IDs inside the database are generated randomly instead of sequentially", "no");
     declare(suffix, "map-size", "LMDB map size in megabytes", (sizeof(void*) == 4) ? "100" : "16000");
+    declare(suffix, "flag-deleted", "Flag entries on deletion instead of deleting them", "no");
   }
   DNSBackend* make(const string& suffix = "") override
   {
index c1d29a864f88ea72fa1830248413245708b542e2..062e0d4976568906495864741a130857733d65c9 100644 (file)
@@ -147,6 +147,13 @@ public:
     return true;
   }
 
+  // other
+  string directBackendCmd(const string& query) override;
+
+  // functions to use without constructing a backend object
+  static std::pair<uint32_t, uint32_t> getSchemaVersionAndShards(std::string& filename);
+  static bool upgradeToSchemav5(std::string& filename);
+
 private:
   struct compoundOrdername
   {
diff --git a/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-0 b/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-0
deleted file mode 100644 (file)
index 4bc4471..0000000
Binary files a/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-0 and /dev/null differ
diff --git a/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-1 b/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-1
deleted file mode 100644 (file)
index 65c700b..0000000
Binary files a/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-1 and /dev/null differ
diff --git a/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-0 b/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-0
deleted file mode 100644 (file)
index 4bc4471..0000000
Binary files a/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-0 and /dev/null differ
diff --git a/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-1 b/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-1
deleted file mode 100644 (file)
index 65c700b..0000000
Binary files a/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-1 and /dev/null differ
similarity index 95%
rename from modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb
rename to modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb
index d9b1f630e9e5f9281d55663e66a0f976646439e2..856f85242659bbca5e9f0f140010128ffbd61b3b 100644 (file)
Binary files a/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb and b/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb differ
diff --git a/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-0 b/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-0
new file mode 100644 (file)
index 0000000..9bed565
Binary files /dev/null and b/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-0 differ
similarity index 97%
rename from modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-0-lock
rename to modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-0-lock
index 26ca8a5f365fd503e3e11bc90fa6454543a92044..59479a6fb7dadb307e78cf5a0f69b35e42c3740b 100644 (file)
Binary files a/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-0-lock and b/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-0-lock differ
diff --git a/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-1 b/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-1
new file mode 100644 (file)
index 0000000..fa74288
Binary files /dev/null and b/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-1 differ
similarity index 97%
rename from modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-1-lock
rename to modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-1-lock
index 3858eb5a6dd3a436086ee27349d3ce3e944d79df..8d710b8cb3c63f33724a8fa7c96138df15c71b1d 100644 (file)
Binary files a/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-1-lock and b/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-1-lock differ
similarity index 97%
rename from modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-lock
rename to modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-lock
index a833f3b30a2fc4e92725e270956d7af2594432e6..f82fccb7791bf92010e6391df89fdd0dcc511de6 100644 (file)
Binary files a/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-lock and b/modules/lmdbbackend/test-assets/lmdb-v3-x86_64/pdns.lmdb-lock differ
similarity index 95%
rename from modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb
rename to modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb
index e3080c130fc14c94a04cc91845a80620ea9b3f4d..0d56ee3f5dd028dbeb12e206b88cd1d1d10f035f 100644 (file)
Binary files a/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb and b/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb differ
diff --git a/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-0 b/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-0
new file mode 100644 (file)
index 0000000..9bed565
Binary files /dev/null and b/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-0 differ
similarity index 97%
rename from modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-0-lock
rename to modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-0-lock
index eaa314cbde70d4bb48f4112c14aa5506e66c398d..cb698cc0d9519e7f7abbf70f3b941777437e2e42 100644 (file)
Binary files a/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-0-lock and b/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-0-lock differ
diff --git a/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-1 b/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-1
new file mode 100644 (file)
index 0000000..fa74288
Binary files /dev/null and b/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-1 differ
similarity index 97%
rename from modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-1-lock
rename to modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-1-lock
index ba97d65258bc0fee208828391896fb23a5b99da6..4cf2b295911bc2b7c44df330f0a231adee07a2b3 100644 (file)
Binary files a/modules/lmdbbackend/test-assets/lmdb-v2-x86_64/pdns.lmdb-1-lock and b/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-1-lock differ
similarity index 97%
rename from modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-lock
rename to modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-lock
index b3399636ce6af381310c0bec4c214b5393d9f3b7..62c3165d538baad93fb8410092d2db067600e415 100644 (file)
Binary files a/modules/lmdbbackend/test-assets/lmdb-v1-x86_64/pdns.lmdb-lock and b/modules/lmdbbackend/test-assets/lmdb-v4-x86_64/pdns.lmdb-lock differ
index f05f6f4be90b02e1a5b63331cd3e33cda34d7831..88229c32a7416c682027be9dbfd8558352037c2d 100644 (file)
@@ -232,6 +232,7 @@ pdns_server_SOURCES = \
        ednscookies.cc ednscookies.hh \
        ednsoptions.cc ednsoptions.hh \
        ednssubnet.cc ednssubnet.hh \
+       gettime.cc gettime.hh \
        gss_context.cc gss_context.hh \
        histogram.hh \
        iputils.cc iputils.hh \
@@ -368,6 +369,7 @@ pdnsutil_SOURCES = \
        ednscookies.cc ednscookies.hh \
        ednsoptions.cc ednsoptions.hh \
        ednssubnet.cc \
+       gettime.cc gettime.hh \
        gss_context.cc gss_context.hh \
        ipcipher.cc ipcipher.hh \
        iputils.cc iputils.hh \
index 03da0ac208437e000ba62c58c7678bedc9a31122..e9337ddde0e511ff90e9abbaffbbd521e743e7b1 100644 (file)
@@ -1,4 +1,5 @@
 
+#include "modules/lmdbbackend/lmdbbackend.hh"
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
@@ -1517,6 +1518,7 @@ static int createZone(const DNSName &zone, const DNSName& nsname) {
     return EXIT_FAILURE;
   }
 
+  cerr<<"di.id="<<di.id<<endl;
   rr.domain_id = di.id;
   di.backend->startTransaction(zone, di.id);
   di.backend->feedRecord(rr, DNSName());
@@ -2649,6 +2651,20 @@ try
 
   loadMainConfig(g_vm["config-dir"].as<string>());
 
+  if (cmds.at(0) == "lmdb-get-schema-version") {
+    // FIXME: add command to help
+    if(cmds.size() != 2) {
+      cerr << "Syntax: pdnsutil lmdb-get-schema-version /path/to/pdns.lmdb"<<endl;
+      return 0;
+    }
+
+    auto res = LMDBBackend::getSchemaVersionAndShards(cmds.at(1));
+    cerr << "schemaversion: "<<res.first<<endl;
+    cerr << "shards: "<<res.second<<endl;
+    cout << res.first <<endl;
+
+    return 0;
+  }
   if (cmds.at(0) == "test-algorithm") {
     if(cmds.size() != 2) {
       cerr << "Syntax: pdnsutil test-algorithm algonum"<<endl;
diff --git a/regression-tests.nobackend/lmdb-metadata-leak/command b/regression-tests.nobackend/lmdb-metadata-leak/command
new file mode 100755 (executable)
index 0000000..c4d9d02
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+set -e
+if [ "${PDNS_DEBUG}" = "YES" ]; then
+  set -x
+fi
+
+rootPath=$(readlink -f $(dirname $0))
+
+for random in no yes
+do
+  workdir=$(mktemp -d)
+
+  cat << EOF > "${workdir}/pdns-lmdb.conf"
+  module-dir=../regression-tests/modules
+  launch=lmdb
+  lmdb-filename=${workdir}/pdns.lmdb
+  lmdb-shards=2
+  lmdb-random-ids=$random
+EOF
+
+  echo === random=$random
+
+  echo == creating zone
+  $PDNSUTIL --config-dir="${workdir}" --config-name=lmdb create-zone example.com
+  
+  $PDNSUTIL --config-dir="${workdir}" --config-name=lmdb set-meta example.com FOO BAR
+  $PDNSUTIL --config-dir="${workdir}" --config-name=lmdb show-zone example.com | grep FOO
+  mdb_dump -p -a -n ${workdir}/pdns.lmdb | egrep -o 'FOO[0-9]*' | LC_ALL=C sort
+
+  $PDNSUTIL --config-dir="${workdir}" --config-name=lmdb set-meta example.com FOO2 BAR2
+  $PDNSUTIL --config-dir="${workdir}" --config-name=lmdb show-zone example.com | grep FOO
+  mdb_dump -p -a -n ${workdir}/pdns.lmdb | egrep -o 'FOO[0-9]*' | LC_ALL=C sort
+
+  $PDNSUTIL --config-dir="${workdir}" --config-name=lmdb set-meta example.com FOO2 BAR2
+  $PDNSUTIL --config-dir="${workdir}" --config-name=lmdb show-zone example.com | grep FOO
+  mdb_dump -p -a -n ${workdir}/pdns.lmdb | egrep -o 'FOO[0-9]*' | LC_ALL=C sort
+
+  echo == deleting zone
+  $PDNSUTIL --config-dir="${workdir}" --config-name=lmdb delete-zone example.com
+  mdb_dump -p -a -n ${workdir}/pdns.lmdb | egrep -o 'FOO[0-9]*' | LC_ALL=C sort
+done
\ No newline at end of file
diff --git a/regression-tests.nobackend/lmdb-metadata-leak/description b/regression-tests.nobackend/lmdb-metadata-leak/description
new file mode 100644 (file)
index 0000000..99a9783
--- /dev/null
@@ -0,0 +1,2 @@
+This test verifies that, with the LMDB-backend, pdnsutil set-meta and delete-zone do not leave old unreachable metadata items lying around
+
diff --git a/regression-tests.nobackend/lmdb-metadata-leak/expected_result b/regression-tests.nobackend/lmdb-metadata-leak/expected_result
new file mode 100644 (file)
index 0000000..d1be72c
--- /dev/null
@@ -0,0 +1,32 @@
+=== random=no
+== creating zone
+Set 'example.com' meta FOO = BAR
+       FOO     BAR
+FOO
+Set 'example.com' meta FOO2 = BAR2
+       FOO     BAR
+       FOO2    BAR2
+FOO
+FOO2
+Set 'example.com' meta FOO2 = BAR2
+       FOO     BAR
+       FOO2    BAR2
+FOO
+FOO2
+== deleting zone
+=== random=yes
+== creating zone
+Set 'example.com' meta FOO = BAR
+       FOO     BAR
+FOO
+Set 'example.com' meta FOO2 = BAR2
+       FOO     BAR
+       FOO2    BAR2
+FOO
+FOO2
+Set 'example.com' meta FOO2 = BAR2
+       FOO     BAR
+       FOO2    BAR2
+FOO
+FOO2
+== deleting zone
index 36f39a1e099891b2a197082cae93e1fa84454e58..e9b5809c30117b9f0572e3ddf0d2bdfbdf566a6f 100644 (file)
--- a/tasks.py
+++ b/tasks.py
@@ -108,6 +108,7 @@ auth_test_deps = [   # FIXME: we should be generating some of these from shlibde
     'libsystemd0',
     'libyaml-cpp0.6',
     'libzmq3-dev',
+    'lmdb-utils',
     'prometheus',
     'ruby-bundler',
     'ruby-dev',