Reuse the connection walking facility in timeout management.
* Needs exclusive access to the backend.
*/
void
-backend_reset( LloadBackend *b )
+backend_reset( LloadBackend *b, int gentle )
{
if ( b->b_cookie ) {
int rc;
b->b_cookie = NULL;
b->b_opening--;
}
- if ( event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
+ if ( b->b_retry_event &&
+ event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
assert( b->b_failed );
event_del( b->b_retry_event );
b->b_opening--;
ch_free( pending );
b->b_opening--;
}
- while ( !LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) ) {
- LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_preparing );
-
- CONNECTION_LOCK(c);
- Debug( LDAP_DEBUG_CONNS, "backend_reset: "
- "destroying connection being set up connid=%lu\n",
- c->c_connid );
-
- assert( c->c_live );
- CONNECTION_DESTROY(c);
- assert( !c );
- }
- while ( !LDAP_CIRCLEQ_EMPTY( &b->b_bindconns ) ) {
- LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_bindconns );
-
- CONNECTION_LOCK(c);
- Debug( LDAP_DEBUG_CONNS, "backend_reset: "
- "destroying bind connection connid=%lu, pending ops=%ld\n",
- c->c_connid, c->c_n_ops_executing );
-
- assert( c->c_live );
- CONNECTION_DESTROY(c);
- assert( !c );
- }
- while ( !LDAP_CIRCLEQ_EMPTY( &b->b_conns ) ) {
- LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_conns );
-
- CONNECTION_LOCK(c);
- Debug( LDAP_DEBUG_CONNS, "backend_reset: "
- "destroying regular connection connid=%lu, pending ops=%ld\n",
- c->c_connid, c->c_n_ops_executing );
-
- assert( c->c_live );
- CONNECTION_DESTROY(c);
- assert( !c );
- }
- if ( b->b_dns_req ) {
- evdns_getaddrinfo_cancel( b->b_dns_req );
- b->b_dns_req = NULL;
- b->b_opening--;
- }
- if ( b->b_cookie ) {
- int rc;
- rc = ldap_pvt_thread_pool_retract( b->b_cookie );
- assert( rc == 1 );
- b->b_cookie = NULL;
- b->b_opening--;
- }
- if ( b->b_retry_event &&
- event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
- assert( b->b_failed );
- event_del( b->b_retry_event );
- b->b_opening--;
- }
+ connections_walk(
+ &b->b_mutex, &b->b_preparing, lload_connection_close, &gentle );
+ assert( LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) );
assert( b->b_opening == 0 );
- assert( b->b_active == 0 );
- assert( b->b_bindavail == 0 );
b->b_failed = 0;
+
+ connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn,
+ lload_connection_close, &gentle );
+ assert( gentle || b->b_bindavail == 0 );
+
+ connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn,
+ lload_connection_close, &gentle );
+ assert( gentle || b->b_active == 0 );
}
void
"destroying backend uri='%s', numconns=%d, numbindconns=%d\n",
b->b_uri.bv_val, b->b_numconns, b->b_numbindconns );
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_numconns = b->b_numbindconns = 0;
- backend_reset( b );
+ backend_reset( b, 0 );
LDAP_CIRCLEQ_REMOVE( &backend, b, b_next );
if ( b == next ) {
assert( rc == LDAP_SUCCESS );
}
#endif /* BALANCER_MODULE */
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
ldap_pvt_thread_mutex_destroy( &b->b_mutex );
if ( b->b_retry_event ) {
}
void
-clients_walk( CONNECTION_CLIENT_WALK apply, void *argv )
+clients_walk( CONNCB apply, void *argv )
{
LloadConnection *c;
ldap_pvt_thread_mutex_lock( &clients_mutex );
}
/*
- * Expected to be run from lload_unpause_server, so there are no other threads
- * running.
+ * Called holding mutex, will walk cq calling cb on all connections whose
+ * c_connid <= cq_last->c_connid that still exist at the time we get to them.
*/
void
-lload_connection_close( LloadConnection *c )
+connections_walk_last(
+ ldap_pvt_thread_mutex_t *cq_mutex,
+ lload_c_head *cq,
+ LloadConnection *cq_last,
+ CONNCB cb,
+ void *arg )
{
- TAvlnode *node;
+ LloadConnection *c, *old;
+ unsigned long last_connid;
+
+ if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
+ return;
+ }
+ last_connid = cq_last->c_connid;
+ c = LDAP_CIRCLEQ_LOOP_NEXT( cq, cq_last, c_next );
+ assert( c->c_connid <= last_connid );
- /* We lock so we can use CONNECTION_UNLOCK_OR_DESTROY to drop the
- * connection if we can */
CONNECTION_LOCK(c);
+ ldap_pvt_thread_mutex_unlock( cq_mutex );
+
+ /*
+ * Ugh... concurrency is annoying:
+ * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
+ * order
+ * - the connection with the highest c_connid is maintained at cq_last
+ * - we can only use cq when we hold cq_mutex
+ * - connections might be added to or removed from cq while we're busy
+ * processing connections
+ * - connection_destroy touches cq
+ * - we can't even hold locks of two different connections
+ * - we need a way to detect we've finished looping around cq for some
+ * definition of looping around
+ *
+ * So as a result, 90% of the code below is spent navigating that...
+ */
+ while ( c->c_connid <= last_connid ) {
+ /* Do not permit the callback to actually free the connection even if
+ * it wants to, we need it to traverse cq */
+ c->c_refcnt++;
+ if ( cb( c, arg ) ) {
+ c->c_refcnt--;
+ break;
+ }
+ c->c_refcnt--;
+
+ if ( c->c_connid == last_connid ) {
+ break;
+ }
+
+ CONNECTION_UNLOCK_INCREF(c);
+
+ ldap_pvt_thread_mutex_lock( cq_mutex );
+ old = c;
+retry:
+ c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
+
+ if ( c->c_connid <= old->c_connid ) {
+ ldap_pvt_thread_mutex_unlock( cq_mutex );
+
+ CONNECTION_LOCK_DECREF(old);
+ CONNECTION_UNLOCK_OR_DESTROY(old);
+
+ ldap_pvt_thread_mutex_lock( cq_mutex );
+ return;
+ }
+
+ CONNECTION_LOCK(c);
+ assert( c->c_state != LLOAD_C_DYING );
+ if ( c->c_state == LLOAD_C_INVALID ) {
+ /* This dying connection will be unlinked once we release cq_mutex
+ * and it wouldn't be safe to iterate further, skip over it */
+ CONNECTION_UNLOCK(c);
+ goto retry;
+ }
+ CONNECTION_UNLOCK_INCREF(c);
+ ldap_pvt_thread_mutex_unlock( cq_mutex );
+
+ CONNECTION_LOCK_DECREF(old);
+ CONNECTION_UNLOCK_OR_DESTROY(old);
+
+ CONNECTION_LOCK_DECREF(c);
+ assert( c->c_state != LLOAD_C_DYING );
+ assert( c->c_state != LLOAD_C_INVALID );
+ }
+ CONNECTION_UNLOCK_OR_DESTROY(c);
+ ldap_pvt_thread_mutex_lock( cq_mutex );
+}
+
+void
+connections_walk(
+ ldap_pvt_thread_mutex_t *cq_mutex,
+ lload_c_head *cq,
+ CONNCB cb,
+ void *arg )
+{
+ LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq );
+ return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
+}
+
+/*
+ * Caller is expected to hold the lock.
+ */
+int
+lload_connection_close( LloadConnection *c, void *arg )
+{
+ TAvlnode *node;
+ int gentle = *(int *)arg;
+
+ if ( !c->c_live ) {
+ return LDAP_SUCCESS;
+ }
+
+ if ( !gentle ) {
+ /* Caller has a reference on this connection,
+ * it doesn't actually die here */
+ CONNECTION_DESTROY(c);
+ assert( c );
+ CONNECTION_LOCK(c);
+ return LDAP_SUCCESS;
+ }
/* The first thing we do is make sure we don't get new Operations in */
c->c_state = LLOAD_C_CLOSING;
}
}
}
- CONNECTION_UNLOCK_OR_DESTROY(c);
+ return LDAP_SUCCESS;
}
LloadConnection *
/* wait for the listener threads to complete */
destroy_listeners();
- /* TODO: Mark upstream connections closing */
+ /* Mark upstream connections closing and prevent from opening new ones */
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_numconns = b->b_numbindconns = 0;
- backend_reset( b );
+ backend_reset( b, 1 );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
if ( !current_backend ) {
current_backend = b;
}
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
backend_retry( b );
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return;
} else if ( change->type == LLOAD_CHANGE_DEL ) {
ldap_pvt_thread_pool_walk(
&connection_pool, handle_pdus, backend_conn_cb, b );
ldap_pvt_thread_pool_walk(
&connection_pool, upstream_bind, backend_conn_cb, b );
- backend_reset( b );
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
+ backend_reset( b, 0 );
backend_retry( b );
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return;
}
assert( need_close >= diff );
LDAP_CIRCLEQ_FOREACH ( c, &b->b_bindconns, c_next ) {
- lload_connection_close( c );
+ int gentle = 1;
+
+ lload_connection_close( c, &gentle );
need_close--;
diff--;
if ( !diff ) {
assert( need_close >= diff );
LDAP_CIRCLEQ_FOREACH ( c, &b->b_conns, c_next ) {
- lload_connection_close( c );
+ int gentle = 1;
+
+ lload_connection_close( c, &gentle );
need_close--;
diff--;
if ( !diff ) {
assert( need_close == 0 );
if ( need_open ) {
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
backend_retry( b );
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
}
}
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
- backend_reset( b );
+ backend_reset( b, 0 );
backend_retry( b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
#endif
};
-typedef int (*CONNECTION_CLIENT_WALK)( LloadConnection *c, void *argv );
+typedef int (*CONNCB)( LloadConnection *c, void *arg );
struct lload_monitor_conn_arg {
Operation *op;
CONNECTION_UNLOCK(c);
}
-void
-connection_timeout( LloadConnection *upstream, time_t threshold )
+int
+connection_timeout( LloadConnection *upstream, void *arg )
{
LloadOperation *op;
TAvlnode *ops = NULL, *node;
LloadBackend *b = upstream->c_private;
+ time_t threshold = *(time_t *)arg;
int rc, nops = 0;
for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node &&
}
if ( nops == 0 ) {
- return;
+ return LDAP_SUCCESS;
}
upstream->c_n_ops_executing -= nops;
Debug( LDAP_DEBUG_STATS, "connection_timeout: "
CONNECTION_LOCK_DECREF(upstream);
/* just dispose of the AVL, most operations should already be gone */
tavl_free( ops, NULL );
-}
-
-static void
-backend_timeout(
- LloadBackend *b,
- lload_c_head *cq,
- LloadConnection **lastp,
- time_t threshold )
-{
- LloadConnection *c, *old;
- unsigned long last_connid;
-
- ldap_pvt_thread_mutex_lock( &b->b_mutex );
- if ( !*lastp ) {
- ldap_pvt_thread_mutex_unlock( &b->b_mutex );
- return;
- }
- last_connid = (*lastp)->c_connid;
- c = LDAP_CIRCLEQ_LOOP_NEXT( cq, *lastp, c_next );
- CONNECTION_LOCK(c);
- ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-
- /*
- * Ugh... concurrency is annoying:
- * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
- * order
- * - the connection with the highest c_connid is maintained at *lastp
- * - we can only use cq when we hold b->b_mutex
- * - connections might be added to or removed from cq while we're busy
- * processing connections
- * - connection_destroy touches cq
- * - we can't even hold locks of two different connections
- * - we need a way to detect we've finished looping around cq for some
- * definition of looping around
- *
- * So as a result, 90% of the code below is spent navigating that...
- */
- while ( c->c_connid <= last_connid ) {
- Debug( LDAP_DEBUG_TRACE, "backend_timeout: "
- "timing out operations for connid=%lu which has %ld "
- "pending ops\n",
- c->c_connid, c->c_n_ops_executing );
- connection_timeout( c, threshold );
- if ( c->c_connid == last_connid ) {
- break;
- }
-
- CONNECTION_UNLOCK_INCREF(c);
-
- ldap_pvt_thread_mutex_lock( &b->b_mutex );
- old = c;
- c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
- CONNECTION_LOCK(c);
- CONNECTION_UNLOCK_INCREF(c);
- ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-
- CONNECTION_LOCK_DECREF(old);
- CONNECTION_UNLOCK_OR_DESTROY(old);
-
- CONNECTION_LOCK_DECREF(c);
- }
- CONNECTION_UNLOCK_OR_DESTROY(c);
+ return LDAP_SUCCESS;
}
void
threshold = slap_get_time() - lload_timeout_api->tv_sec;
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
- if ( b->b_n_ops_executing == 0 ) continue;
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
+ if ( b->b_n_ops_executing == 0 ) {
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+ continue;
+ }
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"timing out binds for backend uri=%s\n",
b->b_uri.bv_val );
- backend_timeout( b, &b->b_bindconns, &b->b_last_bindconn, threshold );
+ connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn,
+ connection_timeout, &threshold );
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"timing out other operations for backend uri=%s\n",
b->b_uri.bv_val );
- backend_timeout( b, &b->b_conns, &b->b_last_conn, threshold );
+ connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn,
+ connection_timeout, &threshold );
+
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
done:
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
LDAP_SLAPD_F (void *) backend_connect_task( void *ctx, void *arg );
LDAP_SLAPD_F (void) backend_retry( LloadBackend *b );
LDAP_SLAPD_F (LloadConnection *) backend_select( LloadOperation *op, int *res );
-LDAP_SLAPD_F (void) backend_reset( LloadBackend *b );
+LDAP_SLAPD_F (void) backend_reset( LloadBackend *b, int gentle );
LDAP_SLAPD_F (void) lload_backend_destroy( LloadBackend *b );
LDAP_SLAPD_F (void) lload_backends_destroy( void );
LDAP_SLAPD_F (void) client_reset( LloadConnection *c );
LDAP_SLAPD_F (void) client_destroy( LloadConnection *c );
LDAP_SLAPD_F (void) clients_destroy( void );
-LDAP_SLAPD_F (void) clients_walk( CONNECTION_CLIENT_WALK apply, void *argv );
+LDAP_SLAPD_F (void) clients_walk( CONNCB apply, void *argv );
/*
* config.c
LDAP_SLAPD_F (void *) handle_pdus( void *ctx, void *arg );
LDAP_SLAPD_F (void) connection_write_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void) connection_read_cb( evutil_socket_t s, short what, void *arg );
-LDAP_SLAPD_F (void) lload_connection_close( LloadConnection *c );
+LDAP_SLAPD_F (int) lload_connection_close( LloadConnection *c, void *arg );
LDAP_SLAPD_F (LloadConnection *) lload_connection_init( ber_socket_t s, const char *peername, int use_tls );
LDAP_SLAPD_F (void) connection_destroy( LloadConnection *c );
+LDAP_SLAPD_F (void) connections_walk_last( ldap_pvt_thread_mutex_t *cq_mutex,
+ lload_c_head *cq,
+ LloadConnection *cq_last,
+ CONNCB cb,
+ void *arg );
+LDAP_SLAPD_F (void) connections_walk( ldap_pvt_thread_mutex_t *cq_mutex, lload_c_head *cq, CONNCB cb, void *arg );
/*
* daemon.c