]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
ITS#8486 Switch sessionlog to use TAVL
authorOndřej Kuzník <ondra@openldap.org>
Thu, 26 Oct 2017 11:00:20 +0000 (12:00 +0100)
committerQuanah Gibson-Mount <quanah@openldap.org>
Thu, 1 Oct 2020 15:01:41 +0000 (15:01 +0000)
servers/slapd/overlays/syncprov.c

index c228c5da600da0aa7e5ccab95b7d500435cdebc7..b00cd73e8aa4b237079e7cd0b9df71eb4c7495b4 100644 (file)
@@ -117,7 +117,6 @@ typedef struct syncmatches {
 
 /* Session log data */
 typedef struct slog_entry {
-       struct slog_entry *se_next;
        struct berval se_uuid;
        struct berval se_csn;
        int     se_sid;
@@ -131,9 +130,8 @@ typedef struct sessionlog {
        int             sl_num;
        int             sl_size;
        int             sl_playing;
-       slog_entry *sl_head;
-       slog_entry *sl_tail;
-       ldap_pvt_thread_mutex_t sl_mutex;
+       Avlnode *sl_entries;
+       ldap_pvt_thread_rdwr_t sl_mutex;
 } sessionlog;
 
 /* The main state for this overlay */
@@ -402,6 +400,25 @@ sp_avl_cmp( const void *c1, const void *c2 )
        return ber_bvcmp( &m1->mt_dn, &m2->mt_dn );
 }
 
+static int
+syncprov_sessionlog_cmp( const void *l, const void *r )
+{
+       const slog_entry *left = l, *right = r;
+       int ret = ber_bvcmp( &left->se_csn, &right->se_csn );
+       if ( !ret )
+               ret = ber_bvcmp( &left->se_uuid, &right->se_uuid );
+       /* Only time we have two modifications with same CSN is when we detect a
+        * rename during replication.
+        * We invert the test here because LDAP_REQ_MODDN is
+        * numerically greater than LDAP_REQ_MODIFY but we
+        * want it to occur first.
+        */
+       if ( !ret )
+               ret = right->se_tag - left->se_tag;
+
+       return ret;
+}
+
 /* syncprov_findbase:
  *   finds the true DN of the base of a search (with alias dereferencing) and
  * checks to make sure the base entry doesn't get replaced with a different
@@ -1577,6 +1594,7 @@ syncprov_add_slog( Operation *op )
        syncprov_info_t         *si = on->on_bi.bi_private;
        sessionlog *sl;
        slog_entry *se;
+       int rc;
 
        sl = si->si_logs;
        {
@@ -1586,24 +1604,20 @@ syncprov_add_slog( Operation *op )
                         * state with respect to such operations, so we ignore them and
                         * wipe out anything in the log if we see them.
                         */
-                       ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+                       ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
                        /* can only do this if no one else is reading the log at the moment */
-                       if (!sl->sl_playing) {
-                       while ( se = sl->sl_head ) {
-                               sl->sl_head = se->se_next;
-                               ch_free( se );
-                       }
-                       sl->sl_tail = NULL;
-                       sl->sl_num = 0;
+                       if ( !sl->sl_playing ) {
+                               tavl_free( sl->sl_entries, (AVL_FREE)ch_free );
+                               sl->sl_num = 0;
+                               sl->sl_entries = NULL;
                        }
-                       ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+                       ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
                        return;
                }
 
                /* Allocate a record. UUIDs are not NUL-terminated. */
-               se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + 
+               se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
                        op->o_csn.bv_len + 1 );
-               se->se_next = NULL;
                se->se_tag = op->o_tag;
 
                se->se_uuid.bv_val = (char *)(&se[1]);
@@ -1616,7 +1630,7 @@ syncprov_add_slog( Operation *op )
                se->se_csn.bv_len = op->o_csn.bv_len;
                se->se_sid = slap_parse_csn_sid( &se->se_csn );
 
-               ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+               ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
                if ( LogTest( LDAP_DEBUG_SYNC ) ) {
                        char uuidstr[40] = {};
                        if ( !BER_BVISEMPTY( &opc->suuid ) ) {
@@ -1628,25 +1642,7 @@ syncprov_add_slog( Operation *op )
                                "adding csn=%s to sessionlog, uuid=%s\n",
                                op->o_log_prefix, se->se_csn.bv_val, uuidstr );
                }
-               if ( sl->sl_head ) {
-                       /* Keep the list in csn order. */
-                       if ( ber_bvcmp( &sl->sl_tail->se_csn, &se->se_csn ) <= 0 ) {
-                               sl->sl_tail->se_next = se;
-                               sl->sl_tail = se;
-                       } else {
-                               slog_entry **sep;
-
-                               for ( sep = &sl->sl_head; *sep; sep = &(*sep)->se_next ) {
-                                       if ( ber_bvcmp( &se->se_csn, &(*sep)->se_csn ) < 0 ) {
-                                               se->se_next = *sep;
-                                               *sep = se;
-                                               break;
-                                       }
-                               }
-                       }
-               } else {
-                       sl->sl_head = se;
-                       sl->sl_tail = se;
+               if ( !sl->sl_entries ) {
                        if ( !sl->sl_mincsn ) {
                                sl->sl_numcsns = 1;
                                sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval ));
@@ -1656,35 +1652,40 @@ syncprov_add_slog( Operation *op )
                                BER_BVZERO( &sl->sl_mincsn[1] );
                        }
                }
+               rc = tavl_insert( &sl->sl_entries, se, syncprov_sessionlog_cmp, avl_dup_error );
+               assert( rc == LDAP_SUCCESS );
                sl->sl_num++;
-               if (!sl->sl_playing) {
-               while ( sl->sl_num > sl->sl_size ) {
-                       int i;
-                       se = sl->sl_head;
-                       sl->sl_head = se->se_next;
-                       Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
-                               "expiring csn=%s from sessionlog (sessionlog size=%d)\n",
-                               op->o_log_prefix, se->se_csn.bv_val, sl->sl_num );
-                       for ( i=0; i<sl->sl_numcsns; i++ )
-                               if ( sl->sl_sids[i] >= se->se_sid )
-                                       break;
-                       if  ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
+               if ( !sl->sl_playing && sl->sl_num > sl->sl_size ) {
+                       Avlnode *edge = tavl_end( sl->sl_entries, TAVL_DIR_LEFT );
+                       while ( sl->sl_num > sl->sl_size ) {
+                               int i;
+                               Avlnode *next = tavl_next( edge, TAVL_DIR_RIGHT );
+                               se = edge->avl_data;
                                Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
-                                       "adding csn=%s to mincsn\n",
-                                       op->o_log_prefix, se->se_csn.bv_val, 0 );
-                               slap_insert_csn_sids( (struct sync_cookie *)sl,
-                                       i, se->se_sid, &se->se_csn );
-                       } else {
-                               Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_add_slog: "
-                                       "updating mincsn for sid=%d csn=%s to %s\n",
-                                       op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val );
-                               ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
+                                       "expiring csn=%s from sessionlog (sessionlog size=%d)\n",
+                                       op->o_log_prefix, se->se_csn.bv_val, sl->sl_num );
+                               for ( i=0; i<sl->sl_numcsns; i++ )
+                                       if ( sl->sl_sids[i] >= se->se_sid )
+                                               break;
+                               if  ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
+                                       Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
+                                               "adding csn=%s to mincsn\n",
+                                               op->o_log_prefix, se->se_csn.bv_val, 0 );
+                                       slap_insert_csn_sids( (struct sync_cookie *)sl,
+                                               i, se->se_sid, &se->se_csn );
+                               } else {
+                                       Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_add_slog: "
+                                               "updating mincsn for sid=%d csn=%s to %s\n",
+                                               op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val );
+                                       ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
+                               }
+                               tavl_delete( &sl->sl_entries, se, syncprov_sessionlog_cmp );
+                               ch_free( se );
+                               edge = next;
+                               sl->sl_num--;
                        }
-                       ch_free( se );
-                       sl->sl_num--;
-               }
                }
-               ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+               ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
        }
 }
 
@@ -1701,17 +1702,18 @@ playlog_cb( Operation *op, SlapReply *rs )
 /* enter with sl->sl_mutex locked, release before returning */
 static void
 syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
-       sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids )
+       sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids,
+       struct berval *mincsn )
 {
        slap_overinst           *on = (slap_overinst *)op->o_bd->bd_info;
-       slog_entry *se;
        int i, j, ndel, num, nmods, mmods;
+       Avlnode *entry;
        char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
        BerVarray uuids;
        struct berval delcsn[2];
 
        if ( !sl->sl_num ) {
-               ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+               ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
                return;
        }
 
@@ -1719,7 +1721,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
        i = 0;
        nmods = 0;
        sl->sl_playing++;
-       ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+       ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
 
        uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
                num * UUID_LEN, op->o_tmpmemctx );
@@ -1729,13 +1731,31 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
        delcsn[0].bv_val = cbuf;
        BER_BVZERO(&delcsn[1]);
 
+       ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
        /* Make a copy of the relevant UUIDs. Put the Deletes up front
         * and everything else at the end. Do this first so we can
-        * unlock the list mutex.
+        * let the write side manage the sessionlog again.
         */
-       for ( se=sl->sl_head; se; se=se->se_next ) {
+       assert( sl->sl_entries );
+
+       /* Find first relevant log entry. If greater than mincsn, backtrack one entry */
+       {
+               slog_entry te = {0};
+               te.se_csn = *mincsn;
+               entry = tavl_find3( sl->sl_entries, &te, syncprov_sessionlog_cmp, &ndel );
+       }
+       if ( ndel > 0 && entry )
+               entry = tavl_next( entry, TAVL_DIR_LEFT );
+       /* if none, just start at beginning */
+       if ( !entry )
+               entry = tavl_end( sl->sl_entries, TAVL_DIR_LEFT );
+
+       do {
+               slog_entry *se = entry->avl_data;
                int k;
 
+               /* Make sure writes can still make progress */
+               ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex );
                ndel = 1;
                for ( k=0; k<srs->sr_state.numcsns; k++ ) {
                        if ( se->se_sid == srs->sr_state.sids[k] ) {
@@ -1744,6 +1764,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
                        }
                }
                if ( ndel <= 0 ) {
+                       ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
                        continue;
                }
                ndel = 0;
@@ -1756,6 +1777,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
                if ( ndel > 0 ) {
                        Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: "
                                "cmp %d, too new\n", op->o_log_prefix, ndel, 0 );
+                       ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
                        break;
                }
                if ( se->se_tag == LDAP_REQ_DELETE ) {
@@ -1765,8 +1787,10 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
                        delcsn[0].bv_len = se->se_csn.bv_len;
                        delcsn[0].bv_val[delcsn[0].bv_len] = '\0';
                } else {
-                       if ( se->se_tag == LDAP_REQ_ADD )
+                       if ( se->se_tag == LDAP_REQ_ADD ) {
+                               ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
                                continue;
+                       }
                        nmods++;
                        j = num - nmods;
                }
@@ -1775,7 +1799,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
                uuids[j].bv_len = UUID_LEN;
 
                if ( LogTest( LDAP_DEBUG_SYNC ) ) {
-                       char uuidstr[40];
+                       char uuidstr[40] = {};
                        lutil_uuidstr_from_normalized( uuids[j].bv_val, uuids[j].bv_len,
                                uuidstr, 40 );
                        Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_playlog: "
@@ -1783,10 +1807,12 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
                                op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified",
                                uuidstr, delcsn[0].bv_len ? delcsn[0].bv_val : "(null)" );
                }
-       }
-       ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+               ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
+       } while ( (entry = tavl_next( entry, TAVL_DIR_RIGHT )) != NULL );
+       ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex );
+       ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
        sl->sl_playing--;
-       ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+       ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
 
        ndel = i;
 
@@ -2826,7 +2852,7 @@ no_change:        if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                sl=si->si_logs;
                if ( sl ) {
                        int do_play = 0;
-                       ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+                       ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
                        /* Are there any log entries, and is the consumer state
                         * present in the session log?
                         */
@@ -2853,9 +2879,9 @@ no_change:        if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                        if ( do_play ) {
                                do_present = 0;
                                /* mutex is unlocked in playlog */
-                               syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids );
+                               syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids, &mincsn );
                        } else {
-                               ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+                               ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
                        }
                }
                /* Is the CSN still present in the database? */
@@ -3185,7 +3211,7 @@ sp_cf_gen(ConfigArgs *c)
                sl = si->si_logs;
                if ( !sl ) {
                        sl = ch_calloc( 1, sizeof( sessionlog ));
-                       ldap_pvt_thread_mutex_init( &sl->sl_mutex );
+                       ldap_pvt_thread_rdwr_init( &sl->sl_mutex );
                        si->si_logs = sl;
                }
                sl->sl_size = size;
@@ -3490,19 +3516,14 @@ syncprov_db_destroy(
        if ( si ) {
                if ( si->si_logs ) {
                        sessionlog *sl = si->si_logs;
-                       slog_entry *se = sl->sl_head;
 
-                       while ( se ) {
-                               slog_entry *se_next = se->se_next;
-                               ch_free( se );
-                               se = se_next;
-                       }
+                       tavl_free( sl->sl_entries, (AVL_FREE)ch_free );
                        if ( sl->sl_mincsn )
                                ber_bvarray_free( sl->sl_mincsn );
                        if ( sl->sl_sids )
                                ch_free( sl->sl_sids );
 
-                       ldap_pvt_thread_mutex_destroy(&si->si_logs->sl_mutex);
+                       ldap_pvt_thread_rdwr_destroy(&si->si_logs->sl_mutex);
                        ch_free( si->si_logs );
                }
                if ( si->si_ctxcsn )