From b4f43ed8e1d3393d6002a2e5a725b28a01f675c7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Fri, 20 Apr 2018 13:42:36 +0100 Subject: [PATCH] Refactor backend reset Reuse the connection walking facility in timeout management. --- servers/lloadd/backend.c | 76 +++++---------------- servers/lloadd/client.c | 2 +- servers/lloadd/connection.c | 127 +++++++++++++++++++++++++++++++++-- servers/lloadd/daemon.c | 22 ++++-- servers/lloadd/lload.h | 2 +- servers/lloadd/operation.c | 84 +++++------------------ servers/lloadd/proto-lload.h | 12 +++- 7 files changed, 180 insertions(+), 145 deletions(-) diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c index 346b32fa9f..38fe9910db 100644 --- a/servers/lloadd/backend.c +++ b/servers/lloadd/backend.c @@ -488,7 +488,7 @@ backend_connect_task( void *ctx, void *arg ) * Needs exclusive access to the backend. */ void -backend_reset( LloadBackend *b ) +backend_reset( LloadBackend *b, int gentle ) { if ( b->b_cookie ) { int rc; @@ -497,7 +497,8 @@ backend_reset( LloadBackend *b ) b->b_cookie = NULL; b->b_opening--; } - if ( event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) { + if ( b->b_retry_event && + event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) { assert( b->b_failed ); event_del( b->b_retry_event ); b->b_opening--; @@ -520,64 +521,19 @@ backend_reset( LloadBackend *b ) ch_free( pending ); b->b_opening--; } - while ( !LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) ) { - LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_preparing ); - - CONNECTION_LOCK(c); - Debug( LDAP_DEBUG_CONNS, "backend_reset: " - "destroying connection being set up connid=%lu\n", - c->c_connid ); - - assert( c->c_live ); - CONNECTION_DESTROY(c); - assert( !c ); - } - while ( !LDAP_CIRCLEQ_EMPTY( &b->b_bindconns ) ) { - LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_bindconns ); - - CONNECTION_LOCK(c); - Debug( LDAP_DEBUG_CONNS, "backend_reset: " - "destroying bind connection connid=%lu, pending ops=%ld\n", - c->c_connid, c->c_n_ops_executing ); - - assert( c->c_live ); - CONNECTION_DESTROY(c); - assert( !c ); - } - while ( !LDAP_CIRCLEQ_EMPTY( &b->b_conns ) ) { - LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_conns ); - - CONNECTION_LOCK(c); - Debug( LDAP_DEBUG_CONNS, "backend_reset: " - "destroying regular connection connid=%lu, pending ops=%ld\n", - c->c_connid, c->c_n_ops_executing ); - - assert( c->c_live ); - CONNECTION_DESTROY(c); - assert( !c ); - } - if ( b->b_dns_req ) { - evdns_getaddrinfo_cancel( b->b_dns_req ); - b->b_dns_req = NULL; - b->b_opening--; - } - if ( b->b_cookie ) { - int rc; - rc = ldap_pvt_thread_pool_retract( b->b_cookie ); - assert( rc == 1 ); - b->b_cookie = NULL; - b->b_opening--; - } - if ( b->b_retry_event && - event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) { - assert( b->b_failed ); - event_del( b->b_retry_event ); - b->b_opening--; - } + connections_walk( + &b->b_mutex, &b->b_preparing, lload_connection_close, &gentle ); + assert( LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) ); assert( b->b_opening == 0 ); - assert( b->b_active == 0 ); - assert( b->b_bindavail == 0 ); b->b_failed = 0; + + connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn, + lload_connection_close, &gentle ); + assert( gentle || b->b_bindavail == 0 ); + + connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn, + lload_connection_close, &gentle ); + assert( gentle || b->b_active == 0 ); } void @@ -589,8 +545,9 @@ lload_backend_destroy( LloadBackend *b ) "destroying backend uri='%s', numconns=%d, numbindconns=%d\n", b->b_uri.bv_val, b->b_numconns, b->b_numbindconns ); + ldap_pvt_thread_mutex_lock( &b->b_mutex ); b->b_numconns = b->b_numbindconns = 0; - backend_reset( b ); + backend_reset( b, 0 ); LDAP_CIRCLEQ_REMOVE( &backend, b, b_next ); if ( b == next ) { @@ -613,6 +570,7 @@ lload_backend_destroy( LloadBackend *b ) assert( rc == LDAP_SUCCESS ); } #endif /* BALANCER_MODULE */ + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); ldap_pvt_thread_mutex_destroy( &b->b_mutex ); if ( b->b_retry_event ) { diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index 0cc1b504e3..1b4f62a385 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -575,7 +575,7 @@ clients_destroy( void ) } void -clients_walk( CONNECTION_CLIENT_WALK apply, void *argv ) +clients_walk( CONNCB apply, void *argv ) { LloadConnection *c; ldap_pvt_thread_mutex_lock( &clients_mutex ); diff --git a/servers/lloadd/connection.c b/servers/lloadd/connection.c index 38e198e20f..22d7969ec7 100644 --- a/servers/lloadd/connection.c +++ b/servers/lloadd/connection.c @@ -334,17 +334,130 @@ connection_destroy( LloadConnection *c ) } /* - * Expected to be run from lload_unpause_server, so there are no other threads - * running. + * Called holding mutex, will walk cq calling cb on all connections whose + * c_connid <= cq_last->c_connid that still exist at the time we get to them. */ void -lload_connection_close( LloadConnection *c ) +connections_walk_last( + ldap_pvt_thread_mutex_t *cq_mutex, + lload_c_head *cq, + LloadConnection *cq_last, + CONNCB cb, + void *arg ) { - TAvlnode *node; + LloadConnection *c, *old; + unsigned long last_connid; + + if ( LDAP_CIRCLEQ_EMPTY( cq ) ) { + return; + } + last_connid = cq_last->c_connid; + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, cq_last, c_next ); + assert( c->c_connid <= last_connid ); - /* We lock so we can use CONNECTION_UNLOCK_OR_DESTROY to drop the - * connection if we can */ CONNECTION_LOCK(c); + ldap_pvt_thread_mutex_unlock( cq_mutex ); + + /* + * Ugh... concurrency is annoying: + * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid + * order + * - the connection with the highest c_connid is maintained at cq_last + * - we can only use cq when we hold cq_mutex + * - connections might be added to or removed from cq while we're busy + * processing connections + * - connection_destroy touches cq + * - we can't even hold locks of two different connections + * - we need a way to detect we've finished looping around cq for some + * definition of looping around + * + * So as a result, 90% of the code below is spent navigating that... + */ + while ( c->c_connid <= last_connid ) { + /* Do not permit the callback to actually free the connection even if + * it wants to, we need it to traverse cq */ + c->c_refcnt++; + if ( cb( c, arg ) ) { + c->c_refcnt--; + break; + } + c->c_refcnt--; + + if ( c->c_connid == last_connid ) { + break; + } + + CONNECTION_UNLOCK_INCREF(c); + + ldap_pvt_thread_mutex_lock( cq_mutex ); + old = c; +retry: + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); + + if ( c->c_connid <= old->c_connid ) { + ldap_pvt_thread_mutex_unlock( cq_mutex ); + + CONNECTION_LOCK_DECREF(old); + CONNECTION_UNLOCK_OR_DESTROY(old); + + ldap_pvt_thread_mutex_lock( cq_mutex ); + return; + } + + CONNECTION_LOCK(c); + assert( c->c_state != LLOAD_C_DYING ); + if ( c->c_state == LLOAD_C_INVALID ) { + /* This dying connection will be unlinked once we release cq_mutex + * and it wouldn't be safe to iterate further, skip over it */ + CONNECTION_UNLOCK(c); + goto retry; + } + CONNECTION_UNLOCK_INCREF(c); + ldap_pvt_thread_mutex_unlock( cq_mutex ); + + CONNECTION_LOCK_DECREF(old); + CONNECTION_UNLOCK_OR_DESTROY(old); + + CONNECTION_LOCK_DECREF(c); + assert( c->c_state != LLOAD_C_DYING ); + assert( c->c_state != LLOAD_C_INVALID ); + } + CONNECTION_UNLOCK_OR_DESTROY(c); + ldap_pvt_thread_mutex_lock( cq_mutex ); +} + +void +connections_walk( + ldap_pvt_thread_mutex_t *cq_mutex, + lload_c_head *cq, + CONNCB cb, + void *arg ) +{ + LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq ); + return connections_walk_last( cq_mutex, cq, cq_last, cb, arg ); +} + +/* + * Caller is expected to hold the lock. + */ +int +lload_connection_close( LloadConnection *c, void *arg ) +{ + TAvlnode *node; + int gentle = *(int *)arg; + + if ( !c->c_live ) { + return LDAP_SUCCESS; + } + + if ( !gentle ) { + /* Caller has a reference on this connection, + * it doesn't actually die here */ + CONNECTION_DESTROY(c); + assert( c ); + CONNECTION_LOCK(c); + return LDAP_SUCCESS; + } /* The first thing we do is make sure we don't get new Operations in */ c->c_state = LLOAD_C_CLOSING; @@ -362,7 +475,7 @@ lload_connection_close( LloadConnection *c ) } } } - CONNECTION_UNLOCK_OR_DESTROY(c); + return LDAP_SUCCESS; } LloadConnection * diff --git a/servers/lloadd/daemon.c b/servers/lloadd/daemon.c index bb449d1300..4d4a8c4423 100644 --- a/servers/lloadd/daemon.c +++ b/servers/lloadd/daemon.c @@ -1388,11 +1388,11 @@ lloadd_daemon( struct event_base *daemon_base ) /* wait for the listener threads to complete */ destroy_listeners(); - /* TODO: Mark upstream connections closing */ + /* Mark upstream connections closing and prevent from opening new ones */ LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) { ldap_pvt_thread_mutex_lock( &b->b_mutex ); b->b_numconns = b->b_numbindconns = 0; - backend_reset( b ); + backend_reset( b, 1 ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); } @@ -1496,7 +1496,9 @@ lload_handle_backend_invalidation( LloadChange *change ) if ( !current_backend ) { current_backend = b; } + ldap_pvt_thread_mutex_lock( &b->b_mutex ); backend_retry( b ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); return; } else if ( change->type == LLOAD_CHANGE_DEL ) { ldap_pvt_thread_pool_walk( @@ -1517,8 +1519,10 @@ lload_handle_backend_invalidation( LloadChange *change ) &connection_pool, handle_pdus, backend_conn_cb, b ); ldap_pvt_thread_pool_walk( &connection_pool, upstream_bind, backend_conn_cb, b ); - backend_reset( b ); + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + backend_reset( b, 0 ); backend_retry( b ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); return; } @@ -1599,7 +1603,9 @@ lload_handle_backend_invalidation( LloadChange *change ) assert( need_close >= diff ); LDAP_CIRCLEQ_FOREACH ( c, &b->b_bindconns, c_next ) { - lload_connection_close( c ); + int gentle = 1; + + lload_connection_close( c, &gentle ); need_close--; diff--; if ( !diff ) { @@ -1615,7 +1621,9 @@ lload_handle_backend_invalidation( LloadChange *change ) assert( need_close >= diff ); LDAP_CIRCLEQ_FOREACH ( c, &b->b_conns, c_next ) { - lload_connection_close( c ); + int gentle = 1; + + lload_connection_close( c, &gentle ); need_close--; diff--; if ( !diff ) { @@ -1627,7 +1635,9 @@ lload_handle_backend_invalidation( LloadChange *change ) assert( need_close == 0 ); if ( need_open ) { + ldap_pvt_thread_mutex_lock( &b->b_mutex ); backend_retry( b ); + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); } } } @@ -1725,7 +1735,7 @@ lload_handle_global_invalidation( LloadChange *change ) LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) { ldap_pvt_thread_mutex_lock( &b->b_mutex ); - backend_reset( b ); + backend_reset( b, 0 ); backend_retry( b ); ldap_pvt_thread_mutex_unlock( &b->b_mutex ); } diff --git a/servers/lloadd/lload.h b/servers/lloadd/lload.h index 02e18b702e..7b045bf582 100644 --- a/servers/lloadd/lload.h +++ b/servers/lloadd/lload.h @@ -446,7 +446,7 @@ struct LloadListener { #endif }; -typedef int (*CONNECTION_CLIENT_WALK)( LloadConnection *c, void *argv ); +typedef int (*CONNCB)( LloadConnection *c, void *arg ); struct lload_monitor_conn_arg { Operation *op; diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c index 1946dc6653..95ae15e3d9 100644 --- a/servers/lloadd/operation.c +++ b/servers/lloadd/operation.c @@ -814,12 +814,13 @@ operation_lost_upstream( LloadOperation *op ) CONNECTION_UNLOCK(c); } -void -connection_timeout( LloadConnection *upstream, time_t threshold ) +int +connection_timeout( LloadConnection *upstream, void *arg ) { LloadOperation *op; TAvlnode *ops = NULL, *node; LloadBackend *b = upstream->c_private; + time_t threshold = *(time_t *)arg; int rc, nops = 0; for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node && @@ -862,7 +863,7 @@ connection_timeout( LloadConnection *upstream, time_t threshold ) } if ( nops == 0 ) { - return; + return LDAP_SUCCESS; } upstream->c_n_ops_executing -= nops; Debug( LDAP_DEBUG_STATS, "connection_timeout: " @@ -916,68 +917,7 @@ connection_timeout( LloadConnection *upstream, time_t threshold ) CONNECTION_LOCK_DECREF(upstream); /* just dispose of the AVL, most operations should already be gone */ tavl_free( ops, NULL ); -} - -static void -backend_timeout( - LloadBackend *b, - lload_c_head *cq, - LloadConnection **lastp, - time_t threshold ) -{ - LloadConnection *c, *old; - unsigned long last_connid; - - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - if ( !*lastp ) { - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - return; - } - last_connid = (*lastp)->c_connid; - c = LDAP_CIRCLEQ_LOOP_NEXT( cq, *lastp, c_next ); - CONNECTION_LOCK(c); - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - - /* - * Ugh... concurrency is annoying: - * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid - * order - * - the connection with the highest c_connid is maintained at *lastp - * - we can only use cq when we hold b->b_mutex - * - connections might be added to or removed from cq while we're busy - * processing connections - * - connection_destroy touches cq - * - we can't even hold locks of two different connections - * - we need a way to detect we've finished looping around cq for some - * definition of looping around - * - * So as a result, 90% of the code below is spent navigating that... - */ - while ( c->c_connid <= last_connid ) { - Debug( LDAP_DEBUG_TRACE, "backend_timeout: " - "timing out operations for connid=%lu which has %ld " - "pending ops\n", - c->c_connid, c->c_n_ops_executing ); - connection_timeout( c, threshold ); - if ( c->c_connid == last_connid ) { - break; - } - - CONNECTION_UNLOCK_INCREF(c); - - ldap_pvt_thread_mutex_lock( &b->b_mutex ); - old = c; - c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); - CONNECTION_LOCK(c); - CONNECTION_UNLOCK_INCREF(c); - ldap_pvt_thread_mutex_unlock( &b->b_mutex ); - - CONNECTION_LOCK_DECREF(old); - CONNECTION_UNLOCK_OR_DESTROY(old); - - CONNECTION_LOCK_DECREF(c); - } - CONNECTION_UNLOCK_OR_DESTROY(c); + return LDAP_SUCCESS; } void @@ -993,17 +933,25 @@ operations_timeout( evutil_socket_t s, short what, void *arg ) threshold = slap_get_time() - lload_timeout_api->tv_sec; LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) { - if ( b->b_n_ops_executing == 0 ) continue; + ldap_pvt_thread_mutex_lock( &b->b_mutex ); + if ( b->b_n_ops_executing == 0 ) { + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); + continue; + } Debug( LDAP_DEBUG_TRACE, "operations_timeout: " "timing out binds for backend uri=%s\n", b->b_uri.bv_val ); - backend_timeout( b, &b->b_bindconns, &b->b_last_bindconn, threshold ); + connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn, + connection_timeout, &threshold ); Debug( LDAP_DEBUG_TRACE, "operations_timeout: " "timing out other operations for backend uri=%s\n", b->b_uri.bv_val ); - backend_timeout( b, &b->b_conns, &b->b_last_conn, threshold ); + connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn, + connection_timeout, &threshold ); + + ldap_pvt_thread_mutex_unlock( &b->b_mutex ); } done: Debug( LDAP_DEBUG_TRACE, "operations_timeout: " diff --git a/servers/lloadd/proto-lload.h b/servers/lloadd/proto-lload.h index 8d3db30aec..accf5e2863 100644 --- a/servers/lloadd/proto-lload.h +++ b/servers/lloadd/proto-lload.h @@ -41,7 +41,7 @@ LDAP_SLAPD_F (void) backend_connect( evutil_socket_t s, short what, void *arg ); LDAP_SLAPD_F (void *) backend_connect_task( void *ctx, void *arg ); LDAP_SLAPD_F (void) backend_retry( LloadBackend *b ); LDAP_SLAPD_F (LloadConnection *) backend_select( LloadOperation *op, int *res ); -LDAP_SLAPD_F (void) backend_reset( LloadBackend *b ); +LDAP_SLAPD_F (void) backend_reset( LloadBackend *b, int gentle ); LDAP_SLAPD_F (void) lload_backend_destroy( LloadBackend *b ); LDAP_SLAPD_F (void) lload_backends_destroy( void ); @@ -64,7 +64,7 @@ LDAP_SLAPD_F (LloadConnection *) client_init( ber_socket_t s, LloadListener *url LDAP_SLAPD_F (void) client_reset( LloadConnection *c ); LDAP_SLAPD_F (void) client_destroy( LloadConnection *c ); LDAP_SLAPD_F (void) clients_destroy( void ); -LDAP_SLAPD_F (void) clients_walk( CONNECTION_CLIENT_WALK apply, void *argv ); +LDAP_SLAPD_F (void) clients_walk( CONNCB apply, void *argv ); /* * config.c @@ -90,9 +90,15 @@ LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) clients_mutex; LDAP_SLAPD_F (void *) handle_pdus( void *ctx, void *arg ); LDAP_SLAPD_F (void) connection_write_cb( evutil_socket_t s, short what, void *arg ); LDAP_SLAPD_F (void) connection_read_cb( evutil_socket_t s, short what, void *arg ); -LDAP_SLAPD_F (void) lload_connection_close( LloadConnection *c ); +LDAP_SLAPD_F (int) lload_connection_close( LloadConnection *c, void *arg ); LDAP_SLAPD_F (LloadConnection *) lload_connection_init( ber_socket_t s, const char *peername, int use_tls ); LDAP_SLAPD_F (void) connection_destroy( LloadConnection *c ); +LDAP_SLAPD_F (void) connections_walk_last( ldap_pvt_thread_mutex_t *cq_mutex, + lload_c_head *cq, + LloadConnection *cq_last, + CONNCB cb, + void *arg ); +LDAP_SLAPD_F (void) connections_walk( ldap_pvt_thread_mutex_t *cq_mutex, lload_c_head *cq, CONNCB cb, void *arg ); /* * daemon.c -- 2.47.3