]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Rework client_read_cb along the lines of upstream
authorOndřej Kuzník <ondra@mistotebe.net>
Wed, 3 May 2017 09:14:19 +0000 (10:14 +0100)
committerOndřej Kuzník <okuznik@symas.com>
Tue, 17 Nov 2020 17:55:46 +0000 (17:55 +0000)
servers/lloadd/bind.c
servers/lloadd/client.c
servers/lloadd/operation.c
servers/lloadd/proto-slap.h

index 077167a594232c1403a1ff8c2f6ceda98feefd4c..53af3895d8af6b2fde28a6a7135a726b52421414 100644 (file)
@@ -246,22 +246,14 @@ fail:
     return 1;
 }
 
-void *
-client_reset( void *ctx, void *arg )
+void
+client_reset( Connection *c )
 {
-    Operation *op = arg;
-    Connection *c = op->o_client;
     TAvlnode *root;
-    int freed, destroy = 1;
+    int freed;
 
-    CONNECTION_LOCK(c);
     root = c->c_ops;
     c->c_ops = NULL;
-    c->c_state = SLAP_C_CLOSING;
-    if ( op->o_tag == LDAP_REQ_BIND ) {
-        c->c_state = SLAP_C_BINDING;
-        destroy = 0;
-    }
     if ( !BER_BVISNULL( &c->c_auth ) ) {
         ch_free( c->c_auth.bv_val );
         BER_BVZERO( &c->c_auth );
@@ -272,35 +264,27 @@ client_reset( void *ctx, void *arg )
     }
     CONNECTION_UNLOCK_INCREF(c);
 
-    tavl_delete( &root, op, operation_client_cmp );
     freed = tavl_free( root, (AVL_FREE)operation_abandon );
 
     Debug( LDAP_DEBUG_TRACE, "client_reset: "
             "dropped %d operations\n",
             freed );
 
-    if ( destroy ) {
-        operation_destroy( op );
-        CLIENT_LOCK_DESTROY(c);
-    } else {
-        CONNECTION_LOCK_DECREF(c);
-        CLIENT_UNLOCK_OR_DESTROY(c);
-    }
-
-    return NULL;
+    CONNECTION_LOCK_DECREF(c);
 }
 
-void *
-client_bind( void *ctx, void *arg )
+int
+client_bind( Connection *client, Operation *op )
 {
-    Operation *op = arg;
-    Connection *upstream, *client = op->o_client;
-    int rc = 0;
+    Connection *upstream;
+    int rc = LDAP_SUCCESS;
 
-    CONNECTION_LOCK(client);
-    CONNECTION_UNLOCK_INCREF(client);
+    /* protect the Bind operation */
+    tavl_delete( &client->c_ops, op, operation_client_cmp );
+    client->c_state = SLAP_C_BINDING;
 
-    client_reset( ctx, arg );
+    client_reset( client );
+    CONNECTION_UNLOCK_INCREF(client);
 
     upstream = backend_select( op );
     if ( !upstream ) {
@@ -309,8 +293,7 @@ client_bind( void *ctx, void *arg )
         operation_send_reject(
                 op, LDAP_UNAVAILABLE, "no connections available", 1 );
         CONNECTION_LOCK_DECREF(client);
-        CLIENT_UNLOCK_OR_DESTROY(client);
-        return NULL;
+        return rc;
     }
 
     op->o_upstream = upstream;
@@ -324,15 +307,15 @@ client_bind( void *ctx, void *arg )
     CONNECTION_LOCK_DECREF(upstream);
     UPSTREAM_UNLOCK_OR_DESTROY(upstream);
 
+    CONNECTION_LOCK_DECREF(client);
     if ( rc ) {
-        CLIENT_LOCK_DESTROY(client);
-        return NULL;
+        CLIENT_DESTROY(client);
+        return -1;
     }
 
-    CONNECTION_LOCK_DECREF(client);
     rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error );
     assert( rc == LDAP_SUCCESS );
     CLIENT_UNLOCK_OR_DESTROY(client);
 
-    return NULL;
+    return rc;
 }
index 4775dbb38cf1b6bb0379fdbaf5e686cb2fad50a4..f2dac9c7d5ba5c8010aedfd2f7f7296c0ea87dd1 100644 (file)
 #include "lutil.h"
 #include "slap.h"
 
+typedef int (*RequestHandler)( Connection *c, Operation *op );
+
 static void
 client_read_cb( evutil_socket_t s, short what, void *arg )
 {
     Connection *c = arg;
     BerElement *ber;
-    Operation *op = NULL;
     ber_tag_t tag;
     ber_len_t len;
-    int rc = 0;
 
     /* What if the shutdown is already in progress and we get to lock the
      * connection? */
@@ -47,8 +47,9 @@ client_read_cb( evutil_socket_t s, short what, void *arg )
         Debug( LDAP_DEBUG_ANY, "client_read_cb: "
                 "ber_alloc failed\n" );
         CLIENT_DESTROY(c);
-        goto fail;
+        return;
     }
+    c->c_currentber = ber;
 
     tag = ber_get_next( c->c_sb, &len, ber );
     if ( tag != LDAP_TAG_MESSAGE ) {
@@ -61,72 +62,134 @@ client_read_cb( evutil_socket_t s, short what, void *arg )
                     c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
 
             c->c_currentber = NULL;
+            ber_free( ber, 1 );
             CLIENT_DESTROY(c);
-            goto fail;
+            return;
         }
-        c->c_currentber = ber;
+        event_add( c->c_read_event, NULL );
         CONNECTION_UNLOCK(c);
         return;
     }
 
+    if ( !slap_conn_max_pdus_per_cycle ||
+            ldap_pvt_thread_pool_submit(
+                    &connection_pool, handle_requests, c ) ) {
+        /* If we're overloaded or configured as such, process one and resume in
+         * the next cycle.
+         *
+         * handle_one_request re-locks the mutex in the
+         * process, need to test it's still alive */
+        if ( handle_one_request( c ) == LDAP_SUCCESS ) {
+            CLIENT_UNLOCK_OR_DESTROY(c);
+        }
+        return;
+    }
+    event_del( c->c_read_event );
+
+    CONNECTION_UNLOCK(c);
+    return;
+}
+
+void *
+handle_requests( void *ctx, void *arg )
+{
+    Connection *c = arg;
+    int requests_handled = 0;
+
+    CONNECTION_LOCK(c);
+    for ( ; requests_handled < slap_conn_max_pdus_per_cycle;
+            requests_handled++ ) {
+        BerElement *ber;
+        ber_tag_t tag;
+        ber_len_t len;
+
+        /* handle_one_response may unlock the connection in the process, we
+         * need to expect that might be our responsibility to destroy it */
+        if ( handle_one_request( c ) ) {
+            /* Error, connection is unlocked and might already have been
+             * destroyed */
+            return NULL;
+        }
+        /* Otherwise, handle_one_request leaves the connection locked */
+
+        if ( (ber = ber_alloc()) == NULL ) {
+            Debug( LDAP_DEBUG_ANY, "client_read_cb: "
+                    "ber_alloc failed\n" );
+            CLIENT_DESTROY(c);
+            return NULL;
+        }
+        c->c_currentber = ber;
+
+        tag = ber_get_next( c->c_sb, &len, ber );
+        if ( tag != LDAP_TAG_MESSAGE ) {
+            int err = sock_errno();
+
+            if ( err != EWOULDBLOCK && err != EAGAIN ) {
+                char ebuf[128];
+                Debug( LDAP_DEBUG_ANY, "handle_requests: "
+                        "ber_get_next on fd %d failed errno=%d (%s)\n",
+                        c->c_fd, err,
+                        sock_errstr( err, ebuf, sizeof(ebuf) ) );
+
+                c->c_currentber = NULL;
+                ber_free( ber, 1 );
+                CLIENT_DESTROY(c);
+                return NULL;
+            }
+            break;
+        }
+    }
+
+    event_add( c->c_read_event, NULL );
+    CLIENT_UNLOCK_OR_DESTROY(c);
+    return NULL;
+}
+
+int
+handle_one_request( Connection *c )
+{
+    BerElement *ber;
+    Operation *op = NULL;
+    RequestHandler handler = NULL;
+
+    ber = c->c_currentber;
     c->c_currentber = NULL;
 
     op = operation_init( c, ber );
     if ( !op ) {
-        Debug( LDAP_DEBUG_ANY, "client_read_cb: "
+        Debug( LDAP_DEBUG_ANY, "handle_one_request: "
                 "operation_init failed\n" );
         CLIENT_DESTROY(c);
-        goto fail;
+        ber_free( ber, 1 );
+        return -1;
     }
 
     switch ( op->o_tag ) {
         case LDAP_REQ_UNBIND:
-            /* We do not expect anything more from the client. Also, we are the
-             * read event, so don't need to unlock */
-            event_del( c->c_read_event );
-
-            rc = ldap_pvt_thread_pool_submit(
-                    &connection_pool, client_reset, op );
-            if ( rc ) {
-                CONNECTION_UNLOCK(c);
-                client_reset( NULL, op );
-                return;
-            }
-            break;
+            c->c_state = SLAP_C_CLOSING;
+            CLIENT_DESTROY(c);
+            return -1;
         case LDAP_REQ_BIND:
-            rc = ldap_pvt_thread_pool_submit(
-                    &connection_pool, client_bind, op );
+            handler = client_bind;
+            break;
+        case LDAP_REQ_ABANDON:
+            /* FIXME: We need to be able to abandon a Bind request, handling
+             * ExOps (esp. Cancel) will be different */
+            handler = request_abandon;
             break;
         default:
             if ( c->c_state == SLAP_C_BINDING ) {
-                CONNECTION_UNLOCK(c);
+                CONNECTION_UNLOCK_INCREF(c);
                 operation_send_reject(
                         op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
-                return;
+                CONNECTION_LOCK_DECREF(c);
+                return LDAP_SUCCESS;
             }
-            rc = ldap_pvt_thread_pool_submit(
-                    &connection_pool, request_process, op );
+            handler = request_process;
             break;
     }
 
-    /* FIXME: unlocks in this function need more thought when we refcount
-     * operations */
-    CONNECTION_UNLOCK(c);
-
-    if ( !rc ) {
-        return;
-    }
-
-fail:
-    if ( op ) {
-        operation_send_reject(
-                op, LDAP_OTHER, "server error or overloaded", 1 );
-        operation_destroy( op );
-    } else if ( ber ) {
-        ber_free( ber, 1 );
-    }
-
-    return;
+    return handler( c, op );
 }
 
 void
index c0af76e76739dc876c51d04ebc53311f914032c0..b0996b2ff57ee2bd26d6e78f359dea9355db15f4 100644 (file)
@@ -204,64 +204,95 @@ fail:
 void
 operation_abandon( Operation *op )
 {
+    Connection *c = op->o_upstream;
+    BerElement *ber;
+    Backend *b;
     int rc;
 
-    if ( op->o_upstream ) {
-        Connection *c = op->o_upstream;
-        BerElement *ber;
-        Backend *b;
+    if ( !c ) {
+        c = op->o_client;
 
         CONNECTION_LOCK(c);
-        rc = ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL );
-        if ( !rc ) {
-            c->c_n_ops_executing--;
-        }
-        b = (Backend *)c->c_private;
-        CONNECTION_UNLOCK_INCREF(c);
+        CLIENT_UNLOCK_OR_DESTROY(c);
+        operation_destroy( op );
+        return;
+    }
 
-        if ( rc ) {
-            /* The operation has already been abandoned or finished */
-            goto done;
-        }
+    CONNECTION_LOCK(c);
+    if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ) {
+        /* The operation has already been abandoned or finished */
+        goto done;
+    }
+    c->c_n_ops_executing--;
+    b = (Backend *)c->c_private;
+    CONNECTION_UNLOCK_INCREF(c);
 
-        ldap_pvt_thread_mutex_lock( &b->b_mutex );
-        b->b_n_ops_executing--;
-        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+    ldap_pvt_thread_mutex_lock( &b->b_mutex );
+    b->b_n_ops_executing--;
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
 
-        ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+    ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
 
-        ber = c->c_pendingber;
-        if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
-            Debug( LDAP_DEBUG_ANY, "operation_abandon: "
-                    "ber_alloc failed\n" );
-            CONNECTION_LOCK_DECREF(c);
-            ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-            UPSTREAM_UNLOCK_OR_DESTROY(c);
-            goto done;
-        }
-        c->c_pendingber = ber;
+    ber = c->c_pendingber;
+    if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+        Debug( LDAP_DEBUG_ANY, "operation_abandon: "
+                "ber_alloc failed\n" );
+        ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+        CONNECTION_LOCK_DECREF(c);
+        goto done;
+    }
+    c->c_pendingber = ber;
 
-        rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE,
-                LDAP_TAG_MSGID, c->c_next_msgid++,
-                LDAP_REQ_ABANDON, op->o_upstream_msgid );
+    rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE,
+            LDAP_TAG_MSGID, c->c_next_msgid++,
+            LDAP_REQ_ABANDON, op->o_upstream_msgid );
 
-        if ( rc == -1 ) {
-            ber_free( ber, 1 );
-        }
+    if ( rc == -1 ) {
+        ber_free( ber, 1 );
+        c->c_pendingber = NULL;
+    }
 
-        CONNECTION_LOCK_DECREF(c);
-        ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-        UPSTREAM_UNLOCK_OR_DESTROY(c);
+    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
 
-        if ( rc != -1 ) {
-            upstream_write_cb( -1, 0, c );
-        }
+    if ( rc != -1 ) {
+        upstream_write_cb( -1, 0, c );
     }
 
+    CONNECTION_LOCK_DECREF(c);
 done:
+    UPSTREAM_UNLOCK_OR_DESTROY(c);
     operation_destroy( op );
 }
 
+int
+request_abandon( Connection *c, Operation *op )
+{
+    Operation *request, needle = { .o_client = c };
+    ber_tag_t tag;
+    int rc = -1;
+
+    tag = ber_get_int( op->o_ber, &needle.o_client_msgid );
+    if ( tag != LDAP_REQ_ABANDON ) {
+        /* How would that happen if we already got the tag for the op? */
+        assert(0);
+        goto done;
+    }
+
+    request = tavl_find( c->c_ops, &needle, operation_client_cmp );
+    if ( !request ) {
+        goto done;
+    }
+
+    CONNECTION_UNLOCK_INCREF(c);
+    operation_abandon( request );
+    CONNECTION_LOCK_DECREF(c);
+
+    rc = LDAP_SUCCESS;
+done:
+    operation_destroy( op );
+    return rc;
+}
+
 void
 operation_send_reject(
         Operation *op,
@@ -317,14 +348,15 @@ operation_lost_upstream( Operation *op )
     operation_destroy( op );
 }
 
-void *
-request_process( void *ctx, void *arg )
+int
+request_process( Connection *client, Operation *op )
 {
-    Operation *op = arg;
     BerElement *output;
-    Connection *client = op->o_client, *upstream;
+    Connection *upstream;
     ber_int_t msgid;
-    int rc;
+    int rc = LDAP_SUCCESS;
+
+    CONNECTION_UNLOCK_INCREF(client);
 
     upstream = backend_select( op );
     if ( !upstream ) {
@@ -346,10 +378,16 @@ request_process( void *ctx, void *arg )
     rc = tavl_insert(
             &upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
     CONNECTION_UNLOCK_INCREF(upstream);
+
+    Debug( LDAP_DEBUG_TRACE, "request_process: "
+            "client connid=%lu added %s msgid=%d to upstream connid=%lu as "
+            "msgid=%d\n",
+            op->o_client_connid, slap_msgtype2str( op->o_tag ),
+            op->o_client_msgid, op->o_upstream_connid, op->o_upstream_msgid );
     assert( rc == LDAP_SUCCESS );
 
     if ( lload_features & LLOAD_FEATURE_PROXYAUTHZ ) {
-        CONNECTION_LOCK(client);
+        CONNECTION_LOCK_DECREF(client);
         Debug( LDAP_DEBUG_TRACE, "request_process: "
                 "proxying identity %s to upstream\n",
                 client->c_auth.bv_val );
@@ -358,7 +396,7 @@ request_process( void *ctx, void *arg )
                 op->o_tag, &op->o_request,
                 LDAP_TAG_CONTROLS,
                 LDAP_CONTROL_PROXY_AUTHZ, 1, &client->c_auth );
-        CONNECTION_UNLOCK(client);
+        CONNECTION_UNLOCK_INCREF(client);
 
         if ( !BER_BVISNULL( &op->o_ctrls ) ) {
             BerElement *control_ber = ber_alloc();
@@ -387,7 +425,8 @@ request_process( void *ctx, void *arg )
     CONNECTION_LOCK_DECREF(upstream);
     UPSTREAM_UNLOCK_OR_DESTROY(upstream);
 
-    return NULL;
+    CONNECTION_LOCK_DECREF(client);
+    return rc;
 
 fail:
     if ( upstream ) {
@@ -396,5 +435,6 @@ fail:
         UPSTREAM_UNLOCK_OR_DESTROY(upstream);
     }
     operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
-    return NULL;
+    CONNECTION_LOCK_DECREF(client);
+    return rc;
 }
index a945b797b8a7ae596c7e77d0593f096be45cc25d..f949ef546ea91ee56ba1ad0d8c90f208f56084ce 100644 (file)
@@ -63,12 +63,14 @@ LDAP_SLAPD_F (void) ch_free( void * );
 /*
  * bind.c
  */
-LDAP_SLAPD_F (void *) client_reset( void *ctx, void *arg );
-LDAP_SLAPD_F (void *) client_bind( void *ctx, void *arg );
+LDAP_SLAPD_F (void) client_reset( Connection *c );
+LDAP_SLAPD_F (int) client_bind( Connection *c, Operation *op );
 
 /*
  * client.c
  */
+LDAP_SLAPD_F (void *) handle_requests( void *ctx, void *arg );
+LDAP_SLAPD_F (int) handle_one_request( Connection *c );
 LDAP_SLAPD_F (Connection *) client_init( ber_socket_t s, Listener *url, const char *peername, struct event_base *base, int use_tls );
 LDAP_SLAPD_F (void) client_write_cb( evutil_socket_t s, short what, void *arg );
 LDAP_SLAPD_F (void) client_destroy( Connection *c );
@@ -154,7 +156,8 @@ LDAP_SLAPD_F (void) operation_abandon( Operation *op );
 LDAP_SLAPD_F (void) operation_send_reject( Operation *op, int result, const char *msg, int send_anyway );
 LDAP_SLAPD_F (void) operation_lost_upstream( Operation *op );
 LDAP_SLAPD_F (void) operation_destroy( Operation *op );
-LDAP_SLAPD_F (void *) request_process( void *ctx, void *arg );
+LDAP_SLAPD_F (int) request_abandon( Connection *c, Operation *op );
+LDAP_SLAPD_F (int) request_process( Connection *c, Operation *op );
 
 /*
  * sl_malloc.c