From: Ondřej Kuzník Date: Thu, 26 Oct 2017 11:00:20 +0000 (+0100) Subject: ITS#8486 Switch sessionlog to use TAVL X-Git-Tag: OPENLDAP_REL_ENG_2_4_54~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=234230f286f8d67c4d4bf590accbd483172dbdf8;p=thirdparty%2Fopenldap.git ITS#8486 Switch sessionlog to use TAVL --- diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index c228c5da60..b00cd73e8a 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -117,7 +117,6 @@ typedef struct syncmatches { /* Session log data */ typedef struct slog_entry { - struct slog_entry *se_next; struct berval se_uuid; struct berval se_csn; int se_sid; @@ -131,9 +130,8 @@ typedef struct sessionlog { int sl_num; int sl_size; int sl_playing; - slog_entry *sl_head; - slog_entry *sl_tail; - ldap_pvt_thread_mutex_t sl_mutex; + Avlnode *sl_entries; + ldap_pvt_thread_rdwr_t sl_mutex; } sessionlog; /* The main state for this overlay */ @@ -402,6 +400,25 @@ sp_avl_cmp( const void *c1, const void *c2 ) return ber_bvcmp( &m1->mt_dn, &m2->mt_dn ); } +static int +syncprov_sessionlog_cmp( const void *l, const void *r ) +{ + const slog_entry *left = l, *right = r; + int ret = ber_bvcmp( &left->se_csn, &right->se_csn ); + if ( !ret ) + ret = ber_bvcmp( &left->se_uuid, &right->se_uuid ); + /* Only time we have two modifications with same CSN is when we detect a + * rename during replication. + * We invert the test here because LDAP_REQ_MODDN is + * numerically greater than LDAP_REQ_MODIFY but we + * want it to occur first. + */ + if ( !ret ) + ret = right->se_tag - left->se_tag; + + return ret; +} + /* syncprov_findbase: * finds the true DN of the base of a search (with alias dereferencing) and * checks to make sure the base entry doesn't get replaced with a different @@ -1577,6 +1594,7 @@ syncprov_add_slog( Operation *op ) syncprov_info_t *si = on->on_bi.bi_private; sessionlog *sl; slog_entry *se; + int rc; sl = si->si_logs; { @@ -1586,24 +1604,20 @@ syncprov_add_slog( Operation *op ) * state with respect to such operations, so we ignore them and * wipe out anything in the log if we see them. */ - ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); /* can only do this if no one else is reading the log at the moment */ - if (!sl->sl_playing) { - while ( se = sl->sl_head ) { - sl->sl_head = se->se_next; - ch_free( se ); - } - sl->sl_tail = NULL; - sl->sl_num = 0; + if ( !sl->sl_playing ) { + tavl_free( sl->sl_entries, (AVL_FREE)ch_free ); + sl->sl_num = 0; + sl->sl_entries = NULL; } - ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); return; } /* Allocate a record. UUIDs are not NUL-terminated. */ - se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + + se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + op->o_csn.bv_len + 1 ); - se->se_next = NULL; se->se_tag = op->o_tag; se->se_uuid.bv_val = (char *)(&se[1]); @@ -1616,7 +1630,7 @@ syncprov_add_slog( Operation *op ) se->se_csn.bv_len = op->o_csn.bv_len; se->se_sid = slap_parse_csn_sid( &se->se_csn ); - ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); if ( LogTest( LDAP_DEBUG_SYNC ) ) { char uuidstr[40] = {}; if ( !BER_BVISEMPTY( &opc->suuid ) ) { @@ -1628,25 +1642,7 @@ syncprov_add_slog( Operation *op ) "adding csn=%s to sessionlog, uuid=%s\n", op->o_log_prefix, se->se_csn.bv_val, uuidstr ); } - if ( sl->sl_head ) { - /* Keep the list in csn order. */ - if ( ber_bvcmp( &sl->sl_tail->se_csn, &se->se_csn ) <= 0 ) { - sl->sl_tail->se_next = se; - sl->sl_tail = se; - } else { - slog_entry **sep; - - for ( sep = &sl->sl_head; *sep; sep = &(*sep)->se_next ) { - if ( ber_bvcmp( &se->se_csn, &(*sep)->se_csn ) < 0 ) { - se->se_next = *sep; - *sep = se; - break; - } - } - } - } else { - sl->sl_head = se; - sl->sl_tail = se; + if ( !sl->sl_entries ) { if ( !sl->sl_mincsn ) { sl->sl_numcsns = 1; sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval )); @@ -1656,35 +1652,40 @@ syncprov_add_slog( Operation *op ) BER_BVZERO( &sl->sl_mincsn[1] ); } } + rc = tavl_insert( &sl->sl_entries, se, syncprov_sessionlog_cmp, avl_dup_error ); + assert( rc == LDAP_SUCCESS ); sl->sl_num++; - if (!sl->sl_playing) { - while ( sl->sl_num > sl->sl_size ) { - int i; - se = sl->sl_head; - sl->sl_head = se->se_next; - Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " - "expiring csn=%s from sessionlog (sessionlog size=%d)\n", - op->o_log_prefix, se->se_csn.bv_val, sl->sl_num ); - for ( i=0; isl_numcsns; i++ ) - if ( sl->sl_sids[i] >= se->se_sid ) - break; - if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) { + if ( !sl->sl_playing && sl->sl_num > sl->sl_size ) { + Avlnode *edge = tavl_end( sl->sl_entries, TAVL_DIR_LEFT ); + while ( sl->sl_num > sl->sl_size ) { + int i; + Avlnode *next = tavl_next( edge, TAVL_DIR_RIGHT ); + se = edge->avl_data; Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " - "adding csn=%s to mincsn\n", - op->o_log_prefix, se->se_csn.bv_val, 0 ); - slap_insert_csn_sids( (struct sync_cookie *)sl, - i, se->se_sid, &se->se_csn ); - } else { - Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_add_slog: " - "updating mincsn for sid=%d csn=%s to %s\n", - op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val ); - ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn ); + "expiring csn=%s from sessionlog (sessionlog size=%d)\n", + op->o_log_prefix, se->se_csn.bv_val, sl->sl_num ); + for ( i=0; isl_numcsns; i++ ) + if ( sl->sl_sids[i] >= se->se_sid ) + break; + if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) { + Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " + "adding csn=%s to mincsn\n", + op->o_log_prefix, se->se_csn.bv_val, 0 ); + slap_insert_csn_sids( (struct sync_cookie *)sl, + i, se->se_sid, &se->se_csn ); + } else { + Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_add_slog: " + "updating mincsn for sid=%d csn=%s to %s\n", + op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val ); + ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn ); + } + tavl_delete( &sl->sl_entries, se, syncprov_sessionlog_cmp ); + ch_free( se ); + edge = next; + sl->sl_num--; } - ch_free( se ); - sl->sl_num--; - } } - ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); } } @@ -1701,17 +1702,18 @@ playlog_cb( Operation *op, SlapReply *rs ) /* enter with sl->sl_mutex locked, release before returning */ static void syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, - sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids ) + sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids, + struct berval *mincsn ) { slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; - slog_entry *se; int i, j, ndel, num, nmods, mmods; + Avlnode *entry; char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; BerVarray uuids; struct berval delcsn[2]; if ( !sl->sl_num ) { - ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); return; } @@ -1719,7 +1721,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, i = 0; nmods = 0; sl->sl_playing++; - ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) + num * UUID_LEN, op->o_tmpmemctx ); @@ -1729,13 +1731,31 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, delcsn[0].bv_val = cbuf; BER_BVZERO(&delcsn[1]); + ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); /* Make a copy of the relevant UUIDs. Put the Deletes up front * and everything else at the end. Do this first so we can - * unlock the list mutex. + * let the write side manage the sessionlog again. */ - for ( se=sl->sl_head; se; se=se->se_next ) { + assert( sl->sl_entries ); + + /* Find first relevant log entry. If greater than mincsn, backtrack one entry */ + { + slog_entry te = {0}; + te.se_csn = *mincsn; + entry = tavl_find3( sl->sl_entries, &te, syncprov_sessionlog_cmp, &ndel ); + } + if ( ndel > 0 && entry ) + entry = tavl_next( entry, TAVL_DIR_LEFT ); + /* if none, just start at beginning */ + if ( !entry ) + entry = tavl_end( sl->sl_entries, TAVL_DIR_LEFT ); + + do { + slog_entry *se = entry->avl_data; int k; + /* Make sure writes can still make progress */ + ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex ); ndel = 1; for ( k=0; ksr_state.numcsns; k++ ) { if ( se->se_sid == srs->sr_state.sids[k] ) { @@ -1744,6 +1764,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, } } if ( ndel <= 0 ) { + ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); continue; } ndel = 0; @@ -1756,6 +1777,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, if ( ndel > 0 ) { Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: " "cmp %d, too new\n", op->o_log_prefix, ndel, 0 ); + ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); break; } if ( se->se_tag == LDAP_REQ_DELETE ) { @@ -1765,8 +1787,10 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, delcsn[0].bv_len = se->se_csn.bv_len; delcsn[0].bv_val[delcsn[0].bv_len] = '\0'; } else { - if ( se->se_tag == LDAP_REQ_ADD ) + if ( se->se_tag == LDAP_REQ_ADD ) { + ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); continue; + } nmods++; j = num - nmods; } @@ -1775,7 +1799,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, uuids[j].bv_len = UUID_LEN; if ( LogTest( LDAP_DEBUG_SYNC ) ) { - char uuidstr[40]; + char uuidstr[40] = {}; lutil_uuidstr_from_normalized( uuids[j].bv_val, uuids[j].bv_len, uuidstr, 40 ); Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_playlog: " @@ -1783,10 +1807,12 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified", uuidstr, delcsn[0].bv_len ? delcsn[0].bv_val : "(null)" ); } - } - ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); + } while ( (entry = tavl_next( entry, TAVL_DIR_RIGHT )) != NULL ); + ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); sl->sl_playing--; - ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); ndel = i; @@ -2826,7 +2852,7 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { sl=si->si_logs; if ( sl ) { int do_play = 0; - ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); /* Are there any log entries, and is the consumer state * present in the session log? */ @@ -2853,9 +2879,9 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { if ( do_play ) { do_present = 0; /* mutex is unlocked in playlog */ - syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids ); + syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids, &mincsn ); } else { - ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); } } /* Is the CSN still present in the database? */ @@ -3185,7 +3211,7 @@ sp_cf_gen(ConfigArgs *c) sl = si->si_logs; if ( !sl ) { sl = ch_calloc( 1, sizeof( sessionlog )); - ldap_pvt_thread_mutex_init( &sl->sl_mutex ); + ldap_pvt_thread_rdwr_init( &sl->sl_mutex ); si->si_logs = sl; } sl->sl_size = size; @@ -3490,19 +3516,14 @@ syncprov_db_destroy( if ( si ) { if ( si->si_logs ) { sessionlog *sl = si->si_logs; - slog_entry *se = sl->sl_head; - while ( se ) { - slog_entry *se_next = se->se_next; - ch_free( se ); - se = se_next; - } + tavl_free( sl->sl_entries, (AVL_FREE)ch_free ); if ( sl->sl_mincsn ) ber_bvarray_free( sl->sl_mincsn ); if ( sl->sl_sids ) ch_free( sl->sl_sids ); - ldap_pvt_thread_mutex_destroy(&si->si_logs->sl_mutex); + ldap_pvt_thread_rdwr_destroy(&si->si_logs->sl_mutex); ch_free( si->si_logs ); } if ( si->si_ctxcsn )