]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Refactor backend reset
authorOndřej Kuzník <okuznik@symas.com>
Fri, 20 Apr 2018 12:42:36 +0000 (13:42 +0100)
committerOndřej Kuzník <okuznik@symas.com>
Tue, 17 Nov 2020 17:58:15 +0000 (17:58 +0000)
Reuse the connection walking facility in timeout management.

servers/lloadd/backend.c
servers/lloadd/client.c
servers/lloadd/connection.c
servers/lloadd/daemon.c
servers/lloadd/lload.h
servers/lloadd/operation.c
servers/lloadd/proto-lload.h

index 346b32fa9f91bad445a6ab88d404218921b1de28..38fe9910db4f782cd7e2540c7d2490f83b5b03cc 100644 (file)
@@ -488,7 +488,7 @@ backend_connect_task( void *ctx, void *arg )
  * Needs exclusive access to the backend.
  */
 void
-backend_reset( LloadBackend *b )
+backend_reset( LloadBackend *b, int gentle )
 {
     if ( b->b_cookie ) {
         int rc;
@@ -497,7 +497,8 @@ backend_reset( LloadBackend *b )
         b->b_cookie = NULL;
         b->b_opening--;
     }
-    if ( event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
+    if ( b->b_retry_event &&
+            event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
         assert( b->b_failed );
         event_del( b->b_retry_event );
         b->b_opening--;
@@ -520,64 +521,19 @@ backend_reset( LloadBackend *b )
         ch_free( pending );
         b->b_opening--;
     }
-    while ( !LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) ) {
-        LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_preparing );
-
-        CONNECTION_LOCK(c);
-        Debug( LDAP_DEBUG_CONNS, "backend_reset: "
-                "destroying connection being set up connid=%lu\n",
-                c->c_connid );
-
-        assert( c->c_live );
-        CONNECTION_DESTROY(c);
-        assert( !c );
-    }
-    while ( !LDAP_CIRCLEQ_EMPTY( &b->b_bindconns ) ) {
-        LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_bindconns );
-
-        CONNECTION_LOCK(c);
-        Debug( LDAP_DEBUG_CONNS, "backend_reset: "
-                "destroying bind connection connid=%lu, pending ops=%ld\n",
-                c->c_connid, c->c_n_ops_executing );
-
-        assert( c->c_live );
-        CONNECTION_DESTROY(c);
-        assert( !c );
-    }
-    while ( !LDAP_CIRCLEQ_EMPTY( &b->b_conns ) ) {
-        LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_conns );
-
-        CONNECTION_LOCK(c);
-        Debug( LDAP_DEBUG_CONNS, "backend_reset: "
-                "destroying regular connection connid=%lu, pending ops=%ld\n",
-                c->c_connid, c->c_n_ops_executing );
-
-        assert( c->c_live );
-        CONNECTION_DESTROY(c);
-        assert( !c );
-    }
-    if ( b->b_dns_req ) {
-        evdns_getaddrinfo_cancel( b->b_dns_req );
-        b->b_dns_req = NULL;
-        b->b_opening--;
-    }
-    if ( b->b_cookie ) {
-        int rc;
-        rc = ldap_pvt_thread_pool_retract( b->b_cookie );
-        assert( rc == 1 );
-        b->b_cookie = NULL;
-        b->b_opening--;
-    }
-    if ( b->b_retry_event &&
-            event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
-        assert( b->b_failed );
-        event_del( b->b_retry_event );
-        b->b_opening--;
-    }
+    connections_walk(
+            &b->b_mutex, &b->b_preparing, lload_connection_close, &gentle );
+    assert( LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) );
     assert( b->b_opening == 0 );
-    assert( b->b_active == 0 );
-    assert( b->b_bindavail == 0 );
     b->b_failed = 0;
+
+    connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn,
+            lload_connection_close, &gentle );
+    assert( gentle || b->b_bindavail == 0 );
+
+    connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn,
+            lload_connection_close, &gentle );
+    assert( gentle || b->b_active == 0 );
 }
 
 void
@@ -589,8 +545,9 @@ lload_backend_destroy( LloadBackend *b )
             "destroying backend uri='%s', numconns=%d, numbindconns=%d\n",
             b->b_uri.bv_val, b->b_numconns, b->b_numbindconns );
 
+    ldap_pvt_thread_mutex_lock( &b->b_mutex );
     b->b_numconns = b->b_numbindconns = 0;
-    backend_reset( b );
+    backend_reset( b, 0 );
 
     LDAP_CIRCLEQ_REMOVE( &backend, b, b_next );
     if ( b == next ) {
@@ -613,6 +570,7 @@ lload_backend_destroy( LloadBackend *b )
         assert( rc == LDAP_SUCCESS );
     }
 #endif /* BALANCER_MODULE */
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
     ldap_pvt_thread_mutex_destroy( &b->b_mutex );
 
     if ( b->b_retry_event ) {
index 0cc1b504e34b041b9acfdc6e3cb9937b7beb36aa..1b4f62a385b9369df0dbcafda74a468e324afa94 100644 (file)
@@ -575,7 +575,7 @@ clients_destroy( void )
 }
 
 void
-clients_walk( CONNECTION_CLIENT_WALK apply, void *argv )
+clients_walk( CONNCB apply, void *argv )
 {
     LloadConnection *c;
     ldap_pvt_thread_mutex_lock( &clients_mutex );
index 38e198e20f7509af6b9d9515ae3bc84749a79ba3..22d7969ec7a776d76730114841e240fc20e417c2 100644 (file)
@@ -334,17 +334,130 @@ connection_destroy( LloadConnection *c )
 }
 
 /*
- * Expected to be run from lload_unpause_server, so there are no other threads
- * running.
+ * Called holding mutex, will walk cq calling cb on all connections whose
+ * c_connid <= cq_last->c_connid that still exist at the time we get to them.
  */
 void
-lload_connection_close( LloadConnection *c )
+connections_walk_last(
+        ldap_pvt_thread_mutex_t *cq_mutex,
+        lload_c_head *cq,
+        LloadConnection *cq_last,
+        CONNCB cb,
+        void *arg )
 {
-    TAvlnode *node;
+    LloadConnection *c, *old;
+    unsigned long last_connid;
+
+    if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
+        return;
+    }
+    last_connid = cq_last->c_connid;
+    c = LDAP_CIRCLEQ_LOOP_NEXT( cq, cq_last, c_next );
+    assert( c->c_connid <= last_connid );
 
-    /* We lock so we can use CONNECTION_UNLOCK_OR_DESTROY to drop the
-     * connection if we can */
     CONNECTION_LOCK(c);
+    ldap_pvt_thread_mutex_unlock( cq_mutex );
+
+    /*
+     * Ugh... concurrency is annoying:
+     * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
+     *   order
+     * - the connection with the highest c_connid is maintained at cq_last
+     * - we can only use cq when we hold cq_mutex
+     * - connections might be added to or removed from cq while we're busy
+     *   processing connections
+     * - connection_destroy touches cq
+     * - we can't even hold locks of two different connections
+     * - we need a way to detect we've finished looping around cq for some
+     *   definition of looping around
+     *
+     * So as a result, 90% of the code below is spent navigating that...
+     */
+    while ( c->c_connid <= last_connid ) {
+        /* Do not permit the callback to actually free the connection even if
+         * it wants to, we need it to traverse cq */
+        c->c_refcnt++;
+        if ( cb( c, arg ) ) {
+            c->c_refcnt--;
+            break;
+        }
+        c->c_refcnt--;
+
+        if ( c->c_connid == last_connid ) {
+            break;
+        }
+
+        CONNECTION_UNLOCK_INCREF(c);
+
+        ldap_pvt_thread_mutex_lock( cq_mutex );
+        old = c;
+retry:
+        c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
+
+        if ( c->c_connid <= old->c_connid ) {
+            ldap_pvt_thread_mutex_unlock( cq_mutex );
+
+            CONNECTION_LOCK_DECREF(old);
+            CONNECTION_UNLOCK_OR_DESTROY(old);
+
+            ldap_pvt_thread_mutex_lock( cq_mutex );
+            return;
+        }
+
+        CONNECTION_LOCK(c);
+        assert( c->c_state != LLOAD_C_DYING );
+        if ( c->c_state == LLOAD_C_INVALID ) {
+            /* This dying connection will be unlinked once we release cq_mutex
+             * and it wouldn't be safe to iterate further, skip over it */
+            CONNECTION_UNLOCK(c);
+            goto retry;
+        }
+        CONNECTION_UNLOCK_INCREF(c);
+        ldap_pvt_thread_mutex_unlock( cq_mutex );
+
+        CONNECTION_LOCK_DECREF(old);
+        CONNECTION_UNLOCK_OR_DESTROY(old);
+
+        CONNECTION_LOCK_DECREF(c);
+        assert( c->c_state != LLOAD_C_DYING );
+        assert( c->c_state != LLOAD_C_INVALID );
+    }
+    CONNECTION_UNLOCK_OR_DESTROY(c);
+    ldap_pvt_thread_mutex_lock( cq_mutex );
+}
+
+void
+connections_walk(
+        ldap_pvt_thread_mutex_t *cq_mutex,
+        lload_c_head *cq,
+        CONNCB cb,
+        void *arg )
+{
+    LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq );
+    return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
+}
+
+/*
+ * Caller is expected to hold the lock.
+ */
+int
+lload_connection_close( LloadConnection *c, void *arg )
+{
+    TAvlnode *node;
+    int gentle = *(int *)arg;
+
+    if ( !c->c_live ) {
+        return LDAP_SUCCESS;
+    }
+
+    if ( !gentle ) {
+        /* Caller has a reference on this connection,
+         * it doesn't actually die here */
+        CONNECTION_DESTROY(c);
+        assert( c );
+        CONNECTION_LOCK(c);
+        return LDAP_SUCCESS;
+    }
 
     /* The first thing we do is make sure we don't get new Operations in */
     c->c_state = LLOAD_C_CLOSING;
@@ -362,7 +475,7 @@ lload_connection_close( LloadConnection *c )
             }
         }
     }
-    CONNECTION_UNLOCK_OR_DESTROY(c);
+    return LDAP_SUCCESS;
 }
 
 LloadConnection *
index bb449d13004f31eeb146da9173ebf1e16a9e7616..4d4a8c4423b15f4b43b47ae67df4354077705673 100644 (file)
@@ -1388,11 +1388,11 @@ lloadd_daemon( struct event_base *daemon_base )
     /* wait for the listener threads to complete */
     destroy_listeners();
 
-    /* TODO: Mark upstream connections closing */
+    /* Mark upstream connections closing and prevent from opening new ones */
     LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
         ldap_pvt_thread_mutex_lock( &b->b_mutex );
         b->b_numconns = b->b_numbindconns = 0;
-        backend_reset( b );
+        backend_reset( b, 1 );
         ldap_pvt_thread_mutex_unlock( &b->b_mutex );
     }
 
@@ -1496,7 +1496,9 @@ lload_handle_backend_invalidation( LloadChange *change )
         if ( !current_backend ) {
             current_backend = b;
         }
+        ldap_pvt_thread_mutex_lock( &b->b_mutex );
         backend_retry( b );
+        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
         return;
     } else if ( change->type == LLOAD_CHANGE_DEL ) {
         ldap_pvt_thread_pool_walk(
@@ -1517,8 +1519,10 @@ lload_handle_backend_invalidation( LloadChange *change )
                 &connection_pool, handle_pdus, backend_conn_cb, b );
         ldap_pvt_thread_pool_walk(
                 &connection_pool, upstream_bind, backend_conn_cb, b );
-        backend_reset( b );
+        ldap_pvt_thread_mutex_lock( &b->b_mutex );
+        backend_reset( b, 0 );
         backend_retry( b );
+        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
         return;
     }
 
@@ -1599,7 +1603,9 @@ lload_handle_backend_invalidation( LloadChange *change )
             assert( need_close >= diff );
 
             LDAP_CIRCLEQ_FOREACH ( c, &b->b_bindconns, c_next ) {
-                lload_connection_close( c );
+                int gentle = 1;
+
+                lload_connection_close( c, &gentle );
                 need_close--;
                 diff--;
                 if ( !diff ) {
@@ -1615,7 +1621,9 @@ lload_handle_backend_invalidation( LloadChange *change )
             assert( need_close >= diff );
 
             LDAP_CIRCLEQ_FOREACH ( c, &b->b_conns, c_next ) {
-                lload_connection_close( c );
+                int gentle = 1;
+
+                lload_connection_close( c, &gentle );
                 need_close--;
                 diff--;
                 if ( !diff ) {
@@ -1627,7 +1635,9 @@ lload_handle_backend_invalidation( LloadChange *change )
         assert( need_close == 0 );
 
         if ( need_open ) {
+            ldap_pvt_thread_mutex_lock( &b->b_mutex );
             backend_retry( b );
+            ldap_pvt_thread_mutex_unlock( &b->b_mutex );
         }
     }
 }
@@ -1725,7 +1735,7 @@ lload_handle_global_invalidation( LloadChange *change )
 
         LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
             ldap_pvt_thread_mutex_lock( &b->b_mutex );
-            backend_reset( b );
+            backend_reset( b, 0 );
             backend_retry( b );
             ldap_pvt_thread_mutex_unlock( &b->b_mutex );
         }
index 02e18b702e2c8a26c8d02505ece263455b78fecf..7b045bf582e1c2d64f4ab4f4dc69bc7b43b48725 100644 (file)
@@ -446,7 +446,7 @@ struct LloadListener {
 #endif
 };
 
-typedef int (*CONNECTION_CLIENT_WALK)( LloadConnection *c, void *argv );
+typedef int (*CONNCB)( LloadConnection *c, void *arg );
 
 struct lload_monitor_conn_arg {
     Operation *op;
index 1946dc66531b381c956471689381c55b8c757b9c..95ae15e3d90a7aab933f0d06d1c361e2c4ddc1a4 100644 (file)
@@ -814,12 +814,13 @@ operation_lost_upstream( LloadOperation *op )
     CONNECTION_UNLOCK(c);
 }
 
-void
-connection_timeout( LloadConnection *upstream, time_t threshold )
+int
+connection_timeout( LloadConnection *upstream, void *arg )
 {
     LloadOperation *op;
     TAvlnode *ops = NULL, *node;
     LloadBackend *b = upstream->c_private;
+    time_t threshold = *(time_t *)arg;
     int rc, nops = 0;
 
     for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node &&
@@ -862,7 +863,7 @@ connection_timeout( LloadConnection *upstream, time_t threshold )
     }
 
     if ( nops == 0 ) {
-        return;
+        return LDAP_SUCCESS;
     }
     upstream->c_n_ops_executing -= nops;
     Debug( LDAP_DEBUG_STATS, "connection_timeout: "
@@ -916,68 +917,7 @@ connection_timeout( LloadConnection *upstream, time_t threshold )
     CONNECTION_LOCK_DECREF(upstream);
     /* just dispose of the AVL, most operations should already be gone */
     tavl_free( ops, NULL );
-}
-
-static void
-backend_timeout(
-        LloadBackend *b,
-        lload_c_head *cq,
-        LloadConnection **lastp,
-        time_t threshold )
-{
-    LloadConnection *c, *old;
-    unsigned long last_connid;
-
-    ldap_pvt_thread_mutex_lock( &b->b_mutex );
-    if ( !*lastp ) {
-        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-        return;
-    }
-    last_connid = (*lastp)->c_connid;
-    c = LDAP_CIRCLEQ_LOOP_NEXT( cq, *lastp, c_next );
-    CONNECTION_LOCK(c);
-    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-
-    /*
-     * Ugh... concurrency is annoying:
-     * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
-     *   order
-     * - the connection with the highest c_connid is maintained at *lastp
-     * - we can only use cq when we hold b->b_mutex
-     * - connections might be added to or removed from cq while we're busy
-     *   processing connections
-     * - connection_destroy touches cq
-     * - we can't even hold locks of two different connections
-     * - we need a way to detect we've finished looping around cq for some
-     *   definition of looping around
-     *
-     * So as a result, 90% of the code below is spent navigating that...
-     */
-    while ( c->c_connid <= last_connid ) {
-        Debug( LDAP_DEBUG_TRACE, "backend_timeout: "
-                "timing out operations for connid=%lu which has %ld "
-                "pending ops\n",
-                c->c_connid, c->c_n_ops_executing );
-        connection_timeout( c, threshold );
-        if ( c->c_connid == last_connid ) {
-            break;
-        }
-
-        CONNECTION_UNLOCK_INCREF(c);
-
-        ldap_pvt_thread_mutex_lock( &b->b_mutex );
-        old = c;
-        c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
-        CONNECTION_LOCK(c);
-        CONNECTION_UNLOCK_INCREF(c);
-        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-
-        CONNECTION_LOCK_DECREF(old);
-        CONNECTION_UNLOCK_OR_DESTROY(old);
-
-        CONNECTION_LOCK_DECREF(c);
-    }
-    CONNECTION_UNLOCK_OR_DESTROY(c);
+    return LDAP_SUCCESS;
 }
 
 void
@@ -993,17 +933,25 @@ operations_timeout( evutil_socket_t s, short what, void *arg )
     threshold = slap_get_time() - lload_timeout_api->tv_sec;
 
     LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
-        if ( b->b_n_ops_executing == 0 ) continue;
+        ldap_pvt_thread_mutex_lock( &b->b_mutex );
+        if ( b->b_n_ops_executing == 0 ) {
+            ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+            continue;
+        }
 
         Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
                 "timing out binds for backend uri=%s\n",
                 b->b_uri.bv_val );
-        backend_timeout( b, &b->b_bindconns, &b->b_last_bindconn, threshold );
+        connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn,
+                connection_timeout, &threshold );
 
         Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
                 "timing out other operations for backend uri=%s\n",
                 b->b_uri.bv_val );
-        backend_timeout( b, &b->b_conns, &b->b_last_conn, threshold );
+        connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn,
+                connection_timeout, &threshold );
+
+        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
     }
 done:
     Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
index 8d3db30aecabcaf2738a2a55ddb28816edbed7a7..accf5e2863c20aa9cf07ba87469818992228a3be 100644 (file)
@@ -41,7 +41,7 @@ LDAP_SLAPD_F (void) backend_connect( evutil_socket_t s, short what, void *arg );
 LDAP_SLAPD_F (void *) backend_connect_task( void *ctx, void *arg );
 LDAP_SLAPD_F (void) backend_retry( LloadBackend *b );
 LDAP_SLAPD_F (LloadConnection *) backend_select( LloadOperation *op, int *res );
-LDAP_SLAPD_F (void) backend_reset( LloadBackend *b );
+LDAP_SLAPD_F (void) backend_reset( LloadBackend *b, int gentle );
 LDAP_SLAPD_F (void) lload_backend_destroy( LloadBackend *b );
 LDAP_SLAPD_F (void) lload_backends_destroy( void );
 
@@ -64,7 +64,7 @@ LDAP_SLAPD_F (LloadConnection *) client_init( ber_socket_t s, LloadListener *url
 LDAP_SLAPD_F (void) client_reset( LloadConnection *c );
 LDAP_SLAPD_F (void) client_destroy( LloadConnection *c );
 LDAP_SLAPD_F (void) clients_destroy( void );
-LDAP_SLAPD_F (void) clients_walk( CONNECTION_CLIENT_WALK apply, void *argv );
+LDAP_SLAPD_F (void) clients_walk( CONNCB apply, void *argv );
 
 /*
  * config.c
@@ -90,9 +90,15 @@ LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) clients_mutex;
 LDAP_SLAPD_F (void *) handle_pdus( void *ctx, void *arg );
 LDAP_SLAPD_F (void) connection_write_cb( evutil_socket_t s, short what, void *arg );
 LDAP_SLAPD_F (void) connection_read_cb( evutil_socket_t s, short what, void *arg );
-LDAP_SLAPD_F (void) lload_connection_close( LloadConnection *c );
+LDAP_SLAPD_F (int) lload_connection_close( LloadConnection *c, void *arg );
 LDAP_SLAPD_F (LloadConnection *) lload_connection_init( ber_socket_t s, const char *peername, int use_tls );
 LDAP_SLAPD_F (void) connection_destroy( LloadConnection *c );
+LDAP_SLAPD_F (void) connections_walk_last( ldap_pvt_thread_mutex_t *cq_mutex,
+        lload_c_head *cq,
+        LloadConnection *cq_last,
+        CONNCB cb,
+        void *arg );
+LDAP_SLAPD_F (void) connections_walk( ldap_pvt_thread_mutex_t *cq_mutex, lload_c_head *cq, CONNCB cb, void *arg );
 
 /*
  * daemon.c