c->c_currentber = ber;
checked_lock( &c->c_io_mutex );
+ if ( (lload_features & LLOAD_FEATURE_PAUSE) &&
+ (c->c_io_state & LLOAD_C_READ_PAUSE) ) {
+ goto pause;
+ }
tag = ber_get_next( c->c_sb, &len, ber );
checked_unlock( &c->c_io_mutex );
if ( tag != LDAP_TAG_MESSAGE ) {
assert( IS_ALIVE( c, c_refcnt ) );
}
- event_add( c->c_read_event, c->c_read_timeout );
- Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
- "re-enabled read event on connid=%lu\n",
- c->c_connid );
+ checked_lock( &c->c_io_mutex );
+ if ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
+ !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
+ event_add( c->c_read_event, c->c_read_timeout );
+ Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
+ "re-enabled read event on connid=%lu\n",
+ c->c_connid );
+ }
+pause:
+ c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
+ checked_unlock( &c->c_io_mutex );
+
done:
RELEASE_REF( c, c_refcnt, c->c_destroy );
epoch_leave( epoch );
ber_tag_t tag;
ber_len_t len;
epoch_t epoch;
+ int pause;
if ( !IS_ALIVE( c, c_live ) ) {
event_del( c->c_read_event );
c->c_currentber = ber;
checked_lock( &c->c_io_mutex );
+ assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) );
tag = ber_get_next( c->c_sb, &len, ber );
+ pause = c->c_io_state & LLOAD_C_READ_PAUSE;
checked_unlock( &c->c_io_mutex );
if ( tag != LDAP_TAG_MESSAGE ) {
CONNECTION_LOCK_DESTROY(c);
goto out;
}
- event_add( c->c_read_event, c->c_read_timeout );
- Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
- "re-enabled read event on connid=%lu\n",
- c->c_connid );
+ if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) {
+ event_add( c->c_read_event, c->c_read_timeout );
+ Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+ "re-enabled read event on connid=%lu\n",
+ c->c_connid );
+ }
goto out;
}
+ checked_lock( &c->c_io_mutex );
+ c->c_io_state |= LLOAD_C_READ_HANDOVER;
+ checked_unlock( &c->c_io_mutex );
event_del( c->c_read_event );
+
if ( !lload_conn_max_pdus_per_cycle ||
ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
/* If we're overloaded or configured as such, process one and resume in
* the next cycle. */
- event_add( c->c_read_event, c->c_read_timeout );
- c->c_pdu_cb( c );
+ int rc = c->c_pdu_cb( c );
+
+ checked_lock( &c->c_io_mutex );
+ c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
+ if ( rc == LDAP_SUCCESS &&
+ ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
+ !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) {
+ event_add( c->c_read_event, c->c_read_timeout );
+ }
+ checked_unlock( &c->c_io_mutex );
goto out;
}
CONNECTION_LOCK_DESTROY(c);
goto done;
}
+
+ if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
+ Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
+ "connection connid=%lu blocked on writing, marking "
+ "paused\n",
+ c->c_connid );
+ }
+ c->c_io_state |= LLOAD_C_READ_PAUSE;
+
+ /* TODO: Do not reset write timeout unless we wrote something */
event_add( c->c_write_event, lload_write_timeout );
} else {
c->c_pendingber = NULL;
+ if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
+ c->c_io_state ^= LLOAD_C_READ_PAUSE;
+ Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
+ "Unpausing connection connid=%lu\n",
+ c->c_connid );
+ if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) {
+ event_add( c->c_read_event, c->c_read_timeout );
+ }
+ }
}
checked_unlock( &c->c_io_mutex );
LLOAD_FEATURE_VC = 1 << 0,
#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
LLOAD_FEATURE_PROXYAUTHZ = 1 << 1,
+ LLOAD_FEATURE_PAUSE = 1 << 2,
} lload_features_t;
#ifdef BALANCER_MODULE
LLOAD_C_CLOSING, /* closing */
LLOAD_C_ACTIVE, /* exclusive operation (tls setup, ...) in progress */
LLOAD_C_BINDING, /* binding */
- LLOAD_C_DYING, /* part-processed dead but someone still holds a reference */
+ LLOAD_C_DYING, /* part-processed dead waiting to be freed, someone
+ * might still be observing it */
};
enum sc_type {
LLOAD_C_OPEN = 0, /* regular connection */
LLOAD_C_BIND, /* connection used to handle bind client requests if VC not enabled */
LLOAD_C_PRIVILEGED, /* connection can override proxyauthz control */
};
+enum sc_io_state {
+ LLOAD_C_OPERATIONAL = 0, /* all is good */
+ LLOAD_C_READ_HANDOVER = 1 << 0, /* A task to process PDUs is scheduled or
+ * running, do not re-enable c_read_event */
+ LLOAD_C_READ_PAUSE = 1 << 1, /* We want to pause reading until the client
+ * has sufficiently caught up with what we
+ * sent */
+};
+
/*
* represents a connection from an ldap client/to ldap server
*/
struct LloadConnection {
enum sc_state c_state; /* connection state */
enum sc_type c_type;
+ enum sc_io_state c_io_state;
ber_socket_t c_fd;
/*
goto fail;
}
+ checked_lock( &c->c_io_mutex );
+ c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
+ checked_unlock( &c->c_io_mutex );
event_add( c->c_read_event, c->c_read_timeout );
ber_free( ber, 1 );
return -1;
}
#endif /* HAVE_CYRUS_SASL */
}
+ /* TODO: can we be paused at this point? Then we'd have to move this line
+ * after connection_write_cb */
+ c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
checked_unlock( &c->c_io_mutex );
connection_write_cb( -1, 0, c );
ber_free( ber, 1 );
CONNECTION_UNLOCK(c);
+ checked_lock( &c->c_io_mutex );
+ c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
+ checked_unlock( &c->c_io_mutex );
+
return rc;
}
base = event_get_base( c->c_read_event );
+ c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
event_del( c->c_read_event );
event_del( c->c_write_event );