]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Scaled back on overflow page work
authorHoward Chu <hyc@openldap.org>
Tue, 25 Jul 2017 14:34:03 +0000 (15:34 +0100)
committerHoward Chu <hyc@openldap.org>
Sat, 10 Oct 2020 11:56:11 +0000 (12:56 +0100)
Still keeping page header at top of overflow page for now

libraries/liblmdb/mdb.c

index 1e53764aa8ae167a8e21b3cff853605df6635181..ad5dce6b76ed6199bc811f44c185edefcc24a373 100644 (file)
@@ -942,9 +942,9 @@ enum {
  * sorted #mp_ptrs[] entries referring to them. Exception: #P_LEAF2 pages
  * omit mp_ptrs and pack sorted #MDB_DUPFIXED values after the page header.
  *
- * #P_OVERFLOW records occupy one or more contiguous pages that contain
- * pure data with no page header. They hold the real data of #F_BIGDATA nodes,
- * and the node stores what would have gone in a page header.
+ * #P_OVERFLOW records occupy one or more contiguous pages where only the
+ * first has a page header. They hold the real data of #F_BIGDATA nodes,
+ * and the node stores the pgno and number of pages used by the record.
  *
  * #P_SUBP sub-pages are small leaf "pages" with duplicate data.
  * A node with flag #F_DUPDATA but not #F_SUBDATA contains a sub-page.
@@ -1004,7 +1004,7 @@ typedef struct MDB_page {
 } MDB_page;
 
        /** Size of the page header, excluding dynamic data at the end */
-#define PAGEHDRSZ       sizeof(MDB_page_header)
+#define PAGEHDRSZ       ((unsigned)sizeof(MDB_page_header))
 
        /** Address of first usable data byte in a page, after the header */
 #define METADATA(p)     ((void *)((char *)(p) + PAGEHDRSZ))
@@ -1300,8 +1300,10 @@ struct MDB_txn {
                /** For read txns: This thread/txn's reader table slot, or NULL. */
                MDB_reader      *reader;
        } mt_u;
+#if OVERFLOW_NOTYET
        /** The sorted list of dirty overflow pages. */
        MDB_ID2L        mt_dirty_ovs;
+#endif
        /** Array of records for each DB known in the environment. */
        MDB_dbx         *mt_dbxs;
        /** Array of MDB_db records for each known DB */
@@ -1579,7 +1581,7 @@ typedef struct MDB_ntxn {
 #define TXN_DBI_CHANGED(txn, dbi) \
        ((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi])
 
-static int  mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp);
+static int  mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp);
 static int  mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp);
 static int  mdb_page_touch(MDB_cursor *mc);
 
@@ -2435,7 +2437,7 @@ mdb_find_oldest(MDB_txn *txn)
 
 /** Add a page to the txn's dirty list */
 static void
-mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov)
+mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
 {
        MDB_ID2 mid;
        int rc, (*insert)(MDB_ID2L, MDB_ID2 *);
@@ -2447,13 +2449,9 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov)
        }
        mid.mid = mp->mp_pgno;
        mid.mptr = mp;
-       if (ov) {
-               rc = mdb_mid2l_insert(txn->mt_dirty_ovs, &mid);
-       } else {
-               rc = insert(txn->mt_u.dirty_list, &mid);
-               txn->mt_dirty_room--;
-       }
+       rc = insert(txn->mt_u.dirty_list, &mid);
        mdb_tassert(txn, rc == 0);
+       txn->mt_dirty_room--;
 }
 
 /** Allocate page numbers and memory for writing.  Maintain me_pglast,
@@ -2474,7 +2472,7 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov)
  * @return 0 on success, non-zero on failure.
  */
 static int
-mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp)
+mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp)
 {
 #ifdef MDB_PARANOID    /* Seems like we can ignore this now */
        /* Get at most <Max_retries> more freeDB records once me_pghead
@@ -2497,6 +2495,8 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp)
        MDB_cursor_op op;
        MDB_cursor m2;
        int found_old = 0;
+
+#if OVERFLOW_NOTYET
        MDB_dovpage *dph = NULL;
 
        if (ov) {
@@ -2508,6 +2508,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp)
                        return ENOMEM;
                dph = malloc(sizeof(MDB_dovpage));
        }
+#endif
 
        /* If there are any loose pages, just use them */
        if (num == 1 && txn->mt_loose_pgs) {
@@ -2515,11 +2516,13 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp)
                txn->mt_loose_pgs = NEXT_LOOSE_PAGE(np);
                txn->mt_loose_count--;
                DPRINTF(("db %d use loose page %"Yu, DDBI(mc), np->mp_pgno));
+#if OVERFLOW_NOTYET
                if (ov) {
                        dph->mp_hdr = np->mp_hdr;
                        dph->mp_ptr = np;
                        np = (MDB_page *)dph;
                }
+#endif
                *mp = np;
                return MDB_SUCCESS;
        }
@@ -2673,19 +2676,25 @@ search_done:
        }
        np->mp_pgno = pgno;
        np->mp_txnid = txn->mt_txnid;
+#if OVERFLOW_NOTYET
        if (ov) {
                dph->mp_hdr = np->mp_hdr;
                dph->mp_ptr = np;
                np = (MDB_page *)dph;
        }
        mdb_page_dirty(txn, np, ov);
+#else
+       mdb_page_dirty(txn, np);
+#endif
        *mp = np;
 
        return MDB_SUCCESS;
 
 fail:
+#if OVERFLOW_NOTYET
        if (dph)
                free(dph);
+#endif
        txn->mt_flags |= MDB_TXN_ERROR;
        return rc;
 }
@@ -2723,7 +2732,7 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize)
  * mp wasn't spilled.
  */
 static int
-mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret)
+mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret)
 {
        MDB_env *env = txn->mt_env;
        const MDB_txn *tx2;
@@ -2736,15 +2745,20 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret)
                x = mdb_midl_search(tx2->mt_spill_pgs, pn);
                if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) {
                        MDB_page *np;
+                       int num;
                        if (txn->mt_dirty_room == 0)
                                return MDB_TXN_FULL;
+                       if (IS_OVERFLOW(mp))
+                               num = mp->mp_pages;
+                       else
+                               num = 1;
                        if (env->me_flags & MDB_WRITEMAP) {
                                np = mp;
                        } else {
                                np = mdb_page_malloc(txn, num, 1);
                                if (!np)
                                        return ENOMEM;
-                               if (ov)
+                               if (num > 1)
                                        memcpy(np, mp, num * env->me_psize);
                                else
                                        mdb_page_copy(np, mp, env->me_psize);
@@ -2762,9 +2776,8 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret)
                                 * page remains spilled until child commits
                                 */
 
-                       mdb_page_dirty(txn, np, ov);
-                       if (!ov)
-                               np->mp_flags |= P_DIRTY;
+                       mdb_page_dirty(txn, np);
+                       np->mp_flags |= P_DIRTY;
                        *ret = np;
                        break;
                }
@@ -2789,14 +2802,14 @@ mdb_page_touch(MDB_cursor *mc)
        if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
                if (txn->mt_flags & MDB_TXN_SPILLS) {
                        np = NULL;
-                       rc = mdb_page_unspill(txn, mp, 1, 0, &np);
+                       rc = mdb_page_unspill(txn, mp, &np);
                        if (rc)
                                goto fail;
                        if (np)
                                goto done;
                }
                if ((rc = mdb_midl_need(&txn->mt_free_pgs, 1)) ||
-                       (rc = mdb_page_alloc(mc, 1, 0, &np)))
+                       (rc = mdb_page_alloc(mc, 1, &np)))
                        goto fail;
                pgno = np->mp_pgno;
                DPRINTF(("touched db %d page %"Yu" -> %"Yu, DDBI(mc),
@@ -3148,7 +3161,9 @@ mdb_txn_renew0(MDB_txn *txn)
                txn->mt_free_pgs = env->me_free_pgs;
                txn->mt_free_pgs[0] = 0;
                txn->mt_spill_pgs = NULL;
+#if OVERFLOW_NOTYET
                txn->mt_dirty_ovs = NULL;
+#endif
                env->me_txn = txn;
                memcpy(txn->mt_dbiseqs, env->me_dbiseqs, env->me_maxdbs * sizeof(unsigned int));
        }
@@ -3273,7 +3288,9 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
                txn->mt_dirty_room = parent->mt_dirty_room;
                txn->mt_u.dirty_list[0].mid = 0;
                txn->mt_spill_pgs = NULL;
+#if OVERFLOW_NOTYET
                txn->mt_dirty_ovs = NULL;
+#endif
                txn->mt_next_pgno = parent->mt_next_pgno;
                parent->mt_flags |= MDB_TXN_HAS_CHILD;
                parent->mt_child = txn;
@@ -3434,7 +3451,9 @@ mdb_txn_end(MDB_txn *txn, unsigned mode)
                        env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate;
                        mdb_midl_free(txn->mt_free_pgs);
                        mdb_midl_free(txn->mt_spill_pgs);
+#if OVERFLOW_NOTYET
                        mdb_mid2l_free(txn->mt_dirty_ovs);
+#endif
                        free(txn->mt_u.dirty_list);
                }
 
@@ -7981,7 +8000,7 @@ prep_subDB:
                                        dummy.md_entries = NUMKEYS(fp);
                                        xdata.mv_size = sizeof(MDB_db);
                                        xdata.mv_data = &dummy;
-                                       if ((rc = mdb_page_alloc(mc, 1, 0, &mp)))
+                                       if ((rc = mdb_page_alloc(mc, 1, &mp)))
                                                return rc;
                                        offset = env->me_psize - olddata.mv_size;
                                        flags |= F_DUPDATA|F_SUBDATA;
@@ -8031,13 +8050,13 @@ current:
                          if (!(omp->mp_flags & P_DIRTY) &&
                                  (level || (env->me_flags & MDB_WRITEMAP)))
                          {
-                               rc = mdb_page_unspill(mc->mc_txn, omp, ovpages, 1, &omp);
+                               rc = mdb_page_unspill(mc->mc_txn, omp, &omp);
                                if (rc)
                                        return rc;
                                level = 0;              /* dirty in this txn or clean */
                          }
                          /* Is it dirty? */
-                         if (ovp.op_txnid == mc->mc_txn->mt_txnid) {
+                         if (omp->mp_flags & P_DIRTY) {
                                /* yes, overwrite it. Note in this case we don't
                                 * bother to try shrinking the page if the new data
                                 * is smaller than the overflow threshold.
@@ -8063,10 +8082,12 @@ current:
                                                 * Copy end of page, adjusting alignment so
                                                 * compiler may copy words instead of bytes.
                                                 */
-                                               off = data->mv_size & -sizeof(size_t);
+                                               off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t);
                                                memcpy((size_t *)((char *)np + off),
                                                        (size_t *)((char *)omp + off), sz - off);
+                                               sz = PAGEHDRSZ;
                                        }
+                                       memcpy(np, omp, sz); /* Copy beginning of page */
                                        omp = np;
                                }
                                SETDSZ(leaf, data->mv_size);
@@ -8351,7 +8372,7 @@ mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp)
        MDB_page        *np;
        int rc;
 
-       if ((rc = mdb_page_alloc(mc, num, flags & P_OVERFLOW, &np)))
+       if ((rc = mdb_page_alloc(mc, num, &np)))
                return rc;
        DPRINTF(("allocated new mpage %"Yu", page size %u",
            np->mp_pgno, mc->mc_txn->mt_env->me_psize));
@@ -8540,7 +8561,9 @@ update:
                        else
                                memcpy(ndata, data->mv_data, data->mv_size);
                } else {
-                       ndata = ((MDB_dovpage *)ofp)->mp_ptr;
+                       MDB_ovpage ovp = {ofp->mp_pgno, mc->mc_txn->mt_txnid, ofp->mp_pages};
+                       memcpy(ndata, &ovp, sizeof(MDB_ovpage));
+                       ndata = METADATA(ofp);
                        if (F_ISSET(flags, MDB_RESERVE))
                                data->mv_data = ndata;
                        else