]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
ITS#9584 Do not rely on retry=.* to reschedule new syncrepl sessions
authorOndřej Kuzník <ondra@mistotebe.net>
Wed, 26 Jan 2022 17:39:41 +0000 (17:39 +0000)
committerQuanah Gibson-Mount <quanah@openldap.org>
Fri, 28 Jan 2022 17:49:30 +0000 (17:49 +0000)
servers/slapd/syncrepl.c

index 47a773f4d71039091c3fcb6e224b892bb6e81e05..0042abdf330b3eb79cc63594cd77593b3c3e13c8 100644 (file)
@@ -93,6 +93,13 @@ typedef struct cookie_state {
        struct syncinfo_s *cs_refreshing;
 } cookie_state;
 
+#define SYNC_TIMEOUT   0
+#define SYNC_SHUTDOWN  -100
+#define SYNC_ERROR             -101
+#define SYNC_REPOLL            -102
+#define SYNC_PAUSED            -103
+#define SYNC_BUSY              -104
+
 #define        SYNCDATA_DEFAULT        0       /* entries are plain LDAP entries */
 #define        SYNCDATA_ACCESSLOG      1       /* entries are accesslog format */
 #define        SYNCDATA_CHANGELOG      2       /* entries are changelog format */
@@ -145,6 +152,7 @@ typedef struct syncinfo_s {
        int                     si_refreshDelete;
        int                     si_refreshPresent;
        int                     si_refreshDone;
+       int                     si_paused;
        int                     si_syncdata;
        int                     si_logstate;
        int                     si_lazyCommit;
@@ -494,6 +502,64 @@ init_syncrepl(syncinfo_t *si)
        si->si_exattrs = exattrs;       
 }
 
+static int
+start_refresh(syncinfo_t *si)
+{
+       ldap_pvt_thread_mutex_lock( &si->si_cookieState->cs_refresh_mutex );
+       if ( si->si_cookieState->cs_refreshing ) {
+               struct re_s* rtask = si->si_re;
+
+               ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+               ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
+               ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
+               si->si_paused = 1;
+               Debug( LDAP_DEBUG_SYNC, "start_refresh: %s "
+                               "a refresh on %s in progress, pausing\n",
+                               si->si_ridtxt, si->si_cookieState->cs_refreshing->si_ridtxt );
+               ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
+               return SYNC_BUSY;
+       }
+       si->si_cookieState->cs_refreshing = si;
+       ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
+
+       return LDAP_SUCCESS;
+}
+
+static int
+refresh_finished(syncinfo_t *si)
+{
+       syncinfo_t *sie;
+       int removed = 0;
+
+       ldap_pvt_thread_mutex_lock( &si->si_cookieState->cs_refresh_mutex );
+       if ( si->si_cookieState->cs_refreshing == si ) {
+               si->si_cookieState->cs_refreshing = NULL;
+               removed = 1;
+       }
+
+       if ( removed ) {
+               for ( sie = si->si_be->be_syncinfo; sie; sie = sie->si_next ) {
+                       if ( sie->si_paused ) {
+                               struct re_s* rtask = sie->si_re;
+
+                               Debug( LDAP_DEBUG_SYNC, "refresh_finished: %s "
+                                               "rescheduling refresh on %s\n",
+                                               si->si_ridtxt, sie->si_ridtxt );
+                               sie->si_paused = 0;
+                               ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+                               rtask->interval.tv_sec = 0;
+                               ldap_pvt_runqueue_resched( &slapd_rq, rtask, 0 );
+                               rtask->interval.tv_sec = si->si_interval;
+                               ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+                               break;
+                       }
+               }
+       }
+       ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
+       return removed;
+}
+
 static struct berval generic_filterstr = BER_BVC("(objectclass=*)");
 
 static int
@@ -517,6 +583,8 @@ ldap_sync_search(
        ber_init2( ber, NULL, LBER_USE_DER );
        ber_set_option( ber, LBER_OPT_BER_MEMCTX, &ctx );
 
+       si->si_msgid = 0;
+
        /* If we're using a log but we have no state, then fallback to
         * normal mode for a full refresh.
         */
@@ -525,6 +593,11 @@ ldap_sync_search(
                        LDAPMessage *res, *msg;
                        unsigned long first = 0, last = 0;
                        int gotfirst = 0, gotlast = 0;
+
+                       if ( (rc = start_refresh( si )) ) {
+                               return rc;
+                       }
+
                        /* See if we're new enough for the remote server */
                        lattrs[0] = "firstchangenumber";
                        lattrs[1] = "lastchangenumber";
@@ -603,6 +676,10 @@ ldap_sync_search(
                attrs = lattrs;
                attrsonly = 0;
        } else {
+               if ( (rc = start_refresh( si )) ) {
+                       return rc;
+               }
+
                rhint = 1;
                base = si->si_base.bv_val;
                filter = si->si_filterstr.bv_val;
@@ -925,13 +1002,6 @@ check_syncprov(
        return changed;
 }
 
-#define SYNC_TIMEOUT   0
-#define SYNC_SHUTDOWN  -100
-#define SYNC_ERROR             -101
-#define SYNC_REPOLL            -102
-#define SYNC_PAUSED            -103
-#define SYNC_BUSY              -104
-
 static int
 do_syncrep1(
        Operation *op,
@@ -945,20 +1015,6 @@ do_syncrep1(
        void    *ssl;
 #endif
 
-       ldap_pvt_thread_mutex_lock( &si->si_cookieState->cs_refresh_mutex );
-       if ( si->si_cookieState->cs_refreshing ) {
-               ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
-               return SYNC_BUSY;
-       }
-
-       /*
-        * FIXME: Right now, ldap_sync_search decides whether we're in logging or
-        * fallback mode for log-based replication, so we have to claim the
-        * refreshing role (ITS#9584) preemptively and release it later.
-        */
-       si->si_cookieState->cs_refreshing = si;
-       ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
-
        si->si_lastconnect = slap_get_time();
        rc = slap_client_connect( &si->si_ld, &si->si_bindconf );
        if ( rc != LDAP_SUCCESS ) {
@@ -1125,7 +1181,10 @@ do_syncrep1(
 
        rc = ldap_sync_search( si, op->o_tmpmemctx );
 
-       if( rc != LDAP_SUCCESS ) {
+       if ( rc == SYNC_BUSY ) {
+               return rc;
+       } else if ( rc != LDAP_SUCCESS ) {
+               refresh_finished( si );
                Debug( LDAP_DEBUG_ANY, "do_syncrep1: %s "
                        "ldap_search_ext: %s (%d)\n",
                        si->si_ridtxt, ldap_err2string( rc ), rc );
@@ -1138,12 +1197,6 @@ done:
                        si->si_ld = NULL;
                }
        }
-       if ( rc || ( si->si_syncdata && si->si_logstate == SYNCLOG_LOGGING ) ) {
-               ldap_pvt_thread_mutex_lock( &si->si_cookieState->cs_refresh_mutex );
-               assert( si->si_cookieState->cs_refreshing == si );
-               si->si_cookieState->cs_refreshing = NULL;
-               ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
-       }
 
        return rc;
 }
@@ -1795,9 +1848,7 @@ logerr:
                                        }
                                        ber_scanf( ber, /*"{"*/ "}" );
                                        if ( refreshing && si->si_refreshDone ) {
-                                               ldap_pvt_thread_mutex_lock( &si->si_cookieState->cs_refresh_mutex );
-                                               si->si_cookieState->cs_refreshing = NULL;
-                                               ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
+                                               refresh_finished( si );
                                                refreshing = 0;
                                        }
                                        break;
@@ -1946,10 +1997,7 @@ done:
                        si->si_ridtxt, err, ldap_err2string( err ) );
        }
        if ( refreshing && ( rc || si->si_refreshDone ) ) {
-               ldap_pvt_thread_mutex_lock( &si->si_cookieState->cs_refresh_mutex );
-               assert( si->si_cookieState->cs_refreshing == si );
-               si->si_cookieState->cs_refreshing = NULL;
-               ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
+               refresh_finished( si );
        }
 
        slap_sync_cookie_free( &syncCookie, 0 );
@@ -2105,6 +2153,13 @@ do_syncrepl(
                op->o_dn = op->o_bd->be_rootdn;
                op->o_ndn = op->o_bd->be_rootndn;
                rc = do_syncrep1( op, si );
+       } else if ( !si->si_msgid ) {
+               /* We got a SYNC_BUSY, now told to resume */
+               rc = ldap_sync_search( si, op->o_tmpmemctx );
+       }
+       if ( rc == SYNC_BUSY ) {
+               ldap_pvt_thread_mutex_unlock( &si->si_mutex );
+               return NULL;
        }
 
 reload:
@@ -6108,11 +6163,7 @@ syncinfo_free( syncinfo_t *sie, int free_all )
                }
                if ( sie->si_cookieState ) {
                        /* Could be called from do_syncrepl (server unpaused) */
-                       ldap_pvt_thread_mutex_lock( &sie->si_cookieState->cs_refresh_mutex );
-                       if ( sie->si_cookieState->cs_refreshing == sie ) {
-                               sie->si_cookieState->cs_refreshing = NULL;
-                       }
-                       ldap_pvt_thread_mutex_unlock( &sie->si_cookieState->cs_refresh_mutex );
+                       refresh_finished( sie );
 
                        sie->si_cookieState->cs_ref--;
                        if ( !sie->si_cookieState->cs_ref ) {