From: Ondřej Kuzník Date: Mon, 10 Aug 2020 15:14:07 +0000 (+0200) Subject: ITS#9598 Per OID restrictions X-Git-Tag: OPENLDAP_REL_ENG_2_6_0~98 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ddc94307277cb7b1cf2794b4284699e0b08c34ce;p=thirdparty%2Fopenldap.git ITS#9598 Per OID restrictions --- diff --git a/doc/man/man5/lloadd.conf.5 b/doc/man/man5/lloadd.conf.5 index fc6beb5765..d6c6a836e6 100644 --- a/doc/man/man5/lloadd.conf.5 +++ b/doc/man/man5/lloadd.conf.5 @@ -347,10 +347,65 @@ Specify the number of seconds after a write operation is finished that .B lloadd will direct operations exclusively to the last selected backend. A write operation is anything not handled internally (certain exops, abandon), -excepting search, compare and bind operations. Bind operations also reset this +except search, compare and bind operations. Bind operations also reset this restriction. The default is 0, write operations do not restrict selection. When negative, the restriction is not time limited and will persist until the next bind. +.TP +.B restrict_exop +Tell +.B lloadd +that extended operation with a given OID should be handled in a specific way. +OID +.B 1.1 +is special, setting a default (only for operations not handled internally). +The meaning of the +.B +argument is the same as in +.B restrict_control +below. +.TP +.B restrict_control +Tell +.B lloadd +that a control with a given OID attached to any operation should be handled in +a specific way according to the +.B +argument. At the moment, only operations passed intact are inspected in +this way, in particular, controls on bind and extended operations are +.B not +checked. + +In order of descending priority (the control with highest priority action +wins), this is the action made: +.RS +.RS +.PD 0 +.TP +.B reject +operations that carry this control will be rejected. +.TP +.B connection +once an upstream is selected, every future operation from this client will be +directed to the same connection. Useful when state is shared between client and +upstream that the load balancer doesn't track. +.TP +.B backend +like +.B write +except this does not time out. +.TP +.B write +this is treated like a write operation (see +.BR write_coherence ) +above. +.TP +.B ignore +does not influence restrictions, useful when changing the global exop default. +This is the default handling for exops/controls not handled by the load balancer +internally. +.PD +.RE .SH TLS OPTIONS If @@ -804,6 +859,21 @@ Here is a short example of a configuration file: argsfile LOCALSTATEDIR/run/lloadd.args pidfile LOCALSTATEDIR/run/lloadd.pid +# cancel not supported yet +restrict_exop 1.3.6.1.1.8 reject + +# turn not supported +restrict_exop 1.3.6.1.1.19 reject + +# TXN Exop if desired, otherwise reject +restrict_exop 1.3.6.1.1.21.1 connection + +# Paged results control +restrict_control 1.2.840.113556.1.4.319 connection + +# VLV control +restrict_control 2.16.840.1.113730.3.4.9 connection + bindconf bindmethod=simple binddn=cn=test diff --git a/servers/lloadd/bind.c b/servers/lloadd/bind.c index e01534a3af..c3351943fe 100644 --- a/servers/lloadd/bind.c +++ b/servers/lloadd/bind.c @@ -197,6 +197,7 @@ request_bind( LloadConnection *client, LloadOperation *op ) unsigned long pin; int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS; char *message = "no connections available"; + enum op_restriction client_restricted; CONNECTION_LOCK(client); pin = client->c_pin_id; @@ -336,26 +337,22 @@ request_bind( LloadConnection *client, LloadOperation *op ) assert( rc == LDAP_SUCCESS ); client->c_n_ops_executing++; - if ( client->c_backend ) { - assert( client->c_restricted_inflight == 0 ); - client->c_backend = NULL; - client->c_restricted_at = 0; - } + client_restricted = client->c_restricted; CONNECTION_UNLOCK(client); if ( pin ) { checked_lock( &op->o_link_mutex ); upstream = op->o_upstream; checked_unlock( &op->o_link_mutex ); + } - if ( upstream ) { - checked_lock( &upstream->c_io_mutex ); - CONNECTION_LOCK(upstream); - if ( !IS_ALIVE( upstream, c_live ) ) { - CONNECTION_UNLOCK(upstream); - checked_unlock( &upstream->c_io_mutex ); - upstream = NULL; - } + if ( upstream ) { + checked_lock( &upstream->c_io_mutex ); + CONNECTION_LOCK(upstream); + if ( !IS_ALIVE( upstream, c_live ) ) { + CONNECTION_UNLOCK(upstream); + checked_unlock( &upstream->c_io_mutex ); + upstream = NULL; } } @@ -363,7 +360,7 @@ request_bind( LloadConnection *client, LloadOperation *op ) * have to reject the op and clear pin */ if ( upstream ) { /* No need to do anything */ - } else if ( !pin ) { + } else if ( !pin && client_restricted != LLOAD_OP_RESTRICTED_ISOLATE ) { upstream_select( op, &upstream, &res, &message ); } else { Debug( LDAP_DEBUG_STATS, "request_bind: " diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index 5061490e2b..a1902870b9 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -94,19 +94,97 @@ request_process( LloadConnection *client, LloadOperation *op ) ber_int_t msgid; int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS; char *message = "no connections available"; + enum op_restriction client_restricted; - if ( lload_write_coherence ) { - CONNECTION_LOCK(client); - if ( client->c_restricted_inflight || client->c_restricted_at < 0 || - client->c_restricted_at + lload_write_coherence >= op->o_start ) { - b = client->c_backend; - } else { + if ( lload_control_actions && !BER_BVISNULL( &op->o_ctrls ) ) { + BerElementBuffer copy_berbuf; + BerElement *copy = (BerElement *)©_berbuf; + struct berval control; + + ber_init2( copy, &op->o_ctrls, 0 ); + + while ( ber_skip_element( copy, &control ) == LBER_SEQUENCE ) { + struct restriction_entry *entry, needle = {}; + BerElementBuffer control_berbuf; + BerElement *control_ber = (BerElement *)&control_berbuf; + + ber_init2( control_ber, &control, 0 ); + + if ( ber_skip_element( control_ber, &needle.oid ) == LBER_ERROR ) { + res = LDAP_PROTOCOL_ERROR; + message = "invalid control"; + + operation_send_reject( op, res, message, 1 ); + goto fail; + } + + entry = ldap_tavl_find( + lload_control_actions, &needle, lload_restriction_cmp ); + if ( entry && op->o_restricted < entry->action ) { + op->o_restricted = entry->action; + } + } + } + if ( op->o_restricted < LLOAD_OP_RESTRICTED_WRITE && + lload_write_coherence && + op->o_tag != LDAP_REQ_SEARCH && + op->o_tag != LDAP_REQ_COMPARE ) { + op->o_restricted = LLOAD_OP_RESTRICTED_WRITE; + } + + if ( op->o_restricted == LLOAD_OP_RESTRICTED_REJECT ) { + res = LDAP_UNWILLING_TO_PERFORM; + message = "extended operation or control disallowed"; + + operation_send_reject( op, res, message, 1 ); + goto fail; + } + + CONNECTION_LOCK(client); + client_restricted = client->c_restricted; + if ( client_restricted ) { + if ( client_restricted == LLOAD_OP_RESTRICTED_WRITE && + client->c_restricted_inflight == 0 && + client->c_restricted_at >= 0 && + client->c_restricted_at + lload_write_coherence < + op->o_start ) { + Debug( LDAP_DEBUG_TRACE, "request_process: " + "connid=%lu write coherence to backend '%s' expired\n", + client->c_connid, client->c_backend->b_name.bv_val ); client->c_backend = NULL; + client_restricted = client->c_restricted = LLOAD_OP_NOT_RESTRICTED; } - CONNECTION_UNLOCK(client); + switch ( client_restricted ) { + case LLOAD_OP_NOT_RESTRICTED: + break; + case LLOAD_OP_RESTRICTED_WRITE: + case LLOAD_OP_RESTRICTED_BACKEND: + b = client->c_backend; + assert( b ); + break; + case LLOAD_OP_RESTRICTED_UPSTREAM: + case LLOAD_OP_RESTRICTED_ISOLATE: + upstream = client->c_linked_upstream; + assert( upstream ); + break; + default: + assert(0); + break; + } + } + if ( op->o_restricted < client_restricted ) { + op->o_restricted = client_restricted; } + CONNECTION_UNLOCK(client); - if ( b ) { + if ( upstream ) { + b = upstream->c_backend; + checked_lock( &b->b_mutex ); + if ( !try_upstream( b, NULL, op, upstream, &res, &message ) ) { + upstream = NULL; + } + checked_unlock( &b->b_mutex ); + } else if ( b ) { backend_select( b, op, &upstream, &res, &message ); } else { upstream_select( op, &upstream, &res, &message ); @@ -169,9 +247,18 @@ request_process( LloadConnection *client, LloadOperation *op ) } upstream->c_pendingber = output; + if ( client_restricted < LLOAD_OP_RESTRICTED_UPSTREAM && + op->o_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ) { + rc = ldap_tavl_insert( + &upstream->c_linked, client, lload_upstream_entry_cmp, + ldap_avl_dup_error ); + assert( rc == LDAP_SUCCESS ); + } + op->o_upstream_msgid = msgid = upstream->c_next_msgid++; rc = ldap_tavl_insert( &upstream->c_ops, op, operation_upstream_cmp, ldap_avl_dup_error ); + CONNECTION_UNLOCK(upstream); Debug( LDAP_DEBUG_TRACE, "request_process: " @@ -183,18 +270,27 @@ request_process( LloadConnection *client, LloadOperation *op ) lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++; - if ( lload_write_coherence && !b && - op->o_tag != LDAP_REQ_SEARCH && - op->o_tag != LDAP_REQ_COMPARE ) { - /* - * TODO: There can't be more than one thread receiving a new request, - * so we could drop the lock. We'd still need some atomics for the - * counters. - */ + if ( op->o_restricted > client_restricted || + client_restricted == LLOAD_OP_RESTRICTED_WRITE ) { CONNECTION_LOCK(client); - client->c_backend = upstream->c_backend; - client->c_restricted_inflight++; - op->o_restricted = 1; + if ( op->o_restricted > client_restricted ) { + client->c_restricted = op->o_restricted; + } + if ( op->o_restricted == LLOAD_OP_RESTRICTED_WRITE ) { + client->c_restricted_inflight++; + } + if ( op->o_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ) { + if ( client_restricted < LLOAD_OP_RESTRICTED_UPSTREAM ) { + client->c_linked_upstream = upstream; + } + assert( client->c_linked_upstream == upstream ); + client->c_backend = NULL; + } else if ( op->o_restricted >= LLOAD_OP_RESTRICTED_WRITE ) { + if ( client_restricted < LLOAD_OP_RESTRICTED_WRITE ) { + client->c_backend = upstream->c_backend; + } + assert( client->c_backend == upstream->c_backend ); + } CONNECTION_UNLOCK(client); } @@ -540,6 +636,8 @@ client_reset( LloadConnection *c ) { TAvlnode *root; long freed = 0, executing; + LloadConnection *linked_upstream = NULL; + enum op_restriction restricted = c->c_restricted; CONNECTION_ASSERT_LOCKED(c); root = c->c_ops; @@ -555,6 +653,20 @@ client_reset( LloadConnection *c ) ch_free( c->c_sasl_bind_mech.bv_val ); BER_BVZERO( &c->c_sasl_bind_mech ); } + + if ( restricted && restricted < LLOAD_OP_RESTRICTED_ISOLATE ) { + if ( c->c_backend ) { + assert( c->c_restricted <= LLOAD_OP_RESTRICTED_BACKEND ); + assert( c->c_restricted_inflight == 0 ); + c->c_backend = NULL; + c->c_restricted_at = 0; + } else { + assert( c->c_restricted == LLOAD_OP_RESTRICTED_UPSTREAM ); + assert( c->c_linked_upstream != NULL ); + linked_upstream = c->c_linked_upstream; + c->c_linked_upstream = NULL; + } + } CONNECTION_UNLOCK(c); if ( root ) { @@ -565,6 +677,12 @@ client_reset( LloadConnection *c ) } assert( freed == executing ); + if ( linked_upstream && restricted == LLOAD_OP_RESTRICTED_UPSTREAM ) { + LloadConnection *removed = ldap_tavl_delete( + &linked_upstream->c_linked, c, lload_upstream_entry_cmp ); + assert( removed == c ); + } + CONNECTION_LOCK(c); CONNECTION_ASSERT_LOCKED(c); } @@ -586,6 +704,11 @@ client_unlink( LloadConnection *c ) state = c->c_state; c->c_state = LLOAD_C_DYING; + if ( c->c_restricted == LLOAD_OP_RESTRICTED_ISOLATE ) { + /* Allow upstream connection to be severed in client_reset() */ + c->c_restricted = LLOAD_OP_RESTRICTED_UPSTREAM; + } + read_event = c->c_read_event; write_event = c->c_write_event; CONNECTION_UNLOCK(c); diff --git a/servers/lloadd/config.c b/servers/lloadd/config.c index 3bdf6dac7a..16f08009d4 100644 --- a/servers/lloadd/config.c +++ b/servers/lloadd/config.c @@ -115,6 +115,7 @@ static ConfigDriver config_fname; static ConfigDriver config_generic; static ConfigDriver config_backend; static ConfigDriver config_bindconf; +static ConfigDriver config_restrict_oid; #ifdef LDAP_TCP_BUFFER static ConfigDriver config_tcp_buffer; #endif /* LDAP_TCP_BUFFER */ @@ -179,6 +180,8 @@ enum { CFG_MAX_PENDING_CONNS, CFG_STARTTLS, CFG_CLIENT_PENDING, + CFG_RESTRICT_EXOP, + CFG_RESTRICT_CONTROL, CFG_LAST }; @@ -635,12 +638,33 @@ static ConfigTable config_back_cf_table[] = { { "write_coherence", "seconds", 2, 2, 0, ARG_INT, &lload_write_coherence, - "( OLcfgBkAt:13.38 " + "( OLcfgBkAt:13.36 " "NAME 'olcBkLloadWriteCoherence' " "DESC 'Keep operations to the same backend after a write' " "EQUALITY integerMatch " "SYNTAX OMsInteger " "SINGLE-VALUE )", + NULL, + { .v_int = 0 } + }, + { "restrict_exop", "OID> oid.bv_val ); + ch_free( restriction ); +} + +static int +config_restrict_oid( ConfigArgs *c ) +{ + TAvlnode *node = NULL, **root = ( c->type == CFG_RESTRICT_EXOP ) ? + &lload_exop_actions : + &lload_control_actions; + struct restriction_entry *entry = NULL; + char *parsed_oid; + int i, rc = -1; + + if ( c->op == SLAP_CONFIG_EMIT ) { + struct berval bv = { .bv_val = c->cr_msg }; + + if ( c->type == CFG_RESTRICT_EXOP && lload_default_exop_action ) { + bv.bv_len = snprintf( bv.bv_val, sizeof(c->cr_msg), "1.1 %s", + restrictopts[lload_default_exop_action].name ); + value_add_one( &c->rvalue_vals, &bv ); + } + for ( node = ldap_tavl_end( *root, TAVL_DIR_LEFT ); + node; + node = ldap_tavl_next( node, TAVL_DIR_RIGHT ) ) { + entry = node->avl_data; + + bv.bv_len = snprintf( bv.bv_val, sizeof(c->cr_msg), "%s %s", + entry->oid.bv_val, restrictopts[entry->action].name ); + value_add_one( &c->rvalue_vals, &bv ); + } + + return LDAP_SUCCESS; + + } else if ( c->op == LDAP_MOD_DELETE ) { + if ( !c->line ) { + ldap_tavl_free( *root, (AVL_FREE)restriction_free ); + *root = NULL; + if ( c->type == CFG_RESTRICT_EXOP ) { + lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED; + } + rc = LDAP_SUCCESS; + } else { + struct restriction_entry needle; + + parsed_oid = strchr( c->line, ' ' ); + if ( !parsed_oid ) { + return rc; + } + + memcpy( c->cr_msg, c->line, parsed_oid - c->line ); + c->cr_msg[parsed_oid - c->line] = '\0'; + + needle.oid.bv_val = oidm_find( c->cr_msg ); + needle.oid.bv_len = strlen( needle.oid.bv_val ); + + if ( !needle.oid.bv_val ) { + return rc; + } else if ( c->type == CFG_RESTRICT_EXOP && + !strcmp( needle.oid.bv_val, "1.1" ) ) { + lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED; + } else { + /* back-config should have checked we have this value */ + entry = ldap_tavl_delete( root, &needle, + lload_restriction_cmp ); + assert( entry != NULL ); + } + rc = LDAP_SUCCESS; + } + return rc; + } + + parsed_oid = oidm_find( c->argv[1] ); + if ( !parsed_oid ) { + snprintf( c->cr_msg, sizeof(c->cr_msg), "Could not parse oid %s", + c->argv[1] ); + goto done; + } + + for ( i = 0; restrictopts[i].name; i++ ) { + if ( !strcasecmp( c->argv[2], restrictopts[i].name ) ) { + break; + } + } + + if ( !restrictopts[i].name ) { + snprintf( c->cr_msg, sizeof(c->cr_msg), "Could not parse action %s", + c->argv[2] ); + goto done; + } + + if ( !strcmp( parsed_oid, "1.1" ) ) { + if ( lload_default_exop_action ) { + snprintf( c->cr_msg, sizeof(c->cr_msg), "Default already set" ); + goto done; + } else { + lload_default_exop_action = i; + } + } + + entry = ch_malloc( sizeof(struct restriction_entry) ); + /* Copy only if a reference to argv[1] was returned */ + ber_str2bv( parsed_oid, 0, parsed_oid == c->argv[1], &entry->oid ); + entry->action = i; + + if ( ldap_tavl_insert( root, entry, lload_restriction_cmp, + ldap_avl_dup_error ) ) { + snprintf( c->cr_msg, sizeof(c->cr_msg), + "%s with OID %s already restricted", + c->type == CFG_RESTRICT_EXOP ? "Extended operation" : "Control", + c->argv[1] ); + goto done; + } + + rc = LDAP_SUCCESS; +done: + if ( rc ) { + Debug( LDAP_DEBUG_ANY, "%s: %s\n", c->log, c->cr_msg ); + if ( parsed_oid ) ch_free( parsed_oid ); + if ( entry ) ch_free( entry ); + } + + return rc; +} + static int config_fname( ConfigArgs *c ) { diff --git a/servers/lloadd/daemon.c b/servers/lloadd/daemon.c index d402dfe6b1..2c9a2f6e98 100644 --- a/servers/lloadd/daemon.c +++ b/servers/lloadd/daemon.c @@ -1429,6 +1429,41 @@ client_tls_cb( ldap_pvt_thread_start_t *start, void *startarg, void *arg ) } #endif /* HAVE_TLS */ +static int +detach_linked_backend_cb( LloadConnection *client, LloadBackend *b ) +{ + int rc = LDAP_SUCCESS; + + if ( client->c_backend != b ) { + return rc; + } + + Debug( LDAP_DEBUG_CONNS, "detach_linked_backend_cb: " + "detaching backend '%s' from connid=%lu%s\n", + b->b_name.bv_val, client->c_connid, + client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ? + " and closing the connection" : + "" ); + + /* We were approached from the connection list */ + assert( IS_ALIVE( client, c_refcnt ) ); + + assert( client->c_restricted == LLOAD_OP_RESTRICTED_WRITE || + client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ); + if ( client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ) { + int gentle = 1; + CONNECTION_LOCK(client); + rc = lload_connection_close( client, &gentle ); + CONNECTION_UNLOCK(client); + } + + client->c_restricted = LLOAD_OP_NOT_RESTRICTED; + client->c_restricted_at = 0; + client->c_restricted_inflight = 0; + + return rc; +} + void lload_handle_backend_invalidation( LloadChange *change ) { @@ -1458,6 +1493,13 @@ lload_handle_backend_invalidation( LloadChange *change ) &connection_pool, handle_pdus, backend_conn_cb, b ); ldap_pvt_thread_pool_walk( &connection_pool, upstream_bind, backend_conn_cb, b ); + + checked_lock( &clients_mutex ); + connections_walk( + &clients_mutex, &clients, + (CONNCB)detach_linked_backend_cb, b ); + checked_unlock( &clients_mutex ); + lload_backend_destroy( b ); return; } diff --git a/servers/lloadd/extended.c b/servers/lloadd/extended.c index 7475d87e02..975c175e41 100644 --- a/servers/lloadd/extended.c +++ b/servers/lloadd/extended.c @@ -125,6 +125,7 @@ int request_extended( LloadConnection *c, LloadOperation *op ) { ExopHandler *handler, needle = {}; + struct restriction_entry *restriction, rneedle = {}; BerElement *copy; struct berval bv; ber_tag_t tag; @@ -158,6 +159,15 @@ request_extended( LloadConnection *c, LloadOperation *op ) } ber_free( copy, 0 ); + rneedle.oid = bv; + restriction = ldap_tavl_find( lload_exop_actions, &rneedle, + lload_restriction_cmp ); + if ( restriction ) { + op->o_restricted = restriction->action; + } else { + op->o_restricted = lload_default_exop_action; + } + return request_process( c, op ); } diff --git a/servers/lloadd/lload.h b/servers/lloadd/lload.h index d9b84ec7e6..6db6179cbd 100644 --- a/servers/lloadd/lload.h +++ b/servers/lloadd/lload.h @@ -298,9 +298,18 @@ enum sc_io_state { /* Tracking whether an operation might cause a client to restrict which * upstreams are eligible */ enum op_restriction { - LLOAD_OP_NOT_RESTRICTED, /* operation didn't trigger any restriction */ - LLOAD_OP_RESTRICTED_BACKEND, /* operation restricts a client to a certain backend */ - LLOAD_OP_RESTRICTED_UPSTREAM, /* operation restricts a client to a certain upstream */ + LLOAD_OP_NOT_RESTRICTED, /* no restrictions in place */ + LLOAD_OP_RESTRICTED_WRITE, /* client is restricted to a certain backend with + * a timeout attached */ + LLOAD_OP_RESTRICTED_BACKEND, /* client is restricted to a certain backend, + * without a timeout */ + LLOAD_OP_RESTRICTED_UPSTREAM, /* client is restricted to a certain upstream */ + LLOAD_OP_RESTRICTED_ISOLATE, /* TODO: client is restricted to a certain + * upstream and removes the upstream from the + * pool */ + LLOAD_OP_RESTRICTED_REJECT, /* operation should not be forwarded to any + * backend, either it is processed internally + * or rejected */ }; /* @@ -410,9 +419,13 @@ struct LloadConnection { long c_n_ops_completed; /* num of ops completed */ lload_counters_t c_counters; /* per connection operation counters */ - LloadBackend *c_backend; + enum op_restriction c_restricted; uintptr_t c_restricted_inflight; time_t c_restricted_at; + LloadBackend *c_backend; + LloadConnection *c_linked_upstream; + + TAvlnode *c_linked; /* * Protected by the CIRCLEQ mutex: @@ -470,6 +483,11 @@ struct LloadOperation { BerValue o_request, o_ctrls; }; +struct restriction_entry { + struct berval oid; + enum op_restriction action; +}; + /* * listener; need to access it from monitor backend */ diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c index 5b953841a8..95657b9204 100644 --- a/servers/lloadd/operation.c +++ b/servers/lloadd/operation.c @@ -21,6 +21,10 @@ ldap_pvt_thread_mutex_t lload_pin_mutex; unsigned long lload_next_pin = 1; +TAvlnode *lload_control_actions = NULL; +TAvlnode *lload_exop_actions = NULL; +enum op_restriction lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED; + ber_tag_t slap_req2res( ber_tag_t tag ) { @@ -84,6 +88,13 @@ lload_msgtype2str( ber_tag_t tag ) return "unknown message"; } +int +lload_restriction_cmp( const void *left, const void *right ) +{ + const struct restriction_entry *l = left, *r = right; + return ber_bvcmp( &l->oid, &r->oid ); +} + int operation_client_cmp( const void *left, const void *right ) { @@ -276,7 +287,7 @@ operation_unlink_client( LloadOperation *op, LloadConnection *client ) assert( op == removed ); client->c_n_ops_executing--; - if ( op->o_restricted ) { + if ( op->o_restricted == LLOAD_OP_RESTRICTED_WRITE ) { if ( !--client->c_restricted_inflight && client->c_restricted_at >= 0 ) { if ( lload_write_coherence < 0 ) { client->c_restricted_at = -1; diff --git a/servers/lloadd/proto-lload.h b/servers/lloadd/proto-lload.h index 604c4a6b58..5f7f93d568 100644 --- a/servers/lloadd/proto-lload.h +++ b/servers/lloadd/proto-lload.h @@ -172,6 +172,10 @@ LDAP_SLAPD_F (int) lload_monitor_backend_init( BackendInfo *bi, LloadBackend *b */ LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) lload_pin_mutex; LDAP_SLAPD_V (unsigned long) lload_next_pin; +LDAP_SLAPD_V (TAvlnode *) lload_control_actions; +LDAP_SLAPD_V (TAvlnode *) lload_exop_actions; +LDAP_SLAPD_V (enum op_restriction) lload_default_exop_action; +LDAP_SLAPD_F (int) lload_restriction_cmp( const void *left, const void *right ); LDAP_SLAPD_F (const char *) lload_msgtype2str( ber_tag_t tag ); LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r ); LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r ); @@ -192,6 +196,7 @@ LDAP_SLAPD_F (void) operation_update_global_rejected( LloadOperation *op ); /* * upstream.c */ +LDAP_SLAPD_F (int) lload_upstream_entry_cmp( const void *l, const void *r ); LDAP_SLAPD_F (int) forward_final_response( LloadConnection *client, LloadOperation *op, BerElement *ber ); LDAP_SLAPD_F (int) forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber ); LDAP_SLAPD_F (void *) upstream_bind( void *ctx, void *arg ); diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index 375626f5f7..fac4ac32ce 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -40,6 +40,27 @@ static const sasl_callback_t client_callbacks[] = { static void upstream_unlink( LloadConnection *upstream ); +int +lload_upstream_entry_cmp( const void *l, const void *r ) +{ + return SLAP_PTRCMP( l, r ); +} + +static void +linked_upstream_lost( LloadConnection *client ) +{ + int gentle = 1; + + CONNECTION_LOCK(client); + assert( client->c_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ); + assert( client->c_linked_upstream ); + + client->c_restricted = LLOAD_OP_NOT_RESTRICTED; + client->c_linked_upstream = NULL; + CONNECTION_UNLOCK(client); + lload_connection_close( client, &gentle ); +} + int forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber ) { @@ -996,7 +1017,7 @@ upstream_unlink( LloadConnection *c ) { LloadBackend *b = c->c_backend; struct event *read_event, *write_event; - TAvlnode *root; + TAvlnode *root, *linked_root; long freed, executing; Debug( LDAP_DEBUG_CONNS, "upstream_unlink: " @@ -1017,11 +1038,16 @@ upstream_unlink( LloadConnection *c ) executing = c->c_n_ops_executing; c->c_n_ops_executing = 0; + linked_root = c->c_linked; + c->c_linked = NULL; + CONNECTION_UNLOCK(c); freed = ldap_tavl_free( root, (AVL_FREE)operation_lost_upstream ); assert( freed == executing ); + ldap_tavl_free( linked_root, (AVL_FREE)linked_upstream_lost ); + /* * Avoid a deadlock: * event_del will block if the event is currently executing its callback,