]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
xap_helper: reopen+retry in MSetIterator loops
authorEric Wong <e@80x24.org>
Thu, 24 Aug 2023 01:22:36 +0000 (01:22 +0000)
committerEric Wong <e@80x24.org>
Thu, 24 Aug 2023 07:47:53 +0000 (07:47 +0000)
It's possible to hit a DatabaseModifiedError while iterating
through an MSet.  We'll retry in these cases and cleanup some
code in both the Perl and C++ implementations.

lib/PublicInbox/XapHelper.pm
lib/PublicInbox/xap_helper.h

index c80be81058102e2387c413625d25c969c770dfbe..ef6a47a34a0c59252a9ea85a0ac4a1a0cb8b1325 100644 (file)
@@ -36,28 +36,74 @@ sub cmd_test_inspect {
                ($req->{srch}->has_threadid ? 1 : 0)
 }
 
+sub iter_retry_check ($) {
+       die unless ref($@) =~ /\bDatabaseModifiedError\b/;
+       $_[0]->{srch}->reopen;
+       undef; # retries
+}
+
+sub dump_ibx_iter ($$$) {
+       my ($req, $ibx_id, $it) = @_;
+       my $out = $req->{0};
+       eval {
+               my $doc = $it->get_document;
+               for my $p (@{$req->{A}}) {
+                       for (xap_terms($p, $doc)) {
+                               print $out "$_ $ibx_id\n" or die "print: $!";
+                               ++$req->{nr_out};
+                       }
+               }
+       };
+       $@ ? iter_retry_check($req) : 0;
+}
+
+sub emit_mset_stats ($$) {
+       my ($req, $mset) = @_;
+       my $err = $req->{1} or return;
+       say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out}
+}
+
 sub cmd_dump_ibx {
        my ($req, $ibx_id, $qry_str) = @_;
        $qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR');
-       my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX');
+       $req->{A} or return warn('dump_ibx requires -A PREFIX');
        my $max = $req->{srch}->{xdb}->get_doccount;
        my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 };
        $opt->{eidx_key} = $req->{O} if defined $req->{O};
        my $mset = $req->{srch}->mset($qry_str, $opt);
-       my $out = $req->{0};
-       $out->autoflush(1);
-       my $nr = 0;
+       $req->{0}->autoflush(1);
        for my $it ($mset->items) {
+               for (my $t = 10; $t > 0; --$t) {
+                       $t = dump_ibx_iter($req, $ibx_id, $it) // $t;
+               }
+       }
+       if (my $err = $req->{1}) {
+               say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out}
+       }
+}
+
+sub dump_roots_iter ($$$) {
+       my ($req, $root2id, $it) = @_;
+       eval {
                my $doc = $it->get_document;
-               for my $p (@pfx) {
+               my $G = join(' ', map { $root2id->{$_} } xap_terms('G', $doc));
+               for my $p (@{$req->{A}}) {
                        for (xap_terms($p, $doc)) {
-                               print $out "$_ $ibx_id\n" or die "print: $!";
-                               ++$nr;
+                               $req->{wbuf} .= "$_ $G\n";
+                               ++$req->{nr_out};
                        }
                }
-       }
-       if (my $err = $req->{1}) {
-               say $err 'mset.size='.$mset->size.' nr_out='.$nr
+       };
+       $@ ? iter_retry_check($req) : 0;
+}
+
+sub dump_roots_flush ($$) {
+       my ($req, $fh) = @_;
+       if ($req->{wbuf} ne '') {
+               flock($fh, LOCK_EX) or die "flock: $!";
+               print { $req->{0} } $req->{wbuf} or die "print: $!";
+               flock($fh, LOCK_UN) or die "flock: $!";
+               $req->{wbuf} = '';
        }
 }
 
@@ -65,44 +111,29 @@ sub cmd_dump_roots {
        my ($req, $root2id_file, $qry_str) = @_;
        $qry_str // return
                warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR');
-       my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX');
+       $req->{A} or return warn('dump_roots requires -A PREFIX');
        open my $fh, '<', $root2id_file or die "open($root2id_file): $!";
-       my %root2id; # record format: $OIDHEX "\0" uint32_t
+       my $root2id; # record format: $OIDHEX "\0" uint32_t
        my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!");
        while (@x) {
                my $oidhex = shift @x;
-               $root2id{$oidhex} = shift @x;
+               $root2id->{$oidhex} = shift @x;
        }
        my $opt = { relevance => -1, limit => $req->{'m'},
                        offset => $req->{o} // 0 };
        my $mset = $req->{srch}->mset($qry_str, $opt);
        $req->{0}->autoflush(1);
-       my $buf = '';
-       my $nr = 0;
+       $req->{wbuf} = '';
        for my $it ($mset->items) {
-               my $doc = $it->get_document;
-               my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc));
-               for my $p (@pfx) {
-                       for (xap_terms($p, $doc)) {
-                               $buf .= "$_ $G\n";
-                               ++$nr;
-                       }
+               for (my $t = 10; $t > 0; --$t) {
+                       $t = dump_roots_iter($req, $root2id, $it) // $t;
                }
-               if (!($nr & 0x3fff)) {
-                       flock($fh, LOCK_EX) or die "flock: $!";
-                       print { $req->{0} } $buf or die "print: $!";
-                       flock($fh, LOCK_UN) or die "flock: $!";
-                       $buf = '';
+               if (!($req->{nr_out} & 0x3fff)) {
+                       dump_roots_flush($req, $fh);
                }
        }
-       if ($buf ne '') {
-               flock($fh, LOCK_EX) or die "flock: $!";
-               print { $req->{0} } $buf or die "print: $!";
-               flock($fh, LOCK_UN) or die "flock: $!";
-       }
-       if (my $err = $req->{1}) {
-               say $err 'mset.size='.$mset->size.' nr_out='.$nr
-       }
+       dump_roots_flush($req, $fh);
+       emit_mset_stats($req, $mset);
 }
 
 sub dispatch {
@@ -153,6 +184,7 @@ sub recv_loop {
                        next;
                }
                my @argv = split(/\0/, $rbuf);
+               $req->{nr_out} = 0;
                eval { $req->dispatch(@argv) } if @argv;
        }
 }
index c9b4e0cc490baa8008af938be0a71cb986d0c823..e3ccfd4170a7ae9a669c3d687bf8ff40990d6edd 100644 (file)
@@ -63,6 +63,12 @@ static void code_nrp_init(void);
 static void qp_init_mail_search(Xapian::QueryParser *);
 static void qp_init_code_search(Xapian::QueryParser *);
 
+enum exc_iter {
+       ITER_OK = 0,
+       ITER_RETRY,
+       ITER_ABORT
+};
+
 struct srch {
        int paths_len; // int for comparisons
        unsigned qp_flags;
@@ -196,6 +202,13 @@ static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
        return enquire_mset(req, &enq);
 }
 
+static void emit_mset_stats(struct req *req, const Xapian::MSet *mset)
+{
+       if (req->fp[1])
+               fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+                       (unsigned long long)mset->size(), req->nr_out);
+}
+
 static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
 {
        return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
@@ -224,6 +237,20 @@ static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
        return setvbuf(fp, NULL, _IOLBF, 0);
 }
 
+static enum exc_iter dump_ibx_iter(struct req *req, const char *ibx_id,
+                               Xapian::MSetIterator *i)
+{
+       try {
+               Xapian::Document doc = i->get_document();
+               for (int p = 0; p < req->pfxc; p++)
+                       dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+       } catch (const Xapian::DatabaseModifiedError & e) {
+               req->srch->db->reopen();
+               return ITER_RETRY;
+       }
+       return ITER_OK;
+}
+
 static bool cmd_dump_ibx(struct req *req)
 {
        if ((optind + 1) >= req->argc) {
@@ -243,20 +270,18 @@ static bool cmd_dump_ibx(struct req *req)
        req->asc = true;
        req->sort_col = -1;
        Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+
+       // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
+       // in case we need to retry on DB reopens
        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
-               try {
-                       Xapian::Document doc = i.get_document();
-                       for (int p = 0; p < req->pfxc; p++)
-                               dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
-               } catch (const Xapian::Error & e) {
-                       fprintf(orig_err, "W: %s (#%ld)\n",
-                               e.get_description().c_str(), (long)(*i));
-                       continue;
-               }
+               for (int t = 10; t > 0; --t)
+                       switch (dump_ibx_iter(req, ibx_id, &i)) {
+                       case ITER_OK: t = 0; break; // leave inner loop
+                       case ITER_RETRY: break; // continue for-loop
+                       case ITER_ABORT: return false; // error
+                       }
        }
-       if (req->fp[1])
-               fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
-                       (unsigned long long)mset.size(), req->nr_out);
+       emit_mset_stats(req, &mset);
        return true;
 }
 
@@ -312,8 +337,7 @@ static void dump_roots_ensure(void *ptr)
        fbuf_ensure(&drt->wbuf);
 }
 
-static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
-                       Xapian::Document *doc)
+static bool root2ids_str(struct fbuf *root_ids, Xapian::Document *doc)
 {
        if (!fbuf_init(root_ids)) return false;
 
@@ -408,9 +432,28 @@ done_free:
        return ok;
 }
 
+static enum exc_iter dump_roots_iter(struct req *req,
+                               struct dump_roots_tmp *drt,
+                               Xapian::MSetIterator *i)
+{
+       CLEANUP_FBUF struct fbuf root_ids = { 0 }; // " $ID0 $ID1 $IDx..\n"
+       try {
+               Xapian::Document doc = i->get_document();
+               if (!root2ids_str(&root_ids, &doc))
+                       return ITER_ABORT; // bad request, abort
+               for (int p = 0; p < req->pfxc; p++)
+                       dump_roots_term(req, req->pfxv[p], drt,
+                                       &root_ids, &doc);
+       } catch (const Xapian::DatabaseModifiedError & e) {
+               req->srch->db->reopen();
+               return ITER_RETRY;
+       }
+       return ITER_OK;
+}
+
 static bool cmd_dump_roots(struct req *req)
 {
-       CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+       CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
        if ((optind + 1) >= req->argc) {
                warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
                return false; // need file + qry_str
@@ -432,7 +475,7 @@ static bool cmd_dump_roots(struct req *req)
        // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
        // so /32 overestimates the number of expected entries by
        // ~%25 (as recommended by Linux hcreate(3) manpage)
-       size_t est = (drt.sb.st_size / 32) + 1;
+       size_t est = (drt.sb.st_size / 32) + 1; //+1 for "\0" termination
        if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
                warnx("%s size too big (%lld bytes > %zu)", root2id_file,
                        (long long)drt.sb.st_size, SIZE_MAX);
@@ -469,30 +512,24 @@ static bool cmd_dump_roots(struct req *req)
        req->asc = true;
        req->sort_col = -1;
        Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+
+       // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
+       // in case we need to retry on DB reopens
        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
-               CLEANUP_FBUF struct fbuf root_ids = { 0 };
                if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
                        return false;
-               try {
-                       Xapian::Document doc = i.get_document();
-                       if (!root2ids_str(&root_ids, &drt, &doc))
-                               return false;
-                       for (int p = 0; p < req->pfxc; p++)
-                               dump_roots_term(req, req->pfxv[p], &drt,
-                                               &root_ids, &doc);
-               } catch (const Xapian::Error & e) {
-                       fprintf(orig_err, "W: %s (#%ld)\n",
-                               e.get_description().c_str(), (long)(*i));
-                       continue;
-               }
+               for (int t = 10; t > 0; --t)
+                       switch (dump_roots_iter(req, &drt, &i)) {
+                       case ITER_OK: t = 0; break; // leave inner loop
+                       case ITER_RETRY: break; // continue for-loop
+                       case ITER_ABORT: return false; // error
+                       }
                if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
                        return false;
        }
        if (!dump_roots_flush(req, &drt))
                return false;
-       if (req->fp[1])
-               fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
-                       (unsigned long long)mset.size(), req->nr_out);
+       emit_mset_stats(req, &mset);
        return true;
 }