]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
ITS#9598 Per OID restrictions
authorOndřej Kuzník <okuznik@symas.com>
Mon, 10 Aug 2020 15:14:07 +0000 (17:14 +0200)
committerOndřej Kuzník <okuznik@symas.com>
Fri, 13 Aug 2021 09:57:14 +0000 (10:57 +0100)
doc/man/man5/lloadd.conf.5
servers/lloadd/bind.c
servers/lloadd/client.c
servers/lloadd/config.c
servers/lloadd/daemon.c
servers/lloadd/extended.c
servers/lloadd/lload.h
servers/lloadd/operation.c
servers/lloadd/proto-lload.h
servers/lloadd/upstream.c

index fc6beb5765d3704bdef7e8666bad6a52dca2fd06..d6c6a836e656b93033f622d0d16faf93e210fe36 100644 (file)
@@ -347,10 +347,65 @@ Specify the number of seconds after a write operation is finished that
 .B lloadd
 will direct operations exclusively to the last selected backend. A write
 operation is anything not handled internally (certain exops, abandon),
-excepting search, compare and bind operations. Bind operations also reset this
+except search, compare and bind operations. Bind operations also reset this
 restriction. The default is 0, write operations do not restrict selection. When
 negative, the restriction is not time limited and will persist until the next
 bind.
+.TP
+.B restrict_exop <OID> <action>
+Tell
+.B lloadd
+that extended operation with a given OID should be handled in a specific way.
+OID
+.B 1.1
+is special, setting a default (only for operations not handled internally).
+The meaning of the
+.B <action>
+argument is the same as in
+.B restrict_control
+below.
+.TP
+.B restrict_control <OID> <action>
+Tell
+.B lloadd
+that a control with a given OID attached to any operation should be handled in
+a specific way according to the
+.B <action>
+argument. At the moment, only operations passed intact are inspected in
+this way, in particular, controls on bind and extended operations are
+.B not
+checked.
+
+In order of descending priority (the control with highest priority action
+wins), this is the action made:
+.RS
+.RS
+.PD 0
+.TP
+.B reject
+operations that carry this control will be rejected.
+.TP
+.B connection
+once an upstream is selected, every future operation from this client will be
+directed to the same connection. Useful when state is shared between client and
+upstream that the load balancer doesn't track.
+.TP
+.B backend
+like
+.B write
+except this does not time out.
+.TP
+.B write
+this is treated like a write operation (see
+.BR write_coherence )
+above.
+.TP
+.B ignore
+does not influence restrictions, useful when changing the global exop default.
+This is the default handling for exops/controls not handled by the load balancer
+internally.
+.PD
+.RE
 
 .SH TLS OPTIONS
 If
@@ -804,6 +859,21 @@ Here is a short example of a configuration file:
 argsfile  LOCALSTATEDIR/run/lloadd.args
 pidfile   LOCALSTATEDIR/run/lloadd.pid
 
+# cancel not supported yet
+restrict_exop 1.3.6.1.1.8 reject
+
+# turn not supported
+restrict_exop 1.3.6.1.1.19 reject
+
+# TXN Exop if desired, otherwise reject
+restrict_exop 1.3.6.1.1.21.1 connection
+
+# Paged results control
+restrict_control 1.2.840.113556.1.4.319 connection
+
+# VLV control
+restrict_control 2.16.840.1.113730.3.4.9 connection
+
 bindconf
     bindmethod=simple
     binddn=cn=test
index e01534a3afd4db66c1cb8d8f033e1e5e83d0c283..c3351943fef4669e76875d9ccceb3c5508038532 100644 (file)
@@ -197,6 +197,7 @@ request_bind( LloadConnection *client, LloadOperation *op )
     unsigned long pin;
     int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
     char *message = "no connections available";
+    enum op_restriction client_restricted;
 
     CONNECTION_LOCK(client);
     pin = client->c_pin_id;
@@ -336,26 +337,22 @@ request_bind( LloadConnection *client, LloadOperation *op )
     assert( rc == LDAP_SUCCESS );
     client->c_n_ops_executing++;
 
-    if ( client->c_backend ) {
-        assert( client->c_restricted_inflight == 0 );
-        client->c_backend = NULL;
-        client->c_restricted_at = 0;
-    }
+    client_restricted = client->c_restricted;
     CONNECTION_UNLOCK(client);
 
     if ( pin ) {
         checked_lock( &op->o_link_mutex );
         upstream = op->o_upstream;
         checked_unlock( &op->o_link_mutex );
+    }
 
-        if ( upstream ) {
-            checked_lock( &upstream->c_io_mutex );
-            CONNECTION_LOCK(upstream);
-            if ( !IS_ALIVE( upstream, c_live ) ) {
-                CONNECTION_UNLOCK(upstream);
-                checked_unlock( &upstream->c_io_mutex );
-                upstream = NULL;
-            }
+    if ( upstream ) {
+        checked_lock( &upstream->c_io_mutex );
+        CONNECTION_LOCK(upstream);
+        if ( !IS_ALIVE( upstream, c_live ) ) {
+            CONNECTION_UNLOCK(upstream);
+            checked_unlock( &upstream->c_io_mutex );
+            upstream = NULL;
         }
     }
 
@@ -363,7 +360,7 @@ request_bind( LloadConnection *client, LloadOperation *op )
      * have to reject the op and clear pin */
     if ( upstream ) {
         /* No need to do anything */
-    } else if ( !pin ) {
+    } else if ( !pin && client_restricted != LLOAD_OP_RESTRICTED_ISOLATE ) {
         upstream_select( op, &upstream, &res, &message );
     } else {
         Debug( LDAP_DEBUG_STATS, "request_bind: "
index 5061490e2bb46491889be9f435617d8673e2aff9..a1902870b923afb97aa68de3db918d22b9f1612d 100644 (file)
@@ -94,19 +94,97 @@ request_process( LloadConnection *client, LloadOperation *op )
     ber_int_t msgid;
     int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
     char *message = "no connections available";
+    enum op_restriction client_restricted;
 
-    if ( lload_write_coherence ) {
-        CONNECTION_LOCK(client);
-        if ( client->c_restricted_inflight || client->c_restricted_at < 0 ||
-                client->c_restricted_at + lload_write_coherence >= op->o_start ) {
-            b = client->c_backend;
-        } else {
+    if ( lload_control_actions && !BER_BVISNULL( &op->o_ctrls ) ) {
+        BerElementBuffer copy_berbuf;
+        BerElement *copy = (BerElement *)&copy_berbuf;
+        struct berval control;
+
+        ber_init2( copy, &op->o_ctrls, 0 );
+
+        while ( ber_skip_element( copy, &control ) == LBER_SEQUENCE ) {
+            struct restriction_entry *entry, needle = {};
+            BerElementBuffer control_berbuf;
+            BerElement *control_ber = (BerElement *)&control_berbuf;
+
+            ber_init2( control_ber, &control, 0 );
+
+            if ( ber_skip_element( control_ber, &needle.oid ) == LBER_ERROR ) {
+                res = LDAP_PROTOCOL_ERROR;
+                message = "invalid control";
+
+                operation_send_reject( op, res, message, 1 );
+                goto fail;
+            }
+
+            entry = ldap_tavl_find(
+                    lload_control_actions, &needle, lload_restriction_cmp );
+            if ( entry && op->o_restricted < entry->action ) {
+                op->o_restricted = entry->action;
+            }
+        }
+    }
+    if ( op->o_restricted < LLOAD_OP_RESTRICTED_WRITE &&
+            lload_write_coherence &&
+            op->o_tag != LDAP_REQ_SEARCH &&
+            op->o_tag != LDAP_REQ_COMPARE ) {
+        op->o_restricted = LLOAD_OP_RESTRICTED_WRITE;
+    }
+
+    if ( op->o_restricted == LLOAD_OP_RESTRICTED_REJECT ) {
+        res = LDAP_UNWILLING_TO_PERFORM;
+        message = "extended operation or control disallowed";
+
+        operation_send_reject( op, res, message, 1 );
+        goto fail;
+    }
+
+    CONNECTION_LOCK(client);
+    client_restricted = client->c_restricted;
+    if ( client_restricted ) {
+        if ( client_restricted == LLOAD_OP_RESTRICTED_WRITE &&
+                client->c_restricted_inflight == 0 &&
+                client->c_restricted_at >= 0 &&
+                client->c_restricted_at + lload_write_coherence <
+                    op->o_start ) {
+            Debug( LDAP_DEBUG_TRACE, "request_process: "
+                    "connid=%lu write coherence to backend '%s' expired\n",
+                    client->c_connid, client->c_backend->b_name.bv_val );
             client->c_backend = NULL;
+            client_restricted = client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
         }
-        CONNECTION_UNLOCK(client);
+        switch ( client_restricted ) {
+            case LLOAD_OP_NOT_RESTRICTED:
+                break;
+            case LLOAD_OP_RESTRICTED_WRITE:
+            case LLOAD_OP_RESTRICTED_BACKEND:
+                b = client->c_backend;
+                assert( b );
+                break;
+            case LLOAD_OP_RESTRICTED_UPSTREAM:
+            case LLOAD_OP_RESTRICTED_ISOLATE:
+                upstream = client->c_linked_upstream;
+                assert( upstream );
+                break;
+            default:
+                assert(0);
+                break;
+        }
+    }
+    if ( op->o_restricted < client_restricted ) {
+        op->o_restricted = client_restricted;
     }
+    CONNECTION_UNLOCK(client);
 
-    if ( b ) {
+    if ( upstream ) {
+        b = upstream->c_backend;
+        checked_lock( &b->b_mutex );
+        if ( !try_upstream( b, NULL, op, upstream, &res, &message ) ) {
+            upstream = NULL;
+        }
+        checked_unlock( &b->b_mutex );
+    } else if ( b ) {
         backend_select( b, op, &upstream, &res, &message );
     } else {
         upstream_select( op, &upstream, &res, &message );
@@ -169,9 +247,18 @@ request_process( LloadConnection *client, LloadOperation *op )
     }
     upstream->c_pendingber = output;
 
+    if ( client_restricted < LLOAD_OP_RESTRICTED_UPSTREAM &&
+            op->o_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ) {
+        rc = ldap_tavl_insert(
+                &upstream->c_linked, client, lload_upstream_entry_cmp,
+                ldap_avl_dup_error );
+        assert( rc == LDAP_SUCCESS );
+    }
+
     op->o_upstream_msgid = msgid = upstream->c_next_msgid++;
     rc = ldap_tavl_insert(
             &upstream->c_ops, op, operation_upstream_cmp, ldap_avl_dup_error );
+
     CONNECTION_UNLOCK(upstream);
 
     Debug( LDAP_DEBUG_TRACE, "request_process: "
@@ -183,18 +270,27 @@ request_process( LloadConnection *client, LloadOperation *op )
 
     lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++;
 
-    if ( lload_write_coherence && !b &&
-            op->o_tag != LDAP_REQ_SEARCH &&
-            op->o_tag != LDAP_REQ_COMPARE ) {
-        /*
-         * TODO: There can't be more than one thread receiving a new request,
-         * so we could drop the lock. We'd still need some atomics for the
-         * counters.
-         */
+    if ( op->o_restricted > client_restricted ||
+            client_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
         CONNECTION_LOCK(client);
-        client->c_backend = upstream->c_backend;
-        client->c_restricted_inflight++;
-        op->o_restricted = 1;
+        if ( op->o_restricted > client_restricted ) {
+            client->c_restricted = op->o_restricted;
+        }
+        if ( op->o_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
+            client->c_restricted_inflight++;
+        }
+        if ( op->o_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ) {
+            if ( client_restricted < LLOAD_OP_RESTRICTED_UPSTREAM ) {
+                client->c_linked_upstream = upstream;
+            }
+            assert( client->c_linked_upstream == upstream );
+            client->c_backend = NULL;
+        } else if ( op->o_restricted >= LLOAD_OP_RESTRICTED_WRITE ) {
+            if ( client_restricted < LLOAD_OP_RESTRICTED_WRITE ) {
+                client->c_backend = upstream->c_backend;
+            }
+            assert( client->c_backend == upstream->c_backend );
+        }
         CONNECTION_UNLOCK(client);
     }
 
@@ -540,6 +636,8 @@ client_reset( LloadConnection *c )
 {
     TAvlnode *root;
     long freed = 0, executing;
+    LloadConnection *linked_upstream = NULL;
+    enum op_restriction restricted = c->c_restricted;
 
     CONNECTION_ASSERT_LOCKED(c);
     root = c->c_ops;
@@ -555,6 +653,20 @@ client_reset( LloadConnection *c )
         ch_free( c->c_sasl_bind_mech.bv_val );
         BER_BVZERO( &c->c_sasl_bind_mech );
     }
+
+    if ( restricted && restricted < LLOAD_OP_RESTRICTED_ISOLATE ) {
+        if ( c->c_backend ) {
+            assert( c->c_restricted <= LLOAD_OP_RESTRICTED_BACKEND );
+            assert( c->c_restricted_inflight == 0 );
+            c->c_backend = NULL;
+            c->c_restricted_at = 0;
+        } else {
+            assert( c->c_restricted == LLOAD_OP_RESTRICTED_UPSTREAM );
+            assert( c->c_linked_upstream != NULL );
+            linked_upstream = c->c_linked_upstream;
+            c->c_linked_upstream = NULL;
+        }
+    }
     CONNECTION_UNLOCK(c);
 
     if ( root ) {
@@ -565,6 +677,12 @@ client_reset( LloadConnection *c )
     }
     assert( freed == executing );
 
+    if ( linked_upstream && restricted == LLOAD_OP_RESTRICTED_UPSTREAM ) {
+        LloadConnection *removed = ldap_tavl_delete(
+                &linked_upstream->c_linked, c, lload_upstream_entry_cmp );
+        assert( removed == c );
+    }
+
     CONNECTION_LOCK(c);
     CONNECTION_ASSERT_LOCKED(c);
 }
@@ -586,6 +704,11 @@ client_unlink( LloadConnection *c )
     state = c->c_state;
     c->c_state = LLOAD_C_DYING;
 
+    if ( c->c_restricted == LLOAD_OP_RESTRICTED_ISOLATE ) {
+        /* Allow upstream connection to be severed in client_reset() */
+        c->c_restricted = LLOAD_OP_RESTRICTED_UPSTREAM;
+    }
+
     read_event = c->c_read_event;
     write_event = c->c_write_event;
     CONNECTION_UNLOCK(c);
index 3bdf6dac7a3792cf34e562c5ecb86fdc76312537..16f08009d4dfecb878662f6bd4d72f916996d2d4 100644 (file)
@@ -115,6 +115,7 @@ static ConfigDriver config_fname;
 static ConfigDriver config_generic;
 static ConfigDriver config_backend;
 static ConfigDriver config_bindconf;
+static ConfigDriver config_restrict_oid;
 #ifdef LDAP_TCP_BUFFER
 static ConfigDriver config_tcp_buffer;
 #endif /* LDAP_TCP_BUFFER */
@@ -179,6 +180,8 @@ enum {
     CFG_MAX_PENDING_CONNS,
     CFG_STARTTLS,
     CFG_CLIENT_PENDING,
+    CFG_RESTRICT_EXOP,
+    CFG_RESTRICT_CONTROL,
 
     CFG_LAST
 };
@@ -635,12 +638,33 @@ static ConfigTable config_back_cf_table[] = {
     { "write_coherence", "seconds", 2, 2, 0,
         ARG_INT,
         &lload_write_coherence,
-        "( OLcfgBkAt:13.38 "
+        "( OLcfgBkAt:13.36 "
             "NAME 'olcBkLloadWriteCoherence' "
             "DESC 'Keep operations to the same backend after a write' "
             "EQUALITY integerMatch "
             "SYNTAX OMsInteger "
             "SINGLE-VALUE )",
+        NULL,
+        { .v_int = 0 }
+    },
+    { "restrict_exop", "OID> <action", 3, 3, 0,
+        ARG_MAGIC|CFG_RESTRICT_EXOP,
+        &config_restrict_oid,
+        "( OLcfgBkAt:13.37 "
+            "NAME 'olcBkLloadRestrictExop' "
+            "DESC 'Restrict upstream selection after forwarding an extended operation' "
+            "EQUALITY caseIgnoreMatch "
+            "SYNTAX OMsDirectoryString )",
+        NULL, NULL
+    },
+    { "restrict_control", "OID> <action", 3, 3, 0,
+        ARG_MAGIC|CFG_RESTRICT_CONTROL,
+        &config_restrict_oid,
+        "( OLcfgBkAt:13.38 "
+            "NAME 'olcBkLloadRestrictControl' "
+            "DESC 'Restrict upstream selection after forwarding a control' "
+            "EQUALITY caseIgnoreMatch "
+            "SYNTAX OMsDirectoryString )",
         NULL, NULL
     },
 
@@ -763,6 +787,9 @@ static ConfigOCs lloadocs[] = {
             "$ olcBkLloadTLSCRLFile "
             "$ olcBkLloadTLSShareSlapdCTX "
             "$ olcBkLloadClientMaxPending "
+            "$ olcBkLloadWriteCoherence "
+            "$ olcBkLloadRestrictExop "
+            "$ olcBkLloadRestrictControl "
         ") )",
         Cft_Backend, config_back_cf_table,
         NULL,
@@ -1282,6 +1309,160 @@ config_bindconf( ConfigArgs *c )
     return 0;
 }
 
+#ifndef BALANCER_MODULE
+char *
+oidm_find( char *oid )
+{
+    if ( OID_LEADCHAR( *oid ) ) {
+        return oid;
+    }
+    Debug( LDAP_DEBUG_ANY, "oidm_find: "
+            "full OID parsing only available when compiled as a module\n" );
+    return NULL;
+}
+#endif /* !BALANCER_MODULE */
+
+static struct {
+    const char *name;
+    enum op_restriction action;
+} restrictopts[] = {
+    { "ignore", LLOAD_OP_NOT_RESTRICTED },
+    { "write", LLOAD_OP_RESTRICTED_WRITE },
+    { "backend", LLOAD_OP_RESTRICTED_BACKEND },
+    { "connection", LLOAD_OP_RESTRICTED_UPSTREAM },
+    { "isolate", LLOAD_OP_RESTRICTED_ISOLATE },
+    { "reject", LLOAD_OP_RESTRICTED_REJECT },
+    { NULL }
+};
+
+static void
+restriction_free( struct restriction_entry *restriction )
+{
+    ch_free( restriction->oid.bv_val );
+    ch_free( restriction );
+}
+
+static int
+config_restrict_oid( ConfigArgs *c )
+{
+    TAvlnode *node = NULL, **root = ( c->type == CFG_RESTRICT_EXOP ) ?
+            &lload_exop_actions :
+            &lload_control_actions;
+    struct restriction_entry *entry = NULL;
+    char *parsed_oid;
+    int i, rc = -1;
+
+    if ( c->op == SLAP_CONFIG_EMIT ) {
+        struct berval bv = { .bv_val = c->cr_msg };
+
+        if ( c->type == CFG_RESTRICT_EXOP && lload_default_exop_action ) {
+            bv.bv_len = snprintf( bv.bv_val, sizeof(c->cr_msg), "1.1 %s",
+                    restrictopts[lload_default_exop_action].name );
+            value_add_one( &c->rvalue_vals, &bv );
+        }
+        for ( node = ldap_tavl_end( *root, TAVL_DIR_LEFT );
+                node;
+                node = ldap_tavl_next( node, TAVL_DIR_RIGHT ) ) {
+            entry = node->avl_data;
+
+            bv.bv_len = snprintf( bv.bv_val, sizeof(c->cr_msg), "%s %s",
+                    entry->oid.bv_val, restrictopts[entry->action].name );
+            value_add_one( &c->rvalue_vals, &bv );
+        }
+
+        return LDAP_SUCCESS;
+
+    } else if ( c->op == LDAP_MOD_DELETE ) {
+        if ( !c->line ) {
+            ldap_tavl_free( *root, (AVL_FREE)restriction_free );
+            *root = NULL;
+            if ( c->type == CFG_RESTRICT_EXOP ) {
+                lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
+            }
+            rc = LDAP_SUCCESS;
+        } else {
+            struct restriction_entry needle;
+
+            parsed_oid = strchr( c->line, ' ' );
+            if ( !parsed_oid ) {
+                return rc;
+            }
+
+            memcpy( c->cr_msg, c->line, parsed_oid - c->line );
+            c->cr_msg[parsed_oid - c->line] = '\0';
+
+            needle.oid.bv_val = oidm_find( c->cr_msg );
+            needle.oid.bv_len = strlen( needle.oid.bv_val );
+
+            if ( !needle.oid.bv_val ) {
+                return rc;
+            } else if ( c->type == CFG_RESTRICT_EXOP &&
+                    !strcmp( needle.oid.bv_val, "1.1" ) ) {
+                lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
+            } else {
+                /* back-config should have checked we have this value */
+                entry = ldap_tavl_delete( root, &needle,
+                        lload_restriction_cmp );
+                assert( entry != NULL );
+            }
+            rc = LDAP_SUCCESS;
+        }
+        return rc;
+    }
+
+    parsed_oid = oidm_find( c->argv[1] );
+    if ( !parsed_oid ) {
+        snprintf( c->cr_msg, sizeof(c->cr_msg), "Could not parse oid %s",
+                c->argv[1] );
+        goto done;
+    }
+
+    for ( i = 0; restrictopts[i].name; i++ ) {
+        if ( !strcasecmp( c->argv[2], restrictopts[i].name ) ) {
+            break;
+        }
+    }
+
+    if ( !restrictopts[i].name ) {
+        snprintf( c->cr_msg, sizeof(c->cr_msg), "Could not parse action %s",
+                c->argv[2] );
+        goto done;
+    }
+
+    if ( !strcmp( parsed_oid, "1.1" ) ) {
+        if ( lload_default_exop_action ) {
+            snprintf( c->cr_msg, sizeof(c->cr_msg), "Default already set" );
+            goto done;
+        } else {
+            lload_default_exop_action = i;
+        }
+    }
+
+    entry = ch_malloc( sizeof(struct restriction_entry) );
+    /* Copy only if a reference to argv[1] was returned */
+    ber_str2bv( parsed_oid, 0, parsed_oid == c->argv[1], &entry->oid );
+    entry->action = i;
+
+    if ( ldap_tavl_insert( root, entry, lload_restriction_cmp,
+                ldap_avl_dup_error ) ) {
+        snprintf( c->cr_msg, sizeof(c->cr_msg),
+                "%s with OID %s already restricted",
+                c->type == CFG_RESTRICT_EXOP ? "Extended operation" : "Control",
+                c->argv[1] );
+        goto done;
+    }
+
+    rc = LDAP_SUCCESS;
+done:
+    if ( rc ) {
+        Debug( LDAP_DEBUG_ANY, "%s: %s\n", c->log, c->cr_msg );
+        if ( parsed_oid ) ch_free( parsed_oid );
+        if ( entry ) ch_free( entry );
+    }
+
+    return rc;
+}
+
 static int
 config_fname( ConfigArgs *c )
 {
index d402dfe6b1a38986c6dc26aa201eb9462b83a278..2c9a2f6e98b2e8a633e3eb62bc5832c27e10d35f 100644 (file)
@@ -1429,6 +1429,41 @@ client_tls_cb( ldap_pvt_thread_start_t *start, void *startarg, void *arg )
 }
 #endif /* HAVE_TLS */
 
+static int
+detach_linked_backend_cb( LloadConnection *client, LloadBackend *b )
+{
+    int rc = LDAP_SUCCESS;
+
+    if ( client->c_backend != b ) {
+        return rc;
+    }
+
+    Debug( LDAP_DEBUG_CONNS, "detach_linked_backend_cb: "
+            "detaching backend '%s' from connid=%lu%s\n",
+            b->b_name.bv_val, client->c_connid,
+            client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ?
+                " and closing the connection" :
+                "" );
+
+    /* We were approached from the connection list */
+    assert( IS_ALIVE( client, c_refcnt ) );
+
+    assert( client->c_restricted == LLOAD_OP_RESTRICTED_WRITE ||
+            client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND );
+    if ( client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ) {
+        int gentle = 1;
+        CONNECTION_LOCK(client);
+        rc = lload_connection_close( client, &gentle );
+        CONNECTION_UNLOCK(client);
+    }
+
+    client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
+    client->c_restricted_at = 0;
+    client->c_restricted_inflight = 0;
+
+    return rc;
+}
+
 void
 lload_handle_backend_invalidation( LloadChange *change )
 {
@@ -1458,6 +1493,13 @@ lload_handle_backend_invalidation( LloadChange *change )
                 &connection_pool, handle_pdus, backend_conn_cb, b );
         ldap_pvt_thread_pool_walk(
                 &connection_pool, upstream_bind, backend_conn_cb, b );
+
+        checked_lock( &clients_mutex );
+        connections_walk(
+                &clients_mutex, &clients,
+                (CONNCB)detach_linked_backend_cb, b );
+        checked_unlock( &clients_mutex );
+
         lload_backend_destroy( b );
         return;
     }
index 7475d87e021159a41526e0bd240e6385e37dfd29..975c175e41b8803ab669f46b5fdd1ce707511f46 100644 (file)
@@ -125,6 +125,7 @@ int
 request_extended( LloadConnection *c, LloadOperation *op )
 {
     ExopHandler *handler, needle = {};
+    struct restriction_entry *restriction, rneedle = {};
     BerElement *copy;
     struct berval bv;
     ber_tag_t tag;
@@ -158,6 +159,15 @@ request_extended( LloadConnection *c, LloadOperation *op )
     }
     ber_free( copy, 0 );
 
+    rneedle.oid = bv;
+    restriction = ldap_tavl_find( lload_exop_actions, &rneedle,
+            lload_restriction_cmp );
+    if ( restriction ) {
+        op->o_restricted = restriction->action;
+    } else {
+        op->o_restricted = lload_default_exop_action;
+    }
+
     return request_process( c, op );
 }
 
index d9b84ec7e6c44e23a2976cb122f8beee380e8da7..6db6179cbd42b0a1a90286d8acd759813358650d 100644 (file)
@@ -298,9 +298,18 @@ enum sc_io_state {
 /* Tracking whether an operation might cause a client to restrict which
  * upstreams are eligible */
 enum op_restriction {
-    LLOAD_OP_NOT_RESTRICTED, /* operation didn't trigger any restriction */
-    LLOAD_OP_RESTRICTED_BACKEND, /* operation restricts a client to a certain backend */
-    LLOAD_OP_RESTRICTED_UPSTREAM, /* operation restricts a client to a certain upstream */
+    LLOAD_OP_NOT_RESTRICTED, /* no restrictions in place */
+    LLOAD_OP_RESTRICTED_WRITE, /* client is restricted to a certain backend with
+                                * a timeout attached */
+    LLOAD_OP_RESTRICTED_BACKEND, /* client is restricted to a certain backend,
+                                  * without a timeout */
+    LLOAD_OP_RESTRICTED_UPSTREAM, /* client is restricted to a certain upstream */
+    LLOAD_OP_RESTRICTED_ISOLATE, /* TODO: client is restricted to a certain
+                                  * upstream and removes the upstream from the
+                                  * pool */
+    LLOAD_OP_RESTRICTED_REJECT, /* operation should not be forwarded to any
+                                 * backend, either it is processed internally
+                                 * or rejected */
 };
 
 /*
@@ -410,9 +419,13 @@ struct LloadConnection {
     long c_n_ops_completed;      /* num of ops completed */
     lload_counters_t c_counters; /* per connection operation counters */
 
-    LloadBackend *c_backend;
+    enum op_restriction c_restricted;
     uintptr_t c_restricted_inflight;
     time_t c_restricted_at;
+    LloadBackend *c_backend;
+    LloadConnection *c_linked_upstream;
+
+    TAvlnode *c_linked;
 
     /*
      * Protected by the CIRCLEQ mutex:
@@ -470,6 +483,11 @@ struct LloadOperation {
     BerValue o_request, o_ctrls;
 };
 
+struct restriction_entry {
+    struct berval oid;
+    enum op_restriction action;
+};
+
 /*
  * listener; need to access it from monitor backend
  */
index 5b953841a8c5861451660dec1b97ea885a450d8b..95657b920481a806108e0f27db5091f8271eea29 100644 (file)
 ldap_pvt_thread_mutex_t lload_pin_mutex;
 unsigned long lload_next_pin = 1;
 
+TAvlnode *lload_control_actions = NULL;
+TAvlnode *lload_exop_actions = NULL;
+enum op_restriction lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
+
 ber_tag_t
 slap_req2res( ber_tag_t tag )
 {
@@ -84,6 +88,13 @@ lload_msgtype2str( ber_tag_t tag )
     return "unknown message";
 }
 
+int
+lload_restriction_cmp( const void *left, const void *right )
+{
+    const struct restriction_entry *l = left, *r = right;
+    return ber_bvcmp( &l->oid, &r->oid );
+}
+
 int
 operation_client_cmp( const void *left, const void *right )
 {
@@ -276,7 +287,7 @@ operation_unlink_client( LloadOperation *op, LloadConnection *client )
         assert( op == removed );
         client->c_n_ops_executing--;
 
-        if ( op->o_restricted ) {
+        if ( op->o_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
             if ( !--client->c_restricted_inflight && client->c_restricted_at >= 0 ) {
                 if ( lload_write_coherence < 0 ) {
                     client->c_restricted_at = -1;
index 604c4a6b582beb9be5f0763f4b5f762d25704767..5f7f93d5686931da4f79465736edd735a5d19cb3 100644 (file)
@@ -172,6 +172,10 @@ LDAP_SLAPD_F (int) lload_monitor_backend_init( BackendInfo *bi, LloadBackend *b
  */
 LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) lload_pin_mutex;
 LDAP_SLAPD_V (unsigned long) lload_next_pin;
+LDAP_SLAPD_V (TAvlnode *) lload_control_actions;
+LDAP_SLAPD_V (TAvlnode *) lload_exop_actions;
+LDAP_SLAPD_V (enum op_restriction) lload_default_exop_action;
+LDAP_SLAPD_F (int) lload_restriction_cmp( const void *left, const void *right );
 LDAP_SLAPD_F (const char *) lload_msgtype2str( ber_tag_t tag );
 LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r );
 LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r );
@@ -192,6 +196,7 @@ LDAP_SLAPD_F (void) operation_update_global_rejected( LloadOperation *op );
 /*
  * upstream.c
  */
+LDAP_SLAPD_F (int) lload_upstream_entry_cmp( const void *l, const void *r );
 LDAP_SLAPD_F (int) forward_final_response( LloadConnection *client, LloadOperation *op, BerElement *ber );
 LDAP_SLAPD_F (int) forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber );
 LDAP_SLAPD_F (void *) upstream_bind( void *ctx, void *arg );
index 375626f5f7da6bd1d53dc44172f81b51fc8ba72e..fac4ac32ceb154eabe9de1eada6aa3da07143402 100644 (file)
@@ -40,6 +40,27 @@ static const sasl_callback_t client_callbacks[] = {
 
 static void upstream_unlink( LloadConnection *upstream );
 
+int
+lload_upstream_entry_cmp( const void *l, const void *r )
+{
+    return SLAP_PTRCMP( l, r );
+}
+
+static void
+linked_upstream_lost( LloadConnection *client )
+{
+    int gentle = 1;
+
+    CONNECTION_LOCK(client);
+    assert( client->c_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM );
+    assert( client->c_linked_upstream );
+
+    client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
+    client->c_linked_upstream = NULL;
+    CONNECTION_UNLOCK(client);
+    lload_connection_close( client, &gentle );
+}
+
 int
 forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber )
 {
@@ -996,7 +1017,7 @@ upstream_unlink( LloadConnection *c )
 {
     LloadBackend *b = c->c_backend;
     struct event *read_event, *write_event;
-    TAvlnode *root;
+    TAvlnode *root, *linked_root;
     long freed, executing;
 
     Debug( LDAP_DEBUG_CONNS, "upstream_unlink: "
@@ -1017,11 +1038,16 @@ upstream_unlink( LloadConnection *c )
     executing = c->c_n_ops_executing;
     c->c_n_ops_executing = 0;
 
+    linked_root = c->c_linked;
+    c->c_linked = NULL;
+
     CONNECTION_UNLOCK(c);
 
     freed = ldap_tavl_free( root, (AVL_FREE)operation_lost_upstream );
     assert( freed == executing );
 
+    ldap_tavl_free( linked_root, (AVL_FREE)linked_upstream_lost );
+
     /*
      * Avoid a deadlock:
      * event_del will block if the event is currently executing its callback,