]> git.ipfire.org Git - thirdparty/bugzilla.git/commitdiff
Bug 1377620 - Elasticsearch bulk indexer ignores changes that happened before newest...
authorDylan William Hardison <dylan@hardison.net>
Thu, 6 Jul 2017 21:59:30 +0000 (14:59 -0700)
committerDylan William Hardison <dylan@hardison.net>
Thu, 6 Jul 2017 22:19:39 +0000 (18:19 -0400)
Bugzilla/Elastic/Indexer.pm

index fce8f1053d0f4af0f4164516e05c3c9b33c5f3b6..b691e06877567eb0cb9efbb82e3d8686be34f1cc 100644 (file)
@@ -10,16 +10,12 @@ use 5.10.1;
 use Moo;
 use List::MoreUtils qw(natatime);
 use Storable qw(dclone);
+use Scalar::Util qw(looks_like_number);
 use namespace::clean;
 
 with 'Bugzilla::Elastic::Role::HasClient';
 with 'Bugzilla::Elastic::Role::HasIndexName';
 
-has 'mtime' => (
-    is      => 'lazy',
-    clearer => 'clear_mtime',
-);
-
 has 'shadow_dbh' => ( is => 'lazy' );
 
 has 'debug_sql' => (
@@ -109,34 +105,41 @@ sub _bulk_helper {
     );
 }
 
-sub find_largest_mtime {
-    my ($self, $class) = @_;
+
+sub _find_largest {
+    my ($self, $class, $field) = @_;
 
     my $result = $self->client->search(
         index => $self->index_name,
         type  => $class->ES_TYPE,
         body  => {
-            aggs => { es_mtime => { extended_stats => { field => 'es_mtime' } } },
+            aggs => { $field => { extended_stats => { field => $field } } },
             size => 0
         }
     );
 
-    return $result->{aggregations}{es_mtime}{max};
+    my $max = $result->{aggregations}{$field}{max};
+    if (not defined $max) {
+        return 0;
+    }
+    elsif (looks_like_number($max)) {
+        return $max;
+    }
+    else {
+        die "largest value for '$field' is not a number: $max";
+    }
 }
 
-sub find_largest_id {
+sub _find_largest_mtime {
     my ($self, $class) = @_;
 
-    my $result = $self->client->search(
-        index => $self->index_name,
-        type  => $class->ES_TYPE,
-        body  => {
-            aggs => { $class->ID_FIELD => { extended_stats => { field => $class->ID_FIELD } } },
-            size => 0
-        }
-    );
+    return $self->_find_largest($class, 'es_mtime');
+}
 
-    return $result->{aggregations}{$class->ID_FIELD}{max};
+sub _find_largest_id {
+    my ($self, $class) = @_;
+
+    return $self->_find_largest($class, $class->ID_FIELD);
 }
 
 sub put_mapping {
@@ -170,46 +173,36 @@ sub _debug_sql {
 sub bulk_load {
     my ( $self, $class ) = @_;
 
-    $self->put_mapping($class);
-    my $bulk = $self->_bulk_helper($class);
-    my $ids  = $self->_select_all_ids($class);
-    $self->clear_mtime;
-    $self->_bulk_load_ids($bulk, $class, $ids) if @$ids;
-    undef $ids; # free up some memory
-
-    my $updated_ids = $self->_select_updated_ids($class);
-    if ($updated_ids) {
-        $self->_bulk_load_ids($bulk, $class, $updated_ids) if @$updated_ids;
-    }
+    my $bulk        = $self->_bulk_helper($class);
+    my $last_mtime  = $self->_find_largest_mtime($class);
+    my $last_id     = $self->_find_largest_id($class);
+    my $new_ids     = $self->_select_all_ids($class, $last_id);
+    my $updated_ids = $self->_select_updated_ids($class, $last_mtime);
+
+    $self->_bulk_load_ids($bulk, $class, $new_ids) if @$new_ids;
+    $self->_bulk_load_ids($bulk, $class, $updated_ids) if @$updated_ids;
 }
 
 sub _select_all_ids {
-    my ($self, $class) = @_;
+    my ($self, $class, $last_id) = @_;
 
-    my $dbh     = Bugzilla->dbh;
-    my $last_id = $self->find_largest_id($class);
+    my $dbh = Bugzilla->dbh;
     my ($sql, $params) = $self->_debug_sql($class->ES_SELECT_ALL_SQL($last_id));
     return $dbh->selectcol_arrayref($sql, undef, @$params);
 }
 
 sub _select_updated_ids {
-    my ($self, $class) = @_;
+    my ($self, $class, $last_mtime) = @_;
 
-    my $dbh   = Bugzilla->dbh;
-    my $mtime = $self->find_largest_mtime($class);
-    if ($mtime && $mtime != $self->mtime) {
-        my ($updated_sql, $updated_params) = $self->_debug_sql($class->ES_SELECT_UPDATED_SQL($mtime));
-        return $dbh->selectcol_arrayref($updated_sql, undef, @$updated_params);
-    } else {
-        return undef;
-    }
+    my $dbh = Bugzilla->dbh;
+    my ($updated_sql, $updated_params) = $self->_debug_sql($class->ES_SELECT_UPDATED_SQL($last_mtime));
+    return $dbh->selectcol_arrayref($updated_sql, undef, @$updated_params);
 }
 
 sub bulk_load_ids {
     my ($self, $class, $ids) = @_;
 
     $self->put_mapping($class);
-    $self->clear_mtime;
     $self->_bulk_load_ids($self->_bulk_helper($class), $class, $ids);
 }
 
@@ -217,7 +210,7 @@ sub _bulk_load_ids {
     my ($self, $bulk, $class, $all_ids) = @_;
 
     my $iter  = natatime $class->ES_OBJECTS_AT_ONCE, @$all_ids;
-    my $mtime = $self->mtime;
+    my $mtime = $self->_current_mtime;
     my $progress_bar;
     my $next_update;
 
@@ -266,7 +259,7 @@ sub _bulk_load_ids {
 
 sub _build_shadow_dbh { Bugzilla->switch_to_shadow_db }
 
-sub _build_mtime {
+sub _current_mtime {
     my ($self) = @_;
     my ($mtime) = $self->shadow_dbh->selectrow_array("SELECT UNIX_TIMESTAMP(NOW())");
     return $mtime;