return 1;
}
-void *
-client_reset( void *ctx, void *arg )
+void
+client_reset( Connection *c )
{
- Operation *op = arg;
- Connection *c = op->o_client;
TAvlnode *root;
- int freed, destroy = 1;
+ int freed;
- CONNECTION_LOCK(c);
root = c->c_ops;
c->c_ops = NULL;
- c->c_state = SLAP_C_CLOSING;
- if ( op->o_tag == LDAP_REQ_BIND ) {
- c->c_state = SLAP_C_BINDING;
- destroy = 0;
- }
if ( !BER_BVISNULL( &c->c_auth ) ) {
ch_free( c->c_auth.bv_val );
BER_BVZERO( &c->c_auth );
}
CONNECTION_UNLOCK_INCREF(c);
- tavl_delete( &root, op, operation_client_cmp );
freed = tavl_free( root, (AVL_FREE)operation_abandon );
Debug( LDAP_DEBUG_TRACE, "client_reset: "
"dropped %d operations\n",
freed );
- if ( destroy ) {
- operation_destroy( op );
- CLIENT_LOCK_DESTROY(c);
- } else {
- CONNECTION_LOCK_DECREF(c);
- CLIENT_UNLOCK_OR_DESTROY(c);
- }
-
- return NULL;
+ CONNECTION_LOCK_DECREF(c);
}
-void *
-client_bind( void *ctx, void *arg )
+int
+client_bind( Connection *client, Operation *op )
{
- Operation *op = arg;
- Connection *upstream, *client = op->o_client;
- int rc = 0;
+ Connection *upstream;
+ int rc = LDAP_SUCCESS;
- CONNECTION_LOCK(client);
- CONNECTION_UNLOCK_INCREF(client);
+ /* protect the Bind operation */
+ tavl_delete( &client->c_ops, op, operation_client_cmp );
+ client->c_state = SLAP_C_BINDING;
- client_reset( ctx, arg );
+ client_reset( client );
+ CONNECTION_UNLOCK_INCREF(client);
upstream = backend_select( op );
if ( !upstream ) {
operation_send_reject(
op, LDAP_UNAVAILABLE, "no connections available", 1 );
CONNECTION_LOCK_DECREF(client);
- CLIENT_UNLOCK_OR_DESTROY(client);
- return NULL;
+ return rc;
}
op->o_upstream = upstream;
CONNECTION_LOCK_DECREF(upstream);
UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+ CONNECTION_LOCK_DECREF(client);
if ( rc ) {
- CLIENT_LOCK_DESTROY(client);
- return NULL;
+ CLIENT_DESTROY(client);
+ return -1;
}
- CONNECTION_LOCK_DECREF(client);
rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error );
assert( rc == LDAP_SUCCESS );
CLIENT_UNLOCK_OR_DESTROY(client);
- return NULL;
+ return rc;
}
#include "lutil.h"
#include "slap.h"
+typedef int (*RequestHandler)( Connection *c, Operation *op );
+
static void
client_read_cb( evutil_socket_t s, short what, void *arg )
{
Connection *c = arg;
BerElement *ber;
- Operation *op = NULL;
ber_tag_t tag;
ber_len_t len;
- int rc = 0;
/* What if the shutdown is already in progress and we get to lock the
* connection? */
Debug( LDAP_DEBUG_ANY, "client_read_cb: "
"ber_alloc failed\n" );
CLIENT_DESTROY(c);
- goto fail;
+ return;
}
+ c->c_currentber = ber;
tag = ber_get_next( c->c_sb, &len, ber );
if ( tag != LDAP_TAG_MESSAGE ) {
c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
c->c_currentber = NULL;
+ ber_free( ber, 1 );
CLIENT_DESTROY(c);
- goto fail;
+ return;
}
- c->c_currentber = ber;
+ event_add( c->c_read_event, NULL );
CONNECTION_UNLOCK(c);
return;
}
+ if ( !slap_conn_max_pdus_per_cycle ||
+ ldap_pvt_thread_pool_submit(
+ &connection_pool, handle_requests, c ) ) {
+ /* If we're overloaded or configured as such, process one and resume in
+ * the next cycle.
+ *
+ * handle_one_request re-locks the mutex in the
+ * process, need to test it's still alive */
+ if ( handle_one_request( c ) == LDAP_SUCCESS ) {
+ CLIENT_UNLOCK_OR_DESTROY(c);
+ }
+ return;
+ }
+ event_del( c->c_read_event );
+
+ CONNECTION_UNLOCK(c);
+ return;
+}
+
+void *
+handle_requests( void *ctx, void *arg )
+{
+ Connection *c = arg;
+ int requests_handled = 0;
+
+ CONNECTION_LOCK(c);
+ for ( ; requests_handled < slap_conn_max_pdus_per_cycle;
+ requests_handled++ ) {
+ BerElement *ber;
+ ber_tag_t tag;
+ ber_len_t len;
+
+ /* handle_one_response may unlock the connection in the process, we
+ * need to expect that might be our responsibility to destroy it */
+ if ( handle_one_request( c ) ) {
+ /* Error, connection is unlocked and might already have been
+ * destroyed */
+ return NULL;
+ }
+ /* Otherwise, handle_one_request leaves the connection locked */
+
+ if ( (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "client_read_cb: "
+ "ber_alloc failed\n" );
+ CLIENT_DESTROY(c);
+ return NULL;
+ }
+ c->c_currentber = ber;
+
+ tag = ber_get_next( c->c_sb, &len, ber );
+ if ( tag != LDAP_TAG_MESSAGE ) {
+ int err = sock_errno();
+
+ if ( err != EWOULDBLOCK && err != EAGAIN ) {
+ char ebuf[128];
+ Debug( LDAP_DEBUG_ANY, "handle_requests: "
+ "ber_get_next on fd %d failed errno=%d (%s)\n",
+ c->c_fd, err,
+ sock_errstr( err, ebuf, sizeof(ebuf) ) );
+
+ c->c_currentber = NULL;
+ ber_free( ber, 1 );
+ CLIENT_DESTROY(c);
+ return NULL;
+ }
+ break;
+ }
+ }
+
+ event_add( c->c_read_event, NULL );
+ CLIENT_UNLOCK_OR_DESTROY(c);
+ return NULL;
+}
+
+int
+handle_one_request( Connection *c )
+{
+ BerElement *ber;
+ Operation *op = NULL;
+ RequestHandler handler = NULL;
+
+ ber = c->c_currentber;
c->c_currentber = NULL;
op = operation_init( c, ber );
if ( !op ) {
- Debug( LDAP_DEBUG_ANY, "client_read_cb: "
+ Debug( LDAP_DEBUG_ANY, "handle_one_request: "
"operation_init failed\n" );
CLIENT_DESTROY(c);
- goto fail;
+ ber_free( ber, 1 );
+ return -1;
}
switch ( op->o_tag ) {
case LDAP_REQ_UNBIND:
- /* We do not expect anything more from the client. Also, we are the
- * read event, so don't need to unlock */
- event_del( c->c_read_event );
-
- rc = ldap_pvt_thread_pool_submit(
- &connection_pool, client_reset, op );
- if ( rc ) {
- CONNECTION_UNLOCK(c);
- client_reset( NULL, op );
- return;
- }
- break;
+ c->c_state = SLAP_C_CLOSING;
+ CLIENT_DESTROY(c);
+ return -1;
case LDAP_REQ_BIND:
- rc = ldap_pvt_thread_pool_submit(
- &connection_pool, client_bind, op );
+ handler = client_bind;
+ break;
+ case LDAP_REQ_ABANDON:
+ /* FIXME: We need to be able to abandon a Bind request, handling
+ * ExOps (esp. Cancel) will be different */
+ handler = request_abandon;
break;
default:
if ( c->c_state == SLAP_C_BINDING ) {
- CONNECTION_UNLOCK(c);
+ CONNECTION_UNLOCK_INCREF(c);
operation_send_reject(
op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
- return;
+ CONNECTION_LOCK_DECREF(c);
+ return LDAP_SUCCESS;
}
- rc = ldap_pvt_thread_pool_submit(
- &connection_pool, request_process, op );
+ handler = request_process;
break;
}
- /* FIXME: unlocks in this function need more thought when we refcount
- * operations */
- CONNECTION_UNLOCK(c);
-
- if ( !rc ) {
- return;
- }
-
-fail:
- if ( op ) {
- operation_send_reject(
- op, LDAP_OTHER, "server error or overloaded", 1 );
- operation_destroy( op );
- } else if ( ber ) {
- ber_free( ber, 1 );
- }
-
- return;
+ return handler( c, op );
}
void
void
operation_abandon( Operation *op )
{
+ Connection *c = op->o_upstream;
+ BerElement *ber;
+ Backend *b;
int rc;
- if ( op->o_upstream ) {
- Connection *c = op->o_upstream;
- BerElement *ber;
- Backend *b;
+ if ( !c ) {
+ c = op->o_client;
CONNECTION_LOCK(c);
- rc = ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL );
- if ( !rc ) {
- c->c_n_ops_executing--;
- }
- b = (Backend *)c->c_private;
- CONNECTION_UNLOCK_INCREF(c);
+ CLIENT_UNLOCK_OR_DESTROY(c);
+ operation_destroy( op );
+ return;
+ }
- if ( rc ) {
- /* The operation has already been abandoned or finished */
- goto done;
- }
+ CONNECTION_LOCK(c);
+ if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ) {
+ /* The operation has already been abandoned or finished */
+ goto done;
+ }
+ c->c_n_ops_executing--;
+ b = (Backend *)c->c_private;
+ CONNECTION_UNLOCK_INCREF(c);
- ldap_pvt_thread_mutex_lock( &b->b_mutex );
- b->b_n_ops_executing--;
- ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
+ b->b_n_ops_executing--;
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
- ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+ ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
- ber = c->c_pendingber;
- if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
- Debug( LDAP_DEBUG_ANY, "operation_abandon: "
- "ber_alloc failed\n" );
- CONNECTION_LOCK_DECREF(c);
- ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
- UPSTREAM_UNLOCK_OR_DESTROY(c);
- goto done;
- }
- c->c_pendingber = ber;
+ ber = c->c_pendingber;
+ if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "operation_abandon: "
+ "ber_alloc failed\n" );
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ CONNECTION_LOCK_DECREF(c);
+ goto done;
+ }
+ c->c_pendingber = ber;
- rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE,
- LDAP_TAG_MSGID, c->c_next_msgid++,
- LDAP_REQ_ABANDON, op->o_upstream_msgid );
+ rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE,
+ LDAP_TAG_MSGID, c->c_next_msgid++,
+ LDAP_REQ_ABANDON, op->o_upstream_msgid );
- if ( rc == -1 ) {
- ber_free( ber, 1 );
- }
+ if ( rc == -1 ) {
+ ber_free( ber, 1 );
+ c->c_pendingber = NULL;
+ }
- CONNECTION_LOCK_DECREF(c);
- ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
- UPSTREAM_UNLOCK_OR_DESTROY(c);
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
- if ( rc != -1 ) {
- upstream_write_cb( -1, 0, c );
- }
+ if ( rc != -1 ) {
+ upstream_write_cb( -1, 0, c );
}
+ CONNECTION_LOCK_DECREF(c);
done:
+ UPSTREAM_UNLOCK_OR_DESTROY(c);
operation_destroy( op );
}
+int
+request_abandon( Connection *c, Operation *op )
+{
+ Operation *request, needle = { .o_client = c };
+ ber_tag_t tag;
+ int rc = -1;
+
+ tag = ber_get_int( op->o_ber, &needle.o_client_msgid );
+ if ( tag != LDAP_REQ_ABANDON ) {
+ /* How would that happen if we already got the tag for the op? */
+ assert(0);
+ goto done;
+ }
+
+ request = tavl_find( c->c_ops, &needle, operation_client_cmp );
+ if ( !request ) {
+ goto done;
+ }
+
+ CONNECTION_UNLOCK_INCREF(c);
+ operation_abandon( request );
+ CONNECTION_LOCK_DECREF(c);
+
+ rc = LDAP_SUCCESS;
+done:
+ operation_destroy( op );
+ return rc;
+}
+
void
operation_send_reject(
Operation *op,
operation_destroy( op );
}
-void *
-request_process( void *ctx, void *arg )
+int
+request_process( Connection *client, Operation *op )
{
- Operation *op = arg;
BerElement *output;
- Connection *client = op->o_client, *upstream;
+ Connection *upstream;
ber_int_t msgid;
- int rc;
+ int rc = LDAP_SUCCESS;
+
+ CONNECTION_UNLOCK_INCREF(client);
upstream = backend_select( op );
if ( !upstream ) {
rc = tavl_insert(
&upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
CONNECTION_UNLOCK_INCREF(upstream);
+
+ Debug( LDAP_DEBUG_TRACE, "request_process: "
+ "client connid=%lu added %s msgid=%d to upstream connid=%lu as "
+ "msgid=%d\n",
+ op->o_client_connid, slap_msgtype2str( op->o_tag ),
+ op->o_client_msgid, op->o_upstream_connid, op->o_upstream_msgid );
assert( rc == LDAP_SUCCESS );
if ( lload_features & LLOAD_FEATURE_PROXYAUTHZ ) {
- CONNECTION_LOCK(client);
+ CONNECTION_LOCK_DECREF(client);
Debug( LDAP_DEBUG_TRACE, "request_process: "
"proxying identity %s to upstream\n",
client->c_auth.bv_val );
op->o_tag, &op->o_request,
LDAP_TAG_CONTROLS,
LDAP_CONTROL_PROXY_AUTHZ, 1, &client->c_auth );
- CONNECTION_UNLOCK(client);
+ CONNECTION_UNLOCK_INCREF(client);
if ( !BER_BVISNULL( &op->o_ctrls ) ) {
BerElement *control_ber = ber_alloc();
CONNECTION_LOCK_DECREF(upstream);
UPSTREAM_UNLOCK_OR_DESTROY(upstream);
- return NULL;
+ CONNECTION_LOCK_DECREF(client);
+ return rc;
fail:
if ( upstream ) {
UPSTREAM_UNLOCK_OR_DESTROY(upstream);
}
operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
- return NULL;
+ CONNECTION_LOCK_DECREF(client);
+ return rc;
}
/*
* bind.c
*/
-LDAP_SLAPD_F (void *) client_reset( void *ctx, void *arg );
-LDAP_SLAPD_F (void *) client_bind( void *ctx, void *arg );
+LDAP_SLAPD_F (void) client_reset( Connection *c );
+LDAP_SLAPD_F (int) client_bind( Connection *c, Operation *op );
/*
* client.c
*/
+LDAP_SLAPD_F (void *) handle_requests( void *ctx, void *arg );
+LDAP_SLAPD_F (int) handle_one_request( Connection *c );
LDAP_SLAPD_F (Connection *) client_init( ber_socket_t s, Listener *url, const char *peername, struct event_base *base, int use_tls );
LDAP_SLAPD_F (void) client_write_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void) client_destroy( Connection *c );
LDAP_SLAPD_F (void) operation_send_reject( Operation *op, int result, const char *msg, int send_anyway );
LDAP_SLAPD_F (void) operation_lost_upstream( Operation *op );
LDAP_SLAPD_F (void) operation_destroy( Operation *op );
-LDAP_SLAPD_F (void *) request_process( void *ctx, void *arg );
+LDAP_SLAPD_F (int) request_abandon( Connection *c, Operation *op );
+LDAP_SLAPD_F (int) request_process( Connection *c, Operation *op );
/*
* sl_malloc.c