From: Howard Chu Date: Tue, 25 Jul 2017 14:34:03 +0000 (+0100) Subject: Scaled back on overflow page work X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=757378fc1d0b598f69d672f8338dcfa218216899;p=thirdparty%2Fopenldap.git Scaled back on overflow page work Still keeping page header at top of overflow page for now --- diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c index 1e53764aa8..ad5dce6b76 100644 --- a/libraries/liblmdb/mdb.c +++ b/libraries/liblmdb/mdb.c @@ -942,9 +942,9 @@ enum { * sorted #mp_ptrs[] entries referring to them. Exception: #P_LEAF2 pages * omit mp_ptrs and pack sorted #MDB_DUPFIXED values after the page header. * - * #P_OVERFLOW records occupy one or more contiguous pages that contain - * pure data with no page header. They hold the real data of #F_BIGDATA nodes, - * and the node stores what would have gone in a page header. + * #P_OVERFLOW records occupy one or more contiguous pages where only the + * first has a page header. They hold the real data of #F_BIGDATA nodes, + * and the node stores the pgno and number of pages used by the record. * * #P_SUBP sub-pages are small leaf "pages" with duplicate data. * A node with flag #F_DUPDATA but not #F_SUBDATA contains a sub-page. @@ -1004,7 +1004,7 @@ typedef struct MDB_page { } MDB_page; /** Size of the page header, excluding dynamic data at the end */ -#define PAGEHDRSZ sizeof(MDB_page_header) +#define PAGEHDRSZ ((unsigned)sizeof(MDB_page_header)) /** Address of first usable data byte in a page, after the header */ #define METADATA(p) ((void *)((char *)(p) + PAGEHDRSZ)) @@ -1300,8 +1300,10 @@ struct MDB_txn { /** For read txns: This thread/txn's reader table slot, or NULL. */ MDB_reader *reader; } mt_u; +#if OVERFLOW_NOTYET /** The sorted list of dirty overflow pages. */ MDB_ID2L mt_dirty_ovs; +#endif /** Array of records for each DB known in the environment. */ MDB_dbx *mt_dbxs; /** Array of MDB_db records for each known DB */ @@ -1579,7 +1581,7 @@ typedef struct MDB_ntxn { #define TXN_DBI_CHANGED(txn, dbi) \ ((txn)->mt_dbiseqs[dbi] != (txn)->mt_env->me_dbiseqs[dbi]) -static int mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp); +static int mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp); static int mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp); static int mdb_page_touch(MDB_cursor *mc); @@ -2435,7 +2437,7 @@ mdb_find_oldest(MDB_txn *txn) /** Add a page to the txn's dirty list */ static void -mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov) +mdb_page_dirty(MDB_txn *txn, MDB_page *mp) { MDB_ID2 mid; int rc, (*insert)(MDB_ID2L, MDB_ID2 *); @@ -2447,13 +2449,9 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov) } mid.mid = mp->mp_pgno; mid.mptr = mp; - if (ov) { - rc = mdb_mid2l_insert(txn->mt_dirty_ovs, &mid); - } else { - rc = insert(txn->mt_u.dirty_list, &mid); - txn->mt_dirty_room--; - } + rc = insert(txn->mt_u.dirty_list, &mid); mdb_tassert(txn, rc == 0); + txn->mt_dirty_room--; } /** Allocate page numbers and memory for writing. Maintain me_pglast, @@ -2474,7 +2472,7 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp, int ov) * @return 0 on success, non-zero on failure. */ static int -mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp) +mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp) { #ifdef MDB_PARANOID /* Seems like we can ignore this now */ /* Get at most more freeDB records once me_pghead @@ -2497,6 +2495,8 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp) MDB_cursor_op op; MDB_cursor m2; int found_old = 0; + +#if OVERFLOW_NOTYET MDB_dovpage *dph = NULL; if (ov) { @@ -2508,6 +2508,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp) return ENOMEM; dph = malloc(sizeof(MDB_dovpage)); } +#endif /* If there are any loose pages, just use them */ if (num == 1 && txn->mt_loose_pgs) { @@ -2515,11 +2516,13 @@ mdb_page_alloc(MDB_cursor *mc, int num, int ov, MDB_page **mp) txn->mt_loose_pgs = NEXT_LOOSE_PAGE(np); txn->mt_loose_count--; DPRINTF(("db %d use loose page %"Yu, DDBI(mc), np->mp_pgno)); +#if OVERFLOW_NOTYET if (ov) { dph->mp_hdr = np->mp_hdr; dph->mp_ptr = np; np = (MDB_page *)dph; } +#endif *mp = np; return MDB_SUCCESS; } @@ -2673,19 +2676,25 @@ search_done: } np->mp_pgno = pgno; np->mp_txnid = txn->mt_txnid; +#if OVERFLOW_NOTYET if (ov) { dph->mp_hdr = np->mp_hdr; dph->mp_ptr = np; np = (MDB_page *)dph; } mdb_page_dirty(txn, np, ov); +#else + mdb_page_dirty(txn, np); +#endif *mp = np; return MDB_SUCCESS; fail: +#if OVERFLOW_NOTYET if (dph) free(dph); +#endif txn->mt_flags |= MDB_TXN_ERROR; return rc; } @@ -2723,7 +2732,7 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned int psize) * mp wasn't spilled. */ static int -mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret) +mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret) { MDB_env *env = txn->mt_env; const MDB_txn *tx2; @@ -2736,15 +2745,20 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret) x = mdb_midl_search(tx2->mt_spill_pgs, pn); if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { MDB_page *np; + int num; if (txn->mt_dirty_room == 0) return MDB_TXN_FULL; + if (IS_OVERFLOW(mp)) + num = mp->mp_pages; + else + num = 1; if (env->me_flags & MDB_WRITEMAP) { np = mp; } else { np = mdb_page_malloc(txn, num, 1); if (!np) return ENOMEM; - if (ov) + if (num > 1) memcpy(np, mp, num * env->me_psize); else mdb_page_copy(np, mp, env->me_psize); @@ -2762,9 +2776,8 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, int num, int ov, MDB_page **ret) * page remains spilled until child commits */ - mdb_page_dirty(txn, np, ov); - if (!ov) - np->mp_flags |= P_DIRTY; + mdb_page_dirty(txn, np); + np->mp_flags |= P_DIRTY; *ret = np; break; } @@ -2789,14 +2802,14 @@ mdb_page_touch(MDB_cursor *mc) if (!F_ISSET(mp->mp_flags, P_DIRTY)) { if (txn->mt_flags & MDB_TXN_SPILLS) { np = NULL; - rc = mdb_page_unspill(txn, mp, 1, 0, &np); + rc = mdb_page_unspill(txn, mp, &np); if (rc) goto fail; if (np) goto done; } if ((rc = mdb_midl_need(&txn->mt_free_pgs, 1)) || - (rc = mdb_page_alloc(mc, 1, 0, &np))) + (rc = mdb_page_alloc(mc, 1, &np))) goto fail; pgno = np->mp_pgno; DPRINTF(("touched db %d page %"Yu" -> %"Yu, DDBI(mc), @@ -3148,7 +3161,9 @@ mdb_txn_renew0(MDB_txn *txn) txn->mt_free_pgs = env->me_free_pgs; txn->mt_free_pgs[0] = 0; txn->mt_spill_pgs = NULL; +#if OVERFLOW_NOTYET txn->mt_dirty_ovs = NULL; +#endif env->me_txn = txn; memcpy(txn->mt_dbiseqs, env->me_dbiseqs, env->me_maxdbs * sizeof(unsigned int)); } @@ -3273,7 +3288,9 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) txn->mt_dirty_room = parent->mt_dirty_room; txn->mt_u.dirty_list[0].mid = 0; txn->mt_spill_pgs = NULL; +#if OVERFLOW_NOTYET txn->mt_dirty_ovs = NULL; +#endif txn->mt_next_pgno = parent->mt_next_pgno; parent->mt_flags |= MDB_TXN_HAS_CHILD; parent->mt_child = txn; @@ -3434,7 +3451,9 @@ mdb_txn_end(MDB_txn *txn, unsigned mode) env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate; mdb_midl_free(txn->mt_free_pgs); mdb_midl_free(txn->mt_spill_pgs); +#if OVERFLOW_NOTYET mdb_mid2l_free(txn->mt_dirty_ovs); +#endif free(txn->mt_u.dirty_list); } @@ -7981,7 +8000,7 @@ prep_subDB: dummy.md_entries = NUMKEYS(fp); xdata.mv_size = sizeof(MDB_db); xdata.mv_data = &dummy; - if ((rc = mdb_page_alloc(mc, 1, 0, &mp))) + if ((rc = mdb_page_alloc(mc, 1, &mp))) return rc; offset = env->me_psize - olddata.mv_size; flags |= F_DUPDATA|F_SUBDATA; @@ -8031,13 +8050,13 @@ current: if (!(omp->mp_flags & P_DIRTY) && (level || (env->me_flags & MDB_WRITEMAP))) { - rc = mdb_page_unspill(mc->mc_txn, omp, ovpages, 1, &omp); + rc = mdb_page_unspill(mc->mc_txn, omp, &omp); if (rc) return rc; level = 0; /* dirty in this txn or clean */ } /* Is it dirty? */ - if (ovp.op_txnid == mc->mc_txn->mt_txnid) { + if (omp->mp_flags & P_DIRTY) { /* yes, overwrite it. Note in this case we don't * bother to try shrinking the page if the new data * is smaller than the overflow threshold. @@ -8063,10 +8082,12 @@ current: * Copy end of page, adjusting alignment so * compiler may copy words instead of bytes. */ - off = data->mv_size & -sizeof(size_t); + off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t); memcpy((size_t *)((char *)np + off), (size_t *)((char *)omp + off), sz - off); + sz = PAGEHDRSZ; } + memcpy(np, omp, sz); /* Copy beginning of page */ omp = np; } SETDSZ(leaf, data->mv_size); @@ -8351,7 +8372,7 @@ mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp) MDB_page *np; int rc; - if ((rc = mdb_page_alloc(mc, num, flags & P_OVERFLOW, &np))) + if ((rc = mdb_page_alloc(mc, num, &np))) return rc; DPRINTF(("allocated new mpage %"Yu", page size %u", np->mp_pgno, mc->mc_txn->mt_env->me_psize)); @@ -8540,7 +8561,9 @@ update: else memcpy(ndata, data->mv_data, data->mv_size); } else { - ndata = ((MDB_dovpage *)ofp)->mp_ptr; + MDB_ovpage ovp = {ofp->mp_pgno, mc->mc_txn->mt_txnid, ofp->mp_pages}; + memcpy(ndata, &ovp, sizeof(MDB_ovpage)); + ndata = METADATA(ofp); if (F_ISSET(flags, MDB_RESERVE)) data->mv_data = ndata; else