c->c_connid, c->c_n_ops_executing );
assert( c->c_live );
- UPSTREAM_DESTROY(c);
+ CONNECTION_DESTROY(c);
}
while ( !LDAP_CIRCLEQ_EMPTY( &b->b_conns ) ) {
Connection *c = LDAP_CIRCLEQ_FIRST( &b->b_conns );
c->c_connid, c->c_n_ops_executing );
assert( c->c_live );
- UPSTREAM_DESTROY(c);
+ CONNECTION_DESTROY(c);
}
LDAP_CIRCLEQ_REMOVE( &backend, b, b_next );
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
ber_free( copy, 0 );
- upstream_write_cb( -1, 0, upstream );
+ connection_write_cb( -1, 0, upstream );
return 0;
fail:
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
ber_free( copy, 0 );
- upstream_write_cb( -1, 0, upstream );
+ connection_write_cb( -1, 0, upstream );
return 0;
}
CONNECTION_LOCK_DECREF(upstream);
- UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+ CONNECTION_UNLOCK_OR_DESTROY(upstream);
CONNECTION_LOCK_DECREF(client);
if ( rc ) {
op->o_client_refcnt--;
operation_destroy_from_client( op );
- CLIENT_DESTROY(client);
+ CONNECTION_DESTROY(client);
return -1;
}
ldap_pvt_thread_mutex_t clients_mutex;
-typedef int (*RequestHandler)( Connection *c, Operation *op );
-
-static void
-client_read_cb( evutil_socket_t s, short what, void *arg )
-{
- Connection *c = arg;
- BerElement *ber;
- ber_tag_t tag;
- ber_len_t len;
-
- CONNECTION_LOCK(c);
- if ( !c->c_live ) {
- event_del( c->c_read_event );
- Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
- "suspended read event on a dead connid=%lu\n",
- c->c_connid );
- CONNECTION_UNLOCK(c);
- return;
- }
-
- Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
- "connection connid=%lu ready to read\n",
- c->c_connid );
-
- ber = c->c_currentber;
- if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
- Debug( LDAP_DEBUG_ANY, "client_read_cb: "
- "connid=%lu, ber_alloc failed\n",
- c->c_connid );
- CLIENT_DESTROY(c);
- return;
- }
- c->c_currentber = ber;
-
- tag = ber_get_next( c->c_sb, &len, ber );
- if ( tag != LDAP_TAG_MESSAGE ) {
- int err = sock_errno();
-
- if ( err != EWOULDBLOCK && err != EAGAIN ) {
- if ( err || tag == LBER_ERROR ) {
- char ebuf[128];
- Debug( LDAP_DEBUG_STATS, "client_read_cb: "
- "ber_get_next on fd=%d failed errno=%d (%s)\n",
- c->c_fd, err,
- sock_errstr( err, ebuf, sizeof(ebuf) ) );
- } else {
- Debug( LDAP_DEBUG_STATS, "client_read_cb: "
- "ber_get_next on fd=%d connid=%lu received "
- "a strange PDU tag=%lx\n",
- c->c_fd, c->c_connid, tag );
- }
-
- c->c_currentber = NULL;
- ber_free( ber, 1 );
-
- event_del( c->c_read_event );
- Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
- "suspended read event on dying connid=%lu\n",
- c->c_connid );
- CLIENT_DESTROY(c);
- return;
- }
- event_add( c->c_read_event, NULL );
- Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
- "re-enabled read event on connid=%lu\n",
- c->c_connid );
- CONNECTION_UNLOCK(c);
- return;
- }
-
- if ( !slap_conn_max_pdus_per_cycle ||
- ldap_pvt_thread_pool_submit(
- &connection_pool, handle_requests, c ) ) {
- /* If we're overloaded or configured as such, process one and resume in
- * the next cycle.
- *
- * handle_one_request re-locks the mutex in the
- * process, need to test it's still alive */
- if ( handle_one_request( c ) == LDAP_SUCCESS ) {
- CLIENT_UNLOCK_OR_DESTROY(c);
- }
- return;
- }
-
- event_del( c->c_read_event );
- Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
- "suspended read event on connid=%lu\n",
- c->c_connid );
-
- /* We have scheduled a call to handle_requests which takes care of
- * handling further requests, just make sure the connection sticks around
- * for that */
- CONNECTION_UNLOCK_INCREF(c);
- return;
-}
-
-void *
-handle_requests( void *ctx, void *arg )
-{
- Connection *c = arg;
- int requests_handled = 0;
-
- CONNECTION_LOCK_DECREF(c);
- for ( ;; ) {
- BerElement *ber;
- ber_tag_t tag;
- ber_len_t len;
-
- /* handle_one_response may unlock the connection in the process, we
- * need to expect that might be our responsibility to destroy it */
- if ( handle_one_request( c ) ) {
- /* Error, connection is unlocked and might already have been
- * destroyed */
- return NULL;
- }
- /* Otherwise, handle_one_request leaves the connection locked */
-
- if ( ++requests_handled >= slap_conn_max_pdus_per_cycle ) {
- /* Do not read now, re-enable read event instead */
- break;
- }
-
- if ( (ber = ber_alloc()) == NULL ) {
- Debug( LDAP_DEBUG_ANY, "client_read_cb: "
- "connid=%lu, ber_alloc failed\n",
- c->c_connid );
- CLIENT_DESTROY(c);
- return NULL;
- }
- c->c_currentber = ber;
-
- tag = ber_get_next( c->c_sb, &len, ber );
- if ( tag != LDAP_TAG_MESSAGE ) {
- int err = sock_errno();
-
- if ( err != EWOULDBLOCK && err != EAGAIN ) {
- if ( err || tag == LBER_ERROR ) {
- char ebuf[128];
- Debug( LDAP_DEBUG_ANY, "handle_requests: "
- "ber_get_next on fd=%d failed errno=%d (%s)\n",
- c->c_fd, err,
- sock_errstr( err, ebuf, sizeof(ebuf) ) );
- } else {
- Debug( LDAP_DEBUG_STATS, "handle_requests: "
- "ber_get_next on fd=%d connid=%lu received "
- "a strange PDU tag=%lx\n",
- c->c_fd, c->c_connid, tag );
- }
-
- c->c_currentber = NULL;
- ber_free( ber, 1 );
- CLIENT_DESTROY(c);
- return NULL;
- }
- break;
- }
- }
-
- event_add( c->c_read_event, NULL );
- Debug( LDAP_DEBUG_CONNS, "handle_requests: "
- "re-enabled read event on connid=%lu\n",
- c->c_connid );
- CLIENT_UNLOCK_OR_DESTROY(c);
- return NULL;
-}
-
int
handle_one_request( Connection *c )
{
Debug( LDAP_DEBUG_ANY, "handle_one_request: "
"connid=%lu, operation_init failed\n",
c->c_connid );
- CLIENT_DESTROY(c);
+ CONNECTION_DESTROY(c);
ber_free( ber, 1 );
return -1;
}
Debug( LDAP_DEBUG_STATS, "handle_one_request: "
"received unbind, closing client connid=%lu\n",
c->c_connid );
- CLIENT_DESTROY(c);
+ CONNECTION_DESTROY(c);
return -1;
case LDAP_REQ_BIND:
handler = client_bind;
return handler( c, op );
}
-void
-client_write_cb( evutil_socket_t s, short what, void *arg )
-{
- Connection *c = arg;
-
- CONNECTION_LOCK(c);
- if ( !c->c_live ) {
- CONNECTION_UNLOCK(c);
- return;
- }
- CONNECTION_UNLOCK_INCREF(c);
-
- /* Before we acquire any locks */
- event_del( c->c_write_event );
-
- ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
- Debug( LDAP_DEBUG_CONNS, "client_write_cb: "
- "have something to write to client connid=%lu\n",
- c->c_connid );
-
- /* We might have been beaten to flushing the data by another thread */
- if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
- int err = sock_errno();
-
- if ( err != EWOULDBLOCK && err != EAGAIN ) {
- char ebuf[128];
- ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
- Debug( LDAP_DEBUG_ANY, "client_write_cb: "
- "ber_flush on fd=%d failed errno=%d (%s)\n",
- c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
- CLIENT_LOCK_DESTROY(c);
- return;
- }
- event_add( c->c_write_event, NULL );
- } else {
- c->c_pendingber = NULL;
- }
- ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-
- CONNECTION_LOCK_DECREF(c);
- CLIENT_UNLOCK_OR_DESTROY(c);
-}
-
Connection *
client_init(
ber_socket_t s,
{
Connection *c;
struct event *event;
+ event_callback_fn read_cb = connection_read_cb,
+ write_cb = connection_write_cb;
assert( listener != NULL );
- c = connection_init( s, peername, flags );
+ if ( (c = connection_init( s, peername, flags )) == NULL ) {
+ return NULL;
+ }
{
ber_len_t max = sockbuf_max_incoming_client;
c->c_state = SLAP_C_READY;
- event = event_new( base, s, EV_READ|EV_PERSIST, client_read_cb, c );
+ event = event_new( base, s, EV_READ|EV_PERSIST, read_cb, c );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "client_init: "
"Read event could not be allocated\n" );
goto fail;
}
- event_add( event, NULL );
c->c_read_event = event;
+ event_add( c->c_read_event, NULL );
- event = event_new( base, s, EV_WRITE, client_write_cb, c );
+ event = event_new( base, s, EV_WRITE, write_cb, c );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "client_init: "
"Write event could not be allocated\n" );
c->c_write_event = event;
c->c_private = listener;
+ c->c_destroy = client_destroy;
+ c->c_pdu_cb = handle_one_request;
/* There should be no lock inversion yet since no other thread could
* approach it from clients side */
event_free( c->c_read_event );
c->c_read_event = NULL;
}
+
c->c_state = SLAP_C_INVALID;
- connection_destroy( c );
+ CONNECTION_DESTROY(c);
+ assert( c == NULL );
return NULL;
}
/* Upstream connections have already been destroyed, there should be no
* ops left */
assert( !c->c_ops );
- CLIENT_DESTROY(c);
+ CONNECTION_DESTROY(c);
ldap_pvt_thread_mutex_lock( &clients_mutex );
}
ldap_pvt_thread_mutex_unlock( &clients_mutex );
ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex );
}
+/*
+ * We start off with the connection muted and c_currentber holding the pdu we
+ * received.
+ *
+ * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait
+ * on reading or after we process slap_conn_max_pdus_per_cycle pdus so as to
+ * maintain fairness and not hog the worker thread forever.
+ *
+ * If we've run out of pdus immediately available from the stream or hit the
+ * budget, we unmute the connection.
+ *
+ * c->c_pdu_cb might return an 'error' and not free the connection. That can
+ * happen when changing the state or when client is blocked on writing and
+ * already has a pdu pending on the same operation, it's their job to make sure
+ * we're woken up again.
+ */
+static void *
+handle_pdus( void *ctx, void *arg )
+{
+ Connection *c = arg;
+ int pdus_handled = 0;
+
+ CONNECTION_LOCK_DECREF(c);
+ for ( ;; ) {
+ BerElement *ber;
+ ber_tag_t tag;
+ ber_len_t len;
+
+ /* handle_one_response may unlock the connection in the process, we
+ * need to expect that might be our responsibility to destroy it */
+ if ( c->c_pdu_cb( c ) ) {
+ /* Error, connection is unlocked and might already have been
+ * destroyed */
+ return NULL;
+ }
+ /* Otherwise, handle_one_request leaves the connection locked */
+
+ if ( ++pdus_handled >= slap_conn_max_pdus_per_cycle ) {
+ /* Do not read now, re-enable read event instead */
+ break;
+ }
+
+ if ( (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "handle_pdus: "
+ "connid=%lu, ber_alloc failed\n",
+ c->c_connid );
+ CONNECTION_DESTROY(c);
+ return NULL;
+ }
+ c->c_currentber = ber;
+
+ tag = ber_get_next( c->c_sb, &len, ber );
+ if ( tag != LDAP_TAG_MESSAGE ) {
+ int err = sock_errno();
+
+ if ( err != EWOULDBLOCK && err != EAGAIN ) {
+ if ( err || tag == LBER_ERROR ) {
+ char ebuf[128];
+ Debug( LDAP_DEBUG_ANY, "handle_pdus: "
+ "ber_get_next on fd=%d failed errno=%d (%s)\n",
+ c->c_fd, err,
+ sock_errstr( err, ebuf, sizeof(ebuf) ) );
+ } else {
+ Debug( LDAP_DEBUG_STATS, "handle_pdus: "
+ "ber_get_next on fd=%d connid=%lu received "
+ "a strange PDU tag=%lx\n",
+ c->c_fd, c->c_connid, tag );
+ }
+
+ c->c_currentber = NULL;
+ ber_free( ber, 1 );
+ CONNECTION_DESTROY(c);
+ return NULL;
+ }
+ break;
+ }
+ }
+
+ event_add( c->c_read_event, NULL );
+ Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
+ "re-enabled read event on connid=%lu\n",
+ c->c_connid );
+ CONNECTION_UNLOCK_OR_DESTROY(c);
+ return NULL;
+}
+
+/*
+ * Initial read on the connection, if we get an LDAP PDU, submit the
+ * processing of this and successive ones to the work queue.
+ *
+ * If we can't submit it to the queue (overload), process this one and return
+ * to the event loop immediately after.
+ */
+void
+connection_read_cb( evutil_socket_t s, short what, void *arg )
+{
+ Connection *c = arg;
+ BerElement *ber;
+ ber_tag_t tag;
+ ber_len_t len;
+
+ CONNECTION_LOCK(c);
+ if ( !c->c_live ) {
+ event_del( c->c_read_event );
+ Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+ "suspended read event on a dead connid=%lu\n",
+ c->c_connid );
+ CONNECTION_UNLOCK(c);
+ return;
+ }
+
+ Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+ "connection connid=%lu ready to read\n",
+ c->c_connid );
+
+ ber = c->c_currentber;
+ if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
+ "connid=%lu, ber_alloc failed\n",
+ c->c_connid );
+ CONNECTION_DESTROY(c);
+ return;
+ }
+ c->c_currentber = ber;
+
+ tag = ber_get_next( c->c_sb, &len, ber );
+ if ( tag != LDAP_TAG_MESSAGE ) {
+ int err = sock_errno();
+
+ if ( err != EWOULDBLOCK && err != EAGAIN ) {
+ if ( err || tag == LBER_ERROR ) {
+ char ebuf[128];
+ Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
+ "ber_get_next on fd=%d failed errno=%d (%s)\n",
+ c->c_fd, err,
+ sock_errstr( err, ebuf, sizeof(ebuf) ) );
+ } else {
+ Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
+ "ber_get_next on fd=%d connid=%lu received "
+ "a strange PDU tag=%lx\n",
+ c->c_fd, c->c_connid, tag );
+ }
+
+ c->c_currentber = NULL;
+ ber_free( ber, 1 );
+
+ event_del( c->c_read_event );
+ Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+ "suspended read event on dying connid=%lu\n",
+ c->c_connid );
+ CONNECTION_DESTROY(c);
+ return;
+ }
+ event_add( c->c_read_event, NULL );
+ Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+ "re-enabled read event on connid=%lu\n",
+ c->c_connid );
+ CONNECTION_UNLOCK(c);
+ return;
+ }
+
+ if ( !slap_conn_max_pdus_per_cycle ||
+ ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
+ /* If we're overloaded or configured as such, process one and resume in
+ * the next cycle.
+ *
+ * handle_one_request re-locks the mutex in the
+ * process, need to test it's still alive */
+ if ( c->c_pdu_cb( c ) == LDAP_SUCCESS ) {
+ CONNECTION_UNLOCK_OR_DESTROY(c);
+ }
+ return;
+ }
+
+ event_del( c->c_read_event );
+ Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
+ "suspended read event on connid=%lu\n",
+ c->c_connid );
+
+ /* We have scheduled a call to handle_requests which takes care of
+ * handling further requests, just make sure the connection sticks around
+ * for that */
+ CONNECTION_UNLOCK_INCREF(c);
+ return;
+}
+
+void
+connection_write_cb( evutil_socket_t s, short what, void *arg )
+{
+ Connection *c = arg;
+
+ CONNECTION_LOCK(c);
+ if ( !c->c_live ) {
+ CONNECTION_UNLOCK(c);
+ return;
+ }
+ CONNECTION_UNLOCK_INCREF(c);
+
+ /* Before we acquire any locks */
+ event_del( c->c_write_event );
+
+ ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
+ Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
+ "have something to write to connection connid=%lu\n",
+ c->c_connid );
+
+ /* We might have been beaten to flushing the data by another thread */
+ if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
+ int err = sock_errno();
+
+ if ( err != EWOULDBLOCK && err != EAGAIN ) {
+ char ebuf[128];
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+ Debug( LDAP_DEBUG_ANY, "connection_write_cb: "
+ "ber_flush on fd=%d failed errno=%d (%s)\n",
+ c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
+ CONNECTION_LOCK_DESTROY(c);
+ return;
+ }
+ event_add( c->c_write_event, NULL );
+ } else {
+ c->c_pendingber = NULL;
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
+
+ CONNECTION_LOCK_DECREF(c);
+ CONNECTION_UNLOCK_OR_DESTROY(c);
+}
+
void
connection_destroy( Connection *c )
{
c->c_next_msgid = 1;
c->c_refcnt = c->c_live = 1;
+ c->c_destroy = connection_destroy;
LDAP_CIRCLEQ_ENTRY_INIT( c, c_next );
ldap_pvt_thread_mutex_unlock( &op->o_mutex );
assert( upstream != NULL );
- UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+ CONNECTION_UNLOCK_OR_DESTROY(upstream);
CONNECTION_LOCK_DECREF(client);
return;
}
upstream->c_n_ops_executing--;
b = (Backend *)upstream->c_private;
}
- UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+ CONNECTION_UNLOCK_OR_DESTROY(upstream);
if ( b ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
ldap_pvt_thread_mutex_unlock( &op->o_mutex );
assert( client != NULL );
- CLIENT_UNLOCK_OR_DESTROY(client);
+ CONNECTION_UNLOCK_OR_DESTROY(client);
CONNECTION_LOCK_DECREF(upstream);
return;
}
/* 7. Remove from the operation map and TODO adjust the pending op count */
if ( client ) {
tavl_delete( &client->c_ops, op, operation_client_cmp );
- CLIENT_UNLOCK_OR_DESTROY(client);
+ CONNECTION_UNLOCK_OR_DESTROY(client);
}
/* 8. Release the operation */
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
if ( rc != -1 ) {
- upstream_write_cb( -1, 0, c );
+ connection_write_cb( -1, 0, c );
}
CONNECTION_LOCK_DECREF(c);
if ( !c->c_live ) {
operation_destroy_from_upstream( op );
}
- UPSTREAM_UNLOCK_OR_DESTROY(c);
+ CONNECTION_UNLOCK_OR_DESTROY(c);
done:
c = op->o_client;
operation_send_reject(
op, LDAP_PROTOCOL_ERROR, "invalid PDU received", 0 );
CONNECTION_LOCK_DECREF(c);
- CLIENT_DESTROY(c);
+ CONNECTION_DESTROY(c);
return -1;
}
"not sending msgid=%d, client connid=%lu is dead\n",
op->o_client_msgid, op->o_client_connid );
operation_destroy_from_upstream( op );
- UPSTREAM_UNLOCK_OR_DESTROY(c);
+ CONNECTION_UNLOCK_OR_DESTROY(c);
return;
}
CONNECTION_LOCK(c);
c->c_connid );
CONNECTION_LOCK_DECREF(c);
operation_destroy_from_client( op );
- CLIENT_DESTROY(c);
+ CONNECTION_DESTROY(c);
return;
}
c->c_pendingber = ber;
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
- client_write_cb( -1, 0, c );
+ connection_write_cb( -1, 0, c );
CONNECTION_LOCK_DECREF(c);
done:
operation_destroy_from_client( op );
- CLIENT_UNLOCK_OR_DESTROY(c);
+ CONNECTION_UNLOCK_OR_DESTROY(c);
}
/*
}
ldap_pvt_thread_mutex_unlock( &upstream->c_io_mutex );
- upstream_write_cb( -1, 0, upstream );
+ connection_write_cb( -1, 0, upstream );
CONNECTION_LOCK_DECREF(upstream);
- UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+ CONNECTION_UNLOCK_OR_DESTROY(upstream);
CONNECTION_LOCK_DECREF(client);
if ( !--op->o_client_refcnt ) {
CONNECTION_LOCK_DECREF(upstream);
upstream->c_n_ops_executing--;
b = (Backend *)upstream->c_private;
- UPSTREAM_UNLOCK_OR_DESTROY(upstream);
+ CONNECTION_UNLOCK_OR_DESTROY(upstream);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
op->o_client_refcnt--;
operation_destroy_from_client( op );
if ( rc ) {
- CLIENT_DESTROY(client);
+ CONNECTION_DESTROY(client);
}
return rc;
}
/*
* client.c
*/
-LDAP_SLAPD_F (void *) handle_requests( void *ctx, void *arg );
LDAP_SLAPD_F (int) handle_one_request( Connection *c );
LDAP_SLAPD_F (Connection *) client_init( ber_socket_t s, Listener *url, const char *peername, struct event_base *base, int use_tls );
-LDAP_SLAPD_F (void) client_write_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void) client_destroy( Connection *c );
LDAP_SLAPD_F (void) clients_destroy( void );
* connection.c
*/
LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) clients_mutex;
+LDAP_SLAPD_F (void) connection_write_cb( evutil_socket_t s, short what, void *arg );
+LDAP_SLAPD_F (void) connection_read_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (Connection *) connection_init( ber_socket_t s, const char *peername, int use_tls );
LDAP_SLAPD_F (void) connection_destroy( Connection *c );
/*
* upstream.c
*/
-LDAP_SLAPD_F (void) upstream_write_cb( evutil_socket_t s, short what, void *arg );
-LDAP_SLAPD_F (void) upstream_read_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (Connection *) upstream_init( ber_socket_t s, Backend *b );
LDAP_SLAPD_F (void) upstream_destroy( Connection *c );
};
typedef int (*OperationHandler)( Operation *op, BerElement *ber );
+typedef int (*RequestHandler)( Connection *c, Operation *op );
+
+typedef int (*CONNECTION_PDU_CB)( Connection *c );
+typedef void (*CONNECTION_DESTROY_CB)( Connection *c );
/* connection state (protected by c_mutex) */
enum sc_state {
* to use CONNECTION_UNLOCK_INCREF, they are then responsible that
* CONNECTION_LOCK_DECREF+CONNECTION_UNLOCK_OR_DESTROY is used when they are
* done with it
- * - when a connection is considered dead, use (UPSTREAM|CLIENT)_DESTROY on a
- * locked connection, it might get disposed of or if anyone still holds a
- * token, it just gets unlocked and it's the last token holder's
- * responsibility to run *_UNLOCK_OR_DESTROY
- * - (UPSTREAM|CLIENT)_LOCK_DESTROY is a shorthand for locking, decreasing
- * refcount and (UPSTREAM|CLIENT)_DESTROY
+ * - when a connection is considered dead, use CONNECTION_DESTROY on a locked
+ * connection, it might get disposed of or if anyone still holds a token, it
+ * just gets unlocked and it's the last token holder's responsibility to run
+ * CONNECTION_UNLOCK_OR_DESTROY
+ * - CONNECTION_LOCK_DESTROY is a shorthand for locking, decreasing refcount
+ * and CONNECTION_DESTROY
*/
ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */
int c_refcnt, c_live;
+ CONNECTION_DESTROY_CB c_destroy;
+ CONNECTION_PDU_CB c_pdu_cb;
#define CONNECTION_LOCK(c) ldap_pvt_thread_mutex_lock( &(c)->c_mutex )
#define CONNECTION_UNLOCK(c) ldap_pvt_thread_mutex_unlock( &(c)->c_mutex )
#define CONNECTION_LOCK_DECREF(c) \
(c)->c_refcnt++; \
CONNECTION_UNLOCK(c); \
} while (0)
-#define CONNECTION_UNLOCK_OR_DESTROY(type, c) \
+#define CONNECTION_UNLOCK_OR_DESTROY(c) \
do { \
assert( (c)->c_refcnt >= 0 ); \
if ( !( c )->c_refcnt ) { \
- Debug( LDAP_DEBUG_TRACE, "%s: destroying " #type " connection connid=%lu\n", \
+ Debug( LDAP_DEBUG_TRACE, "%s: destroying connection connid=%lu\n", \
__func__, (c)->c_connid ); \
- type##_destroy( (c) ); \
+ (c)->c_destroy( (c) ); \
(c) = NULL; \
} else { \
CONNECTION_UNLOCK(c); \
} \
} while (0)
-#define CONNECTION_DESTROY(type, c) \
+#define CONNECTION_DESTROY(c) \
do { \
(c)->c_refcnt -= (c)->c_live; \
(c)->c_live = 0; \
- CONNECTION_UNLOCK_OR_DESTROY(type, c); \
+ CONNECTION_UNLOCK_OR_DESTROY(c); \
} while (0)
-
-#define UPSTREAM_UNLOCK_OR_DESTROY(c) \
- CONNECTION_UNLOCK_OR_DESTROY(upstream, c);
-#define UPSTREAM_DESTROY(c) CONNECTION_DESTROY(upstream, c)
-#define UPSTREAM_LOCK_DESTROY(c) \
- do { \
- CONNECTION_LOCK_DECREF(c); \
- UPSTREAM_DESTROY(c); \
- } while (0);
-
-#define CLIENT_UNLOCK_OR_DESTROY(c) CONNECTION_UNLOCK_OR_DESTROY(client, c);
-#define CLIENT_DESTROY(c) CONNECTION_DESTROY(client, c)
-#define CLIENT_LOCK_DESTROY(c) \
+#define CONNECTION_LOCK_DESTROY(c) \
do { \
CONNECTION_LOCK_DECREF(c); \
- CLIENT_DESTROY(c); \
+ CONNECTION_DESTROY(c); \
} while (0);
Sockbuf *c_sb; /* ber connection stuff */
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
ber_free( ber, 1 );
- client_write_cb( -1, 0, c );
+ connection_write_cb( -1, 0, c );
return 0;
}
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
if ( rc >= 0 ) {
- client_write_cb( -1, 0, c );
+ connection_write_cb( -1, 0, c );
rc = 0;
}
done:
CONNECTION_LOCK_DECREF(c);
operation_destroy_from_client( op );
- CLIENT_UNLOCK_OR_DESTROY(c);
+ CONNECTION_UNLOCK_OR_DESTROY(c);
ber_free( ber, 1 );
return rc;
}
"teardown for upstream connection connid=%lu\n",
c->c_connid );
- UPSTREAM_DESTROY(c);
+ CONNECTION_DESTROY(c);
ber_free( ber, 1 );
return -1;
* operation to be removed)
*
* If the worker pool is overloaded, we might be called directly from
- * upstream_read_cb, at that point, the connection hasn't been muted.
+ * the read callback, at that point, the connection hasn't been muted.
*
* TODO: when the client already has data pending on write, we should mute the
* upstream.
if ( !op->o_client_refcnt ) {
operation_destroy_from_client( op );
}
- CLIENT_UNLOCK_OR_DESTROY(client);
+ CONNECTION_UNLOCK_OR_DESTROY(client);
} else {
ber_free( ber, 1 );
}
"error on processing a response (%s) on upstream connection "
"connid=%ld, tag=%lx\n",
slap_msgtype2str( tag ), c->c_connid, tag );
- UPSTREAM_DESTROY(c);
+ CONNECTION_DESTROY(c);
}
/* We leave the connection locked */
return rc;
}
-/*
- * We start off with the upstream muted and c_currentber holding the response
- * we received.
- *
- * We run handle_one_response on each response, stopping once we hit an error,
- * have to wait on reading or process slap_conn_max_pdus_per_cycle responses so
- * as to maintain fairness and not hog the worker thread forever.
- *
- * If we've run out of responses from the upstream or hit the budget, we unmute
- * the connection and run handle_one_response, it might return an 'error' when
- * the client is blocked on writing, it's that client's job to wake us again.
- */
-static void *
-handle_responses( void *ctx, void *arg )
-{
- Connection *c = arg;
- int responses_handled = 0;
-
- CONNECTION_LOCK_DECREF(c);
- for ( ;; ) {
- BerElement *ber;
- ber_tag_t tag;
- ber_len_t len;
-
- /* handle_one_response may unlock the connection in the process, we
- * need to expect that might be our responsibility to destroy it */
- if ( handle_one_response( c ) ) {
- /* Error, connection is unlocked and might already have been
- * destroyed */
- return NULL;
- }
- /* Otherwise, handle_one_response leaves the connection locked */
-
- if ( ++responses_handled >= slap_conn_max_pdus_per_cycle ) {
- /* Do not read now, re-enable read event instead */
- break;
- }
-
- if ( (ber = ber_alloc()) == NULL ) {
- Debug( LDAP_DEBUG_ANY, "handle_responses: "
- "ber_alloc failed\n" );
- UPSTREAM_DESTROY(c);
- return NULL;
- }
- c->c_currentber = ber;
-
- tag = ber_get_next( c->c_sb, &len, ber );
- if ( tag != LDAP_TAG_MESSAGE ) {
- int err = sock_errno();
-
- if ( err != EWOULDBLOCK && err != EAGAIN ) {
- if ( err || tag == LBER_ERROR ) {
- char ebuf[128];
- Debug( LDAP_DEBUG_ANY, "handle_responses: "
- "ber_get_next on fd=%d failed errno=%d (%s)\n",
- c->c_fd, err,
- sock_errstr( err, ebuf, sizeof(ebuf) ) );
- } else {
- Debug( LDAP_DEBUG_STATS, "handle_responses: "
- "ber_get_next on fd=%d connid=%lu received "
- "a strange PDU tag=%lx\n",
- c->c_fd, c->c_connid, tag );
- }
-
- c->c_currentber = NULL;
- ber_free( ber, 1 );
- UPSTREAM_DESTROY(c);
- return NULL;
- }
- break;
- }
- }
-
- event_add( c->c_read_event, NULL );
- Debug( LDAP_DEBUG_CONNS, "handle_responses: "
- "re-enabled read event on connid=%lu\n",
- c->c_connid );
- UPSTREAM_UNLOCK_OR_DESTROY(c);
- return NULL;
-}
-
-/*
- * Initial read on the upstream connection, if we get an LDAP PDU, submit the
- * processing of this and successive ones to the work queue.
- *
- * If we can't submit it to the queue (overload), process this one and return
- * to the event loop immediately after.
- */
-void
-upstream_read_cb( evutil_socket_t s, short what, void *arg )
-{
- Connection *c = arg;
- BerElement *ber;
- ber_tag_t tag;
- ber_len_t len;
-
- CONNECTION_LOCK(c);
- if ( !c->c_live ) {
- event_del( c->c_read_event );
- Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
- "suspended read event on a dead connid=%lu\n",
- c->c_connid );
- CONNECTION_UNLOCK(c);
- return;
- }
- Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
- "connection connid=%lu ready to read\n",
- c->c_connid );
-
- ber = c->c_currentber;
- if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
- Debug( LDAP_DEBUG_ANY, "upstream_read_cb: "
- "ber_alloc failed\n" );
- UPSTREAM_DESTROY(c);
- return;
- }
- c->c_currentber = ber;
-
- tag = ber_get_next( c->c_sb, &len, ber );
- if ( tag != LDAP_TAG_MESSAGE ) {
- int err = sock_errno();
-
- if ( err != EWOULDBLOCK && err != EAGAIN ) {
- if ( err || tag == LBER_ERROR ) {
- char ebuf[128];
- Debug( LDAP_DEBUG_ANY, "upstream_read_cb: "
- "ber_get_next on fd=%d failed errno=%d (%s)\n",
- c->c_fd, err,
- sock_errstr( err, ebuf, sizeof(ebuf) ) );
- } else {
- Debug( LDAP_DEBUG_STATS, "upstream_read_cb: "
- "ber_get_next on fd=%d connid=%lu received "
- "a strange PDU tag=%lx\n",
- c->c_fd, c->c_connid, tag );
- }
-
- c->c_currentber = NULL;
- ber_free( ber, 1 );
-
- event_del( c->c_read_event );
- Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
- "suspended read event on dying connid=%lu\n",
- c->c_connid );
- UPSTREAM_DESTROY(c);
- return;
- }
- event_add( c->c_read_event, NULL );
- Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
- "re-enabled read event on connid=%lu\n",
- c->c_connid );
- CONNECTION_UNLOCK(c);
- return;
- }
-
- if ( !slap_conn_max_pdus_per_cycle ||
- ldap_pvt_thread_pool_submit(
- &connection_pool, handle_responses, c ) ) {
- /* If we're overloaded or configured as such, process one and resume in
- * the next cycle.
- *
- * handle_one_response re-locks the mutex in the
- * process, need to test it's still alive */
- if ( handle_one_response( c ) == LDAP_SUCCESS ) {
- UPSTREAM_UNLOCK_OR_DESTROY(c);
- }
- return;
- }
-
- /* We have scheduled a call to handle_responses which takes care of
- * handling further requests, just make sure the connection sticks around
- * for that */
- event_del( c->c_read_event );
- CONNECTION_UNLOCK_INCREF(c);
- return;
-}
-
int
upstream_finish( Connection *c )
{
base = slap_get_base( s );
- event = event_new( base, s, EV_READ|EV_PERSIST, upstream_read_cb, c );
+ event = event_new( base, s, EV_READ|EV_PERSIST, connection_read_cb, c );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "upstream_finish: "
"Read event could not be allocated\n" );
"suspended read event on dying connid=%lu\n",
c->c_connid );
ber_free( ber, 1 );
- UPSTREAM_DESTROY(c);
-}
-
-void
-upstream_write_cb( evutil_socket_t s, short what, void *arg )
-{
- Connection *c = arg;
-
- CONNECTION_LOCK(c);
- if ( !c->c_live ) {
- CONNECTION_UNLOCK(c);
- return;
- }
- CONNECTION_UNLOCK_INCREF(c);
-
- /* Before we acquire any locks */
- event_del( c->c_write_event );
-
- ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
- Debug( LDAP_DEBUG_CONNS, "upstream_write_cb: "
- "have something to write to upstream connid=%lu\n",
- c->c_connid );
-
- /* We might have been beaten to flushing the data by another thread */
- if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
- int err = sock_errno();
-
- if ( err != EWOULDBLOCK && err != EAGAIN ) {
- char ebuf[128];
- Debug( LDAP_DEBUG_ANY, "upstream_write_cb: "
- "ber_flush on fd=%d failed errno=%d (%s)\n",
- c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
- ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
- UPSTREAM_LOCK_DESTROY(c);
- return;
- }
- event_add( c->c_write_event, NULL );
- } else {
- c->c_pendingber = NULL;
- }
- ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
-
- CONNECTION_LOCK_DECREF(c);
- UPSTREAM_UNLOCK_OR_DESTROY(c);
+ CONNECTION_DESTROY(c);
}
void *
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "upstream_bind: "
"Read event could not be allocated\n" );
- UPSTREAM_DESTROY(c);
+ CONNECTION_DESTROY(c);
return NULL;
}
event_add( event, NULL );
c->c_pendingber = ber;
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
- upstream_write_cb( -1, 0, c );
+ connection_write_cb( -1, 0, c );
CONNECTION_LOCK_DECREF(c);
- UPSTREAM_UNLOCK_OR_DESTROY(c);
+ CONNECTION_UNLOCK_OR_DESTROY(c);
return NULL;
}
assert( b != NULL );
flags = (b->b_tls == LLOAD_LDAPS) ? CONN_IS_TLS : 0;
- c = connection_init( s, b->b_host, flags );
+ if ( (c = connection_init( s, b->b_host, flags )) == NULL ) {
+ return NULL;
+ }
+
c->c_private = b;
+ c->c_is_tls = b->b_tls;
+ c->c_pdu_cb = handle_one_response;
{
ber_len_t max = sockbuf_max_incoming_upstream;
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max );
}
- event = event_new( base, s, EV_WRITE, upstream_write_cb, c );
+ event = event_new( base, s, EV_READ|EV_PERSIST, connection_read_cb, c );
+ if ( !event ) {
+ Debug( LDAP_DEBUG_ANY, "upstream_init: "
+ "Read event could not be allocated\n" );
+ goto fail;
+ }
+ c->c_read_event = event;
+
+ event = event_new( base, s, EV_WRITE, connection_write_cb, c );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "upstream_init: "
"Write event could not be allocated\n" );
goto fail;
}
- /* We only register the write event when we have data pending */
+ /* We only add the write event when we have data pending */
c->c_write_event = event;
/* Unless we are configured to use the VC exop, consider allocating the
b->b_active++;
}
+ c->c_destroy = upstream_destroy;
CONNECTION_UNLOCK(c);
return c;
event_del( c->c_read_event );
event_free( c->c_read_event );
}
- UPSTREAM_DESTROY(c);
+
+ c->c_state = SLAP_C_INVALID;
+ CONNECTION_DESTROY(c);
+ assert( c == NULL );
+
return NULL;
}
struct event *read_event, *write_event;
TAvlnode *root;
long freed, executing;
+ enum sc_state state;
Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
"freeing connection connid=%lu\n",
c->c_connid );
+ assert( c->c_state != SLAP_C_INVALID );
+ state = c->c_state;
c->c_state = SLAP_C_INVALID;
root = c->c_ops;
event_del( write_event );
}
- ldap_pvt_thread_mutex_lock( &b->b_mutex );
- if ( c->c_type == SLAP_C_BIND ) {
- LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
- b->b_bindavail--;
- } else {
- LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next );
- b->b_active--;
+ /* Remove from the backend on first pass */
+ if ( state != SLAP_C_CLOSING ) {
+ ldap_pvt_thread_mutex_lock( &b->b_mutex );
+ if ( c->c_type == SLAP_C_BIND ) {
+ LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
+ b->b_bindavail--;
+ } else {
+ LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next );
+ b->b_active--;
+ }
+ b->b_n_ops_executing -= executing;
+ ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+ backend_retry( b );
}
- b->b_n_ops_executing -= executing;
- ldap_pvt_thread_mutex_unlock( &b->b_mutex );
- backend_retry( b );
CONNECTION_LOCK_DECREF(c);