client_read_cb( evutil_socket_t s, short what, void *arg )
{
Connection *c = arg;
+ BerElement *ber;
+ Operation *op;
+ ber_tag_t tag;
+ ber_len_t len;
ldap_pvt_thread_mutex_lock( &c->c_mutex );
Debug( LDAP_DEBUG_CONNS, "client_read_cb: "
"connection %lu ready to read\n",
c->c_connid );
+
+ ber = c->c_currentber;
+ if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "ber_alloc failed\n" );
+ goto fail;
+ }
+
+ tag = ber_get_next( c->c_sb, &len, ber );
+ if ( tag != LDAP_TAG_MESSAGE ) {
+ int err = sock_errno();
+
+ if ( err != EWOULDBLOCK && err != EAGAIN ) {
+ char ebuf[128];
+ Debug( LDAP_DEBUG_ANY, "ber_get_next on fd %d failed errno=%d (%s)\n", c->c_fd,
+ err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
+
+ c->c_currentber = NULL;
+ goto fail;
+ }
+ c->c_currentber = ber;
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ return;
+ }
+
+ c->c_currentber = NULL;
+
+ op = operation_init( c, ber );
+ if ( !op ) {
+ Debug( LDAP_DEBUG_ANY, "operation_init failed\n" );
+ goto fail;
+ }
+
+ if ( ldap_pvt_thread_pool_submit(
+ &connection_pool, operation_process, op ) ) {
+ /* what could have happened? */
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ operation_destroy( op );
+ ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ goto fail;
+ }
+
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ return;
+fail:
client_destroy( c );
+
+ if ( ber ) {
+ ber_free( ber, 1 );
+ }
+ return;
}
void
upstream_read_cb( evutil_socket_t s, short what, void *arg )
{
Connection *c = arg;
+ BerElement *ber;
+ ber_tag_t tag;
+ Operation *op, needle = { .o_upstream = c };
+ ber_len_t len;
+ int finished = 0;
ldap_pvt_thread_mutex_lock( &c->c_mutex );
+ Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: "
+ "connection %lu ready to read\n",
+ c->c_connid );
+
+ ber = c->c_currentber;
+ if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
+ Debug( LDAP_DEBUG_ANY, "ber_alloc failed\n" );
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ return;
+ }
+
+ tag = ber_get_next( c->c_sb, &len, ber );
+ if ( tag != LDAP_TAG_MESSAGE ) {
+ int err = sock_errno();
+
+ if ( err != EWOULDBLOCK && err != EAGAIN ) {
+ char ebuf[128];
+ Debug( LDAP_DEBUG_ANY, "ber_get_next on fd %d failed errno=%d (%s)\n", c->c_fd,
+ err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
+
+ c->c_currentber = NULL;
+ goto fail;
+ }
+ c->c_currentber = ber;
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+ return;
+ }
+
+ c->c_currentber = NULL;
+
+ tag = ber_get_int( ber, &needle.o_upstream_msgid );
+ if ( tag != LDAP_TAG_MSGID || needle.o_upstream_msgid == 0 ) {
+ goto fail;
+ }
+
+ op = tavl_find( c->c_ops, &needle, operation_upstream_cmp );
+ if ( !op ) {
+ ber_free( ber, 1 );
+ } else {
+ Connection *client = op->o_client;
+ BerElement *output;
+ BerValue response, controls;
+ ber_tag_t type;
+
+ type = ber_skip_element( ber, &response );
+ switch ( type ) {
+ case LDAP_RES_SEARCH_ENTRY:
+ case LDAP_RES_SEARCH_REFERENCE:
+ case LDAP_RES_INTERMEDIATE:
+ break;
+ default:
+ finished = 1;
+ tavl_delete( &c->c_ops, op, operation_upstream_cmp );
+ break;
+ }
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ tag = ber_peek_tag( ber, &len );
+ if ( tag == LDAP_TAG_CONTROLS ) {
+ tag = ber_skip_element( ber, &controls );
+ }
+
+ output = ber_alloc();
+ if ( !output ) {
+ goto fail;
+ }
+
+ ber_start_seq( output, LDAP_TAG_MESSAGE );
+ ber_put_int( output, op->o_client_msgid, LDAP_TAG_MSGID );
+ ber_put_berval( output, &response, type );
+ if ( tag == LDAP_TAG_CONTROLS ) {
+ ber_put_berval( output, &controls, LDAP_TAG_CONTROLS );
+ }
+ ber_put_seq( output );
+
+ if ( finished ) {
+ ldap_pvt_thread_mutex_lock( &client->c_mutex );
+ tavl_delete( &client->c_ops, op, operation_client_cmp );
+ ldap_pvt_thread_mutex_unlock( &client->c_mutex );
+ operation_destroy( op );
+ }
+
+ ldap_pvt_thread_mutex_lock( &client->c_io_mutex );
+ client->c_pendingber = output;
+ ldap_pvt_thread_mutex_unlock( &client->c_io_mutex );
+
+ client_write_cb( -1, 0, client );
+ return;
+ }
+
+ ldap_pvt_thread_mutex_unlock( &c->c_mutex );
+
+ return;
+fail:
+ Debug( LDAP_DEBUG_ANY, "upstream_read_cb: "
+ "error on processing a response on upstream connection %ld\n",
+ c->c_connid );
+ ber_free( ber, 1 );
upstream_destroy( c );
}