return rs->sr_err;
}
-/* enter with sl->sl_mutex locked, release before returning */
-static void
-syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
- sync_control *srs, BerVarray ctxcsn, int numcsns, int *sids )
+/*
+ * Check whether the last nmods UUIDs in the uuids list exist in the database
+ * and (still) match the op filter, zero out the bv_len of any that still exist
+ * and return the number of UUIDs we have confirmed are gone now.
+ */
+static int
+check_uuidlist_presence(
+ Operation *op,
+ struct berval *uuids,
+ int len,
+ int nmods )
+{
+ slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
+ Operation fop = *op;
+ SlapReply frs = { REP_RESULT };
+ Filter mf, af;
+ AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
+ slap_callback cb = {0};
+ int i, mods = nmods;
+
+ fop.o_sync_mode = 0;
+ fop.o_callback = &cb;
+ fop.ors_limit = NULL;
+ fop.ors_tlimit = SLAP_NO_LIMIT;
+ fop.ors_attrs = slap_anlist_all_attributes;
+ fop.ors_attrsonly = 0;
+ fop.o_managedsait = SLAP_CONTROL_CRITICAL;
+
+ af.f_choice = LDAP_FILTER_AND;
+ af.f_next = NULL;
+ af.f_and = &mf;
+ mf.f_choice = LDAP_FILTER_EQUALITY;
+ mf.f_ava = &eq;
+ mf.f_av_desc = slap_schema.si_ad_entryUUID;
+ mf.f_next = fop.ors_filter;
+
+ fop.ors_filter = ⁡
+
+ cb.sc_response = playlog_cb;
+
+ fop.o_bd->bd_info = (BackendInfo *)on->on_info;
+ for ( i=0; i<nmods; i++ ) {
+ mf.f_av_value = uuids[ len - 1 - i ];
+ cb.sc_private = NULL;
+ fop.ors_slimit = 1;
+
+ if ( BER_BVISEMPTY( &mf.f_av_value ) ) {
+ mods--;
+ continue;
+ }
+
+ rs_reinit( &frs, REP_RESULT );
+ fop.o_bd->be_search( &fop, &frs );
+ if ( cb.sc_private ) {
+ uuids[ len - 1 - i ].bv_len = 0;
+ mods--;
+ }
+ }
+ fop.o_bd->bd_info = (BackendInfo *)on;
+
+ return mods;
+}
+
+static int
+syncprov_play_sessionlog( Operation *op, SlapReply *rs, sync_control *srs,
+ BerVarray ctxcsn, int numcsns, int *sids,
+ struct berval *mincsn, int minsid )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
+ syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
+ sessionlog *sl = si->si_logs;
+ int i, j, ndel, num, nmods, mmods, do_play = 0, rc = -1;
+ BerVarray uuids, csns;
+ struct berval uuid[2] = {}, csn[2] = {};
slog_entry *se;
- int i, j, ndel, num, nmods, mmods;
- char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
- BerVarray uuids;
- struct berval delcsn[2];
+ TAvlnode *entry;
+ ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+ /* Are there any log entries, and is the consumer state
+ * present in the session log?
+ */
if ( !sl->sl_num ) {
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
- return;
+ return rc;
+ }
+ assert( sl->sl_num > 0 );
+
+ for ( i=0; i<sl->sl_numcsns; i++ ) {
+ /* SID not present == new enough */
+ if ( minsid < sl->sl_sids[i] ) {
+ do_play = 1;
+ break;
+ }
+ /* SID present */
+ if ( minsid == sl->sl_sids[i] ) {
+ /* new enough? */
+ if ( ber_bvcmp( mincsn, &sl->sl_mincsn[i] ) >= 0 )
+ do_play = 1;
+ break;
+ }
+ }
+ /* SID not present == new enough */
+ if ( i == sl->sl_numcsns )
+ do_play = 1;
+
+ if ( !do_play ) {
+ ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ return rc;
}
num = sl->sl_num;
sl->sl_playing++;
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
- uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
- num * UUID_LEN, op->o_tmpmemctx );
- uuids[0].bv_val = (char *)(uuids + num + 1);
-
- delcsn[0].bv_len = 0;
- delcsn[0].bv_val = cbuf;
- BER_BVZERO(&delcsn[1]);
+ uuids = op->o_tmpalloc( (num) * sizeof( struct berval ) +
+ num * UUID_LEN, op->o_tmpmemctx );
+ uuids[0].bv_val = (char *)(uuids + num);
+ csns = op->o_tmpalloc( (num) * sizeof( struct berval ) +
+ num * LDAP_PVT_CSNSTR_BUFSIZE, op->o_tmpmemctx );
+ csns[0].bv_val = (char *)(csns + num);
/* Make a copy of the relevant UUIDs. Put the Deletes up front
* and everything else at the end. Do this first so we can
char uuidstr[40];
lutil_uuidstr_from_normalized( se->se_uuid.bv_val, se->se_uuid.bv_len,
uuidstr, 40 );
- Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: "
+ Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: "
"log entry tag=%lu uuid=%s cookie=%s\n",
op->o_log_prefix, se->se_tag, uuidstr, se->se_csn.bv_val );
}
+
ndel = 1;
for ( k=0; k<srs->sr_state.numcsns; k++ ) {
if ( se->se_sid == srs->sr_state.sids[k] ) {
}
}
if ( ndel <= 0 ) {
- Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: "
+ Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: "
"cmp %d, csn %s too old, skipping\n",
op->o_log_prefix, ndel, se->se_csn.bv_val );
continue;
}
}
if ( ndel > 0 ) {
- Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: "
+ Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: "
"cmp %d, csn %s too new, we're finished\n",
op->o_log_prefix, ndel, se->se_csn.bv_val );
break;
if ( se->se_tag == LDAP_REQ_DELETE ) {
j = i;
i++;
- AC_MEMCPY( cbuf, se->se_csn.bv_val, se->se_csn.bv_len );
- delcsn[0].bv_len = se->se_csn.bv_len;
- delcsn[0].bv_val[delcsn[0].bv_len] = '\0';
} else {
if ( se->se_tag == LDAP_REQ_ADD )
continue;
AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN);
uuids[j].bv_len = UUID_LEN;
+ csns[j].bv_val = csns[0].bv_val + (j * LDAP_PVT_CSNSTR_BUFSIZE);
+ AC_MEMCPY(csns[j].bv_val, se->se_csn.bv_val, se->se_csn.bv_len);
+ csns[j].bv_len = se->se_csn.bv_len;
+
if ( LogTest( LDAP_DEBUG_SYNC ) ) {
char uuidstr[40];
lutil_uuidstr_from_normalized( uuids[j].bv_val, uuids[j].bv_len,
uuidstr, 40 );
- Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: "
+ Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: "
"picking a %s entry uuid=%s cookie=%s\n",
op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified",
- uuidstr, delcsn[0].bv_len ? delcsn[0].bv_val : "(null)" );
+ uuidstr, csns[j].bv_len ? csns[j].bv_val : "(null)" );
}
}
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
}
}
+ /* Check mods now */
if ( mmods ) {
- Operation fop;
- int rc;
- Filter mf, af;
- AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
- slap_callback cb = {0};
-
- fop = *op;
-
- fop.o_sync_mode = 0;
- fop.o_callback = &cb;
- fop.ors_limit = NULL;
- fop.ors_tlimit = SLAP_NO_LIMIT;
- fop.ors_attrs = slap_anlist_all_attributes;
- fop.ors_attrsonly = 0;
- fop.o_managedsait = SLAP_CONTROL_CRITICAL;
-
- af.f_choice = LDAP_FILTER_AND;
- af.f_next = NULL;
- af.f_and = &mf;
- mf.f_choice = LDAP_FILTER_EQUALITY;
- mf.f_ava = &eq;
- mf.f_av_desc = slap_schema.si_ad_entryUUID;
- mf.f_next = fop.ors_filter;
-
- fop.ors_filter = ⁡
-
- cb.sc_response = playlog_cb;
- fop.o_bd->bd_info = (BackendInfo *)on->on_info;
+ check_uuidlist_presence( op, uuids, num, nmods );
+ }
- for ( i=ndel; i<num; i++ ) {
- if ( uuids[i].bv_len != 0 ) {
- SlapReply frs = { REP_RESULT };
+ /* ITS#8768 Send entries sorted by CSN order */
+ i = j = 0;
+ while ( i < ndel || j < nmods ) {
+ struct berval cookie;
+ int index;
- mf.f_av_value = uuids[i];
- cb.sc_private = NULL;
- fop.ors_slimit = 1;
- rc = fop.o_bd->be_search( &fop, &frs );
+ /* Skip over duplicate mods */
+ if ( j < nmods && BER_BVISEMPTY( &uuids[ num - 1 - j ] ) ) {
+ j++;
+ continue;
+ }
+ index = num - 1 - j;
- /* If entry was not found, add to delete list */
- if ( !cb.sc_private ) {
- uuids[ndel++] = uuids[i];
- }
- }
+ if ( i >= ndel ) {
+ j++;
+ } else if ( j >= nmods ) {
+ index = i++;
+ /* Take the oldest by CSN order */
+ } else if ( ber_bvcmp( &csns[index], &csns[i] ) < 0 ) {
+ j++;
+ } else {
+ index = i++;
}
- fop.o_bd->bd_info = (BackendInfo *)on;
- }
- if ( ndel ) {
- struct berval cookie;
- if ( delcsn[0].bv_len ) {
- slap_compose_sync_cookie( op, &cookie, delcsn, srs->sr_state.rid,
- slap_serverID ? slap_serverID : -1, delcsn );
+ uuid[0] = uuids[index];
+ csn[0] = csns[index];
- Debug( LDAP_DEBUG_SYNC, "%s syncprov_playlog: cookie=%s\n",
- op->o_log_prefix, cookie.bv_val );
+ slap_compose_sync_cookie( op, &cookie, srs->sr_state.ctxcsn,
+ srs->sr_state.rid, slap_serverID ? slap_serverID : -1, csn );
+ if ( LogTest( LDAP_DEBUG_SYNC ) ) {
+ char uuidstr[40];
+ lutil_uuidstr_from_normalized( uuid[0].bv_val, uuid[0].bv_len,
+ uuidstr, 40 );
+ Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: "
+ "sending a new disappearing entry uuid=%s cookie=%s\n",
+ op->o_log_prefix, uuidstr, cookie.bv_val );
}
- uuids[ndel].bv_val = NULL;
- syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET,
- delcsn[0].bv_len ? &cookie : NULL, 0, uuids, 1 );
- if ( delcsn[0].bv_len ) {
- op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
- }
+ /* TODO: we might batch those that share the same CSN (think present
+ * phase), but would have to limit how many we send out at once */
+ syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuid, 1 );
}
op->o_tmpfree( uuids, op->o_tmpmemctx );
+ op->o_tmpfree( csns, op->o_tmpmemctx );
+
+ return LDAP_SUCCESS;
}
static int
goto shortcut;
}
- /* Do we have a sessionlog for this search? */
- sl=si->si_logs;
- if ( sl ) {
- int do_play = 0;
- ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
- /* Are there any log entries, and is the consumer state
- * present in the session log?
- */
- if ( sl->sl_num > 0 ) {
- int i;
- for ( i=0; i<sl->sl_numcsns; i++ ) {
- /* SID not present == new enough */
- if ( minsid < sl->sl_sids[i] ) {
- do_play = 1;
- break;
- }
- /* SID present */
- if ( minsid == sl->sl_sids[i] ) {
- /* new enough? */
- if ( ber_bvcmp( &mincsn, &sl->sl_mincsn[i] ) >= 0 )
- do_play = 1;
- break;
- }
- }
- /* SID not present == new enough */
- if ( i == sl->sl_numcsns )
- do_play = 1;
- }
- if ( do_play ) {
- do_present = 0;
- /* mutex is unlocked in playlog */
- syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids );
- } else {
- ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ if ( si->si_logs ) {
+ do_present = 0;
+ if ( syncprov_play_sessionlog( op, rs, srs, ctxcsn,
+ numcsns, sids, &mincsn, minsid ) ) {
+ do_present = SS_PRESENT;
}
}
/*