/* Session log data */
typedef struct slog_entry {
- struct slog_entry *se_next;
struct berval se_uuid;
struct berval se_csn;
int se_sid;
int sl_num;
int sl_size;
int sl_playing;
- slog_entry *sl_head;
- slog_entry *sl_tail;
- ldap_pvt_thread_mutex_t sl_mutex;
+ Avlnode *sl_entries;
+ ldap_pvt_thread_rdwr_t sl_mutex;
} sessionlog;
/* The main state for this overlay */
return ber_bvcmp( &m1->mt_dn, &m2->mt_dn );
}
+static int
+syncprov_sessionlog_cmp( const void *l, const void *r )
+{
+ const slog_entry *left = l, *right = r;
+ int ret = ber_bvcmp( &left->se_csn, &right->se_csn );
+ if ( !ret )
+ ret = ber_bvcmp( &left->se_uuid, &right->se_uuid );
+ /* Only time we have two modifications with same CSN is when we detect a
+ * rename during replication.
+ * We invert the test here because LDAP_REQ_MODDN is
+ * numerically greater than LDAP_REQ_MODIFY but we
+ * want it to occur first.
+ */
+ if ( !ret )
+ ret = right->se_tag - left->se_tag;
+
+ return ret;
+}
+
/* syncprov_findbase:
* finds the true DN of the base of a search (with alias dereferencing) and
* checks to make sure the base entry doesn't get replaced with a different
syncprov_info_t *si = on->on_bi.bi_private;
sessionlog *sl;
slog_entry *se;
+ int rc;
sl = si->si_logs;
{
* state with respect to such operations, so we ignore them and
* wipe out anything in the log if we see them.
*/
- ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
/* can only do this if no one else is reading the log at the moment */
- if (!sl->sl_playing) {
- while ( se = sl->sl_head ) {
- sl->sl_head = se->se_next;
- ch_free( se );
- }
- sl->sl_tail = NULL;
- sl->sl_num = 0;
+ if ( !sl->sl_playing ) {
+ tavl_free( sl->sl_entries, (AVL_FREE)ch_free );
+ sl->sl_num = 0;
+ sl->sl_entries = NULL;
}
- ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
return;
}
/* Allocate a record. UUIDs are not NUL-terminated. */
- se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
+ se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
op->o_csn.bv_len + 1 );
- se->se_next = NULL;
se->se_tag = op->o_tag;
se->se_uuid.bv_val = (char *)(&se[1]);
se->se_csn.bv_len = op->o_csn.bv_len;
se->se_sid = slap_parse_csn_sid( &se->se_csn );
- ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
if ( LogTest( LDAP_DEBUG_SYNC ) ) {
char uuidstr[40] = {};
if ( !BER_BVISEMPTY( &opc->suuid ) ) {
"adding csn=%s to sessionlog, uuid=%s\n",
op->o_log_prefix, se->se_csn.bv_val, uuidstr );
}
- if ( sl->sl_head ) {
- /* Keep the list in csn order. */
- if ( ber_bvcmp( &sl->sl_tail->se_csn, &se->se_csn ) <= 0 ) {
- sl->sl_tail->se_next = se;
- sl->sl_tail = se;
- } else {
- slog_entry **sep;
-
- for ( sep = &sl->sl_head; *sep; sep = &(*sep)->se_next ) {
- if ( ber_bvcmp( &se->se_csn, &(*sep)->se_csn ) < 0 ) {
- se->se_next = *sep;
- *sep = se;
- break;
- }
- }
- }
- } else {
- sl->sl_head = se;
- sl->sl_tail = se;
+ if ( !sl->sl_entries ) {
if ( !sl->sl_mincsn ) {
sl->sl_numcsns = 1;
sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval ));
BER_BVZERO( &sl->sl_mincsn[1] );
}
}
+ rc = tavl_insert( &sl->sl_entries, se, syncprov_sessionlog_cmp, avl_dup_error );
+ assert( rc == LDAP_SUCCESS );
sl->sl_num++;
- if (!sl->sl_playing) {
- while ( sl->sl_num > sl->sl_size ) {
- int i;
- se = sl->sl_head;
- sl->sl_head = se->se_next;
- Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
- "expiring csn=%s from sessionlog (sessionlog size=%d)\n",
- op->o_log_prefix, se->se_csn.bv_val, sl->sl_num );
- for ( i=0; i<sl->sl_numcsns; i++ )
- if ( sl->sl_sids[i] >= se->se_sid )
- break;
- if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
+ if ( !sl->sl_playing && sl->sl_num > sl->sl_size ) {
+ Avlnode *edge = tavl_end( sl->sl_entries, TAVL_DIR_LEFT );
+ while ( sl->sl_num > sl->sl_size ) {
+ int i;
+ Avlnode *next = tavl_next( edge, TAVL_DIR_RIGHT );
+ se = edge->avl_data;
Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
- "adding csn=%s to mincsn\n",
- op->o_log_prefix, se->se_csn.bv_val, 0 );
- slap_insert_csn_sids( (struct sync_cookie *)sl,
- i, se->se_sid, &se->se_csn );
- } else {
- Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_add_slog: "
- "updating mincsn for sid=%d csn=%s to %s\n",
- op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val );
- ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
+ "expiring csn=%s from sessionlog (sessionlog size=%d)\n",
+ op->o_log_prefix, se->se_csn.bv_val, sl->sl_num );
+ for ( i=0; i<sl->sl_numcsns; i++ )
+ if ( sl->sl_sids[i] >= se->se_sid )
+ break;
+ if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
+ Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: "
+ "adding csn=%s to mincsn\n",
+ op->o_log_prefix, se->se_csn.bv_val, 0 );
+ slap_insert_csn_sids( (struct sync_cookie *)sl,
+ i, se->se_sid, &se->se_csn );
+ } else {
+ Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_add_slog: "
+ "updating mincsn for sid=%d csn=%s to %s\n",
+ op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val );
+ ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
+ }
+ tavl_delete( &sl->sl_entries, se, syncprov_sessionlog_cmp );
+ ch_free( se );
+ edge = next;
+ sl->sl_num--;
}
- ch_free( se );
- sl->sl_num--;
- }
}
- ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
}
}
/* enter with sl->sl_mutex locked, release before returning */
static void
syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
- sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids )
+ sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids,
+ struct berval *mincsn )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
- slog_entry *se;
int i, j, ndel, num, nmods, mmods;
+ Avlnode *entry;
char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
BerVarray uuids;
struct berval delcsn[2];
if ( !sl->sl_num ) {
- ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
return;
}
i = 0;
nmods = 0;
sl->sl_playing++;
- ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
num * UUID_LEN, op->o_tmpmemctx );
delcsn[0].bv_val = cbuf;
BER_BVZERO(&delcsn[1]);
+ ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
/* Make a copy of the relevant UUIDs. Put the Deletes up front
* and everything else at the end. Do this first so we can
- * unlock the list mutex.
+ * let the write side manage the sessionlog again.
*/
- for ( se=sl->sl_head; se; se=se->se_next ) {
+ assert( sl->sl_entries );
+
+ /* Find first relevant log entry. If greater than mincsn, backtrack one entry */
+ {
+ slog_entry te = {0};
+ te.se_csn = *mincsn;
+ entry = tavl_find3( sl->sl_entries, &te, syncprov_sessionlog_cmp, &ndel );
+ }
+ if ( ndel > 0 && entry )
+ entry = tavl_next( entry, TAVL_DIR_LEFT );
+ /* if none, just start at beginning */
+ if ( !entry )
+ entry = tavl_end( sl->sl_entries, TAVL_DIR_LEFT );
+
+ do {
+ slog_entry *se = entry->avl_data;
int k;
+ /* Make sure writes can still make progress */
+ ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex );
ndel = 1;
for ( k=0; k<srs->sr_state.numcsns; k++ ) {
if ( se->se_sid == srs->sr_state.sids[k] ) {
}
}
if ( ndel <= 0 ) {
+ ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
continue;
}
ndel = 0;
if ( ndel > 0 ) {
Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: "
"cmp %d, too new\n", op->o_log_prefix, ndel, 0 );
+ ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
break;
}
if ( se->se_tag == LDAP_REQ_DELETE ) {
delcsn[0].bv_len = se->se_csn.bv_len;
delcsn[0].bv_val[delcsn[0].bv_len] = '\0';
} else {
- if ( se->se_tag == LDAP_REQ_ADD )
+ if ( se->se_tag == LDAP_REQ_ADD ) {
+ ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
continue;
+ }
nmods++;
j = num - nmods;
}
uuids[j].bv_len = UUID_LEN;
if ( LogTest( LDAP_DEBUG_SYNC ) ) {
- char uuidstr[40];
+ char uuidstr[40] = {};
lutil_uuidstr_from_normalized( uuids[j].bv_val, uuids[j].bv_len,
uuidstr, 40 );
Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, "%s syncprov_playlog: "
op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified",
uuidstr, delcsn[0].bv_len ? delcsn[0].bv_val : "(null)" );
}
- }
- ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex );
+ } while ( (entry = tavl_next( entry, TAVL_DIR_RIGHT )) != NULL );
+ ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
sl->sl_playing--;
- ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
ndel = i;
sl=si->si_logs;
if ( sl ) {
int do_play = 0;
- ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex );
/* Are there any log entries, and is the consumer state
* present in the session log?
*/
if ( do_play ) {
do_present = 0;
/* mutex is unlocked in playlog */
- syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids );
+ syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids, &mincsn );
} else {
- ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex );
}
}
/* Is the CSN still present in the database? */
sl = si->si_logs;
if ( !sl ) {
sl = ch_calloc( 1, sizeof( sessionlog ));
- ldap_pvt_thread_mutex_init( &sl->sl_mutex );
+ ldap_pvt_thread_rdwr_init( &sl->sl_mutex );
si->si_logs = sl;
}
sl->sl_size = size;
if ( si ) {
if ( si->si_logs ) {
sessionlog *sl = si->si_logs;
- slog_entry *se = sl->sl_head;
- while ( se ) {
- slog_entry *se_next = se->se_next;
- ch_free( se );
- se = se_next;
- }
+ tavl_free( sl->sl_entries, (AVL_FREE)ch_free );
if ( sl->sl_mincsn )
ber_bvarray_free( sl->sl_mincsn );
if ( sl->sl_sids )
ch_free( sl->sl_sids );
- ldap_pvt_thread_mutex_destroy(&si->si_logs->sl_mutex);
+ ldap_pvt_thread_rdwr_destroy(&si->si_logs->sl_mutex);
ch_free( si->si_logs );
}
if ( si->si_ctxcsn )