]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Unify connection locking and I/O
authorOndřej Kuzník <ondra@mistotebe.net>
Fri, 22 Sep 2017 09:24:52 +0000 (10:24 +0100)
committerOndřej Kuzník <okuznik@symas.com>
Tue, 17 Nov 2020 17:55:46 +0000 (17:55 +0000)
servers/lloadd/backend.c
servers/lloadd/bind.c
servers/lloadd/client.c
servers/lloadd/connection.c
servers/lloadd/operation.c
servers/lloadd/proto-slap.h
servers/lloadd/slap.h
servers/lloadd/upstream.c

index bdfe621bee1e13dced37d1e800a50046e031d443..712e7007bb3dec7f1af0f2c12380ca84fad55b51 100644 (file)
@@ -457,7 +457,7 @@ backends_destroy( void )
                     c->c_connid, c->c_n_ops_executing );
 
             assert( c->c_live );
-            UPSTREAM_DESTROY(c);
+            CONNECTION_DESTROY(c);
         }
         while ( !LDAP_CIRCLEQ_EMPTY( &b->b_conns ) ) {
             Connection *c = LDAP_CIRCLEQ_FIRST( &b->b_conns );
@@ -469,7 +469,7 @@ backends_destroy( void )
                     c->c_connid, c->c_n_ops_executing );
 
             assert( c->c_live );
-            UPSTREAM_DESTROY(c);
+            CONNECTION_DESTROY(c);
         }
 
         LDAP_CIRCLEQ_REMOVE( &backend, b, b_next );
index 125f937517df8678254bb79ec610deb7c2c7fc77..bf6fd10497b5a2dd9888f705af5923fa837de11b 100644 (file)
@@ -110,7 +110,7 @@ request_bind( Operation *op )
     ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
 
     ber_free( copy, 0 );
-    upstream_write_cb( -1, 0, upstream );
+    connection_write_cb( -1, 0, upstream );
     return 0;
 
 fail:
@@ -242,7 +242,7 @@ request_bind_as_vc( Operation *op )
     ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
 
     ber_free( copy, 0 );
-    upstream_write_cb( -1, 0, upstream );
+    connection_write_cb( -1, 0, upstream );
 
     return 0;
 
@@ -346,13 +346,13 @@ client_bind( Connection *client, Operation *op )
     }
 
     CONNECTION_LOCK_DECREF(upstream);
-    UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+    CONNECTION_UNLOCK_OR_DESTROY(upstream);
 
     CONNECTION_LOCK_DECREF(client);
     if ( rc ) {
         op->o_client_refcnt--;
         operation_destroy_from_client( op );
-        CLIENT_DESTROY(client);
+        CONNECTION_DESTROY(client);
         return -1;
     }
 
index 89af15a13835ba2638073278bdaa5ff5e84e814f..915e90e61dde0e7ac58207344a1e5bc2d1b1dc09 100644 (file)
@@ -28,172 +28,6 @@ slap_c_head clients = LDAP_CIRCLEQ_HEAD_INITIALIZER( clients );
 
 ldap_pvt_thread_mutex_t clients_mutex;
 
-typedef int (*RequestHandler)( Connection *c, Operation *op );
-
-static void
-client_read_cb( evutil_socket_t s, short what, void *arg )
-{
-    Connection *c = arg;
-    BerElement *ber;
-    ber_tag_t tag;
-    ber_len_t len;
-
-    CONNECTION_LOCK(c);
-    if ( !c->c_live ) {
-        event_del( c->c_read_event );
-        Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
-                "suspended read event on a dead connid=%lu\n",
-                c->c_connid );
-        CONNECTION_UNLOCK(c);
-        return;
-    }
-
-    Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
-            "connection connid=%lu ready to read\n",
-            c->c_connid );
-
-    ber = c->c_currentber;
-    if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
-        Debug( LDAP_DEBUG_ANY, "client_read_cb: "
-                "connid=%lu, ber_alloc failed\n",
-                c->c_connid );
-        CLIENT_DESTROY(c);
-        return;
-    }
-    c->c_currentber = ber;
-
-    tag = ber_get_next( c->c_sb, &len, ber );
-    if ( tag != LDAP_TAG_MESSAGE ) {
-        int err = sock_errno();
-
-        if ( err != EWOULDBLOCK && err != EAGAIN ) {
-            if ( err || tag == LBER_ERROR ) {
-                char ebuf[128];
-                Debug( LDAP_DEBUG_STATS, "client_read_cb: "
-                        "ber_get_next on fd=%d failed errno=%d (%s)\n",
-                        c->c_fd, err,
-                        sock_errstr( err, ebuf, sizeof(ebuf) ) );
-            } else {
-                Debug( LDAP_DEBUG_STATS, "client_read_cb: "
-                        "ber_get_next on fd=%d connid=%lu received "
-                        "a strange PDU tag=%lx\n",
-                        c->c_fd, c->c_connid, tag );
-            }
-
-            c->c_currentber = NULL;
-            ber_free( ber, 1 );
-
-            event_del( c->c_read_event );
-            Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
-                    "suspended read event on dying connid=%lu\n",
-                    c->c_connid );
-            CLIENT_DESTROY(c);
-            return;
-        }
-        event_add( c->c_read_event, NULL );
-        Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
-                "re-enabled read event on connid=%lu\n",
-                c->c_connid );
-        CONNECTION_UNLOCK(c);
-        return;
-    }
-
-    if ( !slap_conn_max_pdus_per_cycle ||
-            ldap_pvt_thread_pool_submit(
-                    &connection_pool, handle_requests, c ) ) {
-        /* If we're overloaded or configured as such, process one and resume in
-         * the next cycle.
-         *
-         * handle_one_request re-locks the mutex in the
-         * process, need to test it's still alive */
-        if ( handle_one_request( c ) == LDAP_SUCCESS ) {
-            CLIENT_UNLOCK_OR_DESTROY(c);
-        }
-        return;
-    }
-
-    event_del( c->c_read_event );
-    Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
-            "suspended read event on connid=%lu\n",
-            c->c_connid );
-
-    /* We have scheduled a call to handle_requests which takes care of
-     * handling further requests, just make sure the connection sticks around
-     * for that */
-    CONNECTION_UNLOCK_INCREF(c);
-    return;
-}
-
-void *
-handle_requests( void *ctx, void *arg )
-{
-    Connection *c = arg;
-    int requests_handled = 0;
-
-    CONNECTION_LOCK_DECREF(c);
-    for ( ;; ) {
-        BerElement *ber;
-        ber_tag_t tag;
-        ber_len_t len;
-
-        /* handle_one_response may unlock the connection in the process, we
-         * need to expect that might be our responsibility to destroy it */
-        if ( handle_one_request( c ) ) {
-            /* Error, connection is unlocked and might already have been
-             * destroyed */
-            return NULL;
-        }
-        /* Otherwise, handle_one_request leaves the connection locked */
-
-        if ( ++requests_handled >= slap_conn_max_pdus_per_cycle ) {
-            /* Do not read now, re-enable read event instead */
-            break;
-        }
-
-        if ( (ber = ber_alloc()) == NULL ) {
-            Debug( LDAP_DEBUG_ANY, "client_read_cb: "
-                    "connid=%lu, ber_alloc failed\n",
-                    c->c_connid );
-            CLIENT_DESTROY(c);
-            return NULL;
-        }
-        c->c_currentber = ber;
-
-        tag = ber_get_next( c->c_sb, &len, ber );
-        if ( tag != LDAP_TAG_MESSAGE ) {
-            int err = sock_errno();
-
-            if ( err != EWOULDBLOCK && err != EAGAIN ) {
-                if ( err || tag == LBER_ERROR ) {
-                    char ebuf[128];
-                    Debug( LDAP_DEBUG_ANY, "handle_requests: "
-                            "ber_get_next on fd=%d failed errno=%d (%s)\n",
-                            c->c_fd, err,
-                            sock_errstr( err, ebuf, sizeof(ebuf) ) );
-                } else {
-                    Debug( LDAP_DEBUG_STATS, "handle_requests: "
-                            "ber_get_next on fd=%d connid=%lu received "
-                            "a strange PDU tag=%lx\n",
-                            c->c_fd, c->c_connid, tag );
-                }
-
-                c->c_currentber = NULL;
-                ber_free( ber, 1 );
-                CLIENT_DESTROY(c);
-                return NULL;
-            }
-            break;
-        }
-    }
-
-    event_add( c->c_read_event, NULL );
-    Debug( LDAP_DEBUG_CONNS, "handle_requests: "
-            "re-enabled read event on connid=%lu\n",
-            c->c_connid );
-    CLIENT_UNLOCK_OR_DESTROY(c);
-    return NULL;
-}
-
 int
 handle_one_request( Connection *c )
 {
@@ -209,7 +43,7 @@ handle_one_request( Connection *c )
         Debug( LDAP_DEBUG_ANY, "handle_one_request: "
                 "connid=%lu, operation_init failed\n",
                 c->c_connid );
-        CLIENT_DESTROY(c);
+        CONNECTION_DESTROY(c);
         ber_free( ber, 1 );
         return -1;
     }
@@ -221,7 +55,7 @@ handle_one_request( Connection *c )
             Debug( LDAP_DEBUG_STATS, "handle_one_request: "
                     "received unbind, closing client connid=%lu\n",
                     c->c_connid );
-            CLIENT_DESTROY(c);
+            CONNECTION_DESTROY(c);
             return -1;
         case LDAP_REQ_BIND:
             handler = client_bind;
@@ -249,49 +83,6 @@ handle_one_request( Connection *c )
     return handler( c, op );
 }
 
-void
-client_write_cb( evutil_socket_t s, short what, void *arg )
-{
-    Connection *c = arg;
-
-    CONNECTION_LOCK(c);
-    if ( !c->c_live ) {
-        CONNECTION_UNLOCK(c);
-        return;
-    }
-    CONNECTION_UNLOCK_INCREF(c);
-
-    /* Before we acquire any locks */
-    event_del( c->c_write_event );
-
-    ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
-    Debug( LDAP_DEBUG_CONNS, "client_write_cb: "
-            "have something to write to client connid=%lu\n",
-            c->c_connid );
-
-    /* We might have been beaten to flushing the data by another thread */
-    if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
-        int err = sock_errno();
-
-        if ( err != EWOULDBLOCK && err != EAGAIN ) {
-            char ebuf[128];
-            ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-            Debug( LDAP_DEBUG_ANY, "client_write_cb: "
-                    "ber_flush on fd=%d failed errno=%d (%s)\n",
-                    c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
-            CLIENT_LOCK_DESTROY(c);
-            return;
-        }
-        event_add( c->c_write_event, NULL );
-    } else {
-        c->c_pendingber = NULL;
-    }
-    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-
-    CONNECTION_LOCK_DECREF(c);
-    CLIENT_UNLOCK_OR_DESTROY(c);
-}
-
 Connection *
 client_init(
         ber_socket_t s,
@@ -302,10 +93,14 @@ client_init(
 {
     Connection *c;
     struct event *event;
+    event_callback_fn read_cb = connection_read_cb,
+                      write_cb = connection_write_cb;
 
     assert( listener != NULL );
 
-    c = connection_init( s, peername, flags );
+    if ( (c = connection_init( s, peername, flags )) == NULL ) {
+        return NULL;
+    }
 
     {
         ber_len_t max = sockbuf_max_incoming_client;
@@ -314,16 +109,16 @@ client_init(
 
     c->c_state = SLAP_C_READY;
 
-    event = event_new( base, s, EV_READ|EV_PERSIST, client_read_cb, c );
+    event = event_new( base, s, EV_READ|EV_PERSIST, read_cb, c );
     if ( !event ) {
         Debug( LDAP_DEBUG_ANY, "client_init: "
                 "Read event could not be allocated\n" );
         goto fail;
     }
-    event_add( event, NULL );
     c->c_read_event = event;
+    event_add( c->c_read_event, NULL );
 
-    event = event_new( base, s, EV_WRITE, client_write_cb, c );
+    event = event_new( base, s, EV_WRITE, write_cb, c );
     if ( !event ) {
         Debug( LDAP_DEBUG_ANY, "client_init: "
                 "Write event could not be allocated\n" );
@@ -333,6 +128,8 @@ client_init(
     c->c_write_event = event;
 
     c->c_private = listener;
+    c->c_destroy = client_destroy;
+    c->c_pdu_cb = handle_one_request;
 
     /* There should be no lock inversion yet since no other thread could
      * approach it from clients side */
@@ -352,8 +149,10 @@ fail:
         event_free( c->c_read_event );
         c->c_read_event = NULL;
     }
+
     c->c_state = SLAP_C_INVALID;
-    connection_destroy( c );
+    CONNECTION_DESTROY(c);
+    assert( c == NULL );
     return NULL;
 }
 
@@ -447,7 +246,7 @@ clients_destroy( void )
         /* Upstream connections have already been destroyed, there should be no
          * ops left */
         assert( !c->c_ops );
-        CLIENT_DESTROY(c);
+        CONNECTION_DESTROY(c);
         ldap_pvt_thread_mutex_lock( &clients_mutex );
     }
     ldap_pvt_thread_mutex_unlock( &clients_mutex );
index c96c4792a1ad6d4971f5967dd456f85f0eb4f806..9b13f3273acf4138eac833edc806a3f11c180bbc 100644 (file)
@@ -50,6 +50,235 @@ connection_assign_nextid( Connection *conn )
     ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex );
 }
 
+/*
+ * We start off with the connection muted and c_currentber holding the pdu we
+ * received.
+ *
+ * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait
+ * on reading or after we process slap_conn_max_pdus_per_cycle pdus so as to
+ * maintain fairness and not hog the worker thread forever.
+ *
+ * If we've run out of pdus immediately available from the stream or hit the
+ * budget, we unmute the connection.
+ *
+ * c->c_pdu_cb might return an 'error' and not free the connection. That can
+ * happen when changing the state or when client is blocked on writing and
+ * already has a pdu pending on the same operation, it's their job to make sure
+ * we're woken up again.
+ */
+static void *
+handle_pdus( void *ctx, void *arg )
+{
+    Connection *c = arg;
+    int pdus_handled = 0;
+
+    CONNECTION_LOCK_DECREF(c);
+    for ( ;; ) {
+        BerElement *ber;
+        ber_tag_t tag;
+        ber_len_t len;
+
+        /* handle_one_response may unlock the connection in the process, we
+         * need to expect that might be our responsibility to destroy it */
+        if ( c->c_pdu_cb( c ) ) {
+            /* Error, connection is unlocked and might already have been
+             * destroyed */
+            return NULL;
+        }
+        /* Otherwise, handle_one_request leaves the connection locked */
+
+        if ( ++pdus_handled >= slap_conn_max_pdus_per_cycle ) {
+            /* Do not read now, re-enable read event instead */
+            break;
+        }
+
+        if ( (ber = ber_alloc()) == NULL ) {
+            Debug( LDAP_DEBUG_ANY, "handle_pdus: "
+                    "connid=%lu, ber_alloc failed\n",
+                    c->c_connid );
+            CONNECTION_DESTROY(c);
+            return NULL;
+        }
+        c->c_currentber = ber;
+
+        tag = ber_get_next( c->c_sb, &len, ber );
+        if ( tag != LDAP_TAG_MESSAGE ) {
+            int err = sock_errno();
+
+            if ( err != EWOULDBLOCK && err != EAGAIN ) {
+                if ( err || tag == LBER_ERROR ) {
+                    char ebuf[128];
+                    Debug( LDAP_DEBUG_ANY, "handle_pdus: "
+                            "ber_get_next on fd=%d failed errno=%d (%s)\n",
+                            c->c_fd, err,
+                            sock_errstr( err, ebuf, sizeof(ebuf) ) );
+                } else {
+                    Debug( LDAP_DEBUG_STATS, "handle_pdus: "
+                            "ber_get_next on fd=%d connid=%lu received "
+                            "a strange PDU tag=%lx\n",
+                            c->c_fd, c->c_connid, tag );
+                }
+
+                c->c_currentber = NULL;
+                ber_free( ber, 1 );
+                CONNECTION_DESTROY(c);
+                return NULL;
+            }
+            break;
+        }
+    }
+
+    event_add( c->c_read_event, NULL );
+    Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
+            "re-enabled read event on connid=%lu\n",
+            c->c_connid );
+    CONNECTION_UNLOCK_OR_DESTROY(c);
+    return NULL;
+}
+
+/*
+ * Initial read on the connection, if we get an LDAP PDU, submit the
+ * processing of this and successive ones to the work queue.
+ *
+ * If we can't submit it to the queue (overload), process this one and return
+ * to the event loop immediately after.
+ */
+void
+connection_read_cb( evutil_socket_t s, short what, void *arg )
+{
+    Connection *c = arg;
+    BerElement *ber;
+    ber_tag_t tag;
+    ber_len_t len;
+
+    CONNECTION_LOCK(c);
+    if ( !c->c_live ) {
+        event_del( c->c_read_event );
+        Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+                "suspended read event on a dead connid=%lu\n",
+                c->c_connid );
+        CONNECTION_UNLOCK(c);
+        return;
+    }
+
+    Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+            "connection connid=%lu ready to read\n",
+            c->c_connid );
+
+    ber = c->c_currentber;
+    if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+        Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
+                "connid=%lu, ber_alloc failed\n",
+                c->c_connid );
+        CONNECTION_DESTROY(c);
+        return;
+    }
+    c->c_currentber = ber;
+
+    tag = ber_get_next( c->c_sb, &len, ber );
+    if ( tag != LDAP_TAG_MESSAGE ) {
+        int err = sock_errno();
+
+        if ( err != EWOULDBLOCK && err != EAGAIN ) {
+            if ( err || tag == LBER_ERROR ) {
+                char ebuf[128];
+                Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
+                        "ber_get_next on fd=%d failed errno=%d (%s)\n",
+                        c->c_fd, err,
+                        sock_errstr( err, ebuf, sizeof(ebuf) ) );
+            } else {
+                Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
+                        "ber_get_next on fd=%d connid=%lu received "
+                        "a strange PDU tag=%lx\n",
+                        c->c_fd, c->c_connid, tag );
+            }
+
+            c->c_currentber = NULL;
+            ber_free( ber, 1 );
+
+            event_del( c->c_read_event );
+            Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+                    "suspended read event on dying connid=%lu\n",
+                    c->c_connid );
+            CONNECTION_DESTROY(c);
+            return;
+        }
+        event_add( c->c_read_event, NULL );
+        Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+                "re-enabled read event on connid=%lu\n",
+                c->c_connid );
+        CONNECTION_UNLOCK(c);
+        return;
+    }
+
+    if ( !slap_conn_max_pdus_per_cycle ||
+            ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
+        /* If we're overloaded or configured as such, process one and resume in
+         * the next cycle.
+         *
+         * handle_one_request re-locks the mutex in the
+         * process, need to test it's still alive */
+        if ( c->c_pdu_cb( c ) == LDAP_SUCCESS ) {
+            CONNECTION_UNLOCK_OR_DESTROY(c);
+        }
+        return;
+    }
+
+    event_del( c->c_read_event );
+    Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+            "suspended read event on connid=%lu\n",
+            c->c_connid );
+
+    /* We have scheduled a call to handle_requests which takes care of
+     * handling further requests, just make sure the connection sticks around
+     * for that */
+    CONNECTION_UNLOCK_INCREF(c);
+    return;
+}
+
+void
+connection_write_cb( evutil_socket_t s, short what, void *arg )
+{
+    Connection *c = arg;
+
+    CONNECTION_LOCK(c);
+    if ( !c->c_live ) {
+        CONNECTION_UNLOCK(c);
+        return;
+    }
+    CONNECTION_UNLOCK_INCREF(c);
+
+    /* Before we acquire any locks */
+    event_del( c->c_write_event );
+
+    ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+    Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
+            "have something to write to connection connid=%lu\n",
+            c->c_connid );
+
+    /* We might have been beaten to flushing the data by another thread */
+    if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
+        int err = sock_errno();
+
+        if ( err != EWOULDBLOCK && err != EAGAIN ) {
+            char ebuf[128];
+            ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+            Debug( LDAP_DEBUG_ANY, "connection_write_cb: "
+                    "ber_flush on fd=%d failed errno=%d (%s)\n",
+                    c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
+            CONNECTION_LOCK_DESTROY(c);
+            return;
+        }
+        event_add( c->c_write_event, NULL );
+    } else {
+        c->c_pendingber = NULL;
+    }
+    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
+    CONNECTION_LOCK_DECREF(c);
+    CONNECTION_UNLOCK_OR_DESTROY(c);
+}
+
 void
 connection_destroy( Connection *c )
 {
@@ -151,6 +380,7 @@ connection_init( ber_socket_t s, const char *peername, int flags )
 
     c->c_next_msgid = 1;
     c->c_refcnt = c->c_live = 1;
+    c->c_destroy = connection_destroy;
 
     LDAP_CIRCLEQ_ENTRY_INIT( c, c_next );
 
index bce5102a8ecefa09e7d5548903d49b04dfc639e0..568f8ea1bde79eb710c9ddd00f8e1a04d73ac02a 100644 (file)
@@ -259,7 +259,7 @@ operation_destroy_from_client( Operation *op )
         ldap_pvt_thread_mutex_unlock( &op->o_mutex );
 
         assert( upstream != NULL );
-        UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+        CONNECTION_UNLOCK_OR_DESTROY(upstream);
         CONNECTION_LOCK_DECREF(client);
         return;
     }
@@ -270,7 +270,7 @@ operation_destroy_from_client( Operation *op )
             upstream->c_n_ops_executing--;
             b = (Backend *)upstream->c_private;
         }
-        UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+        CONNECTION_UNLOCK_OR_DESTROY(upstream);
 
         if ( b ) {
             ldap_pvt_thread_mutex_lock( &b->b_mutex );
@@ -426,7 +426,7 @@ operation_destroy_from_upstream( Operation *op )
         ldap_pvt_thread_mutex_unlock( &op->o_mutex );
 
         assert( client != NULL );
-        CLIENT_UNLOCK_OR_DESTROY(client);
+        CONNECTION_UNLOCK_OR_DESTROY(client);
         CONNECTION_LOCK_DECREF(upstream);
         return;
     }
@@ -434,7 +434,7 @@ operation_destroy_from_upstream( Operation *op )
     /* 7. Remove from the operation map and TODO adjust the pending op count */
     if ( client ) {
         tavl_delete( &client->c_ops, op, operation_client_cmp );
-        CLIENT_UNLOCK_OR_DESTROY(client);
+        CONNECTION_UNLOCK_OR_DESTROY(client);
     }
 
     /* 8. Release the operation */
@@ -579,7 +579,7 @@ operation_abandon( Operation *op )
     ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
     if ( rc != -1 ) {
-        upstream_write_cb( -1, 0, c );
+        connection_write_cb( -1, 0, c );
     }
 
     CONNECTION_LOCK_DECREF(c);
@@ -593,7 +593,7 @@ unlock:
     if ( !c->c_live ) {
         operation_destroy_from_upstream( op );
     }
-    UPSTREAM_UNLOCK_OR_DESTROY(c);
+    CONNECTION_UNLOCK_OR_DESTROY(c);
 
 done:
     c = op->o_client;
@@ -624,7 +624,7 @@ request_abandon( Connection *c, Operation *op )
         operation_send_reject(
                 op, LDAP_PROTOCOL_ERROR, "invalid PDU received", 0 );
         CONNECTION_LOCK_DECREF(c);
-        CLIENT_DESTROY(c);
+        CONNECTION_DESTROY(c);
         return -1;
     }
 
@@ -685,7 +685,7 @@ operation_send_reject(
                 "not sending msgid=%d, client connid=%lu is dead\n",
                 op->o_client_msgid, op->o_client_connid );
         operation_destroy_from_upstream( op );
-        UPSTREAM_UNLOCK_OR_DESTROY(c);
+        CONNECTION_UNLOCK_OR_DESTROY(c);
         return;
     }
     CONNECTION_LOCK(c);
@@ -711,7 +711,7 @@ operation_send_reject(
                 c->c_connid );
         CONNECTION_LOCK_DECREF(c);
         operation_destroy_from_client( op );
-        CLIENT_DESTROY(c);
+        CONNECTION_DESTROY(c);
         return;
     }
     c->c_pendingber = ber;
@@ -722,12 +722,12 @@ operation_send_reject(
 
     ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
-    client_write_cb( -1, 0, c );
+    connection_write_cb( -1, 0, c );
 
     CONNECTION_LOCK_DECREF(c);
 done:
     operation_destroy_from_client( op );
-    CLIENT_UNLOCK_OR_DESTROY(c);
+    CONNECTION_UNLOCK_OR_DESTROY(c);
 }
 
 /*
@@ -825,10 +825,10 @@ request_process( Connection *client, Operation *op )
     }
     ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
 
-    upstream_write_cb( -1, 0, upstream );
+    connection_write_cb( -1, 0, upstream );
 
     CONNECTION_LOCK_DECREF(upstream);
-    UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+    CONNECTION_UNLOCK_OR_DESTROY(upstream);
 
     CONNECTION_LOCK_DECREF(client);
     if ( !--op->o_client_refcnt ) {
@@ -844,7 +844,7 @@ fail:
         CONNECTION_LOCK_DECREF(upstream);
         upstream->c_n_ops_executing--;
         b = (Backend *)upstream->c_private;
-        UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+        CONNECTION_UNLOCK_OR_DESTROY(upstream);
 
         ldap_pvt_thread_mutex_lock( &b->b_mutex );
         b->b_n_ops_executing--;
@@ -856,7 +856,7 @@ fail:
     op->o_client_refcnt--;
     operation_destroy_from_client( op );
     if ( rc ) {
-        CLIENT_DESTROY(client);
+        CONNECTION_DESTROY(client);
     }
     return rc;
 }
index cbdf9567f0911353dc30faff2407dfab63353ada..a8a43928f511496fd3e5864a501785b852d93877 100644 (file)
@@ -70,10 +70,8 @@ LDAP_SLAPD_F (int) client_bind( Connection *c, Operation *op );
 /*
  * client.c
  */
-LDAP_SLAPD_F (void *) handle_requests( void *ctx, void *arg );
 LDAP_SLAPD_F (int) handle_one_request( Connection *c );
 LDAP_SLAPD_F (Connection *) client_init( ber_socket_t s, Listener *url, const char *peername, struct event_base *base, int use_tls );
-LDAP_SLAPD_F (void) client_write_cb( evutil_socket_t s, short what, void *arg );
 LDAP_SLAPD_F (void) client_destroy( Connection *c );
 LDAP_SLAPD_F (void) clients_destroy( void );
 
@@ -91,6 +89,8 @@ LDAP_SLAPD_F (void) bindconf_free( slap_bindconf *bc );
  * connection.c
  */
 LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) clients_mutex;
+LDAP_SLAPD_F (void) connection_write_cb( evutil_socket_t s, short what, void *arg );
+LDAP_SLAPD_F (void) connection_read_cb( evutil_socket_t s, short what, void *arg );
 LDAP_SLAPD_F (Connection *) connection_init( ber_socket_t s, const char *peername, int use_tls );
 LDAP_SLAPD_F (void) connection_destroy( Connection *c );
 
@@ -187,8 +187,6 @@ LDAP_SLAPD_F (void *) slap_sl_context( void *ptr );
 /*
  * upstream.c
  */
-LDAP_SLAPD_F (void) upstream_write_cb( evutil_socket_t s, short what, void *arg );
-LDAP_SLAPD_F (void) upstream_read_cb( evutil_socket_t s, short what, void *arg );
 LDAP_SLAPD_F (Connection *) upstream_init( ber_socket_t s, Backend *b );
 LDAP_SLAPD_F (void) upstream_destroy( Connection *c );
 
index 611266de62f02f490971038f56cc829ef8a03406..cb672f6fbfe4914371b8b051426986e99a88dac6 100644 (file)
@@ -281,6 +281,10 @@ struct Backend {
 };
 
 typedef int (*OperationHandler)( Operation *op, BerElement *ber );
+typedef int (*RequestHandler)( Connection *c, Operation *op );
+
+typedef int (*CONNECTION_PDU_CB)( Connection *c );
+typedef void (*CONNECTION_DESTROY_CB)( Connection *c );
 
 /* connection state (protected by c_mutex) */
 enum sc_state {
@@ -313,15 +317,17 @@ struct Connection {
  *   to use CONNECTION_UNLOCK_INCREF, they are then responsible that
  *   CONNECTION_LOCK_DECREF+CONNECTION_UNLOCK_OR_DESTROY is used when they are
  *   done with it
- * - when a connection is considered dead, use (UPSTREAM|CLIENT)_DESTROY on a
- *   locked connection, it might get disposed of or if anyone still holds a
- *   token, it just gets unlocked and it's the last token holder's
- *   responsibility to run *_UNLOCK_OR_DESTROY
- * - (UPSTREAM|CLIENT)_LOCK_DESTROY is a shorthand for locking, decreasing
- *   refcount and (UPSTREAM|CLIENT)_DESTROY
+ * - when a connection is considered dead, use CONNECTION_DESTROY on a locked
+ *   connection, it might get disposed of or if anyone still holds a token, it
+ *   just gets unlocked and it's the last token holder's responsibility to run
+ *   CONNECTION_UNLOCK_OR_DESTROY
+ * - CONNECTION_LOCK_DESTROY is a shorthand for locking, decreasing refcount
+ *   and CONNECTION_DESTROY
  */
     ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */
     int c_refcnt, c_live;
+    CONNECTION_DESTROY_CB c_destroy;
+    CONNECTION_PDU_CB c_pdu_cb;
 #define CONNECTION_LOCK(c) ldap_pvt_thread_mutex_lock( &(c)->c_mutex )
 #define CONNECTION_UNLOCK(c) ldap_pvt_thread_mutex_unlock( &(c)->c_mutex )
 #define CONNECTION_LOCK_DECREF(c) \
@@ -334,40 +340,28 @@ struct Connection {
         (c)->c_refcnt++; \
         CONNECTION_UNLOCK(c); \
     } while (0)
-#define CONNECTION_UNLOCK_OR_DESTROY(type, c) \
+#define CONNECTION_UNLOCK_OR_DESTROY(c) \
     do { \
         assert( (c)->c_refcnt >= 0 ); \
         if ( !( c )->c_refcnt ) { \
-            Debug( LDAP_DEBUG_TRACE, "%s: destroying " #type " connection connid=%lu\n", \
+            Debug( LDAP_DEBUG_TRACE, "%s: destroying connection connid=%lu\n", \
                     __func__, (c)->c_connid ); \
-            type##_destroy( (c) ); \
+            (c)->c_destroy( (c) ); \
             (c) = NULL; \
         } else { \
             CONNECTION_UNLOCK(c); \
         } \
     } while (0)
-#define CONNECTION_DESTROY(type, c) \
+#define CONNECTION_DESTROY(c) \
     do { \
         (c)->c_refcnt -= (c)->c_live; \
         (c)->c_live = 0; \
-        CONNECTION_UNLOCK_OR_DESTROY(type, c); \
+        CONNECTION_UNLOCK_OR_DESTROY(c); \
     } while (0)
-
-#define UPSTREAM_UNLOCK_OR_DESTROY(c) \
-    CONNECTION_UNLOCK_OR_DESTROY(upstream, c);
-#define UPSTREAM_DESTROY(c) CONNECTION_DESTROY(upstream, c)
-#define UPSTREAM_LOCK_DESTROY(c) \
-    do { \
-        CONNECTION_LOCK_DECREF(c); \
-        UPSTREAM_DESTROY(c); \
-    } while (0);
-
-#define CLIENT_UNLOCK_OR_DESTROY(c) CONNECTION_UNLOCK_OR_DESTROY(client, c);
-#define CLIENT_DESTROY(c) CONNECTION_DESTROY(client, c)
-#define CLIENT_LOCK_DESTROY(c) \
+#define CONNECTION_LOCK_DESTROY(c) \
     do { \
         CONNECTION_LOCK_DECREF(c); \
-        CLIENT_DESTROY(c); \
+        CONNECTION_DESTROY(c); \
     } while (0);
 
     Sockbuf *c_sb; /* ber connection stuff */
index 8346db79367dc26e92ef8824e78be603b0d0e435..5d8aec71255b9fdf9d8c0050aacbb26dcfc01b8d 100644 (file)
@@ -62,7 +62,7 @@ forward_response( Operation *op, BerElement *ber )
     ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
     ber_free( ber, 1 );
-    client_write_cb( -1, 0, c );
+    connection_write_cb( -1, 0, c );
     return 0;
 }
 
@@ -280,14 +280,14 @@ handle_vc_bind_response( Operation *op, BerElement *ber )
 
     ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
     if ( rc >= 0 ) {
-        client_write_cb( -1, 0, c );
+        connection_write_cb( -1, 0, c );
         rc = 0;
     }
 
 done:
     CONNECTION_LOCK_DECREF(c);
     operation_destroy_from_client( op );
-    CLIENT_UNLOCK_OR_DESTROY(c);
+    CONNECTION_UNLOCK_OR_DESTROY(c);
     ber_free( ber, 1 );
     return rc;
 }
@@ -302,7 +302,7 @@ handle_unsolicited( Connection *c, BerElement *ber )
             "teardown for upstream connection connid=%lu\n",
             c->c_connid );
 
-    UPSTREAM_DESTROY(c);
+    CONNECTION_DESTROY(c);
     ber_free( ber, 1 );
 
     return -1;
@@ -325,7 +325,7 @@ handle_unsolicited( Connection *c, BerElement *ber )
  *   operation to be removed)
  *
  * If the worker pool is overloaded, we might be called directly from
- * upstream_read_cb, at that point, the connection hasn't been muted.
+ * the read callback, at that point, the connection hasn't been muted.
  *
  * TODO: when the client already has data pending on write, we should mute the
  * upstream.
@@ -435,7 +435,7 @@ handle_one_response( Connection *c )
             if ( !op->o_client_refcnt ) {
                 operation_destroy_from_client( op );
             }
-            CLIENT_UNLOCK_OR_DESTROY(client);
+            CONNECTION_UNLOCK_OR_DESTROY(client);
         } else {
             ber_free( ber, 1 );
         }
@@ -458,188 +458,12 @@ fail:
                 "error on processing a response (%s) on upstream connection "
                 "connid=%ld, tag=%lx\n",
                 slap_msgtype2str( tag ), c->c_connid, tag );
-        UPSTREAM_DESTROY(c);
+        CONNECTION_DESTROY(c);
     }
     /* We leave the connection locked */
     return rc;
 }
 
-/*
- * We start off with the upstream muted and c_currentber holding the response
- * we received.
- *
- * We run handle_one_response on each response, stopping once we hit an error,
- * have to wait on reading or process slap_conn_max_pdus_per_cycle responses so
- * as to maintain fairness and not hog the worker thread forever.
- *
- * If we've run out of responses from the upstream or hit the budget, we unmute
- * the connection and run handle_one_response, it might return an 'error' when
- * the client is blocked on writing, it's that client's job to wake us again.
- */
-static void *
-handle_responses( void *ctx, void *arg )
-{
-    Connection *c = arg;
-    int responses_handled = 0;
-
-    CONNECTION_LOCK_DECREF(c);
-    for ( ;; ) {
-        BerElement *ber;
-        ber_tag_t tag;
-        ber_len_t len;
-
-        /* handle_one_response may unlock the connection in the process, we
-         * need to expect that might be our responsibility to destroy it */
-        if ( handle_one_response( c ) ) {
-            /* Error, connection is unlocked and might already have been
-             * destroyed */
-            return NULL;
-        }
-        /* Otherwise, handle_one_response leaves the connection locked */
-
-        if ( ++responses_handled >= slap_conn_max_pdus_per_cycle ) {
-            /* Do not read now, re-enable read event instead */
-            break;
-        }
-
-        if ( (ber = ber_alloc()) == NULL ) {
-            Debug( LDAP_DEBUG_ANY, "handle_responses: "
-                    "ber_alloc failed\n" );
-            UPSTREAM_DESTROY(c);
-            return NULL;
-        }
-        c->c_currentber = ber;
-
-        tag = ber_get_next( c->c_sb, &len, ber );
-        if ( tag != LDAP_TAG_MESSAGE ) {
-            int err = sock_errno();
-
-            if ( err != EWOULDBLOCK && err != EAGAIN ) {
-                if ( err || tag == LBER_ERROR ) {
-                    char ebuf[128];
-                    Debug( LDAP_DEBUG_ANY, "handle_responses: "
-                            "ber_get_next on fd=%d failed errno=%d (%s)\n",
-                            c->c_fd, err,
-                            sock_errstr( err, ebuf, sizeof(ebuf) ) );
-                } else {
-                    Debug( LDAP_DEBUG_STATS, "handle_responses: "
-                            "ber_get_next on fd=%d connid=%lu received "
-                            "a strange PDU tag=%lx\n",
-                            c->c_fd, c->c_connid, tag );
-                }
-
-                c->c_currentber = NULL;
-                ber_free( ber, 1 );
-                UPSTREAM_DESTROY(c);
-                return NULL;
-            }
-            break;
-        }
-    }
-
-    event_add( c->c_read_event, NULL );
-    Debug( LDAP_DEBUG_CONNS, "handle_responses: "
-            "re-enabled read event on connid=%lu\n",
-            c->c_connid );
-    UPSTREAM_UNLOCK_OR_DESTROY(c);
-    return NULL;
-}
-
-/*
- * Initial read on the upstream connection, if we get an LDAP PDU, submit the
- * processing of this and successive ones to the work queue.
- *
- * If we can't submit it to the queue (overload), process this one and return
- * to the event loop immediately after.
- */
-void
-upstream_read_cb( evutil_socket_t s, short what, void *arg )
-{
-    Connection *c = arg;
-    BerElement *ber;
-    ber_tag_t tag;
-    ber_len_t len;
-
-    CONNECTION_LOCK(c);
-    if ( !c->c_live ) {
-        event_del( c->c_read_event );
-        Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
-                "suspended read event on a dead connid=%lu\n",
-                c->c_connid );
-        CONNECTION_UNLOCK(c);
-        return;
-    }
-    Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
-            "connection connid=%lu ready to read\n",
-            c->c_connid );
-
-    ber = c->c_currentber;
-    if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
-        Debug( LDAP_DEBUG_ANY, "upstream_read_cb: "
-                "ber_alloc failed\n" );
-        UPSTREAM_DESTROY(c);
-        return;
-    }
-    c->c_currentber = ber;
-
-    tag = ber_get_next( c->c_sb, &len, ber );
-    if ( tag != LDAP_TAG_MESSAGE ) {
-        int err = sock_errno();
-
-        if ( err != EWOULDBLOCK && err != EAGAIN ) {
-            if ( err || tag == LBER_ERROR ) {
-                char ebuf[128];
-                Debug( LDAP_DEBUG_ANY, "upstream_read_cb: "
-                        "ber_get_next on fd=%d failed errno=%d (%s)\n",
-                        c->c_fd, err,
-                        sock_errstr( err, ebuf, sizeof(ebuf) ) );
-            } else {
-                Debug( LDAP_DEBUG_STATS, "upstream_read_cb: "
-                        "ber_get_next on fd=%d connid=%lu received "
-                        "a strange PDU tag=%lx\n",
-                        c->c_fd, c->c_connid, tag );
-            }
-
-            c->c_currentber = NULL;
-            ber_free( ber, 1 );
-
-            event_del( c->c_read_event );
-            Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
-                    "suspended read event on dying connid=%lu\n",
-                    c->c_connid );
-            UPSTREAM_DESTROY(c);
-            return;
-        }
-        event_add( c->c_read_event, NULL );
-        Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
-                "re-enabled read event on connid=%lu\n",
-                c->c_connid );
-        CONNECTION_UNLOCK(c);
-        return;
-    }
-
-    if ( !slap_conn_max_pdus_per_cycle ||
-            ldap_pvt_thread_pool_submit(
-                    &connection_pool, handle_responses, c ) ) {
-        /* If we're overloaded or configured as such, process one and resume in
-         * the next cycle.
-         *
-         * handle_one_response re-locks the mutex in the
-         * process, need to test it's still alive */
-        if ( handle_one_response( c ) == LDAP_SUCCESS ) {
-            UPSTREAM_UNLOCK_OR_DESTROY(c);
-        }
-        return;
-    }
-
-    /* We have scheduled a call to handle_responses which takes care of
-     * handling further requests, just make sure the connection sticks around
-     * for that */
-    event_del( c->c_read_event );
-    CONNECTION_UNLOCK_INCREF(c);
-    return;
-}
-
 int
 upstream_finish( Connection *c )
 {
@@ -653,7 +477,7 @@ upstream_finish( Connection *c )
 
     base = slap_get_base( s );
 
-    event = event_new( base, s, EV_READ|EV_PERSIST, upstream_read_cb, c );
+    event = event_new( base, s, EV_READ|EV_PERSIST, connection_read_cb, c );
     if ( !event ) {
         Debug( LDAP_DEBUG_ANY, "upstream_finish: "
                 "Read event could not be allocated\n" );
@@ -769,50 +593,7 @@ fail:
             "suspended read event on dying connid=%lu\n",
             c->c_connid );
     ber_free( ber, 1 );
-    UPSTREAM_DESTROY(c);
-}
-
-void
-upstream_write_cb( evutil_socket_t s, short what, void *arg )
-{
-    Connection *c = arg;
-
-    CONNECTION_LOCK(c);
-    if ( !c->c_live ) {
-        CONNECTION_UNLOCK(c);
-        return;
-    }
-    CONNECTION_UNLOCK_INCREF(c);
-
-    /* Before we acquire any locks */
-    event_del( c->c_write_event );
-
-    ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
-    Debug( LDAP_DEBUG_CONNS, "upstream_write_cb: "
-            "have something to write to upstream connid=%lu\n",
-            c->c_connid );
-
-    /* We might have been beaten to flushing the data by another thread */
-    if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
-        int err = sock_errno();
-
-        if ( err != EWOULDBLOCK && err != EAGAIN ) {
-            char ebuf[128];
-            Debug( LDAP_DEBUG_ANY, "upstream_write_cb: "
-                    "ber_flush on fd=%d failed errno=%d (%s)\n",
-                    c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
-            ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-            UPSTREAM_LOCK_DESTROY(c);
-            return;
-        }
-        event_add( c->c_write_event, NULL );
-    } else {
-        c->c_pendingber = NULL;
-    }
-    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-
-    CONNECTION_LOCK_DECREF(c);
-    UPSTREAM_UNLOCK_OR_DESTROY(c);
+    CONNECTION_DESTROY(c);
 }
 
 void *
@@ -837,7 +618,7 @@ upstream_bind( void *ctx, void *arg )
     if ( !event ) {
         Debug( LDAP_DEBUG_ANY, "upstream_bind: "
                 "Read event could not be allocated\n" );
-        UPSTREAM_DESTROY(c);
+        CONNECTION_DESTROY(c);
         return NULL;
     }
     event_add( event, NULL );
@@ -874,10 +655,10 @@ upstream_bind( void *ctx, void *arg )
     c->c_pendingber = ber;
     ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
-    upstream_write_cb( -1, 0, c );
+    connection_write_cb( -1, 0, c );
 
     CONNECTION_LOCK_DECREF(c);
-    UPSTREAM_UNLOCK_OR_DESTROY(c);
+    CONNECTION_UNLOCK_OR_DESTROY(c);
 
     return NULL;
 }
@@ -896,21 +677,34 @@ upstream_init( ber_socket_t s, Backend *b )
     assert( b != NULL );
 
     flags = (b->b_tls == LLOAD_LDAPS) ? CONN_IS_TLS : 0;
-    c = connection_init( s, b->b_host, flags );
+    if ( (c = connection_init( s, b->b_host, flags )) == NULL ) {
+        return NULL;
+    }
+
     c->c_private = b;
+    c->c_is_tls = b->b_tls;
+    c->c_pdu_cb = handle_one_response;
 
     {
         ber_len_t max = sockbuf_max_incoming_upstream;
         ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max );
     }
 
-    event = event_new( base, s, EV_WRITE, upstream_write_cb, c );
+    event = event_new( base, s, EV_READ|EV_PERSIST, connection_read_cb, c );
+    if ( !event ) {
+        Debug( LDAP_DEBUG_ANY, "upstream_init: "
+                "Read event could not be allocated\n" );
+        goto fail;
+    }
+    c->c_read_event = event;
+
+    event = event_new( base, s, EV_WRITE, connection_write_cb, c );
     if ( !event ) {
         Debug( LDAP_DEBUG_ANY, "upstream_init: "
                 "Write event could not be allocated\n" );
         goto fail;
     }
-    /* We only register the write event when we have data pending */
+    /* We only add the write event when we have data pending */
     c->c_write_event = event;
 
     /* Unless we are configured to use the VC exop, consider allocating the
@@ -947,6 +741,7 @@ upstream_init( ber_socket_t s, Backend *b )
         b->b_active++;
     }
 
+    c->c_destroy = upstream_destroy;
     CONNECTION_UNLOCK(c);
     return c;
 
@@ -959,7 +754,11 @@ fail:
         event_del( c->c_read_event );
         event_free( c->c_read_event );
     }
-    UPSTREAM_DESTROY(c);
+
+    c->c_state = SLAP_C_INVALID;
+    CONNECTION_DESTROY(c);
+    assert( c == NULL );
+
     return NULL;
 }
 
@@ -970,11 +769,14 @@ upstream_destroy( Connection *c )
     struct event *read_event, *write_event;
     TAvlnode *root;
     long freed, executing;
+    enum sc_state state;
 
     Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
             "freeing connection connid=%lu\n",
             c->c_connid );
 
+    assert( c->c_state != SLAP_C_INVALID );
+    state = c->c_state;
     c->c_state = SLAP_C_INVALID;
 
     root = c->c_ops;
@@ -1003,17 +805,20 @@ upstream_destroy( Connection *c )
         event_del( write_event );
     }
 
-    ldap_pvt_thread_mutex_lock( &b->b_mutex );
-    if ( c->c_type == SLAP_C_BIND ) {
-        LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
-        b->b_bindavail--;
-    } else {
-        LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next );
-        b->b_active--;
+    /* Remove from the backend on first pass */
+    if ( state != SLAP_C_CLOSING ) {
+        ldap_pvt_thread_mutex_lock( &b->b_mutex );
+        if ( c->c_type == SLAP_C_BIND ) {
+            LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
+            b->b_bindavail--;
+        } else {
+            LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next );
+            b->b_active--;
+        }
+        b->b_n_ops_executing -= executing;
+        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+        backend_retry( b );
     }
-    b->b_n_ops_executing -= executing;
-    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-    backend_retry( b );
 
     CONNECTION_LOCK_DECREF(c);