]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
ITS#9598 Introduce backend-restricted selection
authorOndřej Kuzník <okuznik@symas.com>
Mon, 10 Aug 2020 12:43:17 +0000 (14:43 +0200)
committerOndřej Kuzník <okuznik@symas.com>
Fri, 13 Aug 2021 09:57:14 +0000 (10:57 +0100)
doc/man/man5/lloadd.conf.5
servers/lloadd/bind.c
servers/lloadd/client.c
servers/lloadd/config.c
servers/lloadd/lload.h
servers/lloadd/operation.c
servers/lloadd/proto-lload.h

index d6e5ade0300afc36c4678d1ddee6535992ca8a83..fc6beb5765d3704bdef7e8666bad6a52dca2fd06 100644 (file)
@@ -341,6 +341,16 @@ Specify the number of milliseconds to wait before forcibly closing
 a connection with an outstanding write. This allows faster recovery from
 various network hang conditions.  An iotimeout of 0 disables this feature.
 The default is 10000.
+.TP
+.B write_coherence <integer>
+Specify the number of seconds after a write operation is finished that
+.B lloadd
+will direct operations exclusively to the last selected backend. A write
+operation is anything not handled internally (certain exops, abandon),
+excepting search, compare and bind operations. Bind operations also reset this
+restriction. The default is 0, write operations do not restrict selection. When
+negative, the restriction is not time limited and will persist until the next
+bind.
 
 .SH TLS OPTIONS
 If
index 30db61db498d8e62febb4b08d6376a10d4d91ee0..e01534a3afd4db66c1cb8d8f033e1e5e83d0c283 100644 (file)
@@ -335,6 +335,12 @@ request_bind( LloadConnection *client, LloadOperation *op )
     rc = ldap_tavl_insert( &client->c_ops, op, operation_client_cmp, ldap_avl_dup_error );
     assert( rc == LDAP_SUCCESS );
     client->c_n_ops_executing++;
+
+    if ( client->c_backend ) {
+        assert( client->c_restricted_inflight == 0 );
+        client->c_backend = NULL;
+        client->c_restricted_at = 0;
+    }
     CONNECTION_UNLOCK(client);
 
     if ( pin ) {
index 9567d2033dde0d9c08e94392d5a7bb21c15305d9..5061490e2bb46491889be9f435617d8673e2aff9 100644 (file)
@@ -90,11 +90,28 @@ request_process( LloadConnection *client, LloadOperation *op )
 {
     BerElement *output;
     LloadConnection *upstream = NULL;
+    LloadBackend *b = NULL;
     ber_int_t msgid;
     int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
     char *message = "no connections available";
 
-    upstream_select( op, &upstream, &res, &message );
+    if ( lload_write_coherence ) {
+        CONNECTION_LOCK(client);
+        if ( client->c_restricted_inflight || client->c_restricted_at < 0 ||
+                client->c_restricted_at + lload_write_coherence >= op->o_start ) {
+            b = client->c_backend;
+        } else {
+            client->c_backend = NULL;
+        }
+        CONNECTION_UNLOCK(client);
+    }
+
+    if ( b ) {
+        backend_select( b, op, &upstream, &res, &message );
+    } else {
+        upstream_select( op, &upstream, &res, &message );
+    }
+
     if ( !upstream ) {
         Debug( LDAP_DEBUG_STATS, "request_process: "
                 "connid=%lu, msgid=%d no available connection found\n",
@@ -166,6 +183,21 @@ request_process( LloadConnection *client, LloadOperation *op )
 
     lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++;
 
+    if ( lload_write_coherence && !b &&
+            op->o_tag != LDAP_REQ_SEARCH &&
+            op->o_tag != LDAP_REQ_COMPARE ) {
+        /*
+         * TODO: There can't be more than one thread receiving a new request,
+         * so we could drop the lock. We'd still need some atomics for the
+         * counters.
+         */
+        CONNECTION_LOCK(client);
+        client->c_backend = upstream->c_backend;
+        client->c_restricted_inflight++;
+        op->o_restricted = 1;
+        CONNECTION_UNLOCK(client);
+    }
+
     if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) &&
             client->c_type != LLOAD_C_PRIVILEGED ) {
         CONNECTION_LOCK(client);
@@ -199,6 +231,8 @@ fail:
     if ( upstream ) {
         CONNECTION_LOCK_DESTROY(upstream);
 
+        /* We have not committed any restrictions in the end */
+        op->o_restricted = LLOAD_OP_NOT_RESTRICTED;
         operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
     }
 
index 03b81cc7714fabc0530c1caf8270fa5767a423f5..3bdf6dac7a3792cf34e562c5ecb86fdc76312537 100644 (file)
@@ -80,6 +80,7 @@ static struct timeval timeout_api_tv, timeout_net_tv,
         timeout_write_tv = { 10, 0 };
 
 lload_features_t lload_features;
+int lload_write_coherence = 0;
 
 ber_len_t sockbuf_max_incoming_client = LLOAD_SB_MAX_INCOMING_CLIENT;
 ber_len_t sockbuf_max_incoming_upstream = LLOAD_SB_MAX_INCOMING_UPSTREAM;
@@ -631,6 +632,17 @@ static ConfigTable config_back_cf_table[] = {
         NULL,
         { .v_uint = 0 }
     },
+    { "write_coherence", "seconds", 2, 2, 0,
+        ARG_INT,
+        &lload_write_coherence,
+        "( OLcfgBkAt:13.38 "
+            "NAME 'olcBkLloadWriteCoherence' "
+            "DESC 'Keep operations to the same backend after a write' "
+            "EQUALITY integerMatch "
+            "SYNTAX OMsInteger "
+            "SINGLE-VALUE )",
+        NULL, NULL
+    },
 
     /* cn=config only options */
 #ifdef BALANCER_MODULE
index 5ba8c6abd3fb3f7bc085085279a6036ec38dc1f8..d9b84ec7e6c44e23a2976cb122f8beee380e8da7 100644 (file)
@@ -295,6 +295,14 @@ enum sc_io_state {
                                      * sent */
 };
 
+/* Tracking whether an operation might cause a client to restrict which
+ * upstreams are eligible */
+enum op_restriction {
+    LLOAD_OP_NOT_RESTRICTED, /* operation didn't trigger any restriction */
+    LLOAD_OP_RESTRICTED_BACKEND, /* operation restricts a client to a certain backend */
+    LLOAD_OP_RESTRICTED_UPSTREAM, /* operation restricts a client to a certain upstream */
+};
+
 /*
  * represents a connection from an ldap client/to ldap server
  */
@@ -403,6 +411,8 @@ struct LloadConnection {
     lload_counters_t c_counters; /* per connection operation counters */
 
     LloadBackend *c_backend;
+    uintptr_t c_restricted_inflight;
+    time_t c_restricted_at;
 
     /*
      * Protected by the CIRCLEQ mutex:
@@ -441,6 +451,7 @@ struct LloadOperation {
     unsigned long o_client_connid;
     ber_int_t o_client_msgid;
     ber_int_t o_saved_msgid;
+    enum op_restriction o_restricted;
 
     LloadConnection *o_upstream;
     unsigned long o_upstream_connid;
index 6004083a7169554ba24460f9623cb508bc95f809..5b953841a8c5861451660dec1b97ea885a450d8b 100644 (file)
@@ -276,6 +276,20 @@ operation_unlink_client( LloadOperation *op, LloadConnection *client )
         assert( op == removed );
         client->c_n_ops_executing--;
 
+        if ( op->o_restricted ) {
+            if ( !--client->c_restricted_inflight && client->c_restricted_at >= 0 ) {
+                if ( lload_write_coherence < 0 ) {
+                    client->c_restricted_at = -1;
+                } else if ( op->o_last_response ) {
+                    client->c_restricted_at = op->o_last_response;
+                } else {
+                    /* We have to default to o_start just in case we abandoned an
+                     * operation that the backend actually processed */
+                    client->c_restricted_at = op->o_start;
+                }
+            }
+        }
+
         if ( op->o_tag == LDAP_REQ_BIND &&
                 client->c_state == LLOAD_C_BINDING ) {
             client->c_state = LLOAD_C_READY;
index b750f553a3888b757ec366034c5fcf3d148e4017..604c4a6b582beb9be5f0763f4b5f762d25704767 100644 (file)
@@ -202,6 +202,8 @@ LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_client;
 LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_upstream;
 LDAP_SLAPD_V (int) lload_conn_max_pdus_per_cycle;
 
+LDAP_SLAPD_V (int) lload_write_coherence;
+
 LDAP_SLAPD_V (lload_features_t) lload_features;
 
 LDAP_SLAPD_V (slap_mask_t) global_allows;