a connection with an outstanding write. This allows faster recovery from
various network hang conditions. An iotimeout of 0 disables this feature.
The default is 10000.
+.TP
+.B write_coherence <integer>
+Specify the number of seconds after a write operation is finished that
+.B lloadd
+will direct operations exclusively to the last selected backend. A write
+operation is anything not handled internally (certain exops, abandon),
+excepting search, compare and bind operations. Bind operations also reset this
+restriction. The default is 0, write operations do not restrict selection. When
+negative, the restriction is not time limited and will persist until the next
+bind.
.SH TLS OPTIONS
If
rc = ldap_tavl_insert( &client->c_ops, op, operation_client_cmp, ldap_avl_dup_error );
assert( rc == LDAP_SUCCESS );
client->c_n_ops_executing++;
+
+ if ( client->c_backend ) {
+ assert( client->c_restricted_inflight == 0 );
+ client->c_backend = NULL;
+ client->c_restricted_at = 0;
+ }
CONNECTION_UNLOCK(client);
if ( pin ) {
{
BerElement *output;
LloadConnection *upstream = NULL;
+ LloadBackend *b = NULL;
ber_int_t msgid;
int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
char *message = "no connections available";
- upstream_select( op, &upstream, &res, &message );
+ if ( lload_write_coherence ) {
+ CONNECTION_LOCK(client);
+ if ( client->c_restricted_inflight || client->c_restricted_at < 0 ||
+ client->c_restricted_at + lload_write_coherence >= op->o_start ) {
+ b = client->c_backend;
+ } else {
+ client->c_backend = NULL;
+ }
+ CONNECTION_UNLOCK(client);
+ }
+
+ if ( b ) {
+ backend_select( b, op, &upstream, &res, &message );
+ } else {
+ upstream_select( op, &upstream, &res, &message );
+ }
+
if ( !upstream ) {
Debug( LDAP_DEBUG_STATS, "request_process: "
"connid=%lu, msgid=%d no available connection found\n",
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++;
+ if ( lload_write_coherence && !b &&
+ op->o_tag != LDAP_REQ_SEARCH &&
+ op->o_tag != LDAP_REQ_COMPARE ) {
+ /*
+ * TODO: There can't be more than one thread receiving a new request,
+ * so we could drop the lock. We'd still need some atomics for the
+ * counters.
+ */
+ CONNECTION_LOCK(client);
+ client->c_backend = upstream->c_backend;
+ client->c_restricted_inflight++;
+ op->o_restricted = 1;
+ CONNECTION_UNLOCK(client);
+ }
+
if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) &&
client->c_type != LLOAD_C_PRIVILEGED ) {
CONNECTION_LOCK(client);
if ( upstream ) {
CONNECTION_LOCK_DESTROY(upstream);
+ /* We have not committed any restrictions in the end */
+ op->o_restricted = LLOAD_OP_NOT_RESTRICTED;
operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
}
timeout_write_tv = { 10, 0 };
lload_features_t lload_features;
+int lload_write_coherence = 0;
ber_len_t sockbuf_max_incoming_client = LLOAD_SB_MAX_INCOMING_CLIENT;
ber_len_t sockbuf_max_incoming_upstream = LLOAD_SB_MAX_INCOMING_UPSTREAM;
NULL,
{ .v_uint = 0 }
},
+ { "write_coherence", "seconds", 2, 2, 0,
+ ARG_INT,
+ &lload_write_coherence,
+ "( OLcfgBkAt:13.38 "
+ "NAME 'olcBkLloadWriteCoherence' "
+ "DESC 'Keep operations to the same backend after a write' "
+ "EQUALITY integerMatch "
+ "SYNTAX OMsInteger "
+ "SINGLE-VALUE )",
+ NULL, NULL
+ },
/* cn=config only options */
#ifdef BALANCER_MODULE
* sent */
};
+/* Tracking whether an operation might cause a client to restrict which
+ * upstreams are eligible */
+enum op_restriction {
+ LLOAD_OP_NOT_RESTRICTED, /* operation didn't trigger any restriction */
+ LLOAD_OP_RESTRICTED_BACKEND, /* operation restricts a client to a certain backend */
+ LLOAD_OP_RESTRICTED_UPSTREAM, /* operation restricts a client to a certain upstream */
+};
+
/*
* represents a connection from an ldap client/to ldap server
*/
lload_counters_t c_counters; /* per connection operation counters */
LloadBackend *c_backend;
+ uintptr_t c_restricted_inflight;
+ time_t c_restricted_at;
/*
* Protected by the CIRCLEQ mutex:
unsigned long o_client_connid;
ber_int_t o_client_msgid;
ber_int_t o_saved_msgid;
+ enum op_restriction o_restricted;
LloadConnection *o_upstream;
unsigned long o_upstream_connid;
assert( op == removed );
client->c_n_ops_executing--;
+ if ( op->o_restricted ) {
+ if ( !--client->c_restricted_inflight && client->c_restricted_at >= 0 ) {
+ if ( lload_write_coherence < 0 ) {
+ client->c_restricted_at = -1;
+ } else if ( op->o_last_response ) {
+ client->c_restricted_at = op->o_last_response;
+ } else {
+ /* We have to default to o_start just in case we abandoned an
+ * operation that the backend actually processed */
+ client->c_restricted_at = op->o_start;
+ }
+ }
+ }
+
if ( op->o_tag == LDAP_REQ_BIND &&
client->c_state == LLOAD_C_BINDING ) {
client->c_state = LLOAD_C_READY;
LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_upstream;
LDAP_SLAPD_V (int) lload_conn_max_pdus_per_cycle;
+LDAP_SLAPD_V (int) lload_write_coherence;
+
LDAP_SLAPD_V (lload_features_t) lload_features;
LDAP_SLAPD_V (slap_mask_t) global_allows;