]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Maintain the configured amount of connections per backend
authorOndřej Kuzník <ondra@mistotebe.net>
Tue, 11 Apr 2017 13:15:46 +0000 (14:15 +0100)
committerOndřej Kuzník <okuznik@symas.com>
Tue, 17 Nov 2020 17:55:46 +0000 (17:55 +0000)
servers/lloadd/backend.c
servers/lloadd/config.c
servers/lloadd/daemon.c
servers/lloadd/proto-slap.h
servers/lloadd/slap.h
servers/lloadd/upstream.c

index 534277b2c2efd45e82786172cbfbb3fc621bf3dc..3a6d96234b5959125e5ce68c8920cf5a04d87a29 100644 (file)
@@ -32,25 +32,26 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
 {
     Backend *b = arg;
     Connection *c;
-    ber_socket_t s;
+    ber_socket_t s = AC_SOCKET_INVALID;
     int rc;
 
+    ldap_pvt_thread_mutex_lock( &b->b_mutex );
+
     if ( result || !res ) {
         Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
                 "name resolution failed for backend '%s': %s\n",
                 b->b_bindconf.sb_uri.bv_val, evutil_gai_strerror( result ) );
-        return;
+        goto fail;
     }
 
-    s = socket( res->ai_family, SOCK_STREAM, 0 );
-    if ( s == AC_SOCKET_INVALID ) {
-        return;
+    /* TODO: if we get failures, try the other addrinfos */
+    if ( (s = socket( res->ai_family, SOCK_STREAM, 0 )) ==
+            AC_SOCKET_INVALID ) {
+        goto fail;
     }
 
-    rc = ber_pvt_socket_set_nonblock( s, 1 );
-    if ( rc ) {
-        evutil_closesocket( s );
-        return;
+    if ( ber_pvt_socket_set_nonblock( s, 1 ) ) {
+        goto fail;
     }
 
     if ( res->ai_family == PF_INET ) {
@@ -66,14 +67,21 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
         Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
                 "failed to connect to server '%s'\n",
                 b->b_bindconf.sb_uri.bv_val );
-        evutil_closesocket( s );
-        return;
+        goto fail;
     }
 
     c = upstream_init( s, b );
-    ldap_pvt_thread_mutex_lock( &b->b_mutex );
-    b->b_conns = c;
     ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+    backend_retry( b );
+    return;
+
+fail:
+    if ( s != AC_SOCKET_INVALID ) {
+        evutil_closesocket( s );
+    }
+    b->b_opening--;
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+    backend_retry( b );
 }
 
 Connection *
@@ -81,48 +89,95 @@ backend_select( Operation *op )
 {
     Backend *b;
 
+    /* TODO: Two runs, one with trylock, then one actually locked if we don't
+     * find anything? */
     LDAP_STAILQ_FOREACH ( b, &backend, b_next ) {
+        struct ConnSt *head;
         Connection *c;
 
         ldap_pvt_thread_mutex_lock( &b->b_mutex );
-        c = b->b_conns;
-        ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
-        if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) {
-            ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-            return b->b_conns;
+        if ( op->o_tag == LDAP_REQ_BIND &&
+                !(lload_features & LLOAD_FEATURE_VC) ) {
+            head = &b->b_bindconns;
+        } else {
+            head = &b->b_conns;
+        }
+
+        /* TODO: Use CIRCLEQ so that we can do a natural round robin over the
+         * backend's connections? */
+        LDAP_LIST_FOREACH( c, head, c_next )
+        {
+            ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+            if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) {
+                Debug( LDAP_DEBUG_CONNS, "backend_select: "
+                        "selected connection %lu for client %lu msgid=%d\n",
+                        c->c_connid, op->o_client->c_connid,
+                        op->o_client_msgid );
+                ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+                return c;
+            }
+            ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
         }
-        ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
         ldap_pvt_thread_mutex_unlock( &b->b_mutex );
     }
 
     return NULL;
 }
 
+void
+backend_retry( Backend *b )
+{
+    int rc, requested;
+
+    ldap_pvt_thread_mutex_lock( &b->b_mutex );
+    /* TODO: timeout regime */
+
+    requested = b->b_numconns;
+    if ( !(lload_features & LLOAD_FEATURE_VC) ) {
+        requested += b->b_numbindconns;
+    }
+    if ( b->b_active + b->b_bindavail + b->b_opening < requested ) {
+        b->b_opening++;
+        rc = ldap_pvt_thread_pool_submit(
+                &connection_pool, backend_connect, b );
+        /* TODO check we're not shutting down */
+        if ( rc ) {
+            ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+            backend_connect( NULL, b );
+            return;
+        }
+    }
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+}
+
 void *
 backend_connect( void *ctx, void *arg )
 {
     struct evutil_addrinfo hints = {};
     Backend *b = arg;
+    char *hostname;
 
+    ldap_pvt_thread_mutex_lock( &b->b_mutex );
 #ifdef LDAP_PF_LOCAL
     if ( b->b_proto == LDAP_PROTO_IPC ) {
         struct sockaddr_un addr;
+        Connection *c;
         ber_socket_t s = socket( PF_LOCAL, SOCK_STREAM, 0 );
         int rc;
 
         if ( s == AC_SOCKET_INVALID ) {
-            return (void *)-1;
+            goto fail;
         }
 
         rc = ber_pvt_socket_set_nonblock( s, 1 );
         if ( rc ) {
             evutil_closesocket( s );
-            return (void *)-1;
+            goto fail;
         }
 
         if ( strlen( b->b_host ) > ( sizeof(addr.sun_path) - 1 ) ) {
             evutil_closesocket( s );
-            return (void *)-1;
+            goto fail;
         }
         memset( &addr, '\0', sizeof(addr) );
         addr.sun_family = AF_LOCAL;
@@ -132,10 +187,12 @@ backend_connect( void *ctx, void *arg )
                 s, (struct sockaddr *)&addr, sizeof(struct sockaddr_un) );
         if ( rc && errno != EINPROGRESS && errno != EWOULDBLOCK ) {
             evutil_closesocket( s );
-            return (void *)-1;
+            goto fail;
         }
 
-        b->b_conns = upstream_init( s, b );
+        c = upstream_init( s, b );
+        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+        backend_retry( b );
         return NULL;
     }
 #endif /* LDAP_PF_LOCAL */
@@ -145,6 +202,15 @@ backend_connect( void *ctx, void *arg )
     hints.ai_socktype = SOCK_STREAM;
     hints.ai_protocol = IPPROTO_TCP;
 
-    evdns_getaddrinfo( dnsbase, b->b_host, NULL, &hints, upstream_name_cb, b );
+    hostname = b->b_host;
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+
+    evdns_getaddrinfo( dnsbase, hostname, NULL, &hints, upstream_name_cb, b );
     return NULL;
+
+fail:
+    b->b_opening--;
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+    backend_retry( b );
+    return (void *)-1;
 }
index 0f13807d3f107243fc29026b1d25a4f710e16ea0..84e1ba92d976509253e2398a54bd21f1cbd0bf91 100644 (file)
@@ -462,6 +462,9 @@ config_backend( ConfigArgs *c )
 
     b = ch_calloc( 1, sizeof(Backend) );
 
+    LDAP_LIST_INIT( &b->b_conns );
+    LDAP_LIST_INIT( &b->b_bindconns );
+
     b->b_numconns = 1;
     b->b_numbindconns = 1;
 
index 31382e594b56218e7f5bebfd9c60ae54fba00073..fbf8621500f9975c51a341ae6d008b20bd07b7cb 100644 (file)
@@ -1300,6 +1300,7 @@ slapd_daemon( struct event_base *daemon_base )
     }
 
     LDAP_STAILQ_FOREACH ( b, &backend, b_next ) {
+        b->b_opening++;
         ldap_pvt_thread_pool_submit( &connection_pool, backend_connect, b );
     }
 
index b55b155de3dba1ba748745fb01d2d82e2d59028e..6c14fd8ddfe5e870274afc5456c89e72bdd03e74 100644 (file)
@@ -41,6 +41,7 @@ struct config_reply_s; /* config.h */
  */
 
 LDAP_SLAPD_F (void *) backend_connect( void *ctx, void *arg );
+LDAP_SLAPD_F (void) backend_retry( Backend *b );
 LDAP_SLAPD_F (Connection *) backend_select( Operation *op );
 
 /*
index 39e97a046760078d6dc843551efd06ae7d5d518e..1ae43d3df241dafa1b7888f093e90a3095c8afc2 100644 (file)
@@ -248,7 +248,8 @@ struct Backend {
     char *b_host;
 
     int b_numconns, b_numbindconns;
-    Connection *b_conns, *b_bindconns;
+    int b_bindavail, b_active, b_opening;
+    LDAP_LIST_HEAD(ConnSt, Connection) b_conns, b_bindconns;
 
     LDAP_STAILQ_ENTRY(Backend) b_next;
 };
@@ -263,11 +264,16 @@ enum sc_state {
     SLAP_C_ACTIVE,      /* exclusive operation (tls setup, ...) in progress */
     SLAP_C_BINDING,     /* binding */
 };
+enum sc_type {
+    SLAP_C_OPEN = 0, /* regular connection */
+    SLAP_C_BIND, /* connection used to handle bind client requests if VC not enabled */
+};
 /*
  * represents a connection from an ldap client/to ldap server
  */
 struct Connection {
     enum sc_state c_state; /* connection state */
+    enum sc_type c_type;
     ber_socket_t c_fd;
 
     ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */
@@ -301,7 +307,7 @@ struct Connection {
     TAvlnode *c_ops; /* Operations pending on the connection */
 
 #define CONN_IS_TLS 1
-#define CONN_IS_CLIENT 4
+#define CONN_IS_BIND 4
 #define CONN_IS_IPC 8
 
 #ifdef HAVE_TLS
@@ -312,6 +318,9 @@ struct Connection {
     long c_n_ops_executing; /* num of ops currently executing */
     long c_n_ops_completed; /* num of ops completed */
 
+    /* Upstream: Protected by its backend's mutex */
+    LDAP_LIST_ENTRY( Connection ) c_next;
+
     void *c_private;
 };
 
index f376d920122f54266b657e3872221a15b0bc5e51..b11afca2ca5d08848674a4fb49d0cff6a6239816 100644 (file)
@@ -735,16 +735,22 @@ upstream_bind( void *ctx, void *arg )
     return NULL;
 }
 
+/*
+ * We must already hold b->b_mutex when called.
+ */
 Connection *
 upstream_init( ber_socket_t s, Backend *b )
 {
     Connection *c;
     struct event_base *base = slap_get_base( s );
     struct event *event;
-    int flags = (b->b_tls == LLOAD_LDAPS) ? CONN_IS_TLS : 0;
+    int flags, is_bindconn = 0;
 
     assert( b != NULL );
 
+    b->b_opening--;
+
+    flags = (b->b_tls == LLOAD_LDAPS) ? CONN_IS_TLS : 0;
     c = connection_init( s, b->b_host, flags );
     c->c_private = b;
 
@@ -757,15 +763,38 @@ upstream_init( ber_socket_t s, Backend *b )
     /* We only register the write event when we have data pending */
     c->c_write_event = event;
 
-    if ( b->b_bindconf.sb_method == LDAP_AUTH_NONE ) {
+    /* Unless we are configured to use the VC exop, consider allocating the
+     * connection into the bind conn pool. Start off by allocating one for
+     * general use, then one for binds, then we start filling up the general
+     * connection pool, finally the bind pool */
+    if ( !(lload_features & LLOAD_FEATURE_VC) && b->b_active &&
+            b->b_numbindconns ) {
+        if ( !b->b_bindavail ) {
+            is_bindconn = 1;
+        } else if ( b->b_active >= b->b_numconns &&
+                b->b_bindavail < b->b_numbindconns ) {
+            is_bindconn = 1;
+        }
+    }
+
+    if ( is_bindconn || b->b_bindconf.sb_method == LDAP_AUTH_NONE ) {
         upstream_finish( c );
     } else {
         ldap_pvt_thread_pool_submit( &connection_pool, upstream_bind, c );
     }
 
-    ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+    if ( is_bindconn ) {
+        LDAP_LIST_INSERT_HEAD( &b->b_bindconns, c, c_next );
+        c->c_type = SLAP_C_BIND;
+        b->b_bindavail++;
+    } else {
+        LDAP_LIST_INSERT_HEAD( &b->b_conns, c, c_next );
+        b->b_active++;
+    }
 
+    ldap_pvt_thread_mutex_unlock( &c->c_mutex );
     return c;
+
 fail:
     if ( c->c_write_event ) {
         event_del( c->c_write_event );
@@ -784,18 +813,22 @@ upstream_destroy( Connection *c )
 {
     Backend *b = c->c_private;
 
+    Debug( LDAP_DEBUG_CONNS, "upstream_destroy: freeing connection %lu\n",
+            c->c_connid );
+
+    assert( c->c_state != SLAP_C_INVALID );
     c->c_state = SLAP_C_INVALID;
     ldap_pvt_thread_mutex_unlock( &c->c_mutex );
 
     ldap_pvt_thread_mutex_lock( &b->b_mutex );
-    if ( !(b->b_conns == c) ) {
-        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-        return;
+    LDAP_LIST_REMOVE( c, c_next );
+    if ( c->c_type == SLAP_C_BIND ) {
+        b->b_bindavail--;
+    } else {
+        b->b_active--;
     }
-    b->b_conns = NULL;
     ldap_pvt_thread_mutex_unlock( &b->b_mutex );
-
-    ldap_pvt_thread_pool_submit( &connection_pool, backend_connect, b );
+    backend_retry( b );
 
     ldap_pvt_thread_mutex_lock( &c->c_mutex );