]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Epoch based memory reclamation
authorOndřej Kuzník <okuznik@symas.com>
Fri, 17 Aug 2018 11:28:13 +0000 (12:28 +0100)
committerOndřej Kuzník <okuznik@symas.com>
Tue, 17 Nov 2020 17:58:15 +0000 (17:58 +0000)
Similar to the algorithm presented in
https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf

Not completely lock-free at the moment. Also the problems with epoch
based memory reclamation are still present - a thread actively observing
an epoch getting stuck will prevent LloadConnections and LloadOperations
being freed, potentially running out of memory.

15 files changed:
servers/lloadd/Makefile.in
servers/lloadd/backend.c
servers/lloadd/bind.c
servers/lloadd/client.c
servers/lloadd/connection.c
servers/lloadd/daemon.c
servers/lloadd/epoch.c [new file with mode: 0644]
servers/lloadd/epoch.h [new file with mode: 0644]
servers/lloadd/extended.c
servers/lloadd/lload.h
servers/lloadd/main.c
servers/lloadd/module_init.c
servers/lloadd/operation.c
servers/lloadd/proto-lload.h
servers/lloadd/upstream.c

index c78bb3acd92a3a733964b3934068dcaf781c84ac..e99bad2770fa77f74c8265c5ca01f703043b8f3d 100644 (file)
@@ -20,7 +20,7 @@ NT_SRCS = nt_svc.c
 NT_OBJS = nt_svc.o ../../libraries/liblutil/slapdmsg.res
 
 SRCS   = backend.c bind.c config.c connection.c client.c \
-                 daemon.c extended.c init.c operation.c \
+                 daemon.c epoch.c extended.c init.c operation.c \
                  upstream.c libevent_support.c \
                  $(@PLAT@_SRCS)
 
index 711cfaf4ef65d551f9da88f628b1acc0dec9d844..a2cff0c5251f99f8b72de7c1d6027e9037d2e8d7 100644 (file)
@@ -33,6 +33,7 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
     LloadPendingConnection *conn = arg;
     LloadBackend *b = conn->backend;
     int error = 0, rc = -1;
+    epoch_t epoch;
 
     ldap_pvt_thread_mutex_lock( &b->b_mutex );
     Debug( LDAP_DEBUG_CONNS, "upstream_connect_cb: "
@@ -44,6 +45,8 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
         goto preempted;
     }
 
+    epoch = epoch_join();
+
     if ( what == EV_WRITE ) {
         socklen_t optlen = sizeof(error);
 
@@ -53,6 +56,7 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
         }
         if ( error == EINTR || error == EINPROGRESS || error == EWOULDBLOCK ) {
             ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+            epoch_leave( epoch );
             return;
         } else if ( error ) {
             goto done;
@@ -63,6 +67,8 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
     }
 
 done:
+    epoch_leave( epoch );
+
     LDAP_LIST_REMOVE( conn, next );
     if ( rc ) {
         evutil_closesocket( conn->fd );
@@ -93,6 +99,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
 {
     LloadBackend *b = arg;
     ber_socket_t s = AC_SOCKET_INVALID;
+    epoch_t epoch;
     int rc;
 
     if ( result == EVUTIL_EAI_CANCEL ) {
@@ -111,6 +118,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
     }
     b->b_dns_req = NULL;
 
+    epoch = epoch_join();
     if ( result || !res ) {
         Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
                 "name resolution failed for backend '%s': %s\n",
@@ -176,6 +184,7 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
 
     ldap_pvt_thread_mutex_unlock( &b->b_mutex );
     evutil_freeaddrinfo( res );
+    epoch_leave( epoch );
     return;
 
 fail:
@@ -189,6 +198,7 @@ fail:
     if ( res ) {
         evutil_freeaddrinfo( res );
     }
+    epoch_leave( epoch );
 }
 
 LloadConnection *
@@ -268,7 +278,6 @@ backend_select( LloadOperation *op, int *res )
                 }
                 c->c_n_ops_executing++;
                 c->c_counters.lc_ops_received++;
-                CONNECTION_UNLOCK_INCREF(c);
 
                 ldap_pvt_thread_mutex_unlock( &b->b_mutex );
                 *res = LDAP_SUCCESS;
@@ -356,6 +365,7 @@ backend_connect( evutil_socket_t s, short what, void *arg )
     LloadBackend *b = arg;
     struct evdns_getaddrinfo_request *request, *placeholder;
     char *hostname;
+    epoch_t epoch;
 
     ldap_pvt_thread_mutex_lock( &b->b_mutex );
     assert( b->b_dns_req == NULL );
@@ -372,6 +382,8 @@ backend_connect( evutil_socket_t s, short what, void *arg )
         return;
     }
 
+    epoch = epoch_join();
+
     Debug( LDAP_DEBUG_CONNS, "backend_connect: "
             "%sattempting connection to %s\n",
             (what & EV_TIMEOUT) ? "retry timeout finished, " : "",
@@ -438,6 +450,7 @@ backend_connect( evutil_socket_t s, short what, void *arg )
         }
 
         ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+        epoch_leave( epoch );
         return;
     }
 #endif /* LDAP_PF_LOCAL */
@@ -473,6 +486,7 @@ backend_connect( evutil_socket_t s, short what, void *arg )
         b->b_dns_req = request;
     }
     ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+    epoch_leave( epoch );
     return;
 
 fail:
@@ -480,6 +494,7 @@ fail:
     b->b_failed++;
     backend_retry( b );
     ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+    epoch_leave( epoch );
 }
 
 void *
index 3f0993a63e5bd3522f25a01e5b584b8d796993c8..4f429602f94a1073e5d713325b626d7efab9165b 100644 (file)
@@ -34,7 +34,8 @@ bind_mech_external(
 {
     BerValue binddn;
     void *ssl;
-    char *ptr;
+    char *ptr, *message = "";
+    int result = LDAP_SUCCESS;
 
     client->c_state = LLOAD_C_READY;
     client->c_type = LLOAD_C_OPEN;
@@ -49,14 +50,16 @@ bind_mech_external(
      * allow that.
      */
     if ( !BER_BVISEMPTY( credentials ) ) {
-        return operation_send_reject_locked( op, LDAP_UNWILLING_TO_PERFORM,
-                "proxy authorization is not supported", 1 );
+        result = LDAP_UNWILLING_TO_PERFORM;
+        message = "proxy authorization is not supported";
+        goto done;
     }
 
     ssl = ldap_pvt_tls_sb_ctx( client->c_sb );
     if ( !ssl || ldap_pvt_tls_get_peer_dn( ssl, &binddn, NULL, 0 ) ) {
-        return operation_send_reject_locked( op, LDAP_INVALID_CREDENTIALS,
-                "no externally negotiated identity", 1 );
+        result = LDAP_INVALID_CREDENTIALS;
+        message = "no externally negotiated identity";
+        goto done;
     }
     client->c_auth.bv_len = binddn.bv_len + STRLENOF("dn:");
     client->c_auth.bv_val = ch_malloc( client->c_auth.bv_len + 1 );
@@ -71,22 +74,20 @@ bind_mech_external(
         client->c_type = LLOAD_C_PRIVILEGED;
     }
 
-    return operation_send_reject_locked( op, LDAP_SUCCESS, "", 1 );
+done:
+    CONNECTION_UNLOCK(client);
+    operation_send_reject( op, result, message, 1 );
+    return LDAP_SUCCESS;
 }
 
-/*
- * On entering the function, we've put a reference on both connections and hold
- * upstream's c_io_mutex.
- */
 static int
 client_bind(
         LloadOperation *op,
+        LloadConnection *upstream,
         struct berval *binddn,
         ber_tag_t tag,
         struct berval *auth )
 {
-    LloadConnection *upstream = op->o_upstream;
-
     ber_printf( upstream->c_pendingber, "t{titOtO}", LDAP_TAG_MESSAGE,
             LDAP_TAG_MSGID, op->o_upstream_msgid,
             LDAP_REQ_BIND, &op->o_request,
@@ -96,19 +97,14 @@ client_bind(
 }
 
 #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
-/*
- * On entering the function, we've put a reference on both connections and hold
- * upstream's c_io_mutex.
- */
 static int
 client_bind_as_vc(
         LloadOperation *op,
+        LloadConnection *upstream,
         struct berval *binddn,
         ber_tag_t tag,
         struct berval *auth )
 {
-    LloadConnection *upstream = op->o_upstream;
-
     CONNECTION_LOCK(upstream);
     ber_printf( upstream->c_pendingber, "t{tit{tst{{tOOtOtO}}}}", LDAP_TAG_MESSAGE,
             LDAP_TAG_MSGID, op->o_upstream_msgid,
@@ -192,9 +188,12 @@ request_bind( LloadConnection *client, LloadOperation *op )
     struct berval binddn, auth, mech = BER_BVNULL;
     ber_int_t version;
     ber_tag_t tag;
-    unsigned long pin = client->c_pin_id;
+    unsigned long pin;
     int res, rc = LDAP_SUCCESS;
 
+    CONNECTION_LOCK(client);
+    pin = client->c_pin_id;
+
     if ( pin ) {
         LloadOperation *pinned_op, needle = {
             .o_client_connid = client->c_connid,
@@ -222,25 +221,28 @@ request_bind( LloadConnection *client, LloadOperation *op )
             pinned_op->o_request = op->o_request;
             pinned_op->o_ctrls = op->o_ctrls;
 
-            /*
-             * pinned_op is accessible from the upstream, protect it since we
-             * lose the client lock in operation_destroy_from_client temporarily
-             */
-            pinned_op->o_client_refcnt++;
+            /* Noone has seen this operation yet, plant the pin back in its stead */
+            client->c_n_ops_executing--;
             op->o_res = LLOAD_OP_COMPLETED;
+            tavl_delete( &client->c_ops, op, operation_client_cmp );
+            op->o_client = NULL;
+            assert( op->o_upstream == NULL );
+
+            rc = tavl_insert( &client->c_ops, pinned_op, operation_client_cmp,
+                    avl_dup_error );
+            assert( rc == LDAP_SUCCESS );
+
+            /* Noone has seen this operation yet */
+            op->o_refcnt--;
+            operation_destroy( op );
 
             /* We didn't start a new operation, just continuing an existing one */
             lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_received--;
 
-            operation_destroy_from_client( op );
-            pinned_op->o_client_refcnt--;
-
             op = pinned_op;
         }
     }
 
-    /* protect the Bind operation */
-    op->o_client_refcnt++;
     tavl_delete( &client->c_ops, op, operation_client_cmp );
 
     client_reset( client );
@@ -259,10 +261,11 @@ request_bind( LloadConnection *client, LloadOperation *op )
                 "failed to parse version field\n" );
         goto fail;
     } else if ( version != LDAP_VERSION3 ) {
-        operation_send_reject_locked(
+        CONNECTION_UNLOCK(client);
+        operation_send_reject(
                 op, LDAP_PROTOCOL_ERROR, "LDAP version unsupported", 1 );
-        ber_free( copy, 0 );
-        return LDAP_SUCCESS;
+        CONNECTION_LOCK(client);
+        goto fail;
     }
 
     tag = ber_get_stringbv( copy, &binddn, LBER_BV_NOTERM );
@@ -307,10 +310,7 @@ request_bind( LloadConnection *client, LloadOperation *op )
 
             /* terminate the upstream side if client switched mechanisms */
             if ( pin ) {
-                op->o_client_refcnt++;
-                CONNECTION_UNLOCK_INCREF(client);
                 operation_abandon( op );
-                CONNECTION_LOCK_DECREF(client);
             }
 
             ber_free( copy, 0 );
@@ -326,26 +326,28 @@ request_bind( LloadConnection *client, LloadOperation *op )
 
     rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error );
     assert( rc == LDAP_SUCCESS );
-    CONNECTION_UNLOCK_INCREF(client);
+    CONNECTION_UNLOCK(client);
 
     if ( pin ) {
         ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
         upstream = op->o_upstream;
+        ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+
         if ( upstream ) {
+            ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex );
             CONNECTION_LOCK(upstream);
             if ( !upstream->c_live ) {
                 CONNECTION_UNLOCK(upstream);
+                ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
                 upstream = NULL;
             }
         }
-        ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
     }
 
     /* If we were pinned but lost the link, don't look for a new upstream, we
      * have to reject the op and clear pin */
     if ( upstream ) {
-        CONNECTION_UNLOCK_INCREF(upstream);
-        ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex );
+        /* No need to do anything */
     } else if ( !pin ) {
         upstream = backend_select( op, &res );
     } else {
@@ -377,18 +379,27 @@ request_bind( LloadConnection *client, LloadOperation *op )
 
     ber = upstream->c_pendingber;
     if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
-        Debug( LDAP_DEBUG_ANY, "request_bind: "
-                "ber_alloc failed\n" );
-        CONNECTION_LOCK_DECREF(upstream);
         ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
-        upstream->c_state = LLOAD_C_READY;
-        if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) {
-            ber_memfree( upstream->c_sasl_bind_mech.bv_val );
-            BER_BVZERO( &upstream->c_sasl_bind_mech );
+        if ( !pin ) {
+            LloadBackend *b = upstream->c_private;
+
+            upstream->c_n_ops_executing--;
+            CONNECTION_UNLOCK(upstream);
+
+            ldap_pvt_thread_mutex_lock( &b->b_mutex );
+            b->b_n_ops_executing--;
+            operation_update_backend_counters( op, b );
+            ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+        } else {
+            CONNECTION_UNLOCK(upstream);
         }
-        CONNECTION_UNLOCK_OR_DESTROY(upstream);
 
-        CONNECTION_LOCK_DECREF(client);
+        Debug( LDAP_DEBUG_ANY, "request_bind: "
+                "ber_alloc failed\n" );
+
+        operation_unlink( op );
+
+        CONNECTION_LOCK(client);
         goto fail;
     }
     upstream->c_pendingber = ber;
@@ -397,7 +408,6 @@ request_bind( LloadConnection *client, LloadOperation *op )
         lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_forwarded++;
     }
 
-    CONNECTION_LOCK(upstream);
     if ( pin ) {
         tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
         if ( tag == LDAP_AUTH_SIMPLE ) {
@@ -440,52 +450,28 @@ request_bind( LloadConnection *client, LloadOperation *op )
 
 #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
     if ( lload_features & LLOAD_FEATURE_VC ) {
-        rc = client_bind_as_vc( op, &binddn, tag, &auth );
+        rc = client_bind_as_vc( op, upstream, &binddn, tag, &auth );
     } else
 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
     {
-        rc = client_bind( op, &binddn, tag, &auth );
+        rc = client_bind( op, upstream, &binddn, tag, &auth );
     }
+    ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
 
 done:
-    if ( rc == LDAP_SUCCESS ) {
-        CONNECTION_LOCK(client);
-        if ( upstream ) {
-            ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
-        }
 
+    CONNECTION_LOCK(client);
+    if ( rc == LDAP_SUCCESS ) {
         client->c_pin_id = pin;
-        if ( !--op->o_client_refcnt || !upstream ) {
-            operation_destroy_from_client( op );
-            if ( client->c_state == LLOAD_C_BINDING ) {
-                client->c_state = LLOAD_C_READY;
-                client->c_type = LLOAD_C_OPEN;
-                client->c_pin_id = 0;
-                if ( !BER_BVISNULL( &client->c_auth ) ) {
-                    ch_free( client->c_auth.bv_val );
-                    BER_BVZERO( &client->c_auth );
-                }
-                if ( !BER_BVISNULL( &client->c_sasl_bind_mech ) ) {
-                    ber_memfree( client->c_sasl_bind_mech.bv_val );
-                    BER_BVZERO( &client->c_sasl_bind_mech );
-                }
-            }
-        }
         CONNECTION_UNLOCK(client);
 
         if ( upstream ) {
             connection_write_cb( -1, 0, upstream );
-            CONNECTION_LOCK_DECREF(upstream);
-            CONNECTION_UNLOCK_OR_DESTROY(upstream);
         }
-        CONNECTION_LOCK_DECREF(client);
     } else {
 fail:
         rc = -1;
 
-        CONNECTION_LOCK_DECREF(client);
-        op->o_client_refcnt--;
-        operation_destroy_from_client( op );
         client->c_pin_id = 0;
         CONNECTION_DESTROY(client);
     }
@@ -508,42 +494,26 @@ finish_sasl_bind(
         LloadOperation *op,
         BerElement *ber )
 {
-    LloadConnection *client = op->o_client;
     BerElement *output;
     LloadOperation *removed;
     ber_int_t msgid;
     int rc;
 
-    if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) {
-        Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: "
-                "connid=%lu not configured to do proxyauthz, making no "
-                "attempt to resolve final authzid name\n",
-                op->o_client_connid );
-        CONNECTION_UNLOCK(upstream);
-        return forward_final_response( client, op, ber );
-    }
-
     removed = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
     if ( !removed ) {
         assert( upstream->c_state != LLOAD_C_BINDING );
         /* FIXME: has client replaced this bind since? */
         assert(0);
-
-        operation_destroy_from_upstream( op );
     }
     assert( removed == op && upstream->c_state == LLOAD_C_BINDING );
 
     CONNECTION_UNLOCK(upstream);
 
-    Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: "
-            "SASL exchange in lieu of client connid=%lu to upstream "
-            "connid=%lu finished, resolving final authzid name\n",
-            op->o_client_connid, op->o_upstream_connid );
-
     ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex );
     output = upstream->c_pendingber;
     if ( output == NULL && (output = ber_alloc()) == NULL ) {
         ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
+        CONNECTION_LOCK_DESTROY(upstream);
         return -1;
     }
     upstream->c_pendingber = output;
@@ -564,12 +534,18 @@ finish_sasl_bind(
     ber_free( op->o_ber, 1 );
     op->o_ber = ber;
 
+    /* Could we have been unlinked in the meantime? */
     rc = tavl_insert(
             &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
     assert( rc == LDAP_SUCCESS );
 
     CONNECTION_UNLOCK(upstream);
 
+    Debug( LDAP_DEBUG_TRACE, "finish_sasl_bind: "
+            "SASL exchange in lieu of client connid=%lu to upstream "
+            "connid=%lu finished, resolving final authzid name msgid=%d\n",
+            op->o_client_connid, op->o_upstream_connid, op->o_upstream_msgid );
+
     connection_write_cb( -1, 0, upstream );
     return LDAP_SUCCESS;
 }
@@ -580,7 +556,7 @@ handle_bind_response(
         LloadOperation *op,
         BerElement *ber )
 {
-    LloadConnection *upstream = op->o_upstream;
+    LloadConnection *upstream;
     BerValue response;
     BerElement *copy;
     LloadOperation *removed;
@@ -611,6 +587,13 @@ handle_bind_response(
             "connid=%lu, result=%d\n",
             op->o_client_msgid, op->o_client_connid, result );
 
+    ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
+    upstream = op->o_upstream;
+    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+    if ( !upstream ) {
+        return LDAP_SUCCESS;
+    }
+
     CONNECTION_LOCK(upstream);
     if ( !tavl_find( upstream->c_ops, op, operation_upstream_cmp ) ) {
         /*
@@ -621,7 +604,6 @@ handle_bind_response(
          *   no response is expected
          * - ???
          */
-        operation_destroy_from_upstream( op );
         CONNECTION_UNLOCK(upstream);
         return LDAP_SUCCESS;
     }
@@ -650,7 +632,6 @@ handle_bind_response(
     } else if ( result == LDAP_SASL_BIND_IN_PROGRESS ) {
         tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
         op->o_upstream_msgid = 0;
-        op->o_upstream_refcnt++;
         rc = tavl_insert(
                 &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
         assert( rc == LDAP_SUCCESS );
@@ -665,13 +646,18 @@ handle_bind_response(
         assert( op->o_client_msgid && op->o_upstream_msgid );
         op->o_pin_id = 0;
 
-        if ( sasl_finished && result == LDAP_SUCCESS ) {
+        if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) && sasl_finished &&
+                result == LDAP_SUCCESS ) {
             return finish_sasl_bind( upstream, op, ber );
         }
-        upstream->c_state = LLOAD_C_READY;
+        op->o_res = LLOAD_OP_COMPLETED;
     }
     CONNECTION_UNLOCK(upstream);
 
+    if ( !op->o_pin_id ) {
+        operation_unlink_upstream( op, upstream );
+    }
+
     CONNECTION_LOCK(client);
     removed = tavl_delete( &client->c_ops, op, operation_client_cmp );
     assert( !removed || op == removed );
@@ -687,7 +673,6 @@ handle_bind_response(
                 break;
             case LDAP_SUCCESS:
             default: {
-                op->o_client = NULL;
                 client->c_state = LLOAD_C_READY;
                 client->c_type = LLOAD_C_OPEN;
                 client->c_pin_id = 0;
@@ -708,7 +693,7 @@ handle_bind_response(
             }
         }
     } else {
-        assert( client->c_state == LLOAD_C_INVALID ||
+        assert( client->c_state == LLOAD_C_DYING ||
                 client->c_state == LLOAD_C_CLOSING );
     }
     CONNECTION_UNLOCK(client);
@@ -729,7 +714,7 @@ handle_whoami_response(
         LloadOperation *op,
         BerElement *ber )
 {
-    LloadConnection *upstream = op->o_upstream;
+    LloadConnection *upstream;
     BerValue matched, diagmsg;
     BerElement *saved_response = op->o_ber;
     LloadOperation *removed;
@@ -739,7 +724,7 @@ handle_whoami_response(
 
     Debug( LDAP_DEBUG_TRACE, "handle_whoami_response: "
             "connid=%ld received whoami response in lieu of connid=%ld\n",
-            upstream->c_connid, client->c_connid );
+            op->o_upstream_connid, client->c_connid );
 
     tag = ber_scanf( ber, "{emm" /* "}" */,
             &result, &matched, &diagmsg );
@@ -748,38 +733,40 @@ handle_whoami_response(
         return -1;
     }
 
-    CONNECTION_LOCK_DECREF(upstream);
+    ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
+    upstream = op->o_upstream;
+    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+    if ( !upstream ) {
+        return LDAP_SUCCESS;
+    }
+
+    op->o_res = LLOAD_OP_COMPLETED;
+    /* Clear upstream status */
+    operation_unlink_upstream( op, upstream );
+
     if ( result == LDAP_PROTOCOL_ERROR ) {
         LloadBackend *b;
 
+        CONNECTION_LOCK(upstream);
         b = (LloadBackend *)upstream->c_private;
         Debug( LDAP_DEBUG_ANY, "handle_whoami_response: "
                 "Who Am I? extended operation not supported on backend %s, "
                 "proxyauthz with clients that do SASL binds will not work "
                 "msg=%s!\n",
                 b->b_uri.bv_val, diagmsg.bv_val );
-        CONNECTION_UNLOCK_INCREF(upstream);
+        CONNECTION_UNLOCK(upstream);
         operation_send_reject( op, LDAP_OTHER, "upstream protocol error", 0 );
         return -1;
     }
 
-    if ( upstream->c_state != LLOAD_C_CLOSING ) {
-        assert( upstream->c_state == LLOAD_C_BINDING );
-        upstream->c_state = LLOAD_C_READY;
-    }
-    if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) {
-        ber_memfree( upstream->c_sasl_bind_mech.bv_val );
-        BER_BVZERO( &upstream->c_sasl_bind_mech );
-    }
-
-    CONNECTION_UNLOCK_INCREF(upstream);
-
     tag = ber_peek_tag( ber, &len );
 
-    CONNECTION_LOCK_DECREF(client);
+    CONNECTION_LOCK(client);
+
+    assert( client->c_state == LLOAD_C_BINDING ||
+            client->c_state == LLOAD_C_CLOSING );
 
-    assert( client->c_state == LLOAD_C_BINDING &&
-            BER_BVISNULL( &client->c_auth ) );
+    assert( BER_BVISNULL( &client->c_auth ) );
     if ( !BER_BVISNULL( &client->c_auth ) ) {
         ber_memfree( client->c_auth.bv_val );
         BER_BVZERO( &client->c_auth );
@@ -788,8 +775,6 @@ handle_whoami_response(
     if ( tag == LDAP_TAG_EXOP_RES_VALUE ) {
         tag = ber_scanf( ber, "o", &client->c_auth );
         if ( tag == LBER_ERROR ) {
-            operation_send_reject_locked(
-                    op, LDAP_OTHER, "upstream protocol error", 0 );
             CONNECTION_DESTROY(client);
             return -1;
         }
@@ -797,13 +782,13 @@ handle_whoami_response(
 
     removed = tavl_delete( &client->c_ops, op, operation_client_cmp );
     assert( !removed || op == removed );
+    op->o_pin_id = 0;
 
     Debug( LDAP_DEBUG_TRACE, "handle_whoami_response: "
             "connid=%ld new authid=%s\n",
             client->c_connid, client->c_auth.bv_val );
 
     if ( client->c_state == LLOAD_C_BINDING ) {
-        op->o_client = NULL;
         client->c_state = LLOAD_C_READY;
         client->c_type = LLOAD_C_OPEN;
         client->c_pin_id = 0;
@@ -817,10 +802,11 @@ handle_whoami_response(
         }
     }
 
-    CONNECTION_UNLOCK_INCREF(client);
+    CONNECTION_UNLOCK(client);
 
-    /* defer the disposal of ber to operation_destroy_* */
+    /* defer the disposal of ber to operation_destroy */
     op->o_ber = ber;
+
     return forward_final_response( client, op, saved_response );
 }
 
@@ -847,15 +833,21 @@ handle_vc_bind_response(
 
     tag = ber_peek_tag( ber, &len );
     if ( result == LDAP_PROTOCOL_ERROR ) {
-        LloadConnection *upstream = op->o_upstream;
-        LloadBackend *b;
+        LloadConnection *upstream;
 
-        CONNECTION_LOCK(upstream);
-        b = (LloadBackend *)upstream->c_private;
-        Debug( LDAP_DEBUG_ANY, "handle_vc_bind_response: "
-                "VC extended operation not supported on backend %s\n",
-                b->b_uri.bv_val );
-        CONNECTION_UNLOCK(upstream);
+        ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
+        upstream = op->o_upstream;
+        ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+        if ( upstream ) {
+            LloadBackend *b;
+
+            CONNECTION_LOCK(upstream);
+            b = (LloadBackend *)upstream->c_private;
+            Debug( LDAP_DEBUG_ANY, "handle_vc_bind_response: "
+                    "VC extended operation not supported on backend %s\n",
+                    b->b_uri.bv_val );
+            CONNECTION_UNLOCK(upstream);
+        }
     }
 
     Debug( LDAP_DEBUG_STATS, "handle_vc_bind_response: "
@@ -872,7 +864,7 @@ handle_vc_bind_response(
         tag = ber_scanf( ber, "o", &client->c_vc_cookie );
         if ( tag == LBER_ERROR ) {
             rc = -1;
-            CONNECTION_UNLOCK_INCREF(client);
+            CONNECTION_UNLOCK(client);
             goto done;
         }
         tag = ber_peek_tag( ber, &len );
@@ -882,7 +874,7 @@ handle_vc_bind_response(
         tag = ber_scanf( ber, "m", &creds );
         if ( tag == LBER_ERROR ) {
             rc = -1;
-            CONNECTION_UNLOCK_INCREF(client);
+            CONNECTION_UNLOCK(client);
             goto done;
         }
         tag = ber_peek_tag( ber, &len );
@@ -892,7 +884,7 @@ handle_vc_bind_response(
         tag = ber_scanf( ber, "m", &controls );
         if ( tag == LBER_ERROR ) {
             rc = -1;
-            CONNECTION_UNLOCK_INCREF(client);
+            CONNECTION_UNLOCK(client);
             goto done;
         }
     }
@@ -928,7 +920,7 @@ handle_vc_bind_response(
         assert( client->c_state == LLOAD_C_INVALID ||
                 client->c_state == LLOAD_C_CLOSING );
     }
-    CONNECTION_UNLOCK_INCREF(client);
+    CONNECTION_UNLOCK(client);
 
     ldap_pvt_thread_mutex_lock( &client->c_io_mutex );
     output = client->c_pendingber;
@@ -952,9 +944,7 @@ handle_vc_bind_response(
     }
 
 done:
-    CONNECTION_LOCK_DECREF(client);
-    operation_destroy_from_client( op );
-    CONNECTION_UNLOCK_OR_DESTROY(client);
+    operation_unlink( op );
     ber_free( ber, 1 );
     return rc;
 }
index bc1248488fc9c79d7290cd61a20a6ed86552d444..f03810a3b0017cf6ddaeced91ef09a2cc39cb9b6 100644 (file)
@@ -28,6 +28,8 @@ lload_c_head clients = LDAP_CIRCLEQ_HEAD_INITIALIZER( clients );
 
 ldap_pvt_thread_mutex_t clients_mutex;
 
+static void client_unlink( LloadConnection *upstream );
+
 int
 request_abandon( LloadConnection *c, LloadOperation *op )
 {
@@ -41,17 +43,19 @@ request_abandon( LloadConnection *c, LloadOperation *op )
                 "connid=%lu msgid=%d invalid integer sent in abandon request\n",
                 c->c_connid, op->o_client_msgid );
 
-        operation_destroy_from_client( op );
-        CONNECTION_DESTROY(c);
+        operation_unlink( op );
+        CONNECTION_LOCK_DESTROY(c);
         return -1;
     }
 
+    CONNECTION_LOCK(c);
     request = tavl_find( c->c_ops, &needle, operation_client_cmp );
     if ( !request ) {
         Debug( LDAP_DEBUG_STATS, "request_abandon: "
                 "connid=%lu msgid=%d requests abandon of an operation "
                 "msgid=%d not being processed anymore\n",
                 c->c_connid, op->o_client_msgid, needle.o_client_msgid );
+        CONNECTION_UNLOCK(c);
         goto done;
     } else if ( request->o_tag == LDAP_REQ_BIND ) {
         /* RFC 4511 states we must not allow Abandon on Binds */
@@ -59,6 +63,7 @@ request_abandon( LloadConnection *c, LloadOperation *op )
                 "connid=%lu msgid=%d requests abandon of a bind operation "
                 "msgid=%d\n",
                 c->c_connid, op->o_client_msgid, needle.o_client_msgid );
+        CONNECTION_UNLOCK(c);
         goto done;
     }
     Debug( LDAP_DEBUG_STATS, "request_abandon: "
@@ -67,20 +72,14 @@ request_abandon( LloadConnection *c, LloadOperation *op )
             lload_msgtype2str( request->o_tag ), needle.o_client_msgid );
 
     if ( c->c_state == LLOAD_C_BINDING ) {
-        /* We have found the request and we are binding, it must be a bind
-         * request */
-        assert( request->o_tag == LDAP_REQ_BIND );
-        c->c_state = LLOAD_C_READY;
+        assert(0);
     }
 
-    /* operation_abandon requires a reference since it is passed with c unlocked */
-    request->o_client_refcnt++;
-    CONNECTION_UNLOCK_INCREF(c);
+    CONNECTION_UNLOCK(c);
     operation_abandon( request );
-    CONNECTION_LOCK_DECREF(c);
 
 done:
-    operation_destroy_from_client( op );
+    operation_unlink( op );
     return rc;
 }
 
@@ -92,9 +91,6 @@ request_process( LloadConnection *client, LloadOperation *op )
     ber_int_t msgid;
     int res, rc = LDAP_SUCCESS;
 
-    op->o_client_refcnt++;
-    CONNECTION_UNLOCK_INCREF(client);
-
     upstream = backend_select( op, &res );
     if ( !upstream ) {
         Debug( LDAP_DEBUG_STATS, "request_process: "
@@ -110,16 +106,29 @@ request_process( LloadConnection *client, LloadOperation *op )
 
     output = upstream->c_pendingber;
     if ( output == NULL && (output = ber_alloc()) == NULL ) {
+        LloadBackend *b = upstream->c_private;
+
+        upstream->c_n_ops_executing--;
+        CONNECTION_UNLOCK(upstream);
+        ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
+
+        ldap_pvt_thread_mutex_lock( &b->b_mutex );
+        b->b_n_ops_executing--;
+        operation_update_backend_counters( op, b );
+        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+
+        Debug( LDAP_DEBUG_ANY, "request_process: "
+                "ber_alloc failed\n" );
+
         rc = -1;
         goto fail;
     }
     upstream->c_pendingber = output;
 
-    CONNECTION_LOCK_DECREF(upstream);
     op->o_upstream_msgid = msgid = upstream->c_next_msgid++;
     rc = tavl_insert(
             &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
-    CONNECTION_UNLOCK_INCREF(upstream);
+    CONNECTION_UNLOCK(upstream);
 
     Debug( LDAP_DEBUG_TRACE, "request_process: "
             "client connid=%lu added %s msgid=%d to upstream connid=%lu as "
@@ -132,7 +141,7 @@ request_process( LloadConnection *client, LloadOperation *op )
 
     if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) &&
             client->c_type != LLOAD_C_PRIVILEGED ) {
-        CONNECTION_LOCK_DECREF(client);
+        CONNECTION_LOCK(client);
         Debug( LDAP_DEBUG_TRACE, "request_process: "
                 "proxying identity %s to upstream\n",
                 client->c_auth.bv_val );
@@ -141,7 +150,7 @@ request_process( LloadConnection *client, LloadOperation *op )
                 op->o_tag, &op->o_request,
                 LDAP_TAG_CONTROLS,
                 LDAP_CONTROL_PROXY_AUTHZ, 1, &client->c_auth );
-        CONNECTION_UNLOCK_INCREF(client);
+        CONNECTION_UNLOCK(client);
 
         if ( !BER_BVISNULL( &op->o_ctrls ) ) {
             ber_write( output, op->o_ctrls.bv_val, op->o_ctrls.bv_len, 0 );
@@ -157,37 +166,18 @@ request_process( LloadConnection *client, LloadOperation *op )
     ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
 
     connection_write_cb( -1, 0, upstream );
-
-    CONNECTION_LOCK_DECREF(upstream);
-    CONNECTION_UNLOCK_OR_DESTROY(upstream);
-
-    CONNECTION_LOCK_DECREF(client);
-    if ( !--op->o_client_refcnt ) {
-        operation_destroy_from_client( op );
-    }
     return rc;
 
 fail:
     if ( upstream ) {
-        LloadBackend *b;
-
-        ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
-        CONNECTION_LOCK_DECREF(upstream);
-        upstream->c_n_ops_executing--;
-        b = (LloadBackend *)upstream->c_private;
-        CONNECTION_UNLOCK_OR_DESTROY(upstream);
-
-        ldap_pvt_thread_mutex_lock( &b->b_mutex );
-        b->b_n_ops_executing--;
-        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+        CONNECTION_LOCK_DESTROY(upstream);
 
         operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
     }
-    CONNECTION_LOCK_DECREF(client);
-    op->o_client_refcnt--;
-    operation_destroy_from_client( op );
+
+    operation_unlink( op );
     if ( rc ) {
-        CONNECTION_DESTROY(client);
+        CONNECTION_LOCK_DESTROY(client);
     }
     return rc;
 }
@@ -202,6 +192,7 @@ handle_one_request( LloadConnection *c )
     ber = c->c_currentber;
     c->c_currentber = NULL;
 
+    CONNECTION_LOCK(c);
     op = operation_init( c, ber );
     if ( !op ) {
         Debug( LDAP_DEBUG_ANY, "handle_one_request: "
@@ -211,16 +202,18 @@ handle_one_request( LloadConnection *c )
         ber_free( ber, 1 );
         return -1;
     }
+    CONNECTION_UNLOCK(c);
 
     switch ( op->o_tag ) {
         case LDAP_REQ_UNBIND:
             /* There is never a response for this operation */
             op->o_res = LLOAD_OP_COMPLETED;
-            operation_destroy_from_client( op );
+            operation_unlink( op );
+
             Debug( LDAP_DEBUG_STATS, "handle_one_request: "
                     "received unbind, closing client connid=%lu\n",
                     c->c_connid );
-            CONNECTION_DESTROY(c);
+            CONNECTION_LOCK_DESTROY(c);
             return -1;
         case LDAP_REQ_BIND:
             handler = request_bind;
@@ -234,16 +227,18 @@ handle_one_request( LloadConnection *c )
             break;
         default:
             if ( c->c_state == LLOAD_C_BINDING ) {
-                return operation_send_reject_locked(
+                operation_send_reject(
                         op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
+                return LDAP_SUCCESS;
             }
             handler = request_process;
             break;
     }
 
     if ( c->c_state == LLOAD_C_CLOSING ) {
-        return operation_send_reject_locked(
+        operation_send_reject(
                 op, LDAP_UNAVAILABLE, "connection is shutting down", 0 );
+        return LDAP_SUCCESS;
     }
 
     return handler( c, op );
@@ -256,9 +251,9 @@ void
 client_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
 {
     LloadConnection *c = arg;
+    epoch_t epoch;
     int rc = 0;
 
-    CONNECTION_LOCK_DECREF(c);
     if ( what & EV_TIMEOUT ) {
         Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: "
                 "connid=%lu, timeout reached, destroying\n",
@@ -274,27 +269,26 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
     ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
     if ( c->c_pendingber ) {
         ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-        CONNECTION_UNLOCK_INCREF(c);
         connection_write_cb( s, what, arg );
-        ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
-        CONNECTION_LOCK_DECREF(c);
 
+        CONNECTION_LOCK(c);
         if ( !c->c_live ) {
-            ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+            CONNECTION_UNLOCK(c);
             goto fail;
         }
+        CONNECTION_UNLOCK(c);
 
         /* Do we still have data pending? If so, connection_write_cb would
          * already have arranged the write callback to trigger again */
+        ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
         if ( c->c_pendingber ) {
             ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-            CONNECTION_UNLOCK_INCREF(c);
             return;
         }
     }
-    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
     rc = ldap_pvt_tls_accept( c->c_sb, LLOAD_TLS_CTX );
+    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
     if ( rc < 0 ) {
         goto fail;
     }
@@ -308,13 +302,16 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
          * This is deadlock-safe, since both share the same base - the one
          * that's just running us.
          */
+        CONNECTION_LOCK(c);
         event_del( c->c_read_event );
         event_del( c->c_write_event );
 
         c->c_read_timeout = NULL;
         event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST,
                 connection_read_cb, c );
-        event_add( c->c_read_event, c->c_read_timeout );
+        if ( c->c_live ) {
+            event_add( c->c_read_event, c->c_read_timeout );
+        }
 
         event_assign( c->c_write_event, base, c->c_fd, EV_WRITE,
                 connection_write_cb, c );
@@ -323,24 +320,29 @@ client_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
                 c->c_connid );
 
         c->c_is_tls = LLOAD_TLS_ESTABLISHED;
-
-        /* The temporary reference established for us is no longer needed */
-        CONNECTION_UNLOCK_OR_DESTROY(c);
+        CONNECTION_UNLOCK(c);
         return;
     } else if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
-        event_add( c->c_write_event, lload_write_timeout );
+        CONNECTION_LOCK(c);
+        if ( c->c_live ) {
+            event_add( c->c_write_event, lload_write_timeout );
+        }
+        CONNECTION_UNLOCK(c);
         Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: "
                 "connid=%lu need write rc=%d\n",
                 c->c_connid, rc );
     }
-    CONNECTION_UNLOCK_INCREF(c);
     return;
 
 fail:
     Debug( LDAP_DEBUG_CONNS, "client_tls_handshake_cb: "
             "connid=%lu failed rc=%d\n",
             c->c_connid, rc );
-    CONNECTION_DESTROY(c);
+
+    assert( c->c_ops == NULL );
+    epoch = epoch_join();
+    CONNECTION_LOCK_DESTROY(c);
+    epoch_leave( epoch );
 }
 
 LloadConnection *
@@ -379,11 +381,11 @@ client_init(
             Debug( LDAP_DEBUG_CONNS, "client_init: "
                     "connid=%lu failed initial TLS accept rc=%d\n",
                     c->c_connid, rc );
+            CONNECTION_LOCK(c);
             goto fail;
         }
 
         if ( rc ) {
-            c->c_refcnt++;
             c->c_read_timeout = lload_timeout_net;
             read_cb = write_cb = client_tls_handshake_cb;
         }
@@ -393,30 +395,32 @@ client_init(
     if ( !event ) {
         Debug( LDAP_DEBUG_ANY, "client_init: "
                 "Read event could not be allocated\n" );
+        CONNECTION_LOCK(c);
         goto fail;
     }
     c->c_read_event = event;
-    event_add( c->c_read_event, c->c_read_timeout );
 
     event = event_new( base, s, EV_WRITE, write_cb, c );
     if ( !event ) {
         Debug( LDAP_DEBUG_ANY, "client_init: "
                 "Write event could not be allocated\n" );
+        CONNECTION_LOCK(c);
         goto fail;
     }
-    /* We only register the write event when we have data pending */
     c->c_write_event = event;
 
     c->c_private = listener;
     c->c_destroy = client_destroy;
+    c->c_unlink = client_unlink;
     c->c_pdu_cb = handle_one_request;
 
-    /* There should be no lock inversion yet since no other thread could
-     * approach it from clients side */
+    CONNECTION_LOCK(c);
+    /* We only register the write event when we have data pending */
+    event_add( c->c_read_event, c->c_read_timeout );
+
     ldap_pvt_thread_mutex_lock( &clients_mutex );
     LDAP_CIRCLEQ_INSERT_TAIL( &clients, c, c_next );
     ldap_pvt_thread_mutex_unlock( &clients_mutex );
-
     CONNECTION_UNLOCK(c);
 
     return c;
@@ -431,8 +435,9 @@ fail:
     }
 
     c->c_state = LLOAD_C_INVALID;
-    CONNECTION_DESTROY(c);
-    assert( c == NULL );
+    c->c_live--;
+    c->c_refcnt--;
+    connection_destroy( c );
     return NULL;
 }
 
@@ -444,19 +449,6 @@ client_reset( LloadConnection *c )
     root = c->c_ops;
     c->c_ops = NULL;
 
-    /* unless op->o_client_refcnt > op->o_client_live, there is noone using the
-     * operation from the client side and noone new will now that we've removed
-     * it from client's c_ops */
-    if ( root ) {
-        TAvlnode *node = tavl_end( root, TAVL_DIR_LEFT );
-        do {
-            LloadOperation *op = node->avl_data;
-
-            /* make sure it's useable after we've unlocked the connection */
-            op->o_client_refcnt++;
-        } while ( (node = tavl_next( node, TAVL_DIR_RIGHT )) );
-    }
-
     if ( !BER_BVISNULL( &c->c_auth ) ) {
         ch_free( c->c_auth.bv_val );
         BER_BVZERO( &c->c_auth );
@@ -465,7 +457,7 @@ client_reset( LloadConnection *c )
         ch_free( c->c_sasl_bind_mech.bv_val );
         BER_BVZERO( &c->c_sasl_bind_mech );
     }
-    CONNECTION_UNLOCK_INCREF(c);
+    CONNECTION_UNLOCK(c);
 
     if ( root ) {
         int freed;
@@ -475,38 +467,29 @@ client_reset( LloadConnection *c )
                 freed );
     }
 
-    CONNECTION_LOCK_DECREF(c);
+    CONNECTION_LOCK(c);
 }
 
 void
-client_destroy( LloadConnection *c )
+client_unlink( LloadConnection *c )
 {
     enum sc_state state;
     struct event *read_event, *write_event;
 
-    Debug( LDAP_DEBUG_CONNS, "client_destroy: "
-            "destroying client connid=%lu\n",
+    Debug( LDAP_DEBUG_CONNS, "client_unlink: "
+            "removing client connid=%lu\n",
             c->c_connid );
 
     assert( c->c_state != LLOAD_C_INVALID );
+    assert( c->c_state != LLOAD_C_DYING );
+
     state = c->c_state;
-    c->c_state = LLOAD_C_INVALID;
+    c->c_state = LLOAD_C_DYING;
 
     read_event = c->c_read_event;
     write_event = c->c_write_event;
+    CONNECTION_UNLOCK(c);
 
-    /*
-     * FIXME: operation_destroy_from_upstream might copy op->o_client and bump
-     * c_refcnt, it is then responsible to call destroy_client again, does that
-     * mean that we can be triggered for recursion over all connections?
-     */
-    CONNECTION_UNLOCK_INCREF(c);
-
-    /*
-     * Avoid a deadlock:
-     * event_del will block if the event is currently executing its callback,
-     * that callback might be waiting to lock c->c_mutex
-     */
     if ( read_event ) {
         event_del( read_event );
     }
@@ -521,7 +504,20 @@ client_destroy( LloadConnection *c )
         ldap_pvt_thread_mutex_unlock( &clients_mutex );
     }
 
-    CONNECTION_LOCK_DECREF(c);
+    CONNECTION_LOCK(c);
+    client_reset( c );
+}
+
+void
+client_destroy( LloadConnection *c )
+{
+    Debug( LDAP_DEBUG_CONNS, "client_destroy: "
+            "destroying client connid=%lu\n",
+            c->c_connid );
+
+    CONNECTION_LOCK(c);
+    assert( c->c_state == LLOAD_C_DYING );
+    c->c_state = LLOAD_C_INVALID;
 
     if ( c->c_read_event ) {
         event_free( c->c_read_event );
@@ -533,23 +529,7 @@ client_destroy( LloadConnection *c )
         c->c_write_event = NULL;
     }
 
-    client_reset( c );
-
-    /*
-     * If we attempted to destroy any operations, we might have lent a new
-     * refcnt token for a thread that raced us to that, let them call us again
-     * later
-     */
-    assert( c->c_refcnt >= 0 );
-    if ( c->c_refcnt ) {
-        c->c_state = LLOAD_C_DYING;
-        Debug( LDAP_DEBUG_CONNS, "client_destroy: "
-                "connid=%lu aborting with refcnt=%d\n",
-                c->c_connid, c->c_refcnt );
-        CONNECTION_UNLOCK(c);
-        return;
-    }
-
+    assert( c->c_refcnt == 0 );
     connection_destroy( c );
 }
 
index e8e805c46769486df58da0ef136d4f3db275dec3..e1b4c39019c8db64c44d62054df781dd1806c9b5 100644 (file)
 #include "lutil.h"
 #include "lutil_ldap.h"
 
-static ldap_pvt_thread_mutex_t conn_nextid_mutex;
 static unsigned long conn_nextid = 0;
 
 static void
 lload_connection_assign_nextid( LloadConnection *conn )
 {
-    ldap_pvt_thread_mutex_lock( &conn_nextid_mutex );
-    conn->c_connid = conn_nextid++;
-    ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex );
+    conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED );
 }
 
 /*
@@ -73,37 +70,40 @@ handle_pdus( void *ctx, void *arg )
 {
     LloadConnection *c = arg;
     int pdus_handled = 0;
+    epoch_t epoch;
 
-    CONNECTION_LOCK_DECREF(c);
+    /* A reference was passed on to us */
+    assert( IS_ALIVE( c, c_refcnt ) );
+
+    epoch = epoch_join();
     for ( ;; ) {
         BerElement *ber;
         ber_tag_t tag;
         ber_len_t len;
 
-        /* handle_one_response may unlock the connection in the process, we
-         * need to expect that might be our responsibility to destroy it */
         if ( c->c_pdu_cb( c ) ) {
-            /* Error, connection is unlocked and might already have been
-             * destroyed */
-            return NULL;
+            /* Error/reset, get rid ouf our reference and bail */
+            goto done;
         }
-        /* Otherwise, handle_one_request leaves the connection locked */
 
         if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) {
             /* Do not read now, re-enable read event instead */
             break;
         }
 
-        if ( (ber = ber_alloc()) == NULL ) {
+        ber = c->c_currentber;
+        if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
             Debug( LDAP_DEBUG_ANY, "handle_pdus: "
                     "connid=%lu, ber_alloc failed\n",
                     c->c_connid );
-            CONNECTION_DESTROY(c);
-            return NULL;
+            CONNECTION_LOCK_DESTROY(c);
+            goto done;
         }
         c->c_currentber = ber;
 
+        ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
         tag = ber_get_next( c->c_sb, &len, ber );
+        ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
         if ( tag != LDAP_TAG_MESSAGE ) {
             int err = sock_errno();
 
@@ -123,8 +123,8 @@ handle_pdus( void *ctx, void *arg )
 
                 c->c_currentber = NULL;
                 ber_free( ber, 1 );
-                CONNECTION_DESTROY(c);
-                return NULL;
+                CONNECTION_LOCK_DESTROY(c);
+                goto done;
             }
             break;
         }
@@ -134,7 +134,9 @@ handle_pdus( void *ctx, void *arg )
     Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
             "re-enabled read event on connid=%lu\n",
             c->c_connid );
-    CONNECTION_UNLOCK_OR_DESTROY(c);
+done:
+    RELEASE_REF( c, c_refcnt, c->c_destroy );
+    epoch_leave( epoch );
     return NULL;
 }
 
@@ -152,25 +154,35 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
     BerElement *ber;
     ber_tag_t tag;
     ber_len_t len;
+    epoch_t epoch;
 
     CONNECTION_LOCK(c);
     if ( !c->c_live ) {
         event_del( c->c_read_event );
+        CONNECTION_UNLOCK(c);
         Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
                 "suspended read event on a dead connid=%lu\n",
                 c->c_connid );
-        CONNECTION_UNLOCK(c);
         return;
     }
+    CONNECTION_UNLOCK(c);
 
     if ( what & EV_TIMEOUT ) {
         Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
                 "connid=%lu, timeout reached, destroying\n",
                 c->c_connid );
-        CONNECTION_DESTROY(c);
+        /* Make sure the connection stays around for us to unlock it */
+        epoch = epoch_join();
+        CONNECTION_LOCK_DESTROY(c);
+        epoch_leave( epoch );
         return;
     }
 
+    if ( !acquire_ref( &c->c_refcnt ) ) {
+        return;
+    }
+    epoch = epoch_join();
+
     Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
             "connection connid=%lu ready to read\n",
             c->c_connid );
@@ -180,12 +192,14 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
         Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
                 "connid=%lu, ber_alloc failed\n",
                 c->c_connid );
-        CONNECTION_DESTROY(c);
-        return;
+        goto out;
     }
     c->c_currentber = ber;
 
+    ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
     tag = ber_get_next( c->c_sb, &len, ber );
+    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
     if ( tag != LDAP_TAG_MESSAGE ) {
         int err = sock_errno();
 
@@ -210,65 +224,78 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
             Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
                     "suspended read event on dying connid=%lu\n",
                     c->c_connid );
-            CONNECTION_DESTROY(c);
-            return;
+            CONNECTION_LOCK_DESTROY(c);
+            goto out;
         }
         event_add( c->c_read_event, c->c_read_timeout );
         Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
                 "re-enabled read event on connid=%lu\n",
                 c->c_connid );
-        CONNECTION_UNLOCK(c);
-        return;
+        goto out;
     }
 
+    event_del( c->c_read_event );
     if ( !lload_conn_max_pdus_per_cycle ||
             ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
         /* If we're overloaded or configured as such, process one and resume in
-         * the next cycle.
-         *
-         * handle_one_request re-locks the mutex in the
-         * process, need to test it's still alive */
-        if ( c->c_pdu_cb( c ) == LDAP_SUCCESS ) {
-            CONNECTION_UNLOCK_OR_DESTROY(c);
-        }
-        return;
+         * the next cycle. */
+        event_add( c->c_read_event, c->c_read_timeout );
+        c->c_pdu_cb( c );
+        goto out;
     }
 
-    event_del( c->c_read_event );
     Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
             "suspended read event on connid=%lu\n",
             c->c_connid );
 
-    /* We have scheduled a call to handle_requests which takes care of
-     * handling further requests, just make sure the connection sticks around
-     * for that */
-    CONNECTION_UNLOCK_INCREF(c);
+    /*
+     * We have scheduled a call to handle_pdus to take care of handling this
+     * and further requests, its reference is now owned by that task.
+     */
+    epoch_leave( epoch );
     return;
+
+out:
+    RELEASE_REF( c, c_refcnt, c->c_destroy );
+    epoch_leave( epoch );
 }
 
 void
 connection_write_cb( evutil_socket_t s, short what, void *arg )
 {
     LloadConnection *c = arg;
+    epoch_t epoch;
 
     CONNECTION_LOCK(c);
+    Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
+            "considering writing to%s connid=%lu what=%hd\n",
+            c->c_live ? " live" : " dead", c->c_connid, what );
     if ( !c->c_live ) {
         CONNECTION_UNLOCK(c);
         return;
     }
+    CONNECTION_UNLOCK(c);
 
     if ( what & EV_TIMEOUT ) {
         Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
                 "connid=%lu, timeout reached, destroying\n",
                 c->c_connid );
-        CONNECTION_DESTROY(c);
+        /* Make sure the connection stays around for us to unlock it */
+        epoch = epoch_join();
+        CONNECTION_LOCK_DESTROY(c);
+        epoch_leave( epoch );
         return;
     }
-    CONNECTION_UNLOCK_INCREF(c);
 
     /* Before we acquire any locks */
     event_del( c->c_write_event );
 
+    if ( !acquire_ref( &c->c_refcnt ) ) {
+        return;
+    }
+
+    epoch = epoch_join();
+
     ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
     Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
             "have something to write to connection connid=%lu\n",
@@ -285,7 +312,7 @@ connection_write_cb( evutil_socket_t s, short what, void *arg )
                     "ber_flush on fd=%d failed errno=%d (%s)\n",
                     c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
             CONNECTION_LOCK_DESTROY(c);
-            return;
+            goto done;
         }
         event_add( c->c_write_event, lload_write_timeout );
     } else {
@@ -293,8 +320,9 @@ connection_write_cb( evutil_socket_t s, short what, void *arg )
     }
     ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
-    CONNECTION_LOCK_DECREF(c);
-    CONNECTION_UNLOCK_OR_DESTROY(c);
+done:
+    RELEASE_REF( c, c_refcnt, c->c_destroy );
+    epoch_leave( epoch );
 }
 
 void
@@ -356,85 +384,54 @@ connections_walk_last(
         CONNCB cb,
         void *arg )
 {
-    LloadConnection *c, *old;
-    unsigned long last_connid;
+    LloadConnection *c = cq_last;
+    uintptr_t last_connid;
 
     if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
         return;
     }
-    last_connid = cq_last->c_connid;
-    c = LDAP_CIRCLEQ_LOOP_NEXT( cq, cq_last, c_next );
-    assert( c->c_connid <= last_connid );
+    last_connid = c->c_connid;
+    c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
 
-    CONNECTION_LOCK(c);
-    ldap_pvt_thread_mutex_unlock( cq_mutex );
+    while ( !acquire_ref( &c->c_refcnt ) ) {
+        c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
+        if ( c->c_connid >= last_connid ) {
+            return;
+        }
+    }
 
     /*
-     * Ugh... concurrency is annoying:
+     * Notes:
      * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
      *   order
-     * - the connection with the highest c_connid is maintained at cq_last
+     * - the connection with the highest c_connid is passed in cq_last
      * - we can only use cq when we hold cq_mutex
      * - connections might be added to or removed from cq while we're busy
      *   processing connections
-     * - connection_destroy touches cq
-     * - we can't even hold locks of two different connections
      * - we need a way to detect we've finished looping around cq for some
      *   definition of looping around
-     *
-     * So as a result, 90% of the code below is spent navigating that...
      */
-    while ( c->c_connid <= last_connid ) {
-        /* Do not permit the callback to actually free the connection even if
-         * it wants to, we need it to traverse cq */
-        c->c_refcnt++;
-        if ( cb( c, arg ) ) {
-            c->c_refcnt--;
-            break;
-        }
-        c->c_refcnt--;
+    do {
+        int rc;
 
-        if ( c->c_connid == last_connid ) {
-            break;
-        }
+        ldap_pvt_thread_mutex_unlock( cq_mutex );
 
-        CONNECTION_UNLOCK_INCREF(c);
+        rc = cb( c, arg );
+        RELEASE_REF( c, c_refcnt, c->c_destroy );
 
         ldap_pvt_thread_mutex_lock( cq_mutex );
-        old = c;
-retry:
-        c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
-
-        if ( c->c_connid <= old->c_connid ) {
-            ldap_pvt_thread_mutex_unlock( cq_mutex );
-
-            CONNECTION_LOCK_DECREF(old);
-            CONNECTION_UNLOCK_OR_DESTROY(old);
-
-            ldap_pvt_thread_mutex_lock( cq_mutex );
-            return;
+        if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) {
+            break;
         }
 
-        CONNECTION_LOCK(c);
-        assert( c->c_state != LLOAD_C_DYING );
-        if ( c->c_state == LLOAD_C_INVALID ) {
-            /* This dying connection will be unlinked once we release cq_mutex
-             * and it wouldn't be safe to iterate further, skip over it */
-            CONNECTION_UNLOCK(c);
-            goto retry;
-        }
-        CONNECTION_UNLOCK_INCREF(c);
-        ldap_pvt_thread_mutex_unlock( cq_mutex );
-
-        CONNECTION_LOCK_DECREF(old);
-        CONNECTION_UNLOCK_OR_DESTROY(old);
-
-        CONNECTION_LOCK_DECREF(c);
-        assert( c->c_state != LLOAD_C_DYING );
-        assert( c->c_state != LLOAD_C_INVALID );
-    }
-    CONNECTION_UNLOCK_OR_DESTROY(c);
-    ldap_pvt_thread_mutex_lock( cq_mutex );
+        do {
+            LloadConnection *old = c;
+            c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
+            if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) {
+                return;
+            }
+        } while ( !acquire_ref( &c->c_refcnt ) );
+    } while ( c->c_connid <= last_connid );
 }
 
 void
@@ -448,44 +445,44 @@ connections_walk(
     return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
 }
 
-/*
- * Caller is expected to hold the lock.
- */
 int
 lload_connection_close( LloadConnection *c, void *arg )
 {
-    TAvlnode *node;
     int gentle = *(int *)arg;
+    LloadOperation *op;
 
-    if ( !c->c_live ) {
-        return LDAP_SUCCESS;
-    }
+    Debug( LDAP_DEBUG_CONNS, "lload_connection_close: "
+            "marking connection connid=%lu closing\n",
+            c->c_connid );
+
+    /* We were approached from the connection list */
+    assert( IS_ALIVE( c, c_refcnt ) );
 
-    if ( !gentle ) {
-        /* Caller has a reference on this connection,
-         * it doesn't actually die here */
+    CONNECTION_LOCK(c);
+    if ( !gentle || !c->c_ops ) {
         CONNECTION_DESTROY(c);
-        assert( c );
-        CONNECTION_LOCK(c);
         return LDAP_SUCCESS;
     }
 
     /* The first thing we do is make sure we don't get new Operations in */
     c->c_state = LLOAD_C_CLOSING;
 
-    for ( node = tavl_end( c->c_ops, TAVL_DIR_LEFT ); node;
-            node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
-        LloadOperation *op = node->avl_data;
+    do {
+        TAvlnode *node = tavl_end( c->c_ops, TAVL_DIR_LEFT );
+        op = node->avl_data;
 
-        if ( op->o_client_msgid == 0 ) {
-            if ( op->o_client == c ) {
-                operation_destroy_from_client( op );
-            } else {
-                assert( op->o_upstream == c );
-                operation_destroy_from_upstream( op );
-            }
+        /* Close operations that would need client action to resolve,
+         * only SASL binds in progress do that right now */
+        if ( op->o_client_msgid || op->o_upstream_msgid ) {
+            break;
         }
-    }
+
+        CONNECTION_UNLOCK(c);
+        operation_unlink( op );
+        CONNECTION_LOCK(c);
+    } while ( c->c_ops );
+
+    CONNECTION_UNLOCK(c);
     return LDAP_SUCCESS;
 }
 
@@ -550,7 +547,6 @@ lload_connection_init( ber_socket_t s, const char *peername, int flags )
             "connection connid=%lu allocated for socket fd=%d peername=%s\n",
             c->c_connid, s, peername );
 
-    CONNECTION_LOCK(c);
     c->c_state = LLOAD_C_ACTIVE;
 
     return c;
index 80c61972ffa8dad369023226f45352f1b92178e8..21847f21e8fea6ea46fa09948a591ad2940882ed 100644 (file)
@@ -759,6 +759,7 @@ lloadd_listeners_init( const char *urls )
 int
 lloadd_daemon_destroy( void )
 {
+    epoch_shutdown();
     if ( lloadd_inited ) {
         int i;
 
@@ -1674,8 +1675,7 @@ lload_handle_global_invalidation( LloadChange *change )
                 LloadConnection *next =
                         LDAP_CIRCLEQ_LOOP_NEXT( &clients, c, c_next );
                 if ( c->c_is_tls ) {
-                    CONNECTION_LOCK(c);
-                    CONNECTION_DESTROY(c);
+                    CONNECTION_LOCK_DESTROY(c);
                     assert( c == NULL );
                 }
                 c = next;
diff --git a/servers/lloadd/epoch.c b/servers/lloadd/epoch.c
new file mode 100644 (file)
index 0000000..790a296
--- /dev/null
@@ -0,0 +1,228 @@
+/* epoch.c - epoch based memory reclamation */
+/* $OpenLDAP$ */
+/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
+ *
+ * Copyright 2018-2020 The OpenLDAP Foundation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ */
+
+/** @file epoch.c
+ *
+ * Implementation of epoch based memory reclamation, in principle
+ * similar to the algorithm presented in
+ * https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf
+ *
+ * Not completely lock-free at the moment.
+ *
+ * Also the problems with epoch based memory reclamation are still
+ * present - a thread actively observing an epoch getting stuck will
+ * prevent managed objects (in our case connections and operations)
+ * from being freed, potentially running out of memory.
+ */
+
+#include "portable.h"
+
+#include "lload.h"
+#include <epoch.h>
+
+/* Has to be >= 3 */
+#define EPOCH_MASK ( 1 << 2 )
+#define EPOCH_PREV(epoch) ( ( (epoch) + EPOCH_MASK - 1 ) % EPOCH_MASK )
+#define EPOCH_NEXT(epoch) ( ( (epoch) + 1 ) % EPOCH_MASK )
+
+struct pending_ref {
+    void *object;
+    dispose_cb *dispose;
+    struct pending_ref *next;
+};
+
+ldap_pvt_thread_rdwr_t epoch_mutex;
+
+static epoch_t current_epoch;
+static uintptr_t epoch_threads[EPOCH_MASK];
+static struct pending_ref *references[EPOCH_MASK];
+
+void
+epoch_init( void )
+{
+    epoch_t epoch;
+
+    current_epoch = 0;
+    for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) {
+        assert( !epoch_threads[epoch] );
+        assert( !references[epoch] );
+    }
+
+    ldap_pvt_thread_rdwr_init( &epoch_mutex );
+}
+
+void
+epoch_shutdown( void )
+{
+    epoch_t epoch;
+    struct pending_ref *old, *next;
+
+    for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) {
+        assert( !epoch_threads[epoch] );
+    }
+
+    /* Free pending references */
+    epoch = EPOCH_PREV(current_epoch);
+    next = references[epoch];
+    references[epoch] = NULL;
+    for ( old = next; old; old = next ) {
+        next = old->next;
+
+        old->dispose( old->object );
+        ch_free( old );
+    }
+
+    epoch = current_epoch;
+    next = references[epoch];
+    references[epoch] = NULL;
+    for ( old = next; old; old = next ) {
+        next = old->next;
+
+        old->dispose( old->object );
+        ch_free( old );
+    }
+
+    /* No references should exist anywhere now */
+    for ( epoch = 0; epoch < EPOCH_MASK; epoch++ ) {
+        assert( !references[epoch] );
+    }
+
+    ldap_pvt_thread_rdwr_destroy( &epoch_mutex );
+}
+
+epoch_t
+epoch_join( void )
+{
+    epoch_t epoch;
+    struct pending_ref *old, *ref = NULL;
+
+    /* TODO: make this completely lock-free */
+    ldap_pvt_thread_rdwr_rlock( &epoch_mutex );
+    epoch = current_epoch;
+    __atomic_add_fetch( &epoch_threads[epoch], 1, __ATOMIC_ACQ_REL );
+    ldap_pvt_thread_rdwr_runlock( &epoch_mutex );
+
+    if ( __atomic_load_n(
+                 &epoch_threads[EPOCH_PREV(epoch)], __ATOMIC_ACQUIRE ) ) {
+        return epoch;
+    }
+
+    __atomic_exchange(
+            &references[EPOCH_PREV(epoch)], &ref, &ref, __ATOMIC_ACQ_REL );
+
+    Debug( LDAP_DEBUG_TRACE, "epoch_join: "
+            "advancing epoch to %zu with %s objects to free\n",
+            EPOCH_NEXT(epoch), ref ? "some" : "no" );
+
+    ldap_pvt_thread_rdwr_wlock( &epoch_mutex );
+    current_epoch = EPOCH_NEXT(epoch);
+    ldap_pvt_thread_rdwr_wunlock( &epoch_mutex );
+
+    for ( old = ref; old; old = ref ) {
+        ref = old->next;
+
+        old->dispose( old->object );
+        ch_free( old );
+    }
+
+    return epoch;
+}
+
+void
+epoch_leave( epoch_t epoch )
+{
+    __atomic_sub_fetch( &epoch_threads[epoch], 1, __ATOMIC_ACQ_REL );
+}
+
+/*
+ * Add the object to the "current global epoch", not the epoch our thread
+ * entered.
+ */
+void
+epoch_append( void *ptr, dispose_cb *cb )
+{
+    struct pending_ref *new;
+    epoch_t epoch = __atomic_load_n( &current_epoch, __ATOMIC_ACQUIRE );
+
+    /*
+     * BTW, the following is not appropriate here:
+     * assert( __atomic_load_n( &epoch_threads[epoch], __ATOMIC_RELAXED ) );
+     *
+     * We might be a thread lagging behind in the "previous epoch" with no
+     * other threads executing at all.
+     */
+
+    new = ch_malloc( sizeof(struct pending_ref) );
+    new->object = ptr;
+    new->dispose = cb;
+    new->next = __atomic_load_n( &references[epoch], __ATOMIC_ACQUIRE );
+
+    while ( !__atomic_compare_exchange( &references[epoch], &new->next, &new, 0,
+            __ATOMIC_RELEASE, __ATOMIC_RELAXED ) )
+        /* iterate until we succeed */;
+}
+
+int
+acquire_ref( uintptr_t *refp )
+{
+    uintptr_t refcnt, new_refcnt;
+
+    refcnt = __atomic_load_n( refp, __ATOMIC_ACQUIRE );
+
+    /*
+     * If we just incremented the refcnt and checked for zero after, another
+     * thread might falsely believe the object was going to stick around.
+     *
+     * Checking whether the object is still dead at disposal time might not be
+     * able to distinguish it from being freed in a later epoch.
+     */
+    do {
+        if ( !refcnt ) {
+            return refcnt;
+        }
+
+        new_refcnt = refcnt + 1;
+    } while ( !__atomic_compare_exchange( refp, &refcnt, &new_refcnt, 0,
+            __ATOMIC_RELEASE, __ATOMIC_RELAXED ) );
+    assert( new_refcnt == refcnt + 1 );
+
+    return refcnt;
+}
+
+int
+try_release_ref( uintptr_t *refp, void *object, dispose_cb *cb )
+{
+    uintptr_t refcnt, new_refcnt;
+
+    refcnt = __atomic_load_n( refp, __ATOMIC_ACQUIRE );
+
+    /* We promise the caller that we won't decrease refcnt below 0 */
+    do {
+        if ( !refcnt ) {
+            return refcnt;
+        }
+
+        new_refcnt = refcnt - 1;
+    } while ( !__atomic_compare_exchange( refp, &refcnt, &new_refcnt, 0,
+            __ATOMIC_RELEASE, __ATOMIC_RELAXED ) );
+    assert( new_refcnt == refcnt - 1 );
+
+    if ( !new_refcnt ) {
+        epoch_append( object, cb );
+    }
+
+    return refcnt;
+}
diff --git a/servers/lloadd/epoch.h b/servers/lloadd/epoch.h
new file mode 100644 (file)
index 0000000..b5ae045
--- /dev/null
@@ -0,0 +1,143 @@
+/* epoch.h - epoch based memory reclamation */
+/* $OpenLDAP$ */
+/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
+ *
+ * Copyright 2018-2020 The OpenLDAP Foundation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ */
+
+#ifndef __LLOAD_EPOCH_H
+#define __LLOAD_EPOCH_H
+
+/** @file epoch.h
+ *
+ * Implementation of epoch based memory reclamation, in principle
+ * similar to the algorithm presented in
+ * https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-579.pdf
+ */
+
+typedef uintptr_t epoch_t;
+
+/** @brief A callback function used to free object and associated data */
+typedef void (dispose_cb)( void *object );
+
+/** @brief Initiate global state */
+void epoch_init( void );
+
+/** @brief Finalise global state and free any objects still pending */
+void epoch_shutdown( void );
+
+/** @brief Register thread as active
+ *
+ * In order to safely access managed objects, a thread should call
+ * this function or make sure no other thread is running (e.g. config
+ * pause, late shutdown). After calling this, it is guaranteed that no
+ * reachable objects will be freed before all threads have called
+ * `epoch_leave( current_epoch + 1 )` so it is essential that there
+ * is an upper limit to the amount of time between #epoch_join and
+ * corresponding #epoch_leave or the number of unfreed objects might
+ * grow without bounds.
+ *
+ * To simplify locking, memory is only freed when the current epoch
+ * is advanced rather than on leaving it.
+ *
+ * Can be safely called multiple times by the same thread as long as
+ * a matching #epoch_leave() call is made eventually.
+ *
+ * @return The observed epoch, to be passed to #epoch_leave()
+ */
+epoch_t epoch_join( void );
+
+/** @brief Register thread as inactive
+ *
+ * A thread should call this after they are finished with work
+ * performed since matching call to #epoch_join(). It is not safe
+ * to keep a local reference to managed objects after this call
+ * unless other precautions have been made to prevent it being
+ * released.
+ *
+ * @param[in] epoch Epoch identifier returned by a previous call to
+ * #epoch_join().
+ */
+void epoch_leave( epoch_t epoch );
+
+/** @brief Return an unreachable object to be freed
+ *
+ * The object should already be unreachable at the point of call and
+ * cb will be invoked when no other thread that could have seen it
+ * is active any more. This happens when we have advanced by two
+ * epochs.
+ *
+ * @param[in] ptr Object to be released/freed
+ * @param[in] cb Callback to invoke when safe to do so
+ */
+void epoch_append( void *ptr, dispose_cb *cb );
+
+/**
+ * \defgroup Reference counting helpers
+ */
+/**@{*/
+
+/** @brief Acquire a reference if possible
+ *
+ * Atomically, check reference count is non-zero and increment if so.
+ * Returns old reference count.
+ *
+ * @param[in] refp Pointer to a reference counter
+ * @return 0 if reference was already zero, non-zero if reference
+ * count was successfully incremented
+ */
+int acquire_ref( uintptr_t *refp );
+
+/** @brief Check reference count and try to decrement
+ *
+ * Atomically, decrement reference count if non-zero and register
+ * object if decremented to zero. Returning previous reference count.
+ *
+ * @param[in] refp Pointer to a reference counter
+ * @param[in] object The managed object
+ * @param[in] cb Callback to invoke when safe to do so
+ * @return 0 if reference was already zero, non-zero if reference
+ * count was non-zero at the time of call
+ */
+int try_release_ref( uintptr_t *refp, void *object, dispose_cb *cb );
+
+/** @brief Read reference count
+ *
+ * @param[in] object Pointer to the managed object
+ * @param[in] ref_field Member where reference count is stored in
+ * the object
+ * @return Current value of reference counter
+ */
+#define IS_ALIVE( object, ref_field ) \
+    __atomic_load_n( &(object)->ref_field, __ATOMIC_ACQUIRE )
+
+/** @brief Release reference
+ *
+ * A cheaper alternative to #try_release_ref(), safe only when we know
+ * reference count was already non-zero.
+ *
+ * @param[in] object The managed object
+ * @param[in] ref_field Member where reference count is stored in
+ * the object
+ * @param[in] cb Callback to invoke when safe to do so
+ */
+#define RELEASE_REF( object, ref_field, cb ) \
+    do { \
+        if ( !__atomic_sub_fetch( \
+                     &(object)->ref_field, 1, __ATOMIC_ACQ_REL ) ) { \
+            epoch_append( object, (dispose_cb *)cb ); \
+        } \
+    } while (0)
+
+/**@}*/
+
+#endif /* __LLOAD_EPOCH_H */
index 74ffcdf38c2d73efbe3ab8ed16128d26bcec0fbd..1f22a71968bc4c37e49bf3689ac58c2f237e1847 100644 (file)
@@ -36,6 +36,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
     char *msg = NULL;
     int rc = LDAP_SUCCESS;
 
+    CONNECTION_LOCK(c);
     tavl_delete( &c->c_ops, op, operation_client_cmp );
 
     if ( c->c_is_tls == LLOAD_TLS_ESTABLISHED ) {
@@ -51,6 +52,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
         rc = LDAP_UNAVAILABLE;
         msg = "Could not initialize TLS";
     }
+    CONNECTION_UNLOCK(c);
 
     Debug( LDAP_DEBUG_STATS, "handle_starttls: "
             "handling StartTLS exop connid=%lu rc=%d msg=%s\n",
@@ -58,11 +60,10 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
 
     if ( rc ) {
         /* We've already removed the operation from the queue */
-        return operation_send_reject_locked( op, rc, msg, 1 );
+        operation_send_reject( op, rc, msg, 1 );
+        return LDAP_SUCCESS;
     }
 
-    CONNECTION_UNLOCK_INCREF(c);
-
     event_del( c->c_read_event );
     event_del( c->c_write_event );
     /*
@@ -77,9 +78,8 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
     output = c->c_pendingber;
     if ( output == NULL && (output = ber_alloc()) == NULL ) {
         ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-        CONNECTION_LOCK_DECREF(c);
-        operation_destroy_from_client( op );
-        CONNECTION_DESTROY(c);
+        operation_unlink( op );
+        CONNECTION_LOCK_DESTROY(c);
         return -1;
     }
     c->c_pendingber = output;
@@ -88,7 +88,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
             LDAP_RES_EXTENDED, LDAP_SUCCESS, "", "" );
     ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
-    CONNECTION_LOCK_DECREF(c);
+    CONNECTION_LOCK(c);
     c->c_read_timeout = lload_timeout_net;
     event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST,
             client_tls_handshake_cb, c );
@@ -100,8 +100,9 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
     event_add( c->c_write_event, lload_write_timeout );
 
     op->o_res = LLOAD_OP_COMPLETED;
-    operation_destroy_from_client( op );
-    CONNECTION_UNLOCK_INCREF(c);
+    CONNECTION_UNLOCK(c);
+
+    operation_unlink( op );
 
     return -1;
 }
@@ -115,10 +116,8 @@ request_extended( LloadConnection *c, LloadOperation *op )
     ber_tag_t tag;
 
     if ( (copy = ber_alloc()) == NULL ) {
-        if ( operation_send_reject_locked(
-                     op, LDAP_OTHER, "internal error", 0 ) == LDAP_SUCCESS ) {
-            CONNECTION_DESTROY(c);
-        }
+        operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
+        CONNECTION_LOCK_DESTROY(c);
         return -1;
     }
 
@@ -128,8 +127,9 @@ request_extended( LloadConnection *c, LloadOperation *op )
     if ( tag != LDAP_TAG_EXOP_REQ_OID ) {
         Debug( LDAP_DEBUG_STATS, "request_extended: "
                 "no OID present in extended request\n" );
-        return operation_send_reject_locked(
-                op, LDAP_PROTOCOL_ERROR, "decoding error", 0 );
+        operation_send_reject( op, LDAP_PROTOCOL_ERROR, "decoding error", 0 );
+        CONNECTION_LOCK_DESTROY(c);
+        return -1;
     }
 
     needle.oid = bv;
@@ -145,8 +145,8 @@ request_extended( LloadConnection *c, LloadOperation *op )
     ber_free( copy, 0 );
 
     if ( c->c_state == LLOAD_C_BINDING ) {
-        return operation_send_reject_locked(
-                op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
+        operation_send_reject( op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
+        return LDAP_SUCCESS;
     }
     return request_process( c, op );
 }
index d787a462d79c2c086013ab888f2cca124fb5ac9c..5a96fb1782c86d003da295d28880aa36849e9f5f 100644 (file)
@@ -84,6 +84,8 @@ LDAP_BEGIN_DECL
 
 #define BER_BV_OPTIONAL( bv ) ( BER_BVISNULL( bv ) ? NULL : ( bv ) )
 
+#include <epoch.h>
+
 typedef struct LloadBackend LloadBackend;
 typedef struct LloadPendingConnection LloadPendingConnection;
 typedef struct LloadConnection LloadConnection;
@@ -280,58 +282,39 @@ struct LloadConnection {
  * - also a liveness/validity token is added to c_refcnt during
  *   lload_connection_init, its existence is tracked in c_live and is usually the
  *   only one that prevents it from being destroyed
- * - anyone who needs to be able to lock the connection after unlocking it has
- *   to use CONNECTION_UNLOCK_INCREF, they are then responsible that
- *   CONNECTION_LOCK_DECREF+CONNECTION_UNLOCK_OR_DESTROY is used when they are
- *   done with it
+ * - anyone who needs to be able to relock the connection after unlocking it has
+ *   to use acquire_ref(), they need to make sure a matching
+ *   RELEASE_REF( c, c_refcnt, c->c_destroy ); is run eventually
  * - when a connection is considered dead, use CONNECTION_DESTROY on a locked
- *   connection, it might get disposed of or if anyone still holds a token, it
- *   just gets unlocked and it's the last token holder's responsibility to run
- *   CONNECTION_UNLOCK_OR_DESTROY
- * - CONNECTION_LOCK_DESTROY is a shorthand for locking, decreasing refcount
- *   and CONNECTION_DESTROY
+ *   connection, it will be made unreachable from normal places and either
+ *   scheduled for reclamation when safe to do so or if anyone still holds a
+ *   reference, it just gets unlocked and reclaimed after the last ref is
+ *   released
+ * - CONNECTION_LOCK_DESTROY is a shorthand for locking and CONNECTION_DESTROY
  */
     ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */
-    int c_refcnt, c_live;
+    uintptr_t c_refcnt, c_live;
+    CONNECTION_DESTROY_CB c_unlink;
     CONNECTION_DESTROY_CB c_destroy;
     CONNECTION_PDU_CB c_pdu_cb;
 #define CONNECTION_LOCK(c) ldap_pvt_thread_mutex_lock( &(c)->c_mutex )
 #define CONNECTION_UNLOCK(c) ldap_pvt_thread_mutex_unlock( &(c)->c_mutex )
-#define CONNECTION_LOCK_DECREF(c) \
-    do { \
-        CONNECTION_LOCK(c); \
-        (c)->c_refcnt--; \
-    } while (0)
-#define CONNECTION_UNLOCK_INCREF(c) \
-    do { \
-        (c)->c_refcnt++; \
-        CONNECTION_UNLOCK(c); \
-    } while (0)
-#define CONNECTION_UNLOCK_OR_DESTROY(c) \
+#define CONNECTION_UNLINK_(c) \
     do { \
-        assert( (c)->c_refcnt >= 0 ); \
-        if ( (c)->c_state == LLOAD_C_CLOSING && !( c )->c_ops ) { \
-            (c)->c_refcnt -= (c)->c_live; \
+        if ( (c)->c_live ) { \
             (c)->c_live = 0; \
-        } \
-        if ( !( c )->c_refcnt ) { \
-            Debug( LDAP_DEBUG_TRACE, "%s: destroying connection connid=%lu\n", \
-                    __func__, (c)->c_connid ); \
-            (c)->c_destroy( (c) ); \
-            (c) = NULL; \
-        } else { \
-            CONNECTION_UNLOCK(c); \
+            RELEASE_REF( (c), c_refcnt, c->c_destroy ); \
+            (c)->c_unlink( (c) ); \
         } \
     } while (0)
 #define CONNECTION_DESTROY(c) \
     do { \
-        (c)->c_refcnt -= (c)->c_live; \
-        (c)->c_live = 0; \
-        CONNECTION_UNLOCK_OR_DESTROY(c); \
+        CONNECTION_UNLINK_(c); \
+        CONNECTION_UNLOCK(c); \
     } while (0)
 #define CONNECTION_LOCK_DESTROY(c) \
     do { \
-        CONNECTION_LOCK_DECREF(c); \
+        CONNECTION_LOCK(c); \
         CONNECTION_DESTROY(c); \
     } while (0);
 
@@ -393,12 +376,13 @@ struct LloadConnection {
 
 enum op_state {
     LLOAD_OP_NOT_FREEING = 0,
-    LLOAD_OP_FREEING_UPSTREAM = 1 << 0,
-    LLOAD_OP_FREEING_CLIENT = 1 << 1,
-    LLOAD_OP_DETACHING_UPSTREAM = 1 << 2,
-    LLOAD_OP_DETACHING_CLIENT = 1 << 3,
+    LLOAD_OP_DETACHING_CLIENT = 1 << 1,
+    LLOAD_OP_DETACHING_UPSTREAM = 1 << 0,
 };
 
+#define LLOAD_OP_DETACHING_MASK \
+    ( LLOAD_OP_DETACHING_UPSTREAM | LLOAD_OP_DETACHING_CLIENT )
+
 /* operation result for monitoring purposes */
 enum op_result {
     LLOAD_OP_REJECTED,  /* operation was not forwarded */
@@ -406,32 +390,28 @@ enum op_result {
     LLOAD_OP_FAILED, /* operation was forwarded, but no response was received */
 };
 
-#define LLOAD_OP_FREEING_MASK \
-    ( LLOAD_OP_FREEING_UPSTREAM | LLOAD_OP_FREEING_CLIENT )
-#define LLOAD_OP_DETACHING_MASK \
-    ( LLOAD_OP_DETACHING_UPSTREAM | LLOAD_OP_DETACHING_CLIENT )
-
+/*
+ * Operation reference tracking:
+ * - o_refcnt is set to 1, never incremented
+ * - operation_unlink sets it to 0 and on transition from 1 clears both
+ *   connection links (o_client, o_upstream)
+ */
 struct LloadOperation {
+    uintptr_t o_refcnt;
+
     LloadConnection *o_client;
     unsigned long o_client_connid;
-    int o_client_live, o_client_refcnt;
     ber_int_t o_client_msgid;
     ber_int_t o_saved_msgid;
 
     LloadConnection *o_upstream;
     unsigned long o_upstream_connid;
-    int o_upstream_live, o_upstream_refcnt;
     ber_int_t o_upstream_msgid;
     time_t o_last_response;
 
-    /* Protects o_client, o_upstream pointers before we lock their c_mutex if
-     * we don't know they are still alive */
+    /* Protects o_client, o_upstream links */
     ldap_pvt_thread_mutex_t o_link_mutex;
-    /* Protects o_freeing, can be locked while holding c_mutex */
-    ldap_pvt_thread_mutex_t o_mutex;
-    /* Consistent w.r.t. o_mutex, only written to while holding
-     * op->o_{client,upstream}->c_mutex */
-    enum op_state o_freeing;
+
     ber_tag_t o_tag;
     time_t o_start;
     unsigned long o_pin_id;
index 84fbbd8bdc1403b581252d93d84dadb59da75337..21125635d04e7ff4947436ab034396210384e384 100644 (file)
@@ -424,6 +424,8 @@ main( int argc, char **argv )
     }
 #endif
 
+    epoch_init();
+
     while ( (i = getopt( argc, argv,
                       "c:d:f:F:h:n:o:s:tV"
 #ifdef LDAP_PF_INET6
index 61eed4fdbe35c9a86a144d9c05a6ce51e685f0bb..9ba216d59caad6a4e93cd8fc01830d9367c4bed5 100644 (file)
@@ -85,6 +85,10 @@ lload_back_open( BackendInfo *bi )
         return 0;
     }
 
+    /* This will fail if we ever try to instantiate more than one lloadd within
+     * the process */
+    epoch_init();
+
     if ( lload_tls_init() != 0 ) {
         return -1;
     }
index f8f559cb40b72f012aba185a8ac4e1b959ece5c5..dd845a337907968974af26603b94e779db31c875 100644 (file)
@@ -116,363 +116,6 @@ operation_upstream_cmp( const void *left, const void *right )
     }
 }
 
-/*
- * Free the operation, subject to there being noone else holding a reference
- * to it.
- *
- * Both operation_destroy_from_* functions are the same, two implementations
- * exist to cater for the fact that either side (client or upstream) might
- * decide to destroy it and each holds a different mutex.
- *
- * Due to the fact that we rely on mutexes on both connections which have a
- * different timespan from the operation, we have to take the following race
- * into account:
- *
- * Trigger
- * - both operation_destroy_from_client and operation_destroy_from_upstream
- *   are called at the same time (each holding its mutex), several times
- *   before one of them finishes
- * - either or both connections might have started the process of being
- *   destroyed
- *
- * We need to detect that the race has happened and only allow one of them to
- * free the operation (we use o_freeing != 0 to announce+detect that).
- *
- * In case the caller was in the process of destroying the connection and the
- * race had been won by the mirror caller, it will increment c_refcnt on its
- * connection and make sure to postpone the final step in
- * client/upstream_destroy(). Testing o_freeing for the mirror side's token
- * allows the winner to detect that it has been a party to the race and a token
- * in c_refcnt has been deposited on its behalf.
- *
- * Beware! This widget really touches all the mutexes we have and showcases the
- * issues with maintaining so many mutex ordering restrictions.
- */
-void
-operation_destroy_from_client( LloadOperation *op )
-{
-    LloadConnection *upstream = NULL, *client = op->o_client;
-    LloadBackend *b = NULL;
-    int race_state, detach_client = !client->c_live;
-
-    Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
-            "op=%p attempting to release operation%s\n",
-            op, detach_client ? " and detach client" : "" );
-
-    /* 1. liveness/refcnt adjustment and test */
-    op->o_client_refcnt -= op->o_client_live;
-    op->o_client_live = 0;
-
-    assert( op->o_client_refcnt <= client->c_refcnt );
-    if ( op->o_client_refcnt ) {
-        Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
-                "op=%p not dead yet\n",
-                op );
-        return;
-    }
-
-    /* 2. Remove from the operation map and TODO adjust the pending op count */
-    tavl_delete( &client->c_ops, op, operation_client_cmp );
-
-    /* 3. Detect whether we entered a race to free op and indicate that to any
-     * others */
-    ldap_pvt_thread_mutex_lock( &op->o_mutex );
-    race_state = op->o_freeing;
-    op->o_freeing |= LLOAD_OP_FREEING_CLIENT;
-    if ( detach_client ) {
-        op->o_freeing |= LLOAD_OP_DETACHING_CLIENT;
-    }
-    ldap_pvt_thread_mutex_unlock( &op->o_mutex );
-
-    CONNECTION_UNLOCK_INCREF(client);
-
-    if ( detach_client ) {
-        ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
-        op->o_client = NULL;
-        ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-    }
-
-    /* 4. If we lost the race, deal with it straight away */
-    if ( race_state ) {
-        /*
-         * We have raced to destroy op and the first one to lose on this side,
-         * leave a refcnt token on client so we don't destroy it before the
-         * other side has finished (it knows we did that when it examines
-         * o_freeing again).
-         */
-        if ( detach_client ) {
-            Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
-                    "op=%p lost race but client connid=%lu is going down\n",
-                    op, client->c_connid );
-            CONNECTION_LOCK_DECREF(client);
-        } else if ( (race_state & LLOAD_OP_FREEING_MASK) ==
-                LLOAD_OP_FREEING_UPSTREAM ) {
-            Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
-                    "op=%p lost race, increased client refcnt connid=%lu "
-                    "to refcnt=%d\n",
-                    op, client->c_connid, client->c_refcnt );
-            CONNECTION_LOCK(client);
-        } else {
-            Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
-                    "op=%p lost race with another "
-                    "operation_destroy_from_client, "
-                    "client connid=%lu\n",
-                    op, client->c_connid );
-            CONNECTION_LOCK_DECREF(client);
-        }
-        return;
-    }
-
-    /* it seems we will be destroying the operation,
-     * so update the global rejected cunter if needed */
-    operation_update_global_rejected( op );
-    /* 5. If we raced the upstream side and won, reclaim the token */
-    ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
-    if ( !(race_state & LLOAD_OP_DETACHING_UPSTREAM) ) {
-        upstream = op->o_upstream;
-        if ( upstream ) {
-            CONNECTION_LOCK(upstream);
-        }
-    }
-    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-
-    ldap_pvt_thread_mutex_lock( &op->o_mutex );
-    /* We don't actually resolve the race in full until we grab the other's
-     * c_mutex+op->o_mutex here */
-    if ( upstream && ( op->o_freeing & LLOAD_OP_FREEING_UPSTREAM ) ) {
-        if ( op->o_freeing & LLOAD_OP_DETACHING_UPSTREAM ) {
-            CONNECTION_UNLOCK(upstream);
-            upstream = NULL;
-        } else {
-            /*
-             * We have raced to destroy op and won. To avoid freeing the connection
-             * under us, a refcnt token has been left over for us on the upstream,
-             * decref and see whether we are in charge of freeing it
-             */
-            upstream->c_refcnt--;
-            Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
-                    "op=%p other side lost race with us, upstream connid=%lu\n",
-                    op, upstream->c_connid );
-        }
-    }
-    ldap_pvt_thread_mutex_unlock( &op->o_mutex );
-
-    /* 6. liveness/refcnt adjustment and test */
-    op->o_upstream_refcnt -= op->o_upstream_live;
-    op->o_upstream_live = 0;
-    if ( op->o_upstream_refcnt ) {
-        Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
-                "op=%p other side still alive, refcnt=%d\n",
-                op, op->o_upstream_refcnt );
-
-        /* There must have been no race if op is still alive */
-        ldap_pvt_thread_mutex_lock( &op->o_mutex );
-        op->o_freeing &= ~LLOAD_OP_FREEING_CLIENT;
-        if ( detach_client ) {
-            op->o_freeing &= ~LLOAD_OP_DETACHING_CLIENT;
-        }
-        assert( op->o_freeing == 0 );
-        ldap_pvt_thread_mutex_unlock( &op->o_mutex );
-
-        assert( upstream != NULL );
-        CONNECTION_UNLOCK_OR_DESTROY(upstream);
-        CONNECTION_LOCK_DECREF(client);
-        return;
-    }
-
-    /* 7. Remove from the operation map and adjust the pending op count */
-    if ( upstream ) {
-        if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) {
-            upstream->c_n_ops_executing--;
-            operation_update_conn_counters( op );
-            b = (LloadBackend *)upstream->c_private;
-        }
-        CONNECTION_UNLOCK_OR_DESTROY(upstream);
-
-        if ( b ) {
-            ldap_pvt_thread_mutex_lock( &b->b_mutex );
-            b->b_n_ops_executing--;
-            operation_update_backend_counters( op, b );
-            ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-        }
-    }
-
-    /* 8. Release the operation */
-    Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_client: "
-            "op=%p destroyed operation from client connid=%lu, "
-            "client msgid=%d\n",
-            op, op->o_client_connid, op->o_client_msgid );
-    ber_free( op->o_ber, 1 );
-    ldap_pvt_thread_mutex_destroy( &op->o_mutex );
-    ldap_pvt_thread_mutex_destroy( &op->o_link_mutex );
-    ch_free( op );
-
-    CONNECTION_LOCK_DECREF(client);
-}
-
-/*
- * See operation_destroy_from_client.
- */
-void
-operation_destroy_from_upstream( LloadOperation *op )
-{
-    LloadConnection *client = NULL, *upstream = op->o_upstream;
-    LloadBackend *b = NULL;
-    int race_state, detach_upstream = !upstream->c_live;
-
-    Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
-            "op=%p attempting to release operation%s\n",
-            op, detach_upstream ? " and detach upstream" : "" );
-
-    /* 1. liveness/refcnt adjustment and test */
-    op->o_upstream_refcnt -= op->o_upstream_live;
-    op->o_upstream_live = 0;
-
-    assert( op->o_upstream_refcnt <= upstream->c_refcnt );
-    if ( op->o_upstream_refcnt ) {
-        Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
-                "op=%p not dead yet\n",
-                op );
-        return;
-    }
-
-    /* it seems we will be destroying the operation,
-     * so update the global rejected cunter if needed */
-    operation_update_global_rejected( op );
-    /* 2. Remove from the operation map and adjust the pending op count */
-    if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) {
-        upstream->c_n_ops_executing--;
-        operation_update_conn_counters( op );
-        b = (LloadBackend *)upstream->c_private;
-    }
-
-    ldap_pvt_thread_mutex_lock( &op->o_mutex );
-    race_state = op->o_freeing;
-    op->o_freeing |= LLOAD_OP_FREEING_UPSTREAM;
-    if ( detach_upstream ) {
-        op->o_freeing |= LLOAD_OP_DETACHING_UPSTREAM;
-    }
-    ldap_pvt_thread_mutex_unlock( &op->o_mutex );
-
-    CONNECTION_UNLOCK_INCREF(upstream);
-
-    /* 3. Detect whether we entered a race to free op */
-    ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
-    if ( detach_upstream ) {
-        op->o_upstream = NULL;
-    }
-    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-
-    if ( b ) {
-        ldap_pvt_thread_mutex_lock( &b->b_mutex );
-        b->b_n_ops_executing--;
-        operation_update_backend_counters( op, b );
-        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-    }
-
-    /* 4. If we lost the race, deal with it straight away */
-    if ( race_state ) {
-        /*
-         * We have raced to destroy op and the first one to lose on this side,
-         * leave a refcnt token on upstream so we don't destroy it before the
-         * other side has finished (it knows we did that when it examines
-         * o_freeing again).
-         */
-        if ( detach_upstream ) {
-            Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
-                    "op=%p lost race but upstream connid=%lu is going down\n",
-                    op, upstream->c_connid );
-            CONNECTION_LOCK_DECREF(upstream);
-        } else if ( (race_state & LLOAD_OP_FREEING_MASK) ==
-                LLOAD_OP_FREEING_CLIENT ) {
-            Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
-                    "op=%p lost race, increased upstream refcnt connid=%lu "
-                    "to refcnt=%d\n",
-                    op, upstream->c_connid, upstream->c_refcnt );
-            CONNECTION_LOCK(upstream);
-        } else {
-            Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
-                    "op=%p lost race with another "
-                    "operation_destroy_from_upstream, "
-                    "upstream connid=%lu\n",
-                    op, upstream->c_connid );
-            CONNECTION_LOCK_DECREF(upstream);
-        }
-        return;
-    }
-
-    /* 5. If we raced the client side and won, reclaim the token */
-    ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
-    if ( !(race_state & LLOAD_OP_DETACHING_CLIENT) ) {
-        client = op->o_client;
-        if ( client ) {
-            CONNECTION_LOCK(client);
-        }
-    }
-    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-
-    /* We don't actually resolve the race in full until we grab the other's
-     * c_mutex+op->o_mutex here */
-    ldap_pvt_thread_mutex_lock( &op->o_mutex );
-    if ( client && ( op->o_freeing & LLOAD_OP_FREEING_CLIENT ) ) {
-        if ( op->o_freeing & LLOAD_OP_DETACHING_CLIENT ) {
-            CONNECTION_UNLOCK(client);
-            client = NULL;
-        } else {
-            /*
-             * We have raced to destroy op and won. To avoid freeing the connection
-             * under us, a refcnt token has been left over for us on the client,
-             * decref and see whether we are in charge of freeing it
-             */
-            client->c_refcnt--;
-            Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
-                    "op=%p other side lost race with us, client connid=%lu\n",
-                    op, client->c_connid );
-        }
-    }
-    ldap_pvt_thread_mutex_unlock( &op->o_mutex );
-
-    /* 6. liveness/refcnt adjustment and test */
-    op->o_client_refcnt -= op->o_client_live;
-    op->o_client_live = 0;
-    if ( op->o_client_refcnt ) {
-        Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
-                "op=%p other side still alive, refcnt=%d\n",
-                op, op->o_client_refcnt );
-        /* There must have been no race if op is still alive */
-        ldap_pvt_thread_mutex_lock( &op->o_mutex );
-        op->o_freeing &= ~LLOAD_OP_FREEING_UPSTREAM;
-        if ( detach_upstream ) {
-            op->o_freeing &= ~LLOAD_OP_DETACHING_UPSTREAM;
-        }
-        assert( op->o_freeing == 0 );
-        ldap_pvt_thread_mutex_unlock( &op->o_mutex );
-
-        assert( client != NULL );
-        CONNECTION_UNLOCK_OR_DESTROY(client);
-        CONNECTION_LOCK_DECREF(upstream);
-        return;
-    }
-
-    /* 7. Remove from the operation map and TODO adjust the pending op count */
-    if ( client ) {
-        tavl_delete( &client->c_ops, op, operation_client_cmp );
-        CONNECTION_UNLOCK_OR_DESTROY(client);
-    }
-
-    /* 8. Release the operation */
-    Debug( LDAP_DEBUG_TRACE, "operation_destroy_from_upstream: "
-            "op=%p destroyed operation from client connid=%lu, "
-            "client msgid=%d\n",
-            op, op->o_client_connid, op->o_client_msgid );
-    ber_free( op->o_ber, 1 );
-    ldap_pvt_thread_mutex_destroy( &op->o_mutex );
-    ldap_pvt_thread_mutex_destroy( &op->o_link_mutex );
-    ch_free( op );
-
-    CONNECTION_LOCK_DECREF(upstream);
-}
-
 /*
  * Entered holding c_mutex for now.
  */
@@ -490,11 +133,9 @@ operation_init( LloadConnection *c, BerElement *ber )
     op->o_ber = ber;
     op->o_start = slap_get_time();
 
-    ldap_pvt_thread_mutex_init( &op->o_mutex );
     ldap_pvt_thread_mutex_init( &op->o_link_mutex );
 
-    op->o_client_live = op->o_client_refcnt = 1;
-    op->o_upstream_live = op->o_upstream_refcnt = 1;
+    op->o_refcnt = 1;
 
     tag = ber_get_int( ber, &op->o_client_msgid );
     if ( tag != LDAP_TAG_MSGID ) {
@@ -549,13 +190,163 @@ fail:
     return NULL;
 }
 
+void
+operation_destroy( LloadOperation *op )
+{
+    Debug( LDAP_DEBUG_TRACE, "operation_destroy: "
+            "op=%p destroyed operation from client connid=%lu, "
+            "client msgid=%d\n",
+            op, op->o_client_connid, op->o_client_msgid );
+
+    assert( op->o_refcnt == 0 );
+    assert( op->o_client == NULL );
+    assert( op->o_upstream == NULL );
+
+    ber_free( op->o_ber, 1 );
+    ldap_pvt_thread_mutex_destroy( &op->o_link_mutex );
+    ch_free( op );
+}
+
 int
-operation_send_abandon( LloadOperation *op )
+operation_unlink( LloadOperation *op )
+{
+    LloadConnection *client, *upstream;
+    uintptr_t prev_refcnt;
+    int result = 0;
+
+    if ( !( prev_refcnt = try_release_ref(
+                    &op->o_refcnt, op, (dispose_cb *)operation_destroy ) ) ) {
+        return result;
+    }
+
+    assert( prev_refcnt == 1 );
+
+    Debug( LDAP_DEBUG_TRACE, "operation_unlink: "
+            "unlinking operation between client connid=%lu and upstream "
+            "connid=%lu "
+            "client msgid=%d\n",
+            op->o_client_connid, op->o_upstream_connid, op->o_client_msgid );
+
+    ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
+    client = op->o_client;
+    upstream = op->o_upstream;
+
+    op->o_client = NULL;
+    op->o_upstream = NULL;
+    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+
+    assert( client || upstream );
+
+    if ( client ) {
+        result |= operation_unlink_client( op, client );
+        operation_update_global_rejected( op );
+    }
+
+    if ( upstream ) {
+        result |= operation_unlink_upstream( op, upstream );
+    }
+
+    return result;
+}
+
+int
+operation_unlink_client( LloadOperation *op, LloadConnection *client )
+{
+    LloadOperation *removed;
+    int result = 0;
+
+    Debug( LDAP_DEBUG_TRACE, "operation_unlink_client: "
+            "unlinking operation op=%p msgid=%d client connid=%lu\n",
+            op, op->o_client_msgid, op->o_client_connid );
+
+    CONNECTION_LOCK(client);
+    if ( (removed = tavl_delete(
+                   &client->c_ops, op, operation_client_cmp )) ) {
+        result = LLOAD_OP_DETACHING_CLIENT;
+
+        assert( op == removed );
+        client->c_n_ops_executing--;
+
+        if ( client->c_state == LLOAD_C_BINDING ) {
+            client->c_state = LLOAD_C_READY;
+            if ( !BER_BVISNULL( &client->c_auth ) ) {
+                ber_memfree( client->c_auth.bv_val );
+                BER_BVZERO( &client->c_auth );
+            }
+            if ( !BER_BVISNULL( &client->c_sasl_bind_mech ) ) {
+                ber_memfree( client->c_sasl_bind_mech.bv_val );
+                BER_BVZERO( &client->c_sasl_bind_mech );
+            }
+            if ( op->o_pin_id ) {
+                client->c_pin_id = 0;
+            }
+        }
+    }
+    if ( client->c_state == LLOAD_C_CLOSING && !client->c_ops ) {
+        CONNECTION_DESTROY(client);
+    } else {
+        CONNECTION_UNLOCK(client);
+    }
+
+    return result;
+}
+
+int
+operation_unlink_upstream( LloadOperation *op, LloadConnection *upstream )
+{
+    LloadOperation *removed;
+    LloadBackend *b = NULL;
+    int result = 0;
+
+    Debug( LDAP_DEBUG_TRACE, "operation_unlink_upstream: "
+            "unlinking operation op=%p msgid=%d upstream connid=%lu\n",
+            op, op->o_upstream_msgid, op->o_upstream_connid );
+
+    CONNECTION_LOCK(upstream);
+    if ( (removed = tavl_delete(
+                   &upstream->c_ops, op, operation_upstream_cmp )) ) {
+        result |= LLOAD_OP_DETACHING_UPSTREAM;
+
+        assert( op == removed );
+        upstream->c_n_ops_executing--;
+
+        if ( upstream->c_state == LLOAD_C_BINDING ) {
+            assert( op->o_tag == LDAP_REQ_BIND && upstream->c_ops == NULL );
+            upstream->c_state = LLOAD_C_READY;
+            if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) {
+                ber_memfree( upstream->c_sasl_bind_mech.bv_val );
+                BER_BVZERO( &upstream->c_sasl_bind_mech );
+            }
+        }
+        operation_update_conn_counters( op, upstream );
+        b = (LloadBackend *)upstream->c_private;
+    }
+    if ( upstream->c_state == LLOAD_C_CLOSING && !upstream->c_ops ) {
+        CONNECTION_DESTROY(upstream);
+    } else {
+        CONNECTION_UNLOCK(upstream);
+    }
+
+    if ( b ) {
+        ldap_pvt_thread_mutex_lock( &b->b_mutex );
+        b->b_n_ops_executing--;
+        operation_update_backend_counters( op, b );
+        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+    }
+
+    return result;
+}
+
+int
+operation_send_abandon( LloadOperation *op, LloadConnection *upstream )
 {
-    LloadConnection *upstream = op->o_upstream;
     BerElement *ber;
     int rc = -1;
 
+    if ( !IS_ALIVE( upstream, c_refcnt ) ) {
+        return rc;
+    }
+
     ldap_pvt_thread_mutex_lock( &upstream->c_io_mutex );
     ber = upstream->c_pendingber;
     if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
@@ -605,114 +396,64 @@ void
 operation_abandon( LloadOperation *op )
 {
     LloadConnection *c;
-    LloadBackend *b;
-    int rc = LDAP_SUCCESS;
 
     ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
     c = op->o_upstream;
-    if ( !c || !c->c_live ) {
-        ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+    if ( !c || !IS_ALIVE( c, c_refcnt ) ) {
         goto done;
     }
 
-    CONNECTION_LOCK(c);
-    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-
     /* for now consider all abandoned operations completed,
      * perhaps add a separate counter later */
     op->o_res = LLOAD_OP_COMPLETED;
-
-    if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ) {
+    if ( !operation_unlink_upstream( op, c ) ) {
         /* The operation has already been abandoned or finished */
         Debug( LDAP_DEBUG_TRACE, "operation_abandon: "
                 "%s from connid=%lu msgid=%d not present in connid=%lu any "
                 "more\n",
                 lload_msgtype2str( op->o_tag ), op->o_client_connid,
                 op->o_client_msgid, op->o_upstream_connid );
-        goto unlock;
-    }
-    if ( c->c_state == LLOAD_C_BINDING ) {
-        c->c_state = LLOAD_C_READY;
-        if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
-            ber_memfree( c->c_sasl_bind_mech.bv_val );
-            BER_BVZERO( &c->c_sasl_bind_mech );
-        }
+        goto done;
     }
-    c->c_n_ops_executing--;
-    b = (LloadBackend *)c->c_private;
 
-    op->o_upstream_refcnt++;
-    CONNECTION_UNLOCK_INCREF(c);
-
-    ldap_pvt_thread_mutex_lock( &b->b_mutex );
-    b->b_n_ops_executing--;
-    operation_update_backend_counters( op, b );
-    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-
-    if ( operation_send_abandon( op ) == LDAP_SUCCESS ) {
+    if ( operation_send_abandon( op, c ) == LDAP_SUCCESS ) {
         connection_write_cb( -1, 0, c );
     }
 
-    CONNECTION_LOCK_DECREF(c);
-    op->o_upstream_refcnt--;
-
-unlock:
-    if ( !c->c_live || !op->o_upstream_refcnt ) {
-        operation_destroy_from_upstream( op );
-    }
-    if ( rc ) {
-        CONNECTION_DESTROY(c);
-    } else {
-        CONNECTION_UNLOCK_OR_DESTROY(c);
-    }
-
 done:
-    c = op->o_client;
-    assert( c );
-
-    /* Caller should hold a reference on client */
-    CONNECTION_LOCK(c);
-    if ( c->c_state == LLOAD_C_BINDING ) {
-        c->c_state = LLOAD_C_READY;
-        if ( !BER_BVISNULL( &c->c_auth ) ) {
-            ber_memfree( c->c_auth.bv_val );
-            BER_BVZERO( &c->c_auth );
-        }
-        if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
-            ber_memfree( c->c_sasl_bind_mech.bv_val );
-            BER_BVZERO( &c->c_sasl_bind_mech );
-        }
-        if ( op->o_pin_id ) {
-            c->c_pin_id = 0;
-        }
-    }
-    assert( op->o_client_refcnt > op->o_client_live );
-    op->o_client_refcnt--;
-    operation_destroy_from_client( op );
-    CONNECTION_UNLOCK(c);
+    operation_unlink( op );
 }
 
-/*
- * Called with op->o_client non-NULL and already locked.
- */
-int
-operation_send_reject_locked(
+void
+operation_send_reject(
         LloadOperation *op,
         int result,
         const char *msg,
         int send_anyway )
 {
-    LloadConnection *c = op->o_client;
+    LloadConnection *c;
     BerElement *ber;
     int found;
 
-    Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: "
+    Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
             "rejecting %s from client connid=%lu with message: \"%s\"\n",
-            lload_msgtype2str( op->o_tag ), c->c_connid, msg );
+            lload_msgtype2str( op->o_tag ), op->o_client_connid, msg );
+
+    ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
+    c = op->o_client;
+    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
+    if ( !c || !IS_ALIVE( c, c_refcnt ) ) {
+        Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
+                "not sending msgid=%d, client connid=%lu is dead\n",
+                op->o_client_msgid, op->o_client_connid );
+
+        goto done;
+    }
 
-    found = ( tavl_delete( &c->c_ops, op, operation_client_cmp ) == op );
+    found = operation_unlink_client( op, c );
     if ( !found && !send_anyway ) {
-        Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: "
+        Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
                 "msgid=%d not scheduled for client connid=%lu anymore, "
                 "not sending\n",
                 op->o_client_msgid, c->c_connid );
@@ -721,25 +462,21 @@ operation_send_reject_locked(
 
     if ( op->o_client_msgid == 0 ) {
         assert( op->o_saved_msgid == 0 && op->o_pin_id );
-        Debug( LDAP_DEBUG_TRACE, "operation_send_reject_locked: "
+        Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
                 "operation pin=%lu is just a pin, not sending\n",
                 op->o_pin_id );
         goto done;
     }
 
-    CONNECTION_UNLOCK_INCREF(c);
     ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
-
     ber = c->c_pendingber;
     if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
         ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-        Debug( LDAP_DEBUG_ANY, "operation_send_reject_locked: "
+        Debug( LDAP_DEBUG_ANY, "operation_send_reject: "
                 "ber_alloc failed, closing connid=%lu\n",
                 c->c_connid );
-        CONNECTION_LOCK_DECREF(c);
-        operation_destroy_from_client( op );
-        CONNECTION_DESTROY(c);
-        return -1;
+        CONNECTION_LOCK_DESTROY(c);
+        goto done;
     }
     c->c_pendingber = ber;
 
@@ -751,46 +488,8 @@ operation_send_reject_locked(
 
     connection_write_cb( -1, 0, c );
 
-    CONNECTION_LOCK_DECREF(c);
 done:
-    operation_destroy_from_client( op );
-    return LDAP_SUCCESS;
-}
-
-void
-operation_send_reject(
-        LloadOperation *op,
-        int result,
-        const char *msg,
-        int send_anyway )
-{
-    LloadConnection *c;
-
-    ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
-    c = op->o_client;
-    if ( !c ) {
-        c = op->o_upstream;
-        /* One of the connections has initiated this and keeps a reference, if
-         * client is dead, it must have been the upstream */
-        assert( c );
-        CONNECTION_LOCK(c);
-        ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-        Debug( LDAP_DEBUG_TRACE, "operation_send_reject: "
-                "not sending msgid=%d, client connid=%lu is dead\n",
-                op->o_client_msgid, op->o_client_connid );
-        operation_destroy_from_upstream( op );
-        CONNECTION_UNLOCK_OR_DESTROY(c);
-        return;
-    }
-    CONNECTION_LOCK(c);
-    ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-
-    /* Non-zero return means connection has been unlocked and might be
-     * destroyed */
-    if ( operation_send_reject_locked( op, result, msg, send_anyway ) ==
-            LDAP_SUCCESS ) {
-        CONNECTION_UNLOCK_OR_DESTROY(c);
-    }
+    operation_unlink( op );
 }
 
 /*
@@ -803,32 +502,27 @@ operation_send_reject(
 void
 operation_lost_upstream( LloadOperation *op )
 {
-    LloadConnection *c = op->o_upstream;
-
     operation_send_reject( op, LDAP_OTHER,
             "connection to the remote server has been severed", 0 );
-
-    CONNECTION_LOCK(c);
-    op->o_upstream_refcnt--;
-    operation_destroy_from_upstream( op );
-    CONNECTION_UNLOCK(c);
 }
 
 int
 connection_timeout( LloadConnection *upstream, void *arg )
 {
     LloadOperation *op;
-    TAvlnode *ops = NULL, *node;
+    TAvlnode *ops = NULL, *node, *next;
     LloadBackend *b = upstream->c_private;
     time_t threshold = *(time_t *)arg;
     int rc, nops = 0;
 
+    CONNECTION_LOCK(upstream);
     for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node &&
             ((LloadOperation *)node->avl_data)->o_start <
                     threshold; /* shortcut */
-            node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
+            node = next ) {
         LloadOperation *found_op;
 
+        next = tavl_next( node, TAVL_DIR_RIGHT );
         op = node->avl_data;
 
         /* Have we received another response since? */
@@ -836,7 +530,6 @@ connection_timeout( LloadConnection *upstream, void *arg )
             continue;
         }
 
-        op->o_upstream_refcnt++;
         op->o_res = LLOAD_OP_FAILED;
         found_op = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
         assert( op == found_op );
@@ -863,13 +556,15 @@ connection_timeout( LloadConnection *upstream, void *arg )
     }
 
     if ( nops == 0 ) {
+        CONNECTION_UNLOCK(upstream);
         return LDAP_SUCCESS;
     }
     upstream->c_n_ops_executing -= nops;
+    upstream->c_counters.lc_ops_failed += nops;
     Debug( LDAP_DEBUG_STATS, "connection_timeout: "
             "timing out %d operations for connid=%lu\n",
             nops, upstream->c_connid );
-    CONNECTION_UNLOCK_INCREF(upstream);
+    CONNECTION_UNLOCK(upstream);
 
     ldap_pvt_thread_mutex_lock( &b->b_mutex );
     b->b_n_ops_executing -= nops;
@@ -877,36 +572,17 @@ connection_timeout( LloadConnection *upstream, void *arg )
 
     for ( node = tavl_end( ops, TAVL_DIR_LEFT ); node;
             node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
-        LloadConnection *client;
-
         op = node->avl_data;
 
-        ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
-        client = op->o_client;
-        if ( !client ) {
-            ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-            continue;
-        }
-        CONNECTION_LOCK(client);
-        ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-
-        /* operation_send_reject_locked unlocks and destroys client on
-         * failure */
-        if ( operation_send_reject_locked( op,
-                     op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED :
-                                                    LDAP_ADMINLIMIT_EXCEEDED,
-                     "upstream did not respond in time", 0 ) == LDAP_SUCCESS ) {
-            CONNECTION_UNLOCK_OR_DESTROY(client);
-        }
+        operation_send_reject( op,
+                op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED :
+                                               LDAP_ADMINLIMIT_EXCEEDED,
+                "upstream did not respond in time", 0 );
 
         if ( rc == LDAP_SUCCESS ) {
-            rc = operation_send_abandon( op );
+            rc = operation_send_abandon( op, upstream );
         }
-
-        CONNECTION_LOCK(upstream);
-        op->o_upstream_refcnt--;
-        operation_destroy_from_upstream( op );
-        CONNECTION_UNLOCK(upstream);
+        operation_unlink( op );
     }
 
     /* TODO: if operation_send_abandon failed, we need to kill the upstream */
@@ -914,7 +590,13 @@ connection_timeout( LloadConnection *upstream, void *arg )
         connection_write_cb( -1, 0, upstream );
     }
 
-    CONNECTION_LOCK_DECREF(upstream);
+    CONNECTION_LOCK(upstream);
+    if ( upstream->c_state == LLOAD_C_CLOSING && !upstream->c_ops ) {
+        CONNECTION_DESTROY(upstream);
+    } else {
+        CONNECTION_UNLOCK(upstream);
+    }
+
     /* just dispose of the AVL, most operations should already be gone */
     tavl_free( ops, NULL );
     return LDAP_SUCCESS;
@@ -933,12 +615,16 @@ operations_timeout( evutil_socket_t s, short what, void *arg )
     threshold = slap_get_time() - lload_timeout_api->tv_sec;
 
     LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
+        epoch_t epoch;
+
         ldap_pvt_thread_mutex_lock( &b->b_mutex );
         if ( b->b_n_ops_executing == 0 ) {
             ldap_pvt_thread_mutex_unlock( &b->b_mutex );
             continue;
         }
 
+        epoch = epoch_join();
+
         Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
                 "timing out binds for backend uri=%s\n",
                 b->b_uri.bv_val );
@@ -951,6 +637,7 @@ operations_timeout( evutil_socket_t s, short what, void *arg )
         connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn,
                 connection_timeout, &threshold );
 
+        epoch_leave( epoch );
         ldap_pvt_thread_mutex_unlock( &b->b_mutex );
     }
 done:
@@ -976,13 +663,12 @@ operation_update_global_rejected( LloadOperation *op )
 }
 
 void
-operation_update_conn_counters( LloadOperation *op )
+operation_update_conn_counters( LloadOperation *op, LloadConnection *upstream )
 {
-    assert( op->o_upstream != NULL );
     if ( op->o_res == LLOAD_OP_COMPLETED ) {
-        op->o_upstream->c_counters.lc_ops_completed++;
+        upstream->c_counters.lc_ops_completed++;
     } else {
-        op->o_upstream->c_counters.lc_ops_failed++;
+        upstream->c_counters.lc_ops_failed++;
     }
 }
 
index f95419e991265fad24eabe461f87bd373e20ae34..21d070313a77c3c3f857bd4d48dfad0bad2fceb8 100644 (file)
@@ -172,15 +172,17 @@ LDAP_SLAPD_F (const char *) lload_msgtype2str( ber_tag_t tag );
 LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r );
 LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r );
 LDAP_SLAPD_F (LloadOperation *) operation_init( LloadConnection *c, BerElement *ber );
-LDAP_SLAPD_F (int) operation_send_abandon( LloadOperation *op );
+LDAP_SLAPD_F (int) operation_send_abandon( LloadOperation *op, LloadConnection *c );
 LDAP_SLAPD_F (void) operation_abandon( LloadOperation *op );
 LDAP_SLAPD_F (void) operation_send_reject( LloadOperation *op, int result, const char *msg, int send_anyway );
 LDAP_SLAPD_F (int) operation_send_reject_locked( LloadOperation *op, int result, const char *msg, int send_anyway );
 LDAP_SLAPD_F (void) operation_lost_upstream( LloadOperation *op );
-LDAP_SLAPD_F (void) operation_destroy_from_client( LloadOperation *op );
-LDAP_SLAPD_F (void) operation_destroy_from_upstream( LloadOperation *op );
+LDAP_SLAPD_F (void) operation_destroy( LloadOperation *op );
+LDAP_SLAPD_F (int) operation_unlink( LloadOperation *op );
+LDAP_SLAPD_F (int) operation_unlink_client( LloadOperation *op, LloadConnection *client );
+LDAP_SLAPD_F (int) operation_unlink_upstream( LloadOperation *op, LloadConnection *upstream );
 LDAP_SLAPD_F (void) operations_timeout( evutil_socket_t s, short what, void *arg );
-LDAP_SLAPD_F (void) operation_update_conn_counters( LloadOperation *op );
+LDAP_SLAPD_F (void) operation_update_conn_counters( LloadOperation *op, LloadConnection *upstream );
 LDAP_SLAPD_F (void) operation_update_backend_counters( LloadOperation *op, LloadBackend *b );
 LDAP_SLAPD_F (void) operation_update_global_rejected( LloadOperation *op );
 /*
index f19fb1f1f994ee5a75a61a549a8f1582b5fc072e..4037bb071696a93529e14e0dec3e1a2dadc825d9 100644 (file)
@@ -38,6 +38,8 @@ static const sasl_callback_t client_callbacks[] = {
 };
 #endif /* HAVE_CYRUS_SASL */
 
+static void upstream_unlink( LloadConnection *upstream );
+
 int
 forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber )
 {
@@ -101,13 +103,13 @@ forward_final_response(
             "connid=%lu msgid=%d finishing up with a request for "
             "client connid=%lu\n",
             op->o_upstream_connid, op->o_upstream_msgid, op->o_client_connid );
+
     rc = forward_response( client, op, ber );
-    CONNECTION_LOCK(op->o_upstream);
+
     op->o_res = LLOAD_OP_COMPLETED;
-    if ( !op->o_pin_id || !op->o_upstream_refcnt-- ) {
-        operation_destroy_from_upstream( op );
+    if ( !op->o_pin_id ) {
+        operation_unlink( op );
     }
-    CONNECTION_UNLOCK(op->o_upstream);
 
     return rc;
 }
@@ -177,11 +179,13 @@ handle_one_response( LloadConnection *c )
         goto fail;
     }
 
+    CONNECTION_LOCK(c);
     if ( needle.o_upstream_msgid == 0 ) {
         return handle_unsolicited( c, ber );
     } else if ( !( op = tavl_find(
                            c->c_ops, &needle, operation_upstream_cmp ) ) ) {
         /* Already abandoned, do nothing */
+        CONNECTION_UNLOCK(c);
         ber_free( ber, 1 );
         return rc;
         /*
@@ -190,6 +194,7 @@ handle_one_response( LloadConnection *c )
         event_del( c->c_read_event );
     */
     } else {
+        CONNECTION_UNLOCK(c);
         /*
         op->o_response_pending = ber;
         */
@@ -239,40 +244,14 @@ handle_one_response( LloadConnection *c )
     if ( handler ) {
         LloadConnection *client;
 
-        op->o_upstream_refcnt++;
-        CONNECTION_UNLOCK_INCREF(c);
-
         ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
         client = op->o_client;
-        if ( client ) {
-            CONNECTION_LOCK(client);
-            if ( client->c_live ) {
-                op->o_client_refcnt++;
-                CONNECTION_UNLOCK_INCREF(client);
-            } else {
-                CONNECTION_UNLOCK(client);
-                client = NULL;
-            }
-        }
         ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
-
-        if ( client ) {
+        if ( client && IS_ALIVE( client, c_refcnt ) ) {
             rc = handler( client, op, ber );
-            CONNECTION_LOCK_DECREF(client);
-            op->o_client_refcnt--;
-            if ( !op->o_client_refcnt ) {
-                operation_destroy_from_client( op );
-            }
-            CONNECTION_UNLOCK_OR_DESTROY(client);
         } else {
             ber_free( ber, 1 );
         }
-
-        CONNECTION_LOCK_DECREF(c);
-        op->o_upstream_refcnt--;
-        if ( !client || !op->o_upstream_refcnt ) {
-            operation_destroy_from_upstream( op );
-        }
     } else {
         assert(0);
         ber_free( ber, 1 );
@@ -284,9 +263,8 @@ fail:
                 "error on processing a response (%s) on upstream connection "
                 "connid=%ld, tag=%lx\n",
                 lload_msgtype2str( tag ), c->c_connid, tag );
-        CONNECTION_DESTROY(c);
+        CONNECTION_LOCK_DESTROY(c);
     }
-    /* We leave the connection locked */
     return rc;
 }
 
@@ -459,19 +437,16 @@ upstream_bind_cb( LloadConnection *c )
                 ber_len_t len;
                 int rc;
 
-                CONNECTION_UNLOCK_INCREF(c);
                 if ( ber_peek_tag( ber, &len ) == LDAP_TAG_SASL_RES_CREDS &&
                         ber_scanf( ber, "m", &scred ) == LBER_ERROR ) {
                     Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
                             "sasl bind response malformed\n" );
-                    CONNECTION_LOCK_DECREF(c);
                     goto fail;
                 }
 
                 rc = sasl_bind_step( c, &scred, &ccred );
                 if ( rc != SASL_OK &&
                         ( rc != SASL_CONTINUE || result == LDAP_SUCCESS ) ) {
-                    CONNECTION_LOCK_DECREF(c);
                     goto fail;
                 }
 
@@ -482,7 +457,6 @@ upstream_bind_cb( LloadConnection *c )
                     outber = c->c_pendingber;
                     if ( outber == NULL && (outber = ber_alloc()) == NULL ) {
                         ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-                        CONNECTION_LOCK_DECREF(c);
                         goto fail;
                     }
                     c->c_pendingber = outber;
@@ -496,27 +470,25 @@ upstream_bind_cb( LloadConnection *c )
 
                     connection_write_cb( -1, 0, c );
 
-                    CONNECTION_LOCK_DECREF(c);
                     if ( rc == SASL_OK ) {
                         BER_BVZERO( &c->c_sasl_bind_mech );
                     }
                     break;
                 }
-                CONNECTION_LOCK_DECREF(c);
             }
             if ( result == LDAP_SASL_BIND_IN_PROGRESS ) {
                 goto fail;
             }
 #endif /* HAVE_CYRUS_SASL */
+            CONNECTION_LOCK(c);
             c->c_pdu_cb = handle_one_response;
             c->c_state = LLOAD_C_READY;
             c->c_type = LLOAD_C_OPEN;
             c->c_read_timeout = NULL;
-            event_add( c->c_read_event, c->c_read_timeout );
             Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: "
                     "connid=%lu finished binding, now active\n",
                     c->c_connid );
-            CONNECTION_UNLOCK_INCREF(c);
+            CONNECTION_UNLOCK(c);
             ldap_pvt_thread_mutex_lock( &b->b_mutex );
             LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
             b->b_active++;
@@ -531,7 +503,6 @@ upstream_bind_cb( LloadConnection *c )
             b->b_last_conn = c;
             backend_retry( b );
             ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-            CONNECTION_LOCK_DECREF(c);
             break;
         default:
             Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
@@ -540,12 +511,13 @@ upstream_bind_cb( LloadConnection *c )
             goto fail;
     }
 
+    event_add( c->c_read_event, c->c_read_timeout );
     ber_free( ber, 1 );
-    return LDAP_SUCCESS;
+    return -1;
 
 fail:
+    CONNECTION_LOCK_DESTROY(c);
     ber_free( ber, 1 );
-    CONNECTION_DESTROY(c);
     return -1;
 }
 
@@ -556,9 +528,17 @@ upstream_bind( void *ctx, void *arg )
     BerElement *ber;
     ber_int_t msgid;
 
-    CONNECTION_LOCK_DECREF(c);
+    /* A reference was passed on to us */
+    assert( IS_ALIVE( c, c_refcnt ) );
+
+    if ( !IS_ALIVE( c, c_live ) ) {
+        RELEASE_REF( c, c_refcnt, c->c_destroy );
+        return NULL;
+    }
+
+    CONNECTION_LOCK(c);
     c->c_pdu_cb = upstream_bind_cb;
-    CONNECTION_UNLOCK_INCREF(c);
+    CONNECTION_UNLOCK(c);
 
     ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
     ber = c->c_pendingber;
@@ -599,16 +579,18 @@ upstream_bind( void *ctx, void *arg )
 
     connection_write_cb( -1, 0, c );
 
-    CONNECTION_LOCK_DECREF(c);
+    CONNECTION_LOCK(c);
     c->c_read_timeout = lload_timeout_net;
     event_add( c->c_read_event, c->c_read_timeout );
-    CONNECTION_UNLOCK_OR_DESTROY(c);
+    CONNECTION_UNLOCK(c);
 
+    RELEASE_REF( c, c_refcnt, c->c_destroy );
     return NULL;
 
 fail:
     ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
     CONNECTION_LOCK_DESTROY(c);
+    RELEASE_REF( c, c_refcnt, c->c_destroy );
     return NULL;
 }
 
@@ -621,6 +603,7 @@ upstream_finish( LloadConnection *c )
     LloadBackend *b = c->c_private;
     int is_bindconn = 0;
 
+    assert( c->c_live );
     c->c_pdu_cb = handle_one_response;
 
     /* Unless we are configured to use the VC exop, consider allocating the
@@ -675,7 +658,8 @@ upstream_finish( LloadConnection *c )
                     c->c_connid );
             return -1;
         }
-        c->c_refcnt++;
+        /* keep a reference for upstream_bind */
+        acquire_ref( &c->c_refcnt );
 
         Debug( LDAP_DEBUG_CONNS, "upstream_finish: "
                 "scheduled a bind callback for connid=%lu\n",
@@ -697,6 +681,7 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
 {
     LloadConnection *c = arg;
     LloadBackend *b;
+    epoch_t epoch;
     int rc = LDAP_SUCCESS;
 
     CONNECTION_LOCK(c);
@@ -737,9 +722,9 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
                 c->c_connid );
         c->c_is_tls = LLOAD_TLS_ESTABLISHED;
 
-        CONNECTION_UNLOCK_INCREF(c);
+        CONNECTION_UNLOCK(c);
         ldap_pvt_thread_mutex_lock( &b->b_mutex );
-        CONNECTION_LOCK_DECREF(c);
+        CONNECTION_LOCK(c);
 
         rc = upstream_finish( c );
         ldap_pvt_thread_mutex_unlock( &b->b_mutex );
@@ -753,14 +738,18 @@ upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
                 "connid=%lu need write rc=%d\n",
                 c->c_connid, rc );
     }
-    CONNECTION_UNLOCK_OR_DESTROY(c);
+    CONNECTION_UNLOCK(c);
     return;
 
 fail:
     Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
             "connid=%lu failed rc=%d\n",
             c->c_connid, rc );
+
+    assert( c->c_ops == NULL );
+    epoch = epoch_join();
     CONNECTION_DESTROY(c);
+    epoch_leave( epoch );
 }
 
 static int
@@ -774,6 +763,7 @@ upstream_starttls( LloadConnection *c )
     ber_tag_t tag;
 
     c->c_currentber = NULL;
+    CONNECTION_LOCK(c);
 
     if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) {
         Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
@@ -824,9 +814,9 @@ upstream_starttls( LloadConnection *c )
         }
         c->c_is_tls = LLOAD_CLEARTEXT;
 
-        CONNECTION_UNLOCK_INCREF(c);
+        CONNECTION_UNLOCK(c);
         ldap_pvt_thread_mutex_lock( &b->b_mutex );
-        CONNECTION_LOCK_DECREF(c);
+        CONNECTION_LOCK(c);
 
         rc = upstream_finish( c );
         ldap_pvt_thread_mutex_unlock( &b->b_mutex );
@@ -836,7 +826,7 @@ upstream_starttls( LloadConnection *c )
         }
 
         ber_free( ber, 1 );
-        CONNECTION_UNLOCK_OR_DESTROY(c);
+        CONNECTION_UNLOCK(c);
 
         return rc;
     }
@@ -884,6 +874,7 @@ upstream_init( ber_socket_t s, LloadBackend *b )
         return NULL;
     }
 
+    CONNECTION_LOCK(c);
     c->c_private = b;
     c->c_is_tls = b->b_tls;
     c->c_pdu_cb = handle_one_response;
@@ -939,14 +930,15 @@ upstream_init( ber_socket_t s, LloadBackend *b )
         ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
         c->c_pdu_cb = upstream_starttls;
-        CONNECTION_UNLOCK_INCREF(c);
+        CONNECTION_UNLOCK(c);
         connection_write_cb( s, 0, c );
-        CONNECTION_LOCK_DECREF(c);
+        CONNECTION_LOCK(c);
     }
     event_add( c->c_read_event, c->c_read_timeout );
 
     c->c_destroy = upstream_destroy;
-    CONNECTION_UNLOCK_OR_DESTROY(c);
+    c->c_unlink = upstream_unlink;
+    CONNECTION_UNLOCK(c);
 
     return c;
 
@@ -961,46 +953,39 @@ fail:
     }
 
     c->c_state = LLOAD_C_INVALID;
-    CONNECTION_DESTROY(c);
-    assert( c == NULL );
+    c->c_live--;
+    c->c_refcnt--;
+    connection_destroy( c );
 
     return NULL;
 }
 
-void
-upstream_destroy( LloadConnection *c )
+static void
+upstream_unlink( LloadConnection *c )
 {
     LloadBackend *b = c->c_private;
     struct event *read_event, *write_event;
-    TAvlnode *root, *node;
+    TAvlnode *root;
     long freed, executing;
-    enum sc_state state;
 
-    Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
-            "freeing connection connid=%lu\n",
+    Debug( LDAP_DEBUG_CONNS, "upstream_unlink: "
+            "removing upstream connid=%lu\n",
             c->c_connid );
 
     assert( c->c_state != LLOAD_C_INVALID );
-    state = c->c_state;
-    c->c_state = LLOAD_C_INVALID;
+    assert( c->c_state != LLOAD_C_DYING );
 
-    root = c->c_ops;
-    c->c_ops = NULL;
-    executing = c->c_n_ops_executing;
-    c->c_n_ops_executing = 0;
+    c->c_state = LLOAD_C_DYING;
 
     read_event = c->c_read_event;
     write_event = c->c_write_event;
 
-    for ( node = tavl_end( root, TAVL_DIR_LEFT ); node;
-            node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
-        LloadOperation *op = node->avl_data;
-
-        op->o_res = LLOAD_OP_FAILED;
-        op->o_upstream_refcnt++;
-    }
+    root = c->c_ops;
+    c->c_ops = NULL;
+    executing = c->c_n_ops_executing;
+    c->c_n_ops_executing = 0;
 
-    CONNECTION_UNLOCK_INCREF(c);
+    CONNECTION_UNLOCK(c);
 
     freed = tavl_free( root, (AVL_FREE)operation_lost_upstream );
     assert( freed == executing );
@@ -1018,44 +1003,53 @@ upstream_destroy( LloadConnection *c )
         event_del( write_event );
     }
 
-    /* Remove from the backend on first pass */
-    if ( state != LLOAD_C_DYING ) {
-        ldap_pvt_thread_mutex_lock( &b->b_mutex );
-        if ( c->c_type == LLOAD_C_PREPARING ) {
-            LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
-            b->b_opening--;
-            b->b_failed++;
-        } else if ( c->c_type == LLOAD_C_BIND ) {
-            if ( c == b->b_last_bindconn ) {
-                LloadConnection *prev =
-                        LDAP_CIRCLEQ_LOOP_PREV( &b->b_bindconns, c, c_next );
-                if ( prev == c ) {
-                    b->b_last_bindconn = NULL;
-                } else {
-                    b->b_last_bindconn = prev;
-                }
+    ldap_pvt_thread_mutex_lock( &b->b_mutex );
+    if ( c->c_type == LLOAD_C_PREPARING ) {
+        LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
+        b->b_opening--;
+        b->b_failed++;
+    } else if ( c->c_type == LLOAD_C_BIND ) {
+        if ( c == b->b_last_bindconn ) {
+            LloadConnection *prev =
+                    LDAP_CIRCLEQ_LOOP_PREV( &b->b_bindconns, c, c_next );
+            if ( prev == c ) {
+                b->b_last_bindconn = NULL;
+            } else {
+                b->b_last_bindconn = prev;
             }
-            LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
-            b->b_bindavail--;
-        } else {
-            if ( c == b->b_last_conn ) {
-                LloadConnection *prev =
-                        LDAP_CIRCLEQ_LOOP_PREV( &b->b_conns, c, c_next );
-                if ( prev == c ) {
-                    b->b_last_conn = NULL;
-                } else {
-                    b->b_last_conn = prev;
-                }
+        }
+        LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
+        b->b_bindavail--;
+    } else {
+        if ( c == b->b_last_conn ) {
+            LloadConnection *prev =
+                    LDAP_CIRCLEQ_LOOP_PREV( &b->b_conns, c, c_next );
+            if ( prev == c ) {
+                b->b_last_conn = NULL;
+            } else {
+                b->b_last_conn = prev;
             }
-            LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next );
-            b->b_active--;
         }
-        b->b_n_ops_executing -= executing;
-        backend_retry( b );
-        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+        LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next );
+        b->b_active--;
     }
+    b->b_n_ops_executing -= executing;
+    backend_retry( b );
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
 
-    CONNECTION_LOCK_DECREF(c);
+    CONNECTION_LOCK(c);
+}
+
+void
+upstream_destroy( LloadConnection *c )
+{
+    Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
+            "freeing connection connid=%lu\n",
+            c->c_connid );
+
+    CONNECTION_LOCK(c);
+    assert( c->c_state == LLOAD_C_DYING );
+    c->c_state = LLOAD_C_INVALID;
 
     if ( c->c_read_event ) {
         event_free( c->c_read_event );
@@ -1067,21 +1061,8 @@ upstream_destroy( LloadConnection *c )
         c->c_write_event = NULL;
     }
 
-    /*
-     * If we attempted to destroy any operations, we might have lent a new
-     * refcnt token for a thread that raced us to that, let them call us again
-     * later
-     */
-    assert( c->c_refcnt >= 0 );
-    if ( c->c_refcnt ) {
-        c->c_state = LLOAD_C_DYING;
-        Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
-                "connid=%lu aborting with refcnt=%d\n",
-                c->c_connid, c->c_refcnt );
-        CONNECTION_UNLOCK(c);
-        return;
+    if ( c->c_type != LLOAD_C_BIND ) {
+        BER_BVZERO( &c->c_sasl_bind_mech );
     }
-
-    BER_BVZERO( &c->c_sasl_bind_mech );
     connection_destroy( c );
 }