]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
ITS#9584 serialize refresh phase
authorHoward Chu <hyc@openldap.org>
Tue, 27 Jul 2021 15:10:29 +0000 (16:10 +0100)
committerQuanah Gibson-Mount <quanah@openldap.org>
Fri, 4 Feb 2022 04:43:17 +0000 (04:43 +0000)
Only allow one consumer at a time to perform a refresh on a database.

Contains:
e1c90d0977d389db05803c127d45b39c89a5ac2f
79d33fe40ea41f52a2c1b9e299a6c711f62d0f40
75636a407e38f1502c592566b5bf4c3ebf142a2b
3e3d9d7637e65a40ec0ec9aa9b9bcb051e3a42b5 minus testsuite tweak

servers/slapd/syncrepl.c

index 1ac265a1ce1c153a529e3fb70d75c839edff18bd..baa669601e79ed12a078b2146b1698c3b039b076 100644 (file)
@@ -58,6 +58,8 @@ static AttributeDescription *sy_ad_dseeLastChange;
 
 #define        UUIDLEN 16
 
+struct syncinfo_s;
+
 struct nonpresent_entry {
        struct berval *npe_name;
        struct berval *npe_nname;
@@ -85,8 +87,19 @@ typedef struct cookie_state {
        struct berval *cs_pvals;
        int *cs_psids;
        int     cs_pnum;
+
+       /* serialize multi-consumer refreshes */
+       ldap_pvt_thread_mutex_t cs_refresh_mutex;
+       struct syncinfo_s *cs_refreshing;
 } cookie_state;
 
+#define SYNC_TIMEOUT   0
+#define SYNC_SHUTDOWN  -100
+#define SYNC_ERROR             -101
+#define SYNC_REPOLL            -102
+#define SYNC_PAUSED            -103
+#define SYNC_BUSY              -104
+
 #define        SYNCDATA_DEFAULT        0       /* entries are plain LDAP entries */
 #define        SYNCDATA_ACCESSLOG      1       /* entries are accesslog format */
 #define        SYNCDATA_CHANGELOG      2       /* entries are changelog format */
@@ -139,6 +152,7 @@ typedef struct syncinfo_s {
        int                     si_refreshDelete;
        int                     si_refreshPresent;
        int                     si_refreshDone;
+       int                     si_paused;
        int                     si_syncdata;
        int                     si_logstate;
        int                     si_lazyCommit;
@@ -488,6 +502,64 @@ init_syncrepl(syncinfo_t *si)
        si->si_exattrs = exattrs;       
 }
 
+static int
+start_refresh(syncinfo_t *si)
+{
+       ldap_pvt_thread_mutex_lock( &si->si_cookieState->cs_refresh_mutex );
+       if ( si->si_cookieState->cs_refreshing ) {
+               struct re_s* rtask = si->si_re;
+
+               ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+               ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
+               ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
+               si->si_paused = 1;
+               Debug( LDAP_DEBUG_SYNC, "start_refresh: %s "
+                               "a refresh on %s in progress, pausing\n",
+                               si->si_ridtxt, si->si_cookieState->cs_refreshing->si_ridtxt );
+               ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
+               return SYNC_BUSY;
+       }
+       si->si_cookieState->cs_refreshing = si;
+       ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
+
+       return LDAP_SUCCESS;
+}
+
+static int
+refresh_finished(syncinfo_t *si)
+{
+       syncinfo_t *sie;
+       int removed = 0;
+
+       ldap_pvt_thread_mutex_lock( &si->si_cookieState->cs_refresh_mutex );
+       if ( si->si_cookieState->cs_refreshing == si ) {
+               si->si_cookieState->cs_refreshing = NULL;
+               removed = 1;
+       }
+
+       if ( removed ) {
+               for ( sie = si->si_be->be_syncinfo; sie; sie = sie->si_next ) {
+                       if ( sie->si_paused ) {
+                               struct re_s* rtask = sie->si_re;
+
+                               Debug( LDAP_DEBUG_SYNC, "refresh_finished: %s "
+                                               "rescheduling refresh on %s\n",
+                                               si->si_ridtxt, sie->si_ridtxt );
+                               sie->si_paused = 0;
+                               ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+                               rtask->interval.tv_sec = 0;
+                               ldap_pvt_runqueue_resched( &slapd_rq, rtask, 0 );
+                               rtask->interval.tv_sec = si->si_interval;
+                               ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+                               break;
+                       }
+               }
+       }
+       ldap_pvt_thread_mutex_unlock( &si->si_cookieState->cs_refresh_mutex );
+       return removed;
+}
+
 static struct berval generic_filterstr = BER_BVC("(objectclass=*)");
 
 static int
@@ -511,6 +583,8 @@ ldap_sync_search(
        ber_init2( ber, NULL, LBER_USE_DER );
        ber_set_option( ber, LBER_OPT_BER_MEMCTX, &ctx );
 
+       si->si_msgid = 0;
+
        /* If we're using a log but we have no state, then fallback to
         * normal mode for a full refresh.
         */
@@ -519,6 +593,11 @@ ldap_sync_search(
                        LDAPMessage *res, *msg;
                        unsigned long first = 0, last = 0;
                        int gotfirst = 0, gotlast = 0;
+
+                       if ( (rc = start_refresh( si )) ) {
+                               return rc;
+                       }
+
                        /* See if we're new enough for the remote server */
                        lattrs[0] = "firstchangenumber";
                        lattrs[1] = "lastchangenumber";
@@ -597,6 +676,10 @@ ldap_sync_search(
                attrs = lattrs;
                attrsonly = 0;
        } else {
+               if ( (rc = start_refresh( si )) ) {
+                       return rc;
+               }
+
                rhint = 1;
                base = si->si_base.bv_val;
                filter = si->si_filterstr.bv_val;
@@ -680,6 +763,10 @@ ldap_sync_search(
                }
        }
 
+       si->si_refreshDone = 0;
+       si->si_refreshPresent = 0;
+       si->si_refreshDelete = 0;
+
        rc = ldap_search_ext( si->si_ld, base, scope, filter, attrs, attrsonly,
                ctrls, NULL, NULL, si->si_slimit, &si->si_msgid );
        ber_free_buf( ber );
@@ -929,7 +1016,6 @@ do_syncrep1(
 #endif
 
        si->si_lastconnect = slap_get_time();
-       si->si_refreshDone = 0;
        rc = slap_client_connect( &si->si_ld, &si->si_bindconf );
        if ( rc != LDAP_SUCCESS ) {
                goto done;
@@ -1095,7 +1181,10 @@ do_syncrep1(
 
        rc = ldap_sync_search( si, op->o_tmpmemctx );
 
-       if( rc != LDAP_SUCCESS ) {
+       if ( rc == SYNC_BUSY ) {
+               return rc;
+       } else if ( rc != LDAP_SUCCESS ) {
+               refresh_finished( si );
                Debug( LDAP_DEBUG_ANY, "do_syncrep1: %s "
                        "ldap_search_ext: %s (%d)\n",
                        si->si_ridtxt, ldap_err2string( rc ), rc );
@@ -1187,12 +1276,6 @@ check_csn_age(
        return rc;
 }
 
-#define SYNC_TIMEOUT   0
-#define SYNC_SHUTDOWN  -100
-#define SYNC_ERROR             -101
-#define SYNC_REPOLL            -102
-#define SYNC_PAUSED            -103
-
 static int
 get_pmutex(
        syncinfo_t *si
@@ -1236,6 +1319,8 @@ do_syncrep2(
        struct timeval tout = { 0, 0 };
 
        int             refreshDeletes = 0;
+       int             refreshing = !si->si_refreshDone &&
+                       !( si->si_syncdata && si->si_logstate == SYNCLOG_LOGGING );
        char empty[6] = "empty";
 
        if ( slapd_shutdown ) {
@@ -1307,8 +1392,6 @@ do_syncrep2(
                                                /* The notification control is only sent during persist phase */
                                                rctrlp = ldap_control_find( LDAP_CONTROL_PERSIST_ENTRY_CHANGE_NOTICE, rctrls, &next );
                                                if ( rctrlp ) {
-                                                       if ( !si->si_refreshDone )
-                                                               si->si_refreshDone = 1;
                                                        if ( si->si_refreshDone )
                                                                syncrepl_dsee_update( si, op );
                                                }
@@ -1716,6 +1799,14 @@ logerr:
                                                "LDAP_RES_INTERMEDIATE", 
                                                si_tag == LDAP_TAG_SYNC_REFRESH_PRESENT ?
                                                "REFRESH_PRESENT" : "REFRESH_DELETE" );
+                                       if ( si->si_refreshDone ) {
+                                               Debug( LDAP_DEBUG_ANY, "do_syncrep2: %s "
+                                                               "server sent multiple refreshDone "
+                                                               "messages? Ending session\n",
+                                                               si->si_ridtxt );
+                                               rc = LDAP_PROTOCOL_ERROR;
+                                               goto done;
+                                       }
                                        if ( si_tag == LDAP_TAG_SYNC_REFRESH_DELETE ) {
                                                si->si_refreshDelete = 1;
                                        } else {
@@ -1754,9 +1845,9 @@ logerr:
                                                si->si_refreshDone = 1;
                                        }
                                        ber_scanf( ber, /*"{"*/ "}" );
-                                       if ( si->si_refreshDone ) {
-                                               Debug( LDAP_DEBUG_SYNC, "do_syncrep1: %s finished refresh\n",
-                                                       si->si_ridtxt );
+                                       if ( refreshing && si->si_refreshDone ) {
+                                               refresh_finished( si );
+                                               refreshing = 0;
                                        }
                                        break;
                                case LDAP_TAG_SYNC_ID_SET:
@@ -1903,6 +1994,9 @@ done:
                        "do_syncrep2: %s (%d) %s\n",
                        si->si_ridtxt, err, ldap_err2string( err ) );
        }
+       if ( refreshing && ( rc || si->si_refreshDone ) ) {
+               refresh_finished( si );
+       }
 
        slap_sync_cookie_free( &syncCookie, 0 );
        slap_sync_cookie_free( &syncCookie_req, 0 );
@@ -2041,9 +2135,6 @@ do_syncrepl(
 
        /* Establish session, do search */
        if ( !si->si_ld ) {
-               si->si_refreshDelete = 0;
-               si->si_refreshPresent = 0;
-
                if ( si->si_presentlist ) {
                    presentlist_free( si->si_presentlist );
                    si->si_presentlist = NULL;
@@ -2054,6 +2145,13 @@ do_syncrepl(
                op->o_dn = op->o_bd->be_rootdn;
                op->o_ndn = op->o_bd->be_rootndn;
                rc = do_syncrep1( op, si );
+       } else if ( !si->si_msgid ) {
+               /* We got a SYNC_BUSY, now told to resume */
+               rc = ldap_sync_search( si, op->o_tmpmemctx );
+       }
+       if ( rc == SYNC_BUSY ) {
+               ldap_pvt_thread_mutex_unlock( &si->si_mutex );
+               return NULL;
        }
 
 reload:
@@ -6007,6 +6105,9 @@ syncinfo_free( syncinfo_t *sie, int free_all )
                        ch_free( npe );
                }
                if ( sie->si_cookieState ) {
+                       /* Could be called from do_syncrepl (server unpaused) */
+                       refresh_finished( sie );
+
                        sie->si_cookieState->cs_ref--;
                        if ( !sie->si_cookieState->cs_ref ) {
                                ch_free( sie->si_cookieState->cs_sids );
@@ -6016,6 +6117,8 @@ syncinfo_free( syncinfo_t *sie, int free_all )
                                ch_free( sie->si_cookieState->cs_psids );
                                ber_bvarray_free( sie->si_cookieState->cs_pvals );
                                ldap_pvt_thread_mutex_destroy( &sie->si_cookieState->cs_pmutex );
+                               ldap_pvt_thread_mutex_destroy( &sie->si_cookieState->cs_refresh_mutex );
+                               assert( sie->si_cookieState->cs_refreshing == NULL );
                                ch_free( sie->si_cookieState );
                        }
                }
@@ -7231,6 +7334,7 @@ add_syncrepl(
                        si->si_cookieState = ch_calloc( 1, sizeof( cookie_state ));
                        ldap_pvt_thread_mutex_init( &si->si_cookieState->cs_mutex );
                        ldap_pvt_thread_mutex_init( &si->si_cookieState->cs_pmutex );
+                       ldap_pvt_thread_mutex_init( &si->si_cookieState->cs_refresh_mutex );
                        ldap_pvt_thread_cond_init( &si->si_cookieState->cs_cond );
 
                        c->be->be_syncinfo = si;