struct evdns_base *dnsbase;
+struct event *lload_timeout_event;
+
#ifndef SLAPD_LISTEN_BACKLOG
#define SLAPD_LISTEN_BACKLOG 1024
#endif /* ! SLAPD_LISTEN_BACKLOG */
int i, rc;
Backend *b;
struct event_base *base;
+ struct event *event;
assert( daemon_base != NULL );
current_backend = LDAP_CIRCLEQ_FIRST( &backend );
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
- struct event *retry_event =
- evtimer_new( daemon_base, backend_connect, b );
-
- if ( !retry_event ) {
+ event = evtimer_new( daemon_base, backend_connect, b );
+ if ( !event ) {
Debug( LDAP_DEBUG_ANY, "lloadd: "
"failed to allocate retry event\n" );
return -1;
}
- b->b_retry_event = retry_event;
+ b->b_retry_event = event;
backend_retry( b );
}
+ event = evtimer_new( daemon_base, operations_timeout, event_self_cbarg() );
+ if ( !event ) {
+ Debug( LDAP_DEBUG_ANY, "lloadd: "
+ "failed to allocate timeout event\n" );
+ return -1;
+ }
+ lload_timeout_event = event;
+
+ /* TODO: should we just add it with any timeout and re-add when the timeout
+ * changes? */
+ if ( lload_timeout_api ) {
+ event_add( event, lload_timeout_api );
+ }
+
lloadd_inited = 1;
rc = event_base_dispatch( daemon_base );
Debug( LDAP_DEBUG_ANY, "lloadd shutdown: "
operation_destroy_from_upstream( op );
CONNECTION_UNLOCK(c);
}
+
+void
+connection_timeout( Connection *upstream, time_t threshold )
+{
+ Operation *op;
+ TAvlnode *ops = NULL, *node;
+ Backend *b = upstream->c_private;
+ int rc, nops = 0;
+
+ for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node &&
+ ((Operation *)node->avl_data)->o_start < threshold; /* shortcut */
+ node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
+ Operation *found_op;
+
+ op = node->avl_data;
+
+ /* Have we received another response since? */
+ if ( op->o_last_response && op->o_last_response >= threshold ) {
+ continue;
+ }
+
+ op->o_upstream_refcnt++;
+ found_op = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
+ assert( op == found_op );
+
+ rc = tavl_insert( &ops, op, operation_upstream_cmp, avl_dup_error );
+ assert( rc == LDAP_SUCCESS );
+
+ Debug( LDAP_DEBUG_STATS2, "connection_timeout: "
+ "timing out %s from connid=%lu msgid=%d sent to connid=%lu as "
+ "msgid=%d\n",
+ slap_msgtype2str( op->o_tag ), op->o_client_connid,
+ op->o_client_msgid, op->o_upstream_connid,
+ op->o_upstream_msgid );
+ nops++;
+ }
+
+ if ( nops == 0 ) {
+ return;
+ }
+ upstream->c_n_ops_executing -= nops;
+ Debug( LDAP_DEBUG_STATS, "connection_timeout: "
+ "timing out %d operations for connid=%lu\n",
+ nops, upstream->c_connid );
+ CONNECTION_UNLOCK_INCREF(upstream);
+
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
+ b->b_n_ops_executing -= nops;
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+
+ for ( node = tavl_end( ops, TAVL_DIR_LEFT ); node;
+ node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
+ Connection *client;
+
+ op = node->avl_data;
+
+ ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
+ client = op->o_client;
+ if ( !client ) {
+ ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+ continue;
+ }
+ CONNECTION_LOCK(client);
+ ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+
+ /* operation_send_reject_locked unlocks and destroys client on
+ * failure */
+ if ( operation_send_reject_locked( op,
+ op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED :
+ LDAP_ADMINLIMIT_EXCEEDED,
+ "upstream did not respond in time", 0 ) == LDAP_SUCCESS ) {
+ CONNECTION_UNLOCK_OR_DESTROY(client);
+ }
+
+ if ( rc == LDAP_SUCCESS ) {
+ rc = operation_send_abandon( op );
+ }
+
+ CONNECTION_LOCK(upstream);
+ op->o_upstream_refcnt--;
+ operation_destroy_from_upstream( op );
+ CONNECTION_UNLOCK(upstream);
+ }
+
+ /* TODO: if operation_send_abandon failed, we need to kill the upstream */
+ if ( rc == LDAP_SUCCESS ) {
+ connection_write_cb( -1, 0, upstream );
+ }
+
+ CONNECTION_LOCK_DECREF(upstream);
+ /* just dispose of the AVL, most operations should already be gone */
+ tavl_free( ops, NULL );
+}
+
+static void
+backend_timeout(
+ Backend *b,
+ struct ConnSt *cq,
+ Connection **lastp,
+ time_t threshold )
+{
+ Connection *c, *old;
+ unsigned long last_connid;
+
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
+ if ( !*lastp ) {
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+ return;
+ }
+ last_connid = (*lastp)->c_connid;
+ c = LDAP_CIRCLEQ_LOOP_NEXT( cq, *lastp, c_next );
+ CONNECTION_LOCK(c);
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+
+ /*
+ * Ugh... concurrency is annoying:
+ * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
+ * order
+ * - the connection with the highest c_connid is maintained at *lastp
+ * - we can only use cq when we hold b->b_mutex
+ * - connections might be added to or removed from cq while we're busy
+ * processing connections
+ * - connection_destroy touches cq
+ * - we can't even hold locks of two different connections
+ * - we need a way to detect we've finished looping around cq for some
+ * definition of looping around
+ *
+ * So as a result, 90% of the code below is spent navigating that...
+ */
+ while ( c->c_connid <= last_connid ) {
+ Debug( LDAP_DEBUG_TRACE, "backend_timeout: "
+ "timing out operations for connid=%lu which has %ld "
+ "pending ops\n",
+ c->c_connid, c->c_n_ops_executing );
+ connection_timeout( c, threshold );
+ if ( c->c_connid == last_connid ) {
+ break;
+ }
+
+ CONNECTION_UNLOCK_INCREF(c);
+
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
+ old = c;
+ c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
+ CONNECTION_LOCK(c);
+ CONNECTION_UNLOCK_INCREF(c);
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+
+ CONNECTION_LOCK_DECREF(old);
+ CONNECTION_UNLOCK_OR_DESTROY(old);
+
+ CONNECTION_LOCK_DECREF(c);
+ }
+ CONNECTION_UNLOCK_OR_DESTROY(c);
+}
+
+void
+operations_timeout( evutil_socket_t s, short what, void *arg )
+{
+ struct event *self = arg;
+ Backend *b;
+ time_t threshold;
+
+ Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
+ "running timeout task\n" );
+ if ( !lload_timeout_api ) goto done;
+
+ threshold = slap_get_time() - lload_timeout_api->tv_sec;
+
+ LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
+ if ( b->b_n_ops_executing == 0 ) continue;
+
+ Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
+ "timing out binds for backend uri=%s\n",
+ b->b_uri.bv_val );
+ backend_timeout( b, &b->b_bindconns, &b->b_last_bindconn, threshold );
+
+ Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
+ "timing out other operations for backend uri=%s\n",
+ b->b_uri.bv_val );
+ backend_timeout( b, &b->b_conns, &b->b_last_conn, threshold );
+ }
+done:
+ Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
+ "timeout task finished\n" );
+ evtimer_add( self, lload_timeout_api );
+}