]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
*search: introduce open.lock for reader safety
authorEric Wong <e@80x24.org>
Wed, 8 Oct 2025 21:24:21 +0000 (21:24 +0000)
committerEric Wong <e@80x24.org>
Fri, 10 Oct 2025 01:12:55 +0000 (01:12 +0000)
public-inbox-compact and -xcpdb (reshard) both use a series of
rename(2) operations to replace Xapian shard directories quickly
to minimize downtime.  While a single rename(2) is atomic,
chaining two or more atomic operations is not.

Readers now acquire a shared lock via LOCK_SH of flock(2)
if it exists, but tolerates ENOENT for backwards compatibility
with indices that haven't been written by the current version.

The open.lock only protects against parallel open(2) calls used
by readers while short rename(2) operations are taking place on
the writers.  In other words, it allows parallel readers but
only a single writer process to do renames.

This open.lock will become more important with --split-shards
in the next commit.

20 files changed:
Documentation/public-inbox-extindex-format.pod
Documentation/public-inbox-v1-format.pod
Documentation/public-inbox-v2-format.pod
lib/PublicInbox/CodeSearch.pm
lib/PublicInbox/ExtSearch.pm
lib/PublicInbox/Inbox.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Lock.pm
lib/PublicInbox/Search.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/SearchIdxShard.pm
lib/PublicInbox/V2Writable.pm
lib/PublicInbox/XapHelper.pm
lib/PublicInbox/Xapcmd.pm
lib/PublicInbox/xap_helper.h
lib/PublicInbox/xh_cidx.h
lib/PublicInbox/xh_thread_fp.h
t/cindex.t
t/convert-compact.t
t/xap_helper.t

index d47e21026a43e39998f543f25a154be76a5d0efb..5ee7a8e0c2bbfa8abe122fe455edcc063116a2b9 100644 (file)
@@ -39,12 +39,13 @@ unique Message-ID requirement of NNTP.
 
   foo/                              # "foo" is the name of the index
   - ei.lock                         # lock file to protect global state
+  - open.lock                       # shared lock for readers (2.0+)
   - ALL.git                         # empty, alternates for inboxes
   - ei$SCHEMA_VERSION/$SHARD        # per-shard Xapian DB
   - ei$SCHEMA_VERSION/over.sqlite3  # overview DB for WWW, IMAP
   - ei$SCHEMA_VERSION/misc          # misc Xapian DB
 
-File and directory names are intentionally different from
+Most file and directory names are intentionally different from
 analogous v2 names to ensure extindex and v2 inboxes can
 easily be distinguished from each other.
 
@@ -91,8 +92,10 @@ reindexing and rely exclusively on GC.
 
 =head1 LOCKING
 
-L<flock(2)> locking exclusively locks the empty ei.lock file
-for all non-atomic operations.
+L<flock(2)> locking exclusively locks the empty C<ei.lock> file
+for all non-atomic operations.  In 2.0+, the L<flock(2)>
+C<open.lock> is a shared lock used by readers to protect them
+from L<public-inbox-compact(1)> and L<public-inbox-xcpdb(1)>.
 
 =head1 THANKS
 
@@ -101,7 +104,7 @@ and testing.
 
 =head1 COPYRIGHT
 
-Copyright 2020-2021 all contributors L<mailto:meta@public-inbox.org>
+Copyright all contributors L<mailto:meta@public-inbox.org>
 
 License: AGPL-3.0+ L<http://www.gnu.org/licenses/agpl-3.0.txt>
 
index db223fd9388b090557c17d9db1d20cf2b7203673..b1832890ba56096a982028f051491bdd553c09ee 100644 (file)
@@ -46,8 +46,8 @@ is also stripped as that header makes no sense in a public archive.
 
 =head1 LOCKING
 
-L<flock(2)> locking exclusively locks the empty $GIT_DIR/ssoma.lock file
-for all non-atomic operations.
+L<flock(2)> locking exclusively locks the empty C<$GIT_DIR/ssoma.lock>
+file for all non-atomic operations.
 
 =head1 EXAMPLE INPUT FLOW (SERVER-SIDE MDA)
 
@@ -155,6 +155,12 @@ A git index file used for MDA updates.  The normal git index (in
 $GIT_DIR/index) is not used at all as there is typically no working
 tree.
 
+=item $GIT_DIR/public-inbox/open.lock
+
+An empty file for L<flock(2)> locking the C<xapian*> directory
+to protect the final phase of L<public-inbox-compact(1)>.
+Introduced in public-inbox 2.0+.
+
 =back
 
 Each client $GIT_DIR may have multiple mbox/maildir/command targets.
index de3b0bfd390fe71602366e43a5cc5fba15eafaee..dd6fca3dc593bd8cfd295224e19a42db79e94cae 100644 (file)
@@ -31,6 +31,7 @@ databases for parallelism by "shards".
   - xap$SCHEMA_VERSION/$SHARD       # per-shard Xapian DB
   - xap$SCHEMA_VERSION/over.sqlite3 # OVER-view DB for NNTP, threading
   - msgmap.sqlite3                  # same as the v1 msgmap
+  - open.lock                       # protect Xapian shard changes (2.0+)
 
 For blob lookups, the reader only needs to open the "all.git"
 repository with $GIT_DIR/objects/info/alternates which references
@@ -162,6 +163,12 @@ described below.
 The SQLite msgmap DB is unchanged from v1, but it is now at the
 top-level of the directory.
 
+=head2 over.lock
+
+An empty file for L<flock(2)> locking the C<xap*> directory to
+protect the final phase of L<public-inbox-compact(1)>.
+Introduced in public-inbox 2.0+.
+
 =head1 OBJECT IDENTIFIERS
 
 There are three distinct type of identifiers.  content_hash is the
@@ -216,7 +223,9 @@ without making DB changes.
 =head1 LOCKING
 
 L<flock(2)> locking exclusively locks the empty inbox.lock file
-for all non-atomic operations.
+for all non-atomic operations.  In 2.0+, the L<flock(2)>
+C<open.lock> is a shared lock used by readers to protect them
+from L<public-inbox-compact(1)> and L<public-inbox-xcpdb(1)>.
 
 =head1 HEADERS
 
@@ -235,7 +244,7 @@ and testing of the v2 format.
 
 =head1 COPYRIGHT
 
-Copyright 2018-2021 all contributors L<mailto:meta@public-inbox.org>
+Copyright all contributors L<mailto:meta@public-inbox.org>
 
 License: AGPL-3.0+ L<http://www.gnu.org/licenses/agpl-3.0.txt>
 
index a80fd4b76aa316aee1e4bfe60ad57995ae51518f..ad0de24015d62beffe42eb7ddf5cc5b1f3551cdf 100644 (file)
@@ -381,4 +381,6 @@ sub repos_sorted {
        @recs = sort { $b->[0] <=> $a->[0] } @recs; # sort by commit time
 }
 
+sub open_lock { "$_[0]->{topdir}/open.lock" }
+
 1;
index d43c23e64adfd60d968ba3512d359d22d929c896..b008332284ef563783cb4d3723d8140e9f8b969f 100644 (file)
@@ -10,6 +10,7 @@ use v5.10.1;
 use PublicInbox::Over;
 use PublicInbox::Inbox;
 use PublicInbox::MiscSearch;
+use PublicInbox::Lock;
 use DBI qw(:sql_types); # SQL_BLOB
 
 # for ->reopen, ->mset, ->mset_to_artnums
@@ -27,7 +28,10 @@ sub new {
 
 sub misc {
        my ($self) = @_;
-       $self->{misc} //= PublicInbox::MiscSearch->new("$self->{xpfx}/misc");
+       $self->{misc} //= do {
+               my $lk = PublicInbox::Lock::may_sh "$self->{topdir}/open.lock";
+               PublicInbox::MiscSearch->new("$self->{xpfx}/misc");
+       };
 }
 
 # same as per-inbox ->over, for now...
@@ -121,6 +125,8 @@ sub search {
 
 sub thing_type { 'external index' }
 
+sub open_lock { "$_[0]->{topdir}/open.lock" }
+
 no warnings 'once';
 *base_url = \&PublicInbox::Inbox::base_url;
 *smsg_eml = \&PublicInbox::Inbox::smsg_eml;
index c519913736aa702af8b76e6b87dfa901b03f1389..7097653283e7d57e326e6fa6aeb9b9584702283c 100644 (file)
@@ -399,4 +399,10 @@ sub lock_file {
        $self->{inboxdir}.($self->version >= 2 ? '/inbox.lock' : '/ssoma.lock')
 }
 
+sub open_lock {
+       my ($self) = @_;
+       $self->{inboxdir}.(version($self) >= 2 ? '/open.lock'
+                                               : '/public-inbox/open.lock')
+}
+
 1;
index 5b47515ab4d69eca3b66c018dd7acf9a6e24290e..c480ddf4f4dc1b03af5f2ca510502d98da0b7991 100644 (file)
@@ -63,11 +63,14 @@ sub attach_external {
        }
        push @{$self->{shards_flat}}, @shards;
        push @{$self->{shard_dirs}}, @dirs;
+       push @{$self->{open_locks}}, $srch->open_lock;
        push(@{$self->{shard2ibx}}, $ibxish) for (@shards);
 }
 
 sub shard_dirs { @{$_[0]->{shard_dirs}} }
 
+sub xh_lock_args { map { ('-l', $_) } @{$_[0]->{open_locks}} }
+
 # returns a list of local inboxes (or count in scalar context)
 sub locals { @{$_[0]->{locals} // []} }
 
index 54164b919906a33fe884adc077521e1ad14f0866..713973c78d9656961401d4bd677ef3acad2af417 100644 (file)
@@ -5,10 +5,10 @@
 # only uses {lock_path} and {lockfh} fields
 package PublicInbox::Lock;
 use v5.12;
-use Fcntl qw(LOCK_UN LOCK_EX O_RDWR O_CREAT);
-use Carp qw(confess);
+use Fcntl qw(LOCK_UN LOCK_EX O_RDWR O_CREAT LOCK_SH);
+use Carp qw(carp confess);
 use PublicInbox::OnDestroy;
-use Errno qw(EINTR);
+use Errno qw(EINTR ENOENT);
 use autodie qw(close sysopen syswrite);
 
 sub xflock ($$) {
@@ -63,4 +63,21 @@ sub lock_for_scope_fast {
        on_destroy \&lock_release_fast, $self;
 }
 
+sub lock_for_scope_shared ($) {
+       my ($self) = @_;
+       my $fn = $self->{lock_path};
+       if (open(my $fh, '<', $fn)) {
+               xflock($fh, LOCK_SH) or confess "LOCK_SH $fn: $!";
+               $self->{lockfh} = $fh;
+               on_destroy \&lock_release, $self;
+       } else {
+               carp "W: open($fn): $!" if $! != ENOENT;
+               undef;
+       }
+}
+
+sub may_sh ($) {
+       lock_for_scope_shared(new(__PACKAGE__, $_[0]));
+}
+
 1;
index 0caa915e261f2f4493387d3bd4fe8e536d9b23d5..c37dcf3b99b1635f98f6b5bf2280919ec4c252e3 100644 (file)
@@ -7,6 +7,7 @@ package PublicInbox::Search;
 use strict;
 use v5.10.1;
 use parent qw(Exporter);
+use PublicInbox::Lock;
 our @EXPORT_OK = qw(retry_reopen int_val get_pct xap_terms);
 use List::Util qw(max);
 use POSIX qw(strftime);
@@ -89,6 +90,7 @@ our @XH_SPEC = (
        'd=s@', # shard dirs
        'g=s', # git dir (with -c)
        'k=i', # sort column (like sort(1))
+       'l=s', # open.lock path
        'm=i', # maximum number of results
        'o=i', # offset
        'r', # 1=relevance then column
@@ -251,12 +253,15 @@ sub shard_dirs ($) {
        }
 }
 
+sub open_lock ($) { "$_[0]->{xpfx}/../open.lock" }
+
 # returns all shards as separate Xapian::Database objects w/o combining
 sub xdb_shards_flat ($) {
        my ($self) = @_;
        load_xapian();
        $self->{qp_flags} //= $QP_FLAGS;
        my $slow_phrase;
+       my $lk = PublicInbox::Lock::may_sh open_lock($self);
        my @xdb = map {
                $slow_phrase ||= -f "$_/iamchert";
                $X{Database}->new($_); # raises if missing
@@ -322,6 +327,9 @@ sub new {
 sub reopen {
        my ($self) = @_;
        if (my $xdb = $self->{xdb}) {
+               my $lk = defined($self->{xpfx}) ?
+                       PublicInbox::Lock::may_sh(open_lock($self)) :
+                       undef;
                $xdb->reopen;
        }
        $self; # make chaining easier
@@ -745,11 +753,14 @@ sub xh_args { # prep getopt args to feed to xap_helper.h socket
                        $dedupe{$x->{prefix}.$sym.$x->{xprefix}} = undef;
                }
                # TODO: arbitrary header indexing goes here
-               [ sort keys %dedupe ];
+               my @apfx = map { ('-Q', $_) } sort keys %dedupe;
+               \@apfx;
        };
-       ((map { ('-d', $_) } $self->shard_dirs), map { ('-Q', $_) } @$apfx);
+       ((map { ('-d', $_) } $self->shard_dirs), $self->xh_lock_args, @$apfx);
 }
 
+sub xh_lock_args { ('-l', open_lock($_[0])) }
+
 sub docids_by_postlist ($$) {
        my ($self, $q) = @_;
        my $cur = $self->xdb->postlist_begin($q);
index 48173a2325ee59ba209bc9b7feb8c0e9724d3dbd..11191765c1aeda94338bdf7b044003798b00264c 100644 (file)
@@ -11,6 +11,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::Search PublicInbox::Lock PublicInbox::Umask
        Exporter);
+use autodie qw(open);
 use PublicInbox::Eml;
 use PublicInbox::DS qw(now);
 use PublicInbox::Search qw(xap_terms);
@@ -82,7 +83,7 @@ sub new {
        if ($version == 1) {
                $self->{lock_path} = "$inboxdir/ssoma.lock";
                $self->{oidx} = PublicInbox::OverIdx->new(
-                               $self->xdir.'/over.sqlite3', $creat_opt);
+                               "$self->{xpfx}/over.sqlite3", $creat_opt);
        } elsif ($version == 2) {
                defined $shard or die "shard is required for v2\n";
                # shard is a number
@@ -152,6 +153,7 @@ sub idx_acquire {
                        File::Path::mkpath($dir);
                        $self->{-opt}->{cow} or
                                PublicInbox::Syscall::nodatacow_dir($dir);
+                       open my $fh, '>>', $owner->open_lock;
                        # owner == self for CodeSearchIdx
                        $self->{-set_has_threadid_once} = 1 if $owner != $self;
                        $flag |= $DB_DANGEROUS if $self->{-opt}->{dangerous};
index 3b8402b7ebcda13d4ddc59cd01d16d136b736654..ea8c4ff125dabcffb3bead4bf1f35cc76a6adaab 100644 (file)
@@ -17,20 +17,22 @@ sub new {
        $self->idx_acquire;
        $self->set_metadata_once;
        $self->idx_release;
-       if ($v2w->{parallel}) {
-               local $self->{-v2w_afc} = $v2w;
-               $self->ipc_worker_spawn("shard[$shard]");
-               # Increasing the pipe size for requests speeds V2 batch imports
-               # across 8 cores by nearly 20%.  Since many of our responses
-               # are small, make the response pipe as small as possible
-               if ($F_SETPIPE_SZ) {
-                       fcntl($self->{-ipc_req}, $F_SETPIPE_SZ, 1048576);
-                       fcntl($self->{-ipc_res}, $F_SETPIPE_SZ, 4096);
-               }
-       }
        $self;
 }
 
+sub start_v2w_worker {
+       my ($self, $v2w) = @_;
+       local $self->{-v2w_afc} = $v2w;
+       $self->ipc_worker_spawn("shard[$self->{shard}]");
+       # Increasing the pipe size for requests speeds V2 batch imports
+       # across 8 cores by nearly 20%.  Since many of our responses
+       # are small, make the response pipe as small as possible
+       if ($F_SETPIPE_SZ) {
+               fcntl($self->{-ipc_req}, $F_SETPIPE_SZ, 1048576);
+               fcntl($self->{-ipc_res}, $F_SETPIPE_SZ, 4096);
+       }
+}
+
 sub _worker_done { # OnDestroy cb
        my ($self) = @_;
        die "BUG: $$ $0 xdb active" if $self->need_xapian && $self->{xdb};
index 8fa7daa3448ed31e28dab21077e2b1104022a407..0cf30c928884f47d3a17d2d5a4c4694f1986c6b9 100644 (file)
@@ -230,10 +230,16 @@ sub _idx_init { # with_umask callback
        eval {
                my $max = $self->{shards} - 1;
                my $idx = $self->{idx_shards} = [];
-               for (0..$max) {
+               my $lk = PublicInbox::Lock->new("$self->{xpfx}/../open.lock");
+               my $unlk = $lk->lock_for_scope;
+               for (0..$max) { # no forking, yet
                        push @$idx, PublicInbox::SearchIdxShard->new($self, $_)
                }
+               undef $unlk; # releases lock
                $self->{-need_xapian} = $idx->[0]->need_xapian;
+               if ($self->{parallel}) { # fork here:
+                       $_->start_v2w_worker($self) for @$idx;
+               }
        };
        if ($@) {
                delete $self->{idx_shards};
index 48203de39fecfd2ab669ce356ffe0f73e83be118..0288aeecd29f2ab934a09f072d4f1ac9ffc3564f 100644 (file)
@@ -16,6 +16,7 @@ use PublicInbox::DS qw(awaitpid);
 use autodie qw(open getsockopt);
 use POSIX qw(:signal_h);
 use Fcntl qw(LOCK_UN LOCK_EX);
+use PublicInbox::Lock;
 use Carp qw(croak);
 my $X = \%PublicInbox::Search::X;
 our (%SRCH, %WORKERS, $nworker, $workerset, $in, $SHARD_NFD, $MY_FD_MAX);
@@ -32,7 +33,9 @@ sub cmd_test_sleep { select(undef, undef, undef, 0.01) while 1 }
 
 sub iter_retry_check ($) {
        if (ref($@) =~ /\bDatabaseModifiedError\b/) {
-               $_[0]->{srch}->reopen;
+               my ($req) = @_;
+               my $lk = PublicInbox::Lock::may_sh $req->{l};
+               $req->{srch}->reopen;
                undef; # retries
        } elsif (ref($@) =~ /\bDocNotFoundError\b/) {
                warn "doc not found: $@";
@@ -118,9 +121,9 @@ sub dump_roots_iter ($$$) {
 sub dump_roots_flush ($$) {
        my ($req, $fh) = @_;
        if ($req->{wbuf} ne '') {
-               until (flock($fh, LOCK_EX)) { die "LOCK_EX: $!" if !$!{EINTR} }
+               PublicInbox::Lock::xflock($fh, LOCK_EX) or die "LOCK_EX: $!";
                print { $req->{0} } $req->{wbuf} or die "print: $!";
-               until (flock($fh, LOCK_UN)) { die "LOCK_UN: $!" if !$!{EINTR} }
+               PublicInbox::Lock::xflock($fh, LOCK_UN) or die "LOCK_UN: $!";
                $req->{wbuf} = '';
        }
 }
@@ -226,6 +229,7 @@ sub dispatch (@) {
                for my $retried (0, 1) {
                        my $slow_phrase = -f "$first/iamchert";
                        eval {
+                               my $lk = PublicInbox::Lock::may_sh $req->{l};
                                $new->{xdb} = $X->{Database}->new($first);
                                for (@$dirs) {
                                        $slow_phrase ||= -f "$_/iamchert";
@@ -250,7 +254,10 @@ sub dispatch (@) {
                srch_init_extra $new, $req;
                $SRCH{$key} = $new;
        };
-       $req->{srch}->{xdb}->reopen unless $new;
+       unless ($new) {
+               my $lk = PublicInbox::Lock::may_sh $req->{l};
+               $req->{srch}->{xdb}->reopen;
+       }
        my $timeo = $req->{K};
        alarm($timeo) if $timeo;
        $fn->($req, @argv);
index 3fdadecceae757ec46c4249fd67e6b408573607c..87db423860f0059916b1d0d5670187b401c15017 100644 (file)
@@ -5,6 +5,7 @@ use v5.12;
 use autodie qw(chmod closedir open opendir rename syswrite);
 use PublicInbox::Spawn qw(which popen_rd);
 use PublicInbox::Syscall;
+use PublicInbox::Lock;
 use PublicInbox::Admin qw(setup_signals);
 use PublicInbox::Over;
 use PublicInbox::Search qw(xap_terms);
@@ -18,6 +19,7 @@ use PublicInbox::DS;
 # commands with a version number suffix (e.g. "xapian-compact-1.5")
 our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
 our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
+my %SKIP = map { $_ => 1 } qw(. ..);
 
 sub commit_changes ($$$$) {
        my ($ibx, $im, $tmp, $opt) = @_;
@@ -35,9 +37,11 @@ sub commit_changes ($$$$) {
                my ($y) = ($b =~ m!/([0-9]+)/*\z!);
                ($y // -1) <=> ($x // -1) # we may have non-shards
        } keys %$tmp;
-       my ($xpfx, $mode);
+       my ($xpfx, $mode, $unlk);
        if (@order) {
                ($xpfx) = ($order[0] =~ m!(.*/)[^/]+/*\z!);
+               my $lk = PublicInbox::Lock->new($ibx->open_lock);
+               $unlk = $lk->lock_for_scope;
                $mode = (stat($xpfx))[2];
        }
        for my $old (@order) {
@@ -69,6 +73,7 @@ sub commit_changes ($$$$) {
                rename $new, $old;
                push @old_shard, "$old/old" if $have_old;
        }
+       undef $unlk; # unlock
 
        # trigger ->check_inodes in read-only daemons
        syswrite($im->{lockfh}, '.') if $over_chg && $im;
@@ -101,12 +106,12 @@ sub commit_changes ($$$$) {
        }
 }
 
-sub cb_spawn {
-       my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact()
+sub cb_spawn ($$$$) {
+       my ($cb, $ibxish, $args, $opt) = @_; # $cb = cpdb() or compact()
        my $pid = PublicInbox::DS::fork_persist;
        return $pid if $pid > 0;
        $SIG{__DIE__} = sub { warn @_; _exit(1) }; # don't jump up stack
-       $cb->($args, $opt);
+       $cb->($ibxish, $args, $opt);
        _exit(0);
 }
 
@@ -147,13 +152,13 @@ sub kill_pids {
        kill($sig, keys %$pids); # pids may be empty
 }
 
-sub process_queue {
-       my ($queue, $task, $opt) = @_;
+sub process_queue ($$$$) {
+       my ($ibxish, $queue, $task, $opt) = @_;
        my $max = $opt->{jobs} // scalar(@$queue);
        my $cb = \&$task;
        if ($max <= 1) {
                while (defined(my $args = shift @$queue)) {
-                       $cb->($args, $opt);
+                       $cb->($ibxish, $args, $opt);
                }
                return;
        }
@@ -165,7 +170,7 @@ sub process_queue {
        while (@$queue) {
                while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
                        my $args = shift @$queue;
-                       $pids{cb_spawn($cb, $args, $opt)} = $args;
+                       $pids{cb_spawn($cb, $ibxish, $args, $opt)} = $args;
                }
 
                my $flags = 0;
@@ -225,7 +230,7 @@ sub prepare_run {
                while (defined(my $dn = readdir($dh))) {
                        if ($dn =~ /\A[0-9]+\z/) {
                                push(@old_shards, $dn + 0);
-                       } elsif ($dn eq '.' || $dn eq '..') {
+                       } elsif ($SKIP{$dn}) {
                        } elsif ($dn =~ /\Aover\.sqlite3/) {
                        } elsif ($dn eq 'misc' && $misc_ok) {
                        } else {
@@ -254,7 +259,7 @@ sub prepare_run {
                        my $wip_dn = $wip->dirname;
                        same_fs_or_die($old, $wip_dn);
                        my $cur = "$old/$dn";
-                       push @queue, [ $src // $cur , $wip ];
+                       push @queue, [ $src // $cur, $wip ];
                        $opt->{cow} or
                                PublicInbox::Syscall::nodatacow_dir($wip_dn);
                        $tmp->{$cur} = $wip;
@@ -308,7 +313,7 @@ sub run {
        if ($task eq 'cpdb' && $opt->{reshard} && $ibx->can('cidx_run')) {
                cidx_reshard($ibx, $queue, $opt);
        } else {
-               process_queue($queue, $task, $opt);
+               process_queue $ibx, $queue, $task, $opt;
        }
        ($im // $ibx)->lock_acquire if !$opt->{-coarse_lock};
        commit_changes($ibx, $im, $tmp, $opt);
@@ -343,8 +348,8 @@ sub kill_compact { # setup_signals callback
 }
 
 # xapian-compact wrapper
-sub compact ($$) { # cb_spawn callback
-       my ($args, $opt) = @_;
+sub compact ($$$) { # cb_spawn callback
+       my ($ibxish, $args, $opt) = @_;
        my ($src, $newdir) = @$args;
        my $dst = ref($newdir) ? $newdir->dirname : $newdir;
        my $pfx = $opt->{-progress_pfx} ||= progress_pfx($src);
@@ -497,17 +502,18 @@ sub cidx_reshard { # not docid based
                push @q, [ "$tmp", $wip ];
        }
        delete $opt->{-progress_pfx};
-       process_queue(\@q, 'compact', $opt);
+       process_queue $cidx, \@q, 'compact', $opt;
 }
 
 # Like copydatabase(1), this is horribly slow; and it doesn't seem due
 # to the overhead of Perl.
-sub cpdb ($$) { # cb_spawn callback
-       my ($args, $opt) = @_;
+sub cpdb ($$$) { # cb_spawn callback
+       my ($ibxish, $args, $opt) = @_;
        my ($old, $wip) = @$args;
        my ($src, $cur_shard);
        my $reshard;
        my ($X, $flag) = xapian_write_prep($opt);
+       my $lk = PublicInbox::Lock::may_sh $ibxish->open_lock;
        if (ref($old) eq 'ARRAY') {
                my $new = $wip->dirname;
                ($cur_shard) = ($new =~ m!(?:xap|ei)[0-9]+/([0-9]+)\b!);
@@ -525,7 +531,7 @@ sub cpdb ($$) { # cb_spawn callback
                                $src = $X->{Database}->new($_);
                        }
                }
-       } else {
+       } else { # 1:1 copy
                $src = $X->{Database}->new($old);
        }
 
@@ -595,7 +601,7 @@ sub cpdb ($$) { # cb_spawn callback
 
        # this is probably the best place to do xapian-compact
        # since $dst isn't readable by HTTP or NNTP clients, yet:
-       compact([ $tmp, $new ], $opt);
+       compact $ibxish, [ $tmp, $new ], $opt;
 }
 
 1;
index fd52a70d6932bfeea11db30daa5124384dde5e1a..658c1cc7f5ead6e845ec52f18ed404df3443cec9 100644 (file)
@@ -104,9 +104,10 @@ static void *xreallocarray(void *ptr, size_t nmemb, size_t size)
 {
 #ifdef HAVE_REALLOCARRAY
        void *ret = reallocarray(ptr, nmemb, size);
-#else // can't rely on __builtin_mul_overflow in gcc 4.x :<
+#else // everyone has g++ >=5 these days, right?
        void *ret = NULL;
-       if (nmemb && size > SIZE_MAX / nmemb)
+
+       if (__builtin_mul_overflow(nmemb, size, 0))
                errno = ENOMEM;
        else
                ret = realloc(ptr, nmemb * size);
@@ -139,7 +140,7 @@ static int srch_eq(const struct srch *a, const struct srch *b)
 KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *,
                srch_hash, srch_eq)
 static srch_set *srch_cache;
-static struct srch *cur_srch; // for ThreadFieldProcessor
+static struct req *cur_req; // for ThreadFieldProcessor
 static long my_fd_max, shard_nfd;
 // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET
 static volatile int sock_fd = STDIN_FILENO;
@@ -170,11 +171,12 @@ enum exc_iter {
        ITER_ABORT
 };
 
-#define MY_ARG_MAX 256
+#define MY_ARG_MAX 256 // FIXME too small?
 typedef bool (*cmd)(struct req *);
 
 // only one request per-process since we have RLIMIT_CPU timeout
 struct req { // argv and pfxv point into global rbuf
+       char *lockv[MY_ARG_MAX]; // open.lock files
        char *argv[MY_ARG_MAX];
        char *pfxv[MY_ARG_MAX]; // -A <prefix>
        char *qpfxv[MY_ARG_MAX]; // -Q <user_prefix>[:=]<INTERNAL_PREFIX>
@@ -191,7 +193,7 @@ struct req { // argv and pfxv point into global rbuf
        unsigned long timeout_sec;
        size_t nr_out;
        long sort_col; // value column, negative means BoolWeight
-       int argc, pfxc, qpfxc, dirc;
+       int argc, pfxc, qpfxc, dirc, lockc;
        FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
        bool has_input; // fp[0] is bidirectional
        bool collapse_threads;
@@ -211,6 +213,11 @@ struct fbuf {
        size_t len;
 };
 
+struct open_locks {
+       struct req *req;
+       int lock_fd[]; // counted by req->lockc
+};
+
 #define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
 static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
 {
@@ -251,6 +258,68 @@ static Xapian::Enquire prep_enquire(const struct req *req)
        return enq;
 }
 
+static void xclose(int fd)
+{
+       if (close(fd) < 0 && errno != EINTR)
+               EABORT("BUG: close");
+}
+
+// NOT_UNUSED keeps clang happy (tested 14.0.5 on FreeBSD)
+#define NOT_UNUSED(v) (void)v
+#define AUTO_UNLOCK __attribute__((__cleanup__(unlock_ensure)))
+static void unlock_ensure(void *ptr)
+{
+       struct open_locks **lk_ = (struct open_locks **)ptr;
+       struct open_locks *lk = *lk_;
+
+       if (!lk)
+               return;
+       for (int i = 0; i < lk->req->lockc; i++)
+               if (lk->lock_fd[i] >= 0)
+                       xclose(lk->lock_fd[i]); // implicit LOCK_UN
+}
+
+static struct open_locks *lock_shared_maybe(struct req *req)
+{
+       struct open_locks *lk = NULL;
+       size_t size;
+
+       assert(req->dirc);
+       if (!req->lockc) {
+               warn("W: %s has no -l (open.lock)", req->dirv[0]);
+               return NULL;
+       }
+       assert(req->lockc > 0);
+       assert(req->lockc < MY_ARG_MAX);
+       if (__builtin_mul_overflow(sizeof(int), (size_t)req->lockc, &size)) {
+               warnx("W: too many locks (%d)", req->lockc);
+               return NULL;
+       }
+       if (__builtin_add_overflow(sizeof(*lk), size, &size)) {
+               warnx("W: too many locks (%d)", req->lockc);
+               return NULL;
+       }
+       lk = (struct open_locks *)malloc(size);
+       if (!lk) EABORT("malloc(%zu)", size);
+       lk->req = req;
+       for (int i = 0; i < req->lockc; i++) {
+               lk->lock_fd[i] = open(req->lockv[i], O_RDONLY);
+
+               if (lk->lock_fd[i] < 0) {
+                       if (errno != ENOENT)
+                               warn("W: open(%s)", req->lockv[i]);
+                       continue;
+               }
+               while (flock(lk->lock_fd[i], LOCK_SH) < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       warn("W: flock(%s, LOCK_SH)", req->lockv[i]);
+                       break;
+               }
+       }
+       return lk;
+}
+
 static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
 {
        if (!req->max) {
@@ -264,7 +333,10 @@ static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
                        Xapian::MSet mset = enq->get_mset(req->off, req->max);
                        return mset;
                } catch (const Xapian::DatabaseModifiedError & e) {
-                       req->srch->db->reopen();
+                       AUTO_UNLOCK struct open_locks *lk =
+                                                       lock_shared_maybe(req);
+                       req->srch->db->reopen(); // may throw
+                       NOT_UNUSED(lk);
                }
        }
        return enq->get_mset(req->off, req->max);
@@ -338,7 +410,10 @@ static void apply_roots_filter(struct req *req, Xapian::Query *qry)
                        *qry = Xapian::Query(Xapian::Query::OP_FILTER, *qry, f);
                        return;
                } catch (const Xapian::DatabaseModifiedError & e) {
+                       AUTO_UNLOCK struct open_locks *lk =
+                                                       lock_shared_maybe(req);
                        xdb->reopen();
+                       NOT_UNUSED(lk);
                }
        }
 }
@@ -418,12 +493,6 @@ static bool write_all(int fd, const struct fbuf *wbuf, size_t len)
                e ? err(e, "ferror|fclose "#f) : perror("ferror|fclose "#f); \
 } while (0)
 
-static void xclose(int fd)
-{
-       if (close(fd) < 0 && errno != EINTR)
-               EABORT("BUG: close");
-}
-
 static size_t off2size(off_t n)
 {
        if (n < 0 || (uintmax_t)n > SIZE_MAX)
@@ -637,6 +706,8 @@ static void srch_init(struct req *req)
                srch->qp_flags |= FLAG_PHRASE;
                i = 0;
                try {
+                       AUTO_UNLOCK struct open_locks *lk =
+                                                       lock_shared_maybe(req);
                        srch->db = new Xapian::Database(req->dirv[i]);
                        if (!lei && is_chert(req->dirv[0]))
                                srch->qp_flags &= ~FLAG_PHRASE;
@@ -647,6 +718,7 @@ static void srch_init(struct req *req)
                                        srch->qp_flags &= ~FLAG_PHRASE;
                                srch->db->add_database(Xapian::Database(dir));
                        }
+                       NOT_UNUSED(lk);
                        break;
                } catch (const Xapian::Error & e) {
                        warnx("E: Xapian::Error: %s (%s)",
@@ -738,6 +810,10 @@ static void dispatch(struct req *req)
                        case LONG_MAX: case LONG_MIN: ABORT("-k %s", optarg);
                        }
                        break;
+               case 'l':
+                       req->lockv[req->lockc++] = optarg;
+                       if (MY_ARG_MAX == req->lockc) ABORT("too many -l");
+                       break;
                case 'm': OPT_U(m, req->max, strtoull, ULLONG_MAX); break;
                case 'o': OPT_U(o, req->off, strtoull, ULLONG_MAX); break;
                case 'r': req->relevance = true; break;
@@ -776,12 +852,14 @@ static void dispatch(struct req *req)
        } else {
                assert(req->srch != kbuf.srch);
                srch_free(kbuf.srch);
+               AUTO_UNLOCK struct open_locks *lk = lock_shared_maybe(req);
                req->srch->db->reopen();
+               NOT_UNUSED(lk);
        }
        if (req->timeout_sec)
                alarm(req->timeout_sec > UINT_MAX ?
                        UINT_MAX : (unsigned)req->timeout_sec);
-       cur_srch = req->srch; // set global for *FieldProcessor
+       cur_req = req; // set global for *FieldProcessor
        try {
                if (!req->fn(req))
                        warnx("`%s' failed", req->argv[0]);
@@ -843,7 +921,7 @@ static void req_cleanup(void *ptr)
 {
        struct req *req = (struct req *)ptr;
        free(req->lenv);
-       cur_srch = NULL;
+       cur_req = NULL;
 }
 
 static void reopen_logs(void)
index 095999d09e08791ebfd7b1d8b5d00fb91ce02ad4..fa9d1ee4273d1322bee578f81d0dda66336e3e60 100644 (file)
@@ -86,7 +86,9 @@ static enum exc_iter dump_ibx_iter(struct req *req, const char *ibx_id,
                for (int p = 0; p < req->pfxc; p++)
                        dump_ibx_term(req, p, &doc, ibx_id);
        } catch (const Xapian::DatabaseModifiedError & e) {
+               AUTO_UNLOCK struct open_locks *lk = lock_shared_maybe(req);
                req->srch->db->reopen();
+               NOT_UNUSED(lk);
                return ITER_RETRY;
        } catch (const Xapian::DocNotFoundError & e) { // oh well...
                warnx("doc not found: %s", e.get_description().c_str());
@@ -230,7 +232,9 @@ static enum exc_iter dump_roots_iter(struct req *req,
                for (int p = 0; p < req->pfxc; p++)
                        dump_roots_term(req, p, drt, &root_offs, &doc);
        } catch (const Xapian::DatabaseModifiedError & e) {
+               AUTO_UNLOCK struct open_locks *lk = lock_shared_maybe(req);
                req->srch->db->reopen();
+               NOT_UNUSED(lk);
                return ITER_RETRY;
        } catch (const Xapian::DocNotFoundError & e) { // oh well...
                warnx("doc not found: %s", e.get_description().c_str());
index ccded18c3e56181d67daba05344231a7d230d71f..9a9a6adf0cb8ec23c1316c0aef912aceaa40d416 100644 (file)
@@ -19,7 +19,9 @@ static enum exc_iter xpand_col_iter(Xapian::Query *xqry,
                *xqry |= Xapian::Query(Xapian::Query::OP_VALUE_RANGE,
                                        column, val, val);
        } catch (const Xapian::DatabaseModifiedError &e) {
-               cur_srch->db->reopen();
+               AUTO_UNLOCK struct open_locks *lk = lock_shared_maybe(cur_req);
+               cur_req->srch->db->reopen();
+               NOT_UNUSED(lk);
                return ITER_RETRY;
        } catch (const Xapian::DocNotFoundError &e) { // oh well...
                warnx("doc not found: %s", e.get_description().c_str());
@@ -30,13 +32,13 @@ static enum exc_iter xpand_col_iter(Xapian::Query *xqry,
 static Xapian::Query qry_xpand_col(Xapian::Query qry, unsigned column)
 {
        Xapian::Query xqry = Xapian::Query::MatchNothing;
-       Xapian::Enquire enq(*cur_srch->db);
+       Xapian::Enquire enq(*cur_req->srch->db);
 
        enq.set_weighting_scheme(Xapian::BoolWeight());
        enq.set_query(qry);
        enq.set_collapse_key(column);
 
-       Xapian::MSet mset = enq.get_mset(0, cur_srch->db->get_doccount());
+       Xapian::MSet mset = enq.get_mset(0, cur_req->srch->db->get_doccount());
 
        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++)  {
                for (int t = 10; t > 0; --t)
@@ -63,7 +65,8 @@ Xapian::Query ThreadFieldProcessor::operator()(const std::string &str)
                throw Xapian::QueryParserError("missing } in '" + str + "'");
        } else { // thread:"{hello world}"
                std::string qstr = str.substr(1, str.size() - 2);
-               qry = cur_srch->qp->parse_query(qstr, cur_srch->qp_flags);
+               qry = cur_req->srch->qp->parse_query(qstr,
+                                               cur_req->srch->qp_flags);
        }
        return qry_xpand_col(qry, THREADID);
 }
index 2743aaa5c829afdf66bd002738e44439148a6a45..de61ae0cd3d9a85845167c6b586a1d84df21f416 100644 (file)
@@ -80,8 +80,8 @@ ok(run_script([qw(-cindex -L medium --dangerous -q -d),
 
 SKIP: {
        have_xapian_compact 2;
-       ok(run_script([qw(-compact -q), "$tmp/ext"]), 'compact on full');
-       ok(run_script([qw(-compact -q), "$tmp/med"]), 'compact on medium');
+       ok(run_script([qw(-compact -q), "$tmp/ext"]), 'compact on full');
+       ok(run_script([qw(-compact -q), "$tmp/med"]), 'compact on medium');
 }
 
 my $no_metadata_set = sub {
index 533dd67d529df545564a3f7da0d94d81df019c50..dcf3eed3b79b88d3a6a17f03a59f5856c2747cf1 100644 (file)
@@ -58,7 +58,7 @@ my ($out, $err) = ('', '');
 my $rdr = { 1 => \$out, 2 => \$err };
 
 my $cmd = [ '-compact', $ibx->{inboxdir} ];
-ok(run_script($cmd, undef, $rdr), 'v1 compact works');
+ok(run_script($cmd, undef, $rdr), 'v1 compact works') or diag $err;
 
 @xdir = glob("$ibx->{inboxdir}/public-inbox/xap*");
 is(scalar(@xdir), 1, 'got one xapian directory after compact');
index 86b44dd24af8fdf2e69ae0f3215e29237eedc7fd..beda0a6e51e8422236011e684f9bc5b9fbe645fd 100644 (file)
@@ -76,11 +76,14 @@ EOM
 };
 
 my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
-my @ibx_shard_args = map { ('-d', $_) } @ibx_idx;
+my @v2ol = ('-l', "$v2->{inboxdir}/open.lock");
+my @ibx_shard_args = (@v2ol, map { ('-d', $_) } @ibx_idx);
 my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
 my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
 is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
 is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int);
+my @ciol = ('-l', "$crepo/public-inbox-cindex/open.lock");
+my @cidx_int_shard_args = (@ciol, map { ('-d', $_) } @int);
 
 my $doreq = sub {
        my ($s, @arg) = @_;
@@ -104,12 +107,12 @@ my $test = sub {
        my $ar = PublicInbox::AutoReap->new($pid);
        diag "$cmd[-1] running pid=$pid";
        close $y;
-       my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+       my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0], @v2ol);
        my %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
        is($info{has_threadid}, '1', 'has_threadid true for inbox');
        like($info{pid}, qr/\A\d+\z/, 'got PID from inbox inspect');
 
-       $r = $doreq->($s, qw(test_inspect -d), $int[0]);
+       $r = $doreq->($s, qw(test_inspect -d), $int[0], @ciol);
        my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
        is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
        is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
@@ -135,7 +138,7 @@ my $test = sub {
        kill('TERM', $cinfo{pid});
        my $tries = 0;
        do {
-               $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+               $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0], @v2ol);
                %info = map { split(/=/, $_, 2) }
                        split(/ /, do { local $/; <$r> });
        } while ($info{pid} == $cinfo{pid} && ++$tries < 10);
@@ -143,7 +146,7 @@ my $test = sub {
 
        my %pids;
        $tries = 0;
-       my @ins = ($s, qw(test_inspect -d), $ibx_idx[0]);
+       my @ins = ($s, qw(test_inspect -d), $ibx_idx[0], @v2ol);
        kill('TTIN', $pid);
        until (scalar(keys %pids) >= 2 || ++$tries > 100) {
                tick;
@@ -224,8 +227,7 @@ for my $n (@NO_CXX) {
 
        pipe my $r, my $w;
        $xhc->mkreq([ $w, $err_w ], qw(dump_ibx -A XDFID -A Q),
-                               (map { ('-d', $_) } @ibx_idx),
-                               9, "mid:$mid");
+                               @ibx_shard_args, 9, "mid:$mid");
        close $err_w;
        close $w;
        my $res = do { local $/; <$r> };
@@ -236,7 +238,7 @@ for my $n (@NO_CXX) {
        pipe($err_r, $err_w);
        pipe $r, $w;
        $xhc->mkreq([ $w, $err_w ], qw(dump_roots -c -A XDFID),
-                       (map { ('-d', $_) } @int),
+                       @cidx_int_shard_args,
                        $root2id_file, 'dt:19700101'.'000000..');
        close $err_w;
        close $w;
@@ -305,7 +307,7 @@ for my $n (@NO_CXX) {
                pipe($err_r, $err_w);
                pipe($r, $w);
                $xhc->mkreq([ $w, $err_w ], qw(dump_roots -c -A),
-                               "XDFPOST$i", (map { ('-d', $_) } @int),
+                               "XDFPOST$i", @cidx_int_shard_args,
                                $root2id_file, 'dt:19700101'.'000000..');
                close $err_w;
                close $w;
@@ -344,7 +346,9 @@ for my $n (@NO_CXX) {
                require PublicInbox::XhcMset;
                my $over = $thr->over;
                my @thr_idx = glob("$thr->{inboxdir}/xap*/?");
-               my @thr_shard_args = map { ('-d', $_) } @thr_idx;
+               my @thr_shard_args = ('-l', "$thr->{inboxdir}/open.lock",
+                                       map { ('-d', $_) } @thr_idx);
+
                my (@art, $mset, $err);
                my $capture = sub { ($mset, $err) = @_ };
                my $retrieve = sub {
@@ -402,7 +406,7 @@ for my $n (@NO_CXX) {
                        my $t0 = now;
                        pipe $r, $w;
                        $xhc->mkreq([ $w ], qw(test_sleep -K 1 -d),
-                                       $ibx_idx[0]);
+                                       $ibx_idx[0], @v2ol);
                        close $w;
                        is readline($r), undef, 'got EOF';
                        my $diff = now - $t0;