.B lloadd
will direct operations exclusively to the last selected backend. A write
operation is anything not handled internally (certain exops, abandon),
-excepting search, compare and bind operations. Bind operations also reset this
+except search, compare and bind operations. Bind operations also reset this
restriction. The default is 0, write operations do not restrict selection. When
negative, the restriction is not time limited and will persist until the next
bind.
+.TP
+.B restrict_exop <OID> <action>
+Tell
+.B lloadd
+that extended operation with a given OID should be handled in a specific way.
+OID
+.B 1.1
+is special, setting a default (only for operations not handled internally).
+The meaning of the
+.B <action>
+argument is the same as in
+.B restrict_control
+below.
+.TP
+.B restrict_control <OID> <action>
+Tell
+.B lloadd
+that a control with a given OID attached to any operation should be handled in
+a specific way according to the
+.B <action>
+argument. At the moment, only operations passed intact are inspected in
+this way, in particular, controls on bind and extended operations are
+.B not
+checked.
+
+In order of descending priority (the control with highest priority action
+wins), this is the action made:
+.RS
+.RS
+.PD 0
+.TP
+.B reject
+operations that carry this control will be rejected.
+.TP
+.B connection
+once an upstream is selected, every future operation from this client will be
+directed to the same connection. Useful when state is shared between client and
+upstream that the load balancer doesn't track.
+.TP
+.B backend
+like
+.B write
+except this does not time out.
+.TP
+.B write
+this is treated like a write operation (see
+.BR write_coherence )
+above.
+.TP
+.B ignore
+does not influence restrictions, useful when changing the global exop default.
+This is the default handling for exops/controls not handled by the load balancer
+internally.
+.PD
+.RE
.SH TLS OPTIONS
If
argsfile LOCALSTATEDIR/run/lloadd.args
pidfile LOCALSTATEDIR/run/lloadd.pid
+# cancel not supported yet
+restrict_exop 1.3.6.1.1.8 reject
+
+# turn not supported
+restrict_exop 1.3.6.1.1.19 reject
+
+# TXN Exop if desired, otherwise reject
+restrict_exop 1.3.6.1.1.21.1 connection
+
+# Paged results control
+restrict_control 1.2.840.113556.1.4.319 connection
+
+# VLV control
+restrict_control 2.16.840.1.113730.3.4.9 connection
+
bindconf
bindmethod=simple
binddn=cn=test
unsigned long pin;
int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
char *message = "no connections available";
+ enum op_restriction client_restricted;
CONNECTION_LOCK(client);
pin = client->c_pin_id;
assert( rc == LDAP_SUCCESS );
client->c_n_ops_executing++;
- if ( client->c_backend ) {
- assert( client->c_restricted_inflight == 0 );
- client->c_backend = NULL;
- client->c_restricted_at = 0;
- }
+ client_restricted = client->c_restricted;
CONNECTION_UNLOCK(client);
if ( pin ) {
checked_lock( &op->o_link_mutex );
upstream = op->o_upstream;
checked_unlock( &op->o_link_mutex );
+ }
- if ( upstream ) {
- checked_lock( &upstream->c_io_mutex );
- CONNECTION_LOCK(upstream);
- if ( !IS_ALIVE( upstream, c_live ) ) {
- CONNECTION_UNLOCK(upstream);
- checked_unlock( &upstream->c_io_mutex );
- upstream = NULL;
- }
+ if ( upstream ) {
+ checked_lock( &upstream->c_io_mutex );
+ CONNECTION_LOCK(upstream);
+ if ( !IS_ALIVE( upstream, c_live ) ) {
+ CONNECTION_UNLOCK(upstream);
+ checked_unlock( &upstream->c_io_mutex );
+ upstream = NULL;
}
}
* have to reject the op and clear pin */
if ( upstream ) {
/* No need to do anything */
- } else if ( !pin ) {
+ } else if ( !pin && client_restricted != LLOAD_OP_RESTRICTED_ISOLATE ) {
upstream_select( op, &upstream, &res, &message );
} else {
Debug( LDAP_DEBUG_STATS, "request_bind: "
ber_int_t msgid;
int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
char *message = "no connections available";
+ enum op_restriction client_restricted;
- if ( lload_write_coherence ) {
- CONNECTION_LOCK(client);
- if ( client->c_restricted_inflight || client->c_restricted_at < 0 ||
- client->c_restricted_at + lload_write_coherence >= op->o_start ) {
- b = client->c_backend;
- } else {
+ if ( lload_control_actions && !BER_BVISNULL( &op->o_ctrls ) ) {
+ BerElementBuffer copy_berbuf;
+ BerElement *copy = (BerElement *)©_berbuf;
+ struct berval control;
+
+ ber_init2( copy, &op->o_ctrls, 0 );
+
+ while ( ber_skip_element( copy, &control ) == LBER_SEQUENCE ) {
+ struct restriction_entry *entry, needle = {};
+ BerElementBuffer control_berbuf;
+ BerElement *control_ber = (BerElement *)&control_berbuf;
+
+ ber_init2( control_ber, &control, 0 );
+
+ if ( ber_skip_element( control_ber, &needle.oid ) == LBER_ERROR ) {
+ res = LDAP_PROTOCOL_ERROR;
+ message = "invalid control";
+
+ operation_send_reject( op, res, message, 1 );
+ goto fail;
+ }
+
+ entry = ldap_tavl_find(
+ lload_control_actions, &needle, lload_restriction_cmp );
+ if ( entry && op->o_restricted < entry->action ) {
+ op->o_restricted = entry->action;
+ }
+ }
+ }
+ if ( op->o_restricted < LLOAD_OP_RESTRICTED_WRITE &&
+ lload_write_coherence &&
+ op->o_tag != LDAP_REQ_SEARCH &&
+ op->o_tag != LDAP_REQ_COMPARE ) {
+ op->o_restricted = LLOAD_OP_RESTRICTED_WRITE;
+ }
+
+ if ( op->o_restricted == LLOAD_OP_RESTRICTED_REJECT ) {
+ res = LDAP_UNWILLING_TO_PERFORM;
+ message = "extended operation or control disallowed";
+
+ operation_send_reject( op, res, message, 1 );
+ goto fail;
+ }
+
+ CONNECTION_LOCK(client);
+ client_restricted = client->c_restricted;
+ if ( client_restricted ) {
+ if ( client_restricted == LLOAD_OP_RESTRICTED_WRITE &&
+ client->c_restricted_inflight == 0 &&
+ client->c_restricted_at >= 0 &&
+ client->c_restricted_at + lload_write_coherence <
+ op->o_start ) {
+ Debug( LDAP_DEBUG_TRACE, "request_process: "
+ "connid=%lu write coherence to backend '%s' expired\n",
+ client->c_connid, client->c_backend->b_name.bv_val );
client->c_backend = NULL;
+ client_restricted = client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
}
- CONNECTION_UNLOCK(client);
+ switch ( client_restricted ) {
+ case LLOAD_OP_NOT_RESTRICTED:
+ break;
+ case LLOAD_OP_RESTRICTED_WRITE:
+ case LLOAD_OP_RESTRICTED_BACKEND:
+ b = client->c_backend;
+ assert( b );
+ break;
+ case LLOAD_OP_RESTRICTED_UPSTREAM:
+ case LLOAD_OP_RESTRICTED_ISOLATE:
+ upstream = client->c_linked_upstream;
+ assert( upstream );
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ }
+ if ( op->o_restricted < client_restricted ) {
+ op->o_restricted = client_restricted;
}
+ CONNECTION_UNLOCK(client);
- if ( b ) {
+ if ( upstream ) {
+ b = upstream->c_backend;
+ checked_lock( &b->b_mutex );
+ if ( !try_upstream( b, NULL, op, upstream, &res, &message ) ) {
+ upstream = NULL;
+ }
+ checked_unlock( &b->b_mutex );
+ } else if ( b ) {
backend_select( b, op, &upstream, &res, &message );
} else {
upstream_select( op, &upstream, &res, &message );
}
upstream->c_pendingber = output;
+ if ( client_restricted < LLOAD_OP_RESTRICTED_UPSTREAM &&
+ op->o_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ) {
+ rc = ldap_tavl_insert(
+ &upstream->c_linked, client, lload_upstream_entry_cmp,
+ ldap_avl_dup_error );
+ assert( rc == LDAP_SUCCESS );
+ }
+
op->o_upstream_msgid = msgid = upstream->c_next_msgid++;
rc = ldap_tavl_insert(
&upstream->c_ops, op, operation_upstream_cmp, ldap_avl_dup_error );
+
CONNECTION_UNLOCK(upstream);
Debug( LDAP_DEBUG_TRACE, "request_process: "
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++;
- if ( lload_write_coherence && !b &&
- op->o_tag != LDAP_REQ_SEARCH &&
- op->o_tag != LDAP_REQ_COMPARE ) {
- /*
- * TODO: There can't be more than one thread receiving a new request,
- * so we could drop the lock. We'd still need some atomics for the
- * counters.
- */
+ if ( op->o_restricted > client_restricted ||
+ client_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
CONNECTION_LOCK(client);
- client->c_backend = upstream->c_backend;
- client->c_restricted_inflight++;
- op->o_restricted = 1;
+ if ( op->o_restricted > client_restricted ) {
+ client->c_restricted = op->o_restricted;
+ }
+ if ( op->o_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
+ client->c_restricted_inflight++;
+ }
+ if ( op->o_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ) {
+ if ( client_restricted < LLOAD_OP_RESTRICTED_UPSTREAM ) {
+ client->c_linked_upstream = upstream;
+ }
+ assert( client->c_linked_upstream == upstream );
+ client->c_backend = NULL;
+ } else if ( op->o_restricted >= LLOAD_OP_RESTRICTED_WRITE ) {
+ if ( client_restricted < LLOAD_OP_RESTRICTED_WRITE ) {
+ client->c_backend = upstream->c_backend;
+ }
+ assert( client->c_backend == upstream->c_backend );
+ }
CONNECTION_UNLOCK(client);
}
{
TAvlnode *root;
long freed = 0, executing;
+ LloadConnection *linked_upstream = NULL;
+ enum op_restriction restricted = c->c_restricted;
CONNECTION_ASSERT_LOCKED(c);
root = c->c_ops;
ch_free( c->c_sasl_bind_mech.bv_val );
BER_BVZERO( &c->c_sasl_bind_mech );
}
+
+ if ( restricted && restricted < LLOAD_OP_RESTRICTED_ISOLATE ) {
+ if ( c->c_backend ) {
+ assert( c->c_restricted <= LLOAD_OP_RESTRICTED_BACKEND );
+ assert( c->c_restricted_inflight == 0 );
+ c->c_backend = NULL;
+ c->c_restricted_at = 0;
+ } else {
+ assert( c->c_restricted == LLOAD_OP_RESTRICTED_UPSTREAM );
+ assert( c->c_linked_upstream != NULL );
+ linked_upstream = c->c_linked_upstream;
+ c->c_linked_upstream = NULL;
+ }
+ }
CONNECTION_UNLOCK(c);
if ( root ) {
}
assert( freed == executing );
+ if ( linked_upstream && restricted == LLOAD_OP_RESTRICTED_UPSTREAM ) {
+ LloadConnection *removed = ldap_tavl_delete(
+ &linked_upstream->c_linked, c, lload_upstream_entry_cmp );
+ assert( removed == c );
+ }
+
CONNECTION_LOCK(c);
CONNECTION_ASSERT_LOCKED(c);
}
state = c->c_state;
c->c_state = LLOAD_C_DYING;
+ if ( c->c_restricted == LLOAD_OP_RESTRICTED_ISOLATE ) {
+ /* Allow upstream connection to be severed in client_reset() */
+ c->c_restricted = LLOAD_OP_RESTRICTED_UPSTREAM;
+ }
+
read_event = c->c_read_event;
write_event = c->c_write_event;
CONNECTION_UNLOCK(c);
static ConfigDriver config_generic;
static ConfigDriver config_backend;
static ConfigDriver config_bindconf;
+static ConfigDriver config_restrict_oid;
#ifdef LDAP_TCP_BUFFER
static ConfigDriver config_tcp_buffer;
#endif /* LDAP_TCP_BUFFER */
CFG_MAX_PENDING_CONNS,
CFG_STARTTLS,
CFG_CLIENT_PENDING,
+ CFG_RESTRICT_EXOP,
+ CFG_RESTRICT_CONTROL,
CFG_LAST
};
{ "write_coherence", "seconds", 2, 2, 0,
ARG_INT,
&lload_write_coherence,
- "( OLcfgBkAt:13.38 "
+ "( OLcfgBkAt:13.36 "
"NAME 'olcBkLloadWriteCoherence' "
"DESC 'Keep operations to the same backend after a write' "
"EQUALITY integerMatch "
"SYNTAX OMsInteger "
"SINGLE-VALUE )",
+ NULL,
+ { .v_int = 0 }
+ },
+ { "restrict_exop", "OID> <action", 3, 3, 0,
+ ARG_MAGIC|CFG_RESTRICT_EXOP,
+ &config_restrict_oid,
+ "( OLcfgBkAt:13.37 "
+ "NAME 'olcBkLloadRestrictExop' "
+ "DESC 'Restrict upstream selection after forwarding an extended operation' "
+ "EQUALITY caseIgnoreMatch "
+ "SYNTAX OMsDirectoryString )",
+ NULL, NULL
+ },
+ { "restrict_control", "OID> <action", 3, 3, 0,
+ ARG_MAGIC|CFG_RESTRICT_CONTROL,
+ &config_restrict_oid,
+ "( OLcfgBkAt:13.38 "
+ "NAME 'olcBkLloadRestrictControl' "
+ "DESC 'Restrict upstream selection after forwarding a control' "
+ "EQUALITY caseIgnoreMatch "
+ "SYNTAX OMsDirectoryString )",
NULL, NULL
},
"$ olcBkLloadTLSCRLFile "
"$ olcBkLloadTLSShareSlapdCTX "
"$ olcBkLloadClientMaxPending "
+ "$ olcBkLloadWriteCoherence "
+ "$ olcBkLloadRestrictExop "
+ "$ olcBkLloadRestrictControl "
") )",
Cft_Backend, config_back_cf_table,
NULL,
return 0;
}
+#ifndef BALANCER_MODULE
+char *
+oidm_find( char *oid )
+{
+ if ( OID_LEADCHAR( *oid ) ) {
+ return oid;
+ }
+ Debug( LDAP_DEBUG_ANY, "oidm_find: "
+ "full OID parsing only available when compiled as a module\n" );
+ return NULL;
+}
+#endif /* !BALANCER_MODULE */
+
+static struct {
+ const char *name;
+ enum op_restriction action;
+} restrictopts[] = {
+ { "ignore", LLOAD_OP_NOT_RESTRICTED },
+ { "write", LLOAD_OP_RESTRICTED_WRITE },
+ { "backend", LLOAD_OP_RESTRICTED_BACKEND },
+ { "connection", LLOAD_OP_RESTRICTED_UPSTREAM },
+ { "isolate", LLOAD_OP_RESTRICTED_ISOLATE },
+ { "reject", LLOAD_OP_RESTRICTED_REJECT },
+ { NULL }
+};
+
+static void
+restriction_free( struct restriction_entry *restriction )
+{
+ ch_free( restriction->oid.bv_val );
+ ch_free( restriction );
+}
+
+static int
+config_restrict_oid( ConfigArgs *c )
+{
+ TAvlnode *node = NULL, **root = ( c->type == CFG_RESTRICT_EXOP ) ?
+ &lload_exop_actions :
+ &lload_control_actions;
+ struct restriction_entry *entry = NULL;
+ char *parsed_oid;
+ int i, rc = -1;
+
+ if ( c->op == SLAP_CONFIG_EMIT ) {
+ struct berval bv = { .bv_val = c->cr_msg };
+
+ if ( c->type == CFG_RESTRICT_EXOP && lload_default_exop_action ) {
+ bv.bv_len = snprintf( bv.bv_val, sizeof(c->cr_msg), "1.1 %s",
+ restrictopts[lload_default_exop_action].name );
+ value_add_one( &c->rvalue_vals, &bv );
+ }
+ for ( node = ldap_tavl_end( *root, TAVL_DIR_LEFT );
+ node;
+ node = ldap_tavl_next( node, TAVL_DIR_RIGHT ) ) {
+ entry = node->avl_data;
+
+ bv.bv_len = snprintf( bv.bv_val, sizeof(c->cr_msg), "%s %s",
+ entry->oid.bv_val, restrictopts[entry->action].name );
+ value_add_one( &c->rvalue_vals, &bv );
+ }
+
+ return LDAP_SUCCESS;
+
+ } else if ( c->op == LDAP_MOD_DELETE ) {
+ if ( !c->line ) {
+ ldap_tavl_free( *root, (AVL_FREE)restriction_free );
+ *root = NULL;
+ if ( c->type == CFG_RESTRICT_EXOP ) {
+ lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
+ }
+ rc = LDAP_SUCCESS;
+ } else {
+ struct restriction_entry needle;
+
+ parsed_oid = strchr( c->line, ' ' );
+ if ( !parsed_oid ) {
+ return rc;
+ }
+
+ memcpy( c->cr_msg, c->line, parsed_oid - c->line );
+ c->cr_msg[parsed_oid - c->line] = '\0';
+
+ needle.oid.bv_val = oidm_find( c->cr_msg );
+ needle.oid.bv_len = strlen( needle.oid.bv_val );
+
+ if ( !needle.oid.bv_val ) {
+ return rc;
+ } else if ( c->type == CFG_RESTRICT_EXOP &&
+ !strcmp( needle.oid.bv_val, "1.1" ) ) {
+ lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
+ } else {
+ /* back-config should have checked we have this value */
+ entry = ldap_tavl_delete( root, &needle,
+ lload_restriction_cmp );
+ assert( entry != NULL );
+ }
+ rc = LDAP_SUCCESS;
+ }
+ return rc;
+ }
+
+ parsed_oid = oidm_find( c->argv[1] );
+ if ( !parsed_oid ) {
+ snprintf( c->cr_msg, sizeof(c->cr_msg), "Could not parse oid %s",
+ c->argv[1] );
+ goto done;
+ }
+
+ for ( i = 0; restrictopts[i].name; i++ ) {
+ if ( !strcasecmp( c->argv[2], restrictopts[i].name ) ) {
+ break;
+ }
+ }
+
+ if ( !restrictopts[i].name ) {
+ snprintf( c->cr_msg, sizeof(c->cr_msg), "Could not parse action %s",
+ c->argv[2] );
+ goto done;
+ }
+
+ if ( !strcmp( parsed_oid, "1.1" ) ) {
+ if ( lload_default_exop_action ) {
+ snprintf( c->cr_msg, sizeof(c->cr_msg), "Default already set" );
+ goto done;
+ } else {
+ lload_default_exop_action = i;
+ }
+ }
+
+ entry = ch_malloc( sizeof(struct restriction_entry) );
+ /* Copy only if a reference to argv[1] was returned */
+ ber_str2bv( parsed_oid, 0, parsed_oid == c->argv[1], &entry->oid );
+ entry->action = i;
+
+ if ( ldap_tavl_insert( root, entry, lload_restriction_cmp,
+ ldap_avl_dup_error ) ) {
+ snprintf( c->cr_msg, sizeof(c->cr_msg),
+ "%s with OID %s already restricted",
+ c->type == CFG_RESTRICT_EXOP ? "Extended operation" : "Control",
+ c->argv[1] );
+ goto done;
+ }
+
+ rc = LDAP_SUCCESS;
+done:
+ if ( rc ) {
+ Debug( LDAP_DEBUG_ANY, "%s: %s\n", c->log, c->cr_msg );
+ if ( parsed_oid ) ch_free( parsed_oid );
+ if ( entry ) ch_free( entry );
+ }
+
+ return rc;
+}
+
static int
config_fname( ConfigArgs *c )
{
}
#endif /* HAVE_TLS */
+static int
+detach_linked_backend_cb( LloadConnection *client, LloadBackend *b )
+{
+ int rc = LDAP_SUCCESS;
+
+ if ( client->c_backend != b ) {
+ return rc;
+ }
+
+ Debug( LDAP_DEBUG_CONNS, "detach_linked_backend_cb: "
+ "detaching backend '%s' from connid=%lu%s\n",
+ b->b_name.bv_val, client->c_connid,
+ client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ?
+ " and closing the connection" :
+ "" );
+
+ /* We were approached from the connection list */
+ assert( IS_ALIVE( client, c_refcnt ) );
+
+ assert( client->c_restricted == LLOAD_OP_RESTRICTED_WRITE ||
+ client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND );
+ if ( client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ) {
+ int gentle = 1;
+ CONNECTION_LOCK(client);
+ rc = lload_connection_close( client, &gentle );
+ CONNECTION_UNLOCK(client);
+ }
+
+ client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
+ client->c_restricted_at = 0;
+ client->c_restricted_inflight = 0;
+
+ return rc;
+}
+
void
lload_handle_backend_invalidation( LloadChange *change )
{
&connection_pool, handle_pdus, backend_conn_cb, b );
ldap_pvt_thread_pool_walk(
&connection_pool, upstream_bind, backend_conn_cb, b );
+
+ checked_lock( &clients_mutex );
+ connections_walk(
+ &clients_mutex, &clients,
+ (CONNCB)detach_linked_backend_cb, b );
+ checked_unlock( &clients_mutex );
+
lload_backend_destroy( b );
return;
}
request_extended( LloadConnection *c, LloadOperation *op )
{
ExopHandler *handler, needle = {};
+ struct restriction_entry *restriction, rneedle = {};
BerElement *copy;
struct berval bv;
ber_tag_t tag;
}
ber_free( copy, 0 );
+ rneedle.oid = bv;
+ restriction = ldap_tavl_find( lload_exop_actions, &rneedle,
+ lload_restriction_cmp );
+ if ( restriction ) {
+ op->o_restricted = restriction->action;
+ } else {
+ op->o_restricted = lload_default_exop_action;
+ }
+
return request_process( c, op );
}
/* Tracking whether an operation might cause a client to restrict which
* upstreams are eligible */
enum op_restriction {
- LLOAD_OP_NOT_RESTRICTED, /* operation didn't trigger any restriction */
- LLOAD_OP_RESTRICTED_BACKEND, /* operation restricts a client to a certain backend */
- LLOAD_OP_RESTRICTED_UPSTREAM, /* operation restricts a client to a certain upstream */
+ LLOAD_OP_NOT_RESTRICTED, /* no restrictions in place */
+ LLOAD_OP_RESTRICTED_WRITE, /* client is restricted to a certain backend with
+ * a timeout attached */
+ LLOAD_OP_RESTRICTED_BACKEND, /* client is restricted to a certain backend,
+ * without a timeout */
+ LLOAD_OP_RESTRICTED_UPSTREAM, /* client is restricted to a certain upstream */
+ LLOAD_OP_RESTRICTED_ISOLATE, /* TODO: client is restricted to a certain
+ * upstream and removes the upstream from the
+ * pool */
+ LLOAD_OP_RESTRICTED_REJECT, /* operation should not be forwarded to any
+ * backend, either it is processed internally
+ * or rejected */
};
/*
long c_n_ops_completed; /* num of ops completed */
lload_counters_t c_counters; /* per connection operation counters */
- LloadBackend *c_backend;
+ enum op_restriction c_restricted;
uintptr_t c_restricted_inflight;
time_t c_restricted_at;
+ LloadBackend *c_backend;
+ LloadConnection *c_linked_upstream;
+
+ TAvlnode *c_linked;
/*
* Protected by the CIRCLEQ mutex:
BerValue o_request, o_ctrls;
};
+struct restriction_entry {
+ struct berval oid;
+ enum op_restriction action;
+};
+
/*
* listener; need to access it from monitor backend
*/
ldap_pvt_thread_mutex_t lload_pin_mutex;
unsigned long lload_next_pin = 1;
+TAvlnode *lload_control_actions = NULL;
+TAvlnode *lload_exop_actions = NULL;
+enum op_restriction lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
+
ber_tag_t
slap_req2res( ber_tag_t tag )
{
return "unknown message";
}
+int
+lload_restriction_cmp( const void *left, const void *right )
+{
+ const struct restriction_entry *l = left, *r = right;
+ return ber_bvcmp( &l->oid, &r->oid );
+}
+
int
operation_client_cmp( const void *left, const void *right )
{
assert( op == removed );
client->c_n_ops_executing--;
- if ( op->o_restricted ) {
+ if ( op->o_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
if ( !--client->c_restricted_inflight && client->c_restricted_at >= 0 ) {
if ( lload_write_coherence < 0 ) {
client->c_restricted_at = -1;
*/
LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) lload_pin_mutex;
LDAP_SLAPD_V (unsigned long) lload_next_pin;
+LDAP_SLAPD_V (TAvlnode *) lload_control_actions;
+LDAP_SLAPD_V (TAvlnode *) lload_exop_actions;
+LDAP_SLAPD_V (enum op_restriction) lload_default_exop_action;
+LDAP_SLAPD_F (int) lload_restriction_cmp( const void *left, const void *right );
LDAP_SLAPD_F (const char *) lload_msgtype2str( ber_tag_t tag );
LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r );
LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r );
/*
* upstream.c
*/
+LDAP_SLAPD_F (int) lload_upstream_entry_cmp( const void *l, const void *r );
LDAP_SLAPD_F (int) forward_final_response( LloadConnection *client, LloadOperation *op, BerElement *ber );
LDAP_SLAPD_F (int) forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber );
LDAP_SLAPD_F (void *) upstream_bind( void *ctx, void *arg );
static void upstream_unlink( LloadConnection *upstream );
+int
+lload_upstream_entry_cmp( const void *l, const void *r )
+{
+ return SLAP_PTRCMP( l, r );
+}
+
+static void
+linked_upstream_lost( LloadConnection *client )
+{
+ int gentle = 1;
+
+ CONNECTION_LOCK(client);
+ assert( client->c_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM );
+ assert( client->c_linked_upstream );
+
+ client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
+ client->c_linked_upstream = NULL;
+ CONNECTION_UNLOCK(client);
+ lload_connection_close( client, &gentle );
+}
+
int
forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber )
{
{
LloadBackend *b = c->c_backend;
struct event *read_event, *write_event;
- TAvlnode *root;
+ TAvlnode *root, *linked_root;
long freed, executing;
Debug( LDAP_DEBUG_CONNS, "upstream_unlink: "
executing = c->c_n_ops_executing;
c->c_n_ops_executing = 0;
+ linked_root = c->c_linked;
+ c->c_linked = NULL;
+
CONNECTION_UNLOCK(c);
freed = ldap_tavl_free( root, (AVL_FREE)operation_lost_upstream );
assert( freed == executing );
+ ldap_tavl_free( linked_root, (AVL_FREE)linked_upstream_lost );
+
/*
* Avoid a deadlock:
* event_del will block if the event is currently executing its callback,