]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
ITS#9378 Add replay tool
authorHoward Chu <hyc@openldap.org>
Fri, 22 Dec 2023 19:55:22 +0000 (19:55 +0000)
committerHoward Chu <hyc@openldap.org>
Tue, 9 Jan 2024 17:42:58 +0000 (17:42 +0000)
Reads a replay log and executes all the write ops

libraries/liblmdb/Makefile
libraries/liblmdb/mplay.c [new file with mode: 0644]

index fa9789223ab66e9b7cd77bb4918a30b71e34637d..7411b309f819d37b90cf98ca657eea56e6006128 100644 (file)
@@ -95,6 +95,8 @@ mtest_enc:    mtest_enc.o chacha8.o liblmdb.a
 mtest_enc2:      mtest_enc2.o module.o liblmdb.a crypto.lm
        $(CC) $(LDFLAGS) -pthread -o $@ mtest_enc2.o module.o liblmdb.a $(LDL)
 
+mplay: mplay.o liblmdb.a
+
 crypto.lm:     crypto.c
        $(CC) -shared $(CFLAGS) -o $@ $^ -lcrypto
 
diff --git a/libraries/liblmdb/mplay.c b/libraries/liblmdb/mplay.c
new file mode 100644 (file)
index 0000000..5ca66d9
--- /dev/null
@@ -0,0 +1,580 @@
+/* mplay.c - memory-mapped database log replay */
+/*
+ * Copyright 2011-2023 Howard Chu, Symas Corp.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <time.h>
+#include <string.h>
+#include <ctype.h>
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "lmdb.h"
+
+#define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr)
+#define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0))
+#define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \
+       "%s:%d: %s: %s\n", __FILE__, __LINE__, msg, mdb_strerror(rc)), abort()))
+
+#define SCMP(s)        s, (sizeof(s)-1)
+char inbuf[8192];
+char *dbuf, *kbuf;
+size_t dbufsize;
+int maxkey;
+
+#define SOFF(s)        (sizeof(s)+1)
+
+#define MAXENVS        16
+#define MAXTXNS        16
+#define MAXCRSS        16
+
+#define MAXPIDS        16
+
+typedef struct crspair {
+       void *tcrs;     /* scanned text pointer */
+       MDB_cursor *rcrs;
+} crspair;
+
+typedef struct txnpair {
+       void *ttxn;     /* scanned text pointer */
+       MDB_txn *rtxn;
+       crspair cursors[MAXCRSS];
+       int ncursors;
+} txnpair;
+
+typedef struct envpair {
+       void *tenv;
+       MDB_env *renv;
+       txnpair txns[MAXTXNS];
+       int ntxns;
+} envpair;
+
+envpair envs[MAXENVS];
+int nenvs;
+
+envpair *lastenv;
+txnpair *lasttxn;
+crspair *lastcrs;
+
+typedef struct pidpair {
+       int tpid;
+       pid_t rpid;
+       int fdout, fdin;
+} pidpair;
+
+pidpair *lastpid;
+
+pidpair pids[MAXPIDS];
+int npids;
+
+unsigned long lcount;
+
+static int unhex(unsigned char *c2)
+{
+       int x, c;
+       x = *c2++ & 0x4f;
+       if (x & 0x40)
+               x -= 55;
+       c = x << 4;
+       x = *c2 & 0x4f;
+       if (x & 0x40)
+               x -= 55;
+       c |= x;
+       return c;
+}
+
+int inhex(char *in, char *out)
+{
+       char *c2 = out;
+       while (isxdigit(*in)) {
+               *c2++ = unhex((unsigned char *)in);
+               in += 2;
+       }
+       return c2 - out;
+}
+
+static void addenv(void *tenv, MDB_env *renv)
+{
+       assert(nenvs < MAXENVS);
+       envs[nenvs].tenv = tenv;
+       envs[nenvs].renv = renv;
+       envs[nenvs].ntxns = 0;
+       lastenv = envs+nenvs;
+       nenvs++;
+}
+
+static envpair *findenv(void *tenv)
+{
+       int i;
+       if (!lastenv || lastenv->tenv != tenv) {
+               for (i=0; i<nenvs; i++)
+                       if (envs[i].tenv == tenv)
+                               break;
+               assert(i < nenvs);
+               lastenv = &envs[i];
+       }
+       return lastenv;
+}
+
+static void delenv(envpair *ep)
+{
+       int i = ep - envs;
+       for (; i<nenvs-1; i++)
+               envs[i] = envs[i+1];
+       nenvs--;
+       lastenv = NULL;
+}
+
+static void addtxn(void *tenv, void *ttxn, MDB_txn *rtxn)
+{
+       envpair *ep;
+       txnpair *tp;
+
+       ep = findenv(tenv);
+       assert(ep->ntxns < MAXTXNS);
+       tp = ep->txns+ep->ntxns;
+       tp->ttxn = ttxn;
+       tp->rtxn = rtxn;
+       tp->ncursors = 0;
+       ep->ntxns++;
+       lasttxn = tp;
+}
+
+static txnpair *findtxn(void *ttxn)
+{
+       int i, j;
+       if (lasttxn && lasttxn->ttxn == ttxn)
+               return lasttxn;
+       if (lastenv) {
+               for (i=0; i<lastenv->ntxns; i++) {
+                       if (lastenv->txns[i].ttxn == ttxn) {
+                               lasttxn = lastenv->txns+i;
+                               return lasttxn;
+                       }
+               }
+       }
+       for (i=0; i<nenvs; i++) {
+               if (envs+i == lastenv) continue;
+               for (j=0; j<envs[i].ntxns; j++) {
+                       if (envs[i].txns[j].ttxn == ttxn) {
+                               lastenv = envs+i;
+                               lasttxn = envs[i].txns+j;
+                               return lasttxn;
+                       }
+               }
+       }
+       assert(0);      /* should have found it */
+}
+
+static void deltxn(txnpair *tp)
+{
+       int i = tp - lastenv->txns;
+       for (; i<lastenv->ntxns-1; i++)
+               lastenv->txns[i] = lastenv->txns[i+1];
+       lastenv->ntxns--;
+       lasttxn = NULL;
+}
+
+static void addcrs(txnpair *tp, void *tcrs, MDB_cursor *rcrs)
+{
+       int j = tp->ncursors;
+       assert(tp->ncursors < MAXCRSS);
+
+       tp->cursors[j].tcrs = tcrs;
+       tp->cursors[j].rcrs = rcrs;
+       tp->ncursors++;
+       lastcrs = tp->cursors+j;
+}
+
+static crspair *findcrs(void *tcrs)
+{
+       int i, j, k;
+       envpair *ep;
+       txnpair *tp;
+       crspair *cp;
+       if (lastcrs && lastcrs->tcrs == tcrs)
+               return lastcrs;
+       if (lasttxn) {
+               for (k=0, cp=lasttxn->cursors; k<lasttxn->ncursors; k++, cp++) {
+                       if (cp->tcrs == tcrs) {
+                               lastcrs = cp;
+                               return lastcrs;
+                       }
+               }
+       }
+       if (lastenv) {
+               for (j=0, tp=lastenv->txns; j<lastenv->ntxns; j++, tp++){
+                       if (tp == lasttxn)
+                               continue;
+                       for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
+                               if (cp->tcrs == tcrs) {
+                                       lastcrs = cp;
+                                       lasttxn = tp;
+                                       return lastcrs;
+                               }
+                       }
+               }
+       }
+       for (i=0, ep=envs; i<nenvs; i++, ep++) {
+               for (j=0, tp=ep->txns; j<ep->ntxns; j++, tp++) {
+                       if (tp == lasttxn)
+                               continue;
+                       for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
+                               if (cp->tcrs == tcrs) {
+                                       lastcrs = cp;
+                                       lasttxn = tp;
+                                       lastenv = ep;
+                                       return lastcrs;
+                               }
+                       }
+               }
+       }
+       assert(0);      /* should have found it already */
+}
+
+static void delcrs(void *tcrs)
+{
+       int i;
+       crspair *cp = findcrs(tcrs);
+       mdb_cursor_close(cp->rcrs);
+       for (i = cp - lasttxn->cursors; i<lasttxn->ncursors-1; i++)
+               lasttxn->cursors[i] = lasttxn->cursors[i+1];
+       lasttxn->ncursors--;
+       lastcrs = NULL;
+}
+
+void child()
+{
+       int rc;
+       MDB_val key, data;
+       char *ptr;
+
+       while (fgets(inbuf, sizeof(inbuf), stdin)) {
+               ptr = inbuf;
+               if (!strncmp(ptr, SCMP("exit")))
+                       break;
+
+               if (!strncmp(ptr, SCMP("mdb_env_create"))) {
+                       void *tenv;
+                       MDB_env *renv;
+                       sscanf(ptr+SOFF("mdb_env_create"), "%p", &tenv);
+                       E(mdb_env_create(&renv));
+                       addenv(tenv, renv);
+               } else if (!strncmp(ptr, SCMP("mdb_env_set_maxdbs"))) {
+                       void *tenv;
+                       envpair *ep;
+                       unsigned int maxdbs;
+                       sscanf(ptr+SOFF("mdb_env_set_maxdbs"), "%p, %u", &tenv, &maxdbs);
+                       ep = findenv(tenv);
+                       E(mdb_env_set_maxdbs(ep->renv, maxdbs));
+               } else if (!strncmp(ptr, SCMP("mdb_env_set_mapsize"))) {
+                       void *tenv;
+                       envpair *ep;
+                       mdb_size_t mapsize;
+                       sscanf(ptr+SOFF("mdb_env_set_mapsize"), "%p, %"MDB_SCNy(u), &tenv, &mapsize);
+                       ep = findenv(tenv);
+                       E(mdb_env_set_mapsize(ep->renv, mapsize));
+               } else if (!strncmp(ptr, SCMP("mdb_env_open"))) {
+                       void *tenv;
+                       envpair *ep;
+                       char *path;
+                       int len;
+                       unsigned int flags, mode;
+                       sscanf(ptr+SOFF("mdb_env_open"), "%p, %n", &tenv, &len);
+                       path = ptr+SOFF("mdb_env_open")+len;
+                       ptr = strchr(path, ',');
+                       *ptr++ = '\0';
+                       sscanf(ptr, "%u, %o", &flags, &mode);
+                       ep = findenv(tenv);
+                       E(mdb_env_open(ep->renv, path, flags, mode));
+                       if (!maxkey) {
+                               maxkey = mdb_env_get_maxkeysize(ep->renv);
+                               kbuf = malloc(maxkey+2);
+                               dbuf = malloc(maxkey+2);
+                               dbufsize = maxkey;
+                       }
+               } else if (!strncmp(ptr, SCMP("mdb_env_close"))) {
+                       void *tenv;
+                       envpair *ep;
+                       sscanf(ptr+SOFF("mdb_env_close"), "%p", &tenv);
+                       ep = findenv(tenv);
+                       mdb_env_close(ep->renv);
+                       delenv(ep);
+                       if (!nenvs)     /* if no other envs left, this process is done */
+                               break;
+               } else if (!strncmp(ptr, SCMP("mdb_txn_begin"))) {
+                       unsigned int flags;
+                       void *tenv, *ttxn;
+                       envpair *ep;
+                       MDB_txn *rtxn;
+                       sscanf(ptr+SOFF("mdb_txn_begin"), "%p, %*p, %u = %p", &tenv, &flags, &ttxn);
+                       ep = findenv(tenv);
+                       E(mdb_txn_begin(ep->renv, NULL, flags, &rtxn));
+                       addtxn(tenv, ttxn, rtxn);
+               } else if (!strncmp(ptr, SCMP("mdb_txn_commit"))) {
+                       void *ttxn;
+                       txnpair *tp;
+                       sscanf(ptr+SOFF("mdb_txn_commit"), "%p", &ttxn);
+                       tp = findtxn(ttxn);
+                       E(mdb_txn_commit(tp->rtxn));
+                       deltxn(tp);
+               } else if (!strncmp(ptr, SCMP("mdb_txn_abort"))) {
+                       void *ttxn;
+                       txnpair *tp;
+                       sscanf(ptr+SOFF("mdb_txn_abort"), "%p", &ttxn);
+                       tp = findtxn(ttxn);
+                       mdb_txn_abort(tp->rtxn);
+                       deltxn(tp);
+               } else if (!strncmp(ptr, SCMP("mdb_dbi_open"))) {
+                       void *ttxn;
+                       txnpair *tp;
+                       char *dbname;
+                       unsigned int flags;
+                       unsigned int tdbi;
+                       MDB_dbi dbi;
+                       sscanf(ptr+SOFF("mdb_dbi_open"), "%p, ", &ttxn);
+                       dbname = strchr(ptr+SOFF("mdb_dbi_open"), ',');
+                       dbname += 2;
+                       ptr = strchr(dbname, ',');
+                       *ptr++ = '\0';
+                       if (!strcmp(dbname, "(null)"))
+                               dbname = NULL;
+                       sscanf(ptr, "%u = %u", &flags, &tdbi);
+                       tp = findtxn(ttxn);
+                       E(mdb_dbi_open(tp->rtxn, dbname, flags, &dbi));
+               } else if (!strncmp(ptr, SCMP("mdb_dbi_close"))) {
+                       void *tenv;
+                       envpair *ep;
+                       unsigned int tdbi;
+                       sscanf(ptr+SOFF("mdb_dbi_close"), "%p, %u", &tenv, &tdbi);
+                       ep = findenv(tenv);
+                       mdb_dbi_close(ep->renv, tdbi);
+               } else if (!strncmp(ptr, SCMP("mdb_cursor_open"))) {
+                       void *ttxn, *tcrs;
+                       txnpair *tp;
+                       MDB_cursor *rcrs;
+                       unsigned int tdbi;
+                       sscanf(ptr+SOFF("mdb_cursor_open"), "%p, %u = %p", &ttxn, &tdbi, &tcrs);
+                       tp = findtxn(ttxn);
+                       E(mdb_cursor_open(tp->rtxn, tdbi, &rcrs));
+                       addcrs(tp, tcrs, rcrs);
+               } else if (!strncmp(ptr, SCMP("mdb_cursor_close"))) {
+                       void *tcrs;
+                       sscanf(ptr+SOFF("mdb_cursor_close"), "%p", &tcrs);
+                       delcrs(tcrs);
+               } else if (!strncmp(ptr, SCMP("mdb_cursor_put"))) {
+                       void *tcrs;
+                       crspair *cp;
+                       unsigned int flags;
+                       int len;
+                       sscanf(ptr+SOFF("mdb_cursor_put"), "%p, ", &tcrs);
+                       cp = findcrs(tcrs);
+                       ptr = strchr(ptr+SOFF("mdb_cursor_put"), ',');
+                       sscanf(ptr+1, "%"MDB_SCNy(u)",", &key.mv_size);
+                       if (key.mv_size) {
+                               ptr = strchr(ptr, '[');
+                               inhex(ptr+1, kbuf);
+                               key.mv_data = kbuf;
+                               ptr += key.mv_size * 2 + 2;
+                       }
+                       ptr = strchr(ptr+1, ',');
+                       sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
+                       if (data.mv_size > dbufsize) {
+                               dbuf = realloc(dbuf, data.mv_size+2);
+                               assert(dbuf != NULL);
+                               dbufsize = data.mv_size;
+                       }
+                       ptr += len+1;
+                       if (*ptr == '[') {
+                               inhex(ptr+1, dbuf);
+                               data.mv_data = dbuf;
+                               ptr += data.mv_size * 2 + 2;
+                       } else {
+                               sprintf(dbuf, "%09ld", (long)mdb_txn_id(lasttxn->rtxn));
+                       }
+                       sscanf(ptr+1, "%u", &flags);
+                       E(mdb_cursor_put(cp->rcrs, &key, &data, flags));
+               } else if (!strncmp(ptr, SCMP("mdb_cursor_del"))) {
+                       void *tcrs;
+                       crspair *cp;
+                       unsigned int flags;
+                       sscanf(ptr+SOFF("mdb_cursor_del"), "%p, %u", &tcrs, &flags);
+                       cp = findcrs(tcrs);
+                       E(mdb_cursor_del(cp->rcrs, flags));
+               } else if (!strncmp(ptr, SCMP("mdb_put"))) {
+                       void *ttxn;
+                       txnpair *tp;
+                       unsigned int tdbi, flags;
+                       int len;
+                       sscanf(ptr+SOFF("mdb_put"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
+                       tp = findtxn(ttxn);
+                       ptr = strchr(ptr+SOFF("mdb_put"), '[');
+                       inhex(ptr+1, kbuf);
+                       key.mv_data = kbuf;
+                       ptr += key.mv_size * 2 + 2;
+                       sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
+                       if (data.mv_size > dbufsize) {
+                               dbuf = realloc(dbuf, data.mv_size+2);
+                               assert(dbuf != NULL);
+                               dbufsize = data.mv_size;
+                       }
+                       ptr += len+1;
+                       if (*ptr == '[') {
+                               inhex(ptr+1, dbuf);
+                               ptr += data.mv_size * 2 + 2;
+                       } else {
+                               sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
+                       }
+                       data.mv_data = dbuf;
+                       sscanf(ptr+1, "%u", &flags);
+                       RES(MDB_KEYEXIST,mdb_put(tp->rtxn, tdbi, &key, &data, flags));
+               } else if (!strncmp(ptr, SCMP("mdb_del"))) {
+                       void *ttxn;
+                       txnpair *tp;
+                       unsigned int tdbi;
+                       int len;
+                       sscanf(ptr+SOFF("mdb_del"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
+                       tp = findtxn(ttxn);
+                       ptr = strchr(ptr+SOFF("mdb_del"), '[');
+                       inhex(ptr+1, kbuf);
+                       key.mv_data = kbuf;
+                       ptr += key.mv_size * 2 + 2;
+                       sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
+                       if (data.mv_size > dbufsize) {
+                               dbuf = realloc(dbuf, data.mv_size+2);
+                               assert(dbuf != NULL);
+                               dbufsize = data.mv_size;
+                       }
+                       ptr += len+1;
+                       if (*ptr == '[') {
+                               inhex(ptr+1, dbuf);
+                       } else {
+                               sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
+                       }
+                       data.mv_data = dbuf;
+                       RES(MDB_NOTFOUND,mdb_del(tp->rtxn, tdbi, &key, &data));
+               }
+               write(1, "\n", 1);
+       }
+       exit(0);
+}
+
+static pidpair *addpid(int tpid)
+{
+       int fdout[2], fdin[2];
+       pid_t pid;
+       assert(npids < MAXPIDS);
+       pids[npids].tpid = tpid;
+       pipe(fdout);
+       pipe(fdin);
+       if ((pid = fork()) == 0) {
+               /* child */
+               fclose(stdin);
+               fclose(stdout);
+               dup2(fdout[0], 0);
+               dup2(fdin[1], 1);
+               stdin = fdopen(0, "r");
+               stdout = fdopen(1, "w");
+               child();
+               return 0;       /* NOTREACHED */
+       } else {
+               pids[npids].rpid = pid;
+               pids[npids].fdout = fdout[1];
+               pids[npids].fdin = fdin[0];
+               lastpid = pids+npids;
+               npids++;
+               return lastpid;
+       }
+}
+
+static pidpair *findpid(int tpid)
+{
+       int i;
+       if (!lastpid || lastpid->tpid != tpid) {
+               for (i=0; i<npids; i++)
+                       if (pids[i].tpid == tpid)
+                               break;
+               if (i == npids)
+                       return NULL;
+               lastpid = &pids[i];
+       }
+       return lastpid;
+}
+
+volatile pid_t killpid;
+
+static void delpid(int tpid)
+{
+       pidpair *pp = findpid(tpid);
+       if (pp) {
+               pid_t kpid = pp->rpid;
+               killpid = kpid;
+               write(pp->fdout, "exit\n", sizeof("exit"));
+               while (killpid == kpid)
+                       usleep(10000);
+       }
+}
+
+static void reaper(int sig)
+{
+       int status, i;
+       pid_t pid = waitpid(-1, &status, 0);
+       if (pid > 0) {
+               fprintf(stderr, "# %s %d\n", WIFEXITED(status) ? "exited" : "killed", pid);
+               for (i=0; i<npids; i++)
+                       if (pids[i].rpid == pid)
+                               break;
+               assert(i < npids);
+               close(pids[i].fdout);
+               close(pids[i].fdin);
+               for (;i<npids-1; i++)
+                       pids[i] = pids[i+1];
+               npids--;
+               killpid = 0;
+       }
+}
+
+int main(int argc,char * argv[])
+{
+       signal(SIGCHLD, reaper);
+
+       while (fgets(inbuf, sizeof(inbuf), stdin)) {
+               pidpair *pp;
+               int tpid, len;
+               char c, *ptr;
+               lcount++;
+
+               if (inbuf[0] == '#' && !strncmp(inbuf+1, SCMP(" killed"))) {
+                       sscanf(inbuf+SOFF("killed"),"%d", &tpid);
+                       delpid(tpid);
+                       continue;
+               }
+
+               if (inbuf[0] != '>')
+                       continue;
+               ptr = inbuf+1;
+               sscanf(ptr, "%d:%n", &tpid, &len);
+               pp = findpid(tpid);
+               if (!pp)
+                       pp = addpid(tpid);      /* new process */
+
+               ptr = inbuf+len+1;
+               len = strlen(ptr);
+               write(pp->fdout, ptr, len);     /* send command and wait for ack */
+               read(pp->fdin, &c, 1);
+       }
+       while (npids)
+               delpid(pids[0].tpid);
+}