]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Lload cn=monitor initial implementation
authorNadezhda Ivanova <nivanova@symas.com>
Thu, 25 Jan 2018 11:19:05 +0000 (13:19 +0200)
committerOndřej Kuzník <okuznik@symas.com>
Tue, 17 Nov 2020 17:58:14 +0000 (17:58 +0000)
14 files changed:
servers/lloadd/Makefile_module.in
servers/lloadd/backend.c
servers/lloadd/bind.c
servers/lloadd/client.c
servers/lloadd/daemon.c
servers/lloadd/extended.c
servers/lloadd/init.c
servers/lloadd/lload.h
servers/lloadd/module_init.c
servers/lloadd/monitor.c [new file with mode: 0644]
servers/lloadd/operation.c
servers/lloadd/proto-lload.h
servers/lloadd/upstream.c
servers/slapd/back-monitor/init.c

index bbe62e6bd76e6ccae0737d4761bf0ea352cd06bd..7d5b77bdf35691c9287e459062c1a8737adcb00c 100644 (file)
@@ -19,7 +19,7 @@ XSRCS = version.c
 NT_SRCS = ../slapd/nt_svc.c
 NT_OBJS = ../slapd/nt_svc.o ../../libraries/liblutil/slapdmsg.res
 
-SRCS   += module_init.c
+SRCS   += module_init.c monitor.c
 
 OBJS   = $(patsubst %.c,%.lo,$(SRCS)) $(@PLAT@_OBJS)
 
index 110b90d905def44eefc527f7d78647ae78054176..1cb04a032ae95617d9307afdbdabf123b4820687 100644 (file)
@@ -245,7 +245,13 @@ backend_select( LloadOperation *op, int *res )
                 ldap_pvt_thread_mutex_unlock( &backend_mutex );
 
                 b->b_n_ops_executing++;
+                if ( op->o_tag == LDAP_REQ_BIND ) {
+                    b->b_counters[LLOAD_STATS_OPS_BIND].lc_ops_received++;
+                } else {
+                    b->b_counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
+                }
                 c->c_n_ops_executing++;
+                c->c_counters.lc_ops_received++;
                 CONNECTION_UNLOCK_INCREF(c);
 
                 ldap_pvt_thread_mutex_unlock( &b->b_mutex );
index 6565d33c47f3111d0510c01aa090877fb99ea1a8..75820bdf19ce18449c93d93b7861dbe69892d965 100644 (file)
@@ -39,6 +39,8 @@ bind_mech_external(
     client->c_state = LLOAD_C_READY;
     client->c_type = LLOAD_C_OPEN;
 
+    op->o_res = LLOAD_OP_COMPLETED;
+
     /*
      * We only support implicit assertion.
      *
@@ -225,6 +227,11 @@ request_bind( LloadConnection *client, LloadOperation *op )
              * lose the client lock in operation_destroy_from_client temporarily
              */
             pinned_op->o_client_refcnt++;
+            op->o_res = LLOAD_OP_COMPLETED;
+
+            /* We didn't start a new operation, just continuing an existing one */
+            lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_received--;
+
             operation_destroy_from_client( op );
             pinned_op->o_client_refcnt--;
 
@@ -380,6 +387,10 @@ request_bind( LloadConnection *client, LloadOperation *op )
     }
     upstream->c_pendingber = ber;
 
+    if ( !pin ) {
+        lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_forwarded++;
+    }
+
     CONNECTION_LOCK(upstream);
     if ( pin ) {
         tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
index ba20bc975643e4eec19da77feb85d49fdbd3c4ee..1044341c615a315fe221f7f8dca7c1993a25c7bb 100644 (file)
@@ -34,6 +34,8 @@ request_abandon( LloadConnection *c, LloadOperation *op )
     LloadOperation *request, needle = { .o_client_connid = c->c_connid };
     int rc = LDAP_SUCCESS;
 
+    op->o_res = LLOAD_OP_COMPLETED;
+
     if ( ber_decode_int( &op->o_request, &needle.o_client_msgid ) ) {
         Debug( LDAP_DEBUG_STATS, "request_abandon: "
                 "connid=%lu msgid=%d invalid integer sent in abandon request\n",
@@ -125,6 +127,8 @@ request_process( LloadConnection *client, LloadOperation *op )
             op->o_client_msgid, op->o_upstream_connid, op->o_upstream_msgid );
     assert( rc == LDAP_SUCCESS );
 
+    lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++;
+
     if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) &&
             client->c_type != LLOAD_C_PRIVILEGED ) {
         CONNECTION_LOCK_DECREF(client);
@@ -209,7 +213,9 @@ handle_one_request( LloadConnection *c )
 
     switch ( op->o_tag ) {
         case LDAP_REQ_UNBIND:
+            lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
             /* There is never a response for this operation */
+            op->o_res = LLOAD_OP_COMPLETED;
             operation_destroy_from_client( op );
             Debug( LDAP_DEBUG_STATS, "handle_one_request: "
                     "received unbind, closing client connid=%lu\n",
@@ -217,14 +223,17 @@ handle_one_request( LloadConnection *c )
             CONNECTION_DESTROY(c);
             return -1;
         case LDAP_REQ_BIND:
+            lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_received++;
             handler = request_bind;
             break;
         case LDAP_REQ_ABANDON:
+            lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
             /* We can't send a response to abandon requests even if a bind is
              * currently in progress */
             handler = request_abandon;
             break;
         case LDAP_REQ_EXTENDED:
+            lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
             handler = request_extended;
             break;
         default:
@@ -232,6 +241,7 @@ handle_one_request( LloadConnection *c )
                 return operation_send_reject_locked(
                         op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
             }
+            lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
             handler = request_process;
             break;
     }
@@ -563,3 +573,20 @@ clients_destroy( void )
     }
     ldap_pvt_thread_mutex_unlock( &clients_mutex );
 }
+
+void
+clients_walk( CONNECTION_CLIENT_WALK apply, void *argv )
+{
+    LloadConnection *c;
+    ldap_pvt_thread_mutex_lock( &clients_mutex );
+    if ( LDAP_CIRCLEQ_EMPTY( &clients ) ) {
+        ldap_pvt_thread_mutex_unlock( &clients_mutex );
+        return;
+    }
+
+    /* Todo is it possible to do this without holding this lock? */
+    LDAP_CIRCLEQ_FOREACH ( c, &clients, c_next ) {
+        apply( c, argv );
+    }
+    ldap_pvt_thread_mutex_unlock( &clients_mutex );
+}
index 9aff43c25dacb614cba79449614daa0d9ad8b621..a03c3ea3f9664e75542278f7497e241028adfab2 100644 (file)
@@ -85,6 +85,12 @@ struct evdns_base *dnsbase;
 
 struct event *lload_timeout_event;
 
+/*
+ * global lload statistics. Not mutex protected to preserve performance -
+ * increment is atomic, at most we risk a bit of inconsistency
+ */
+lload_global_stats_t lload_stats;
+
 #ifndef SLAPD_LISTEN_BACKLOG
 #define SLAPD_LISTEN_BACKLOG 1024
 #endif /* ! SLAPD_LISTEN_BACKLOG */
@@ -1431,3 +1437,10 @@ lload_resume_listeners( void )
         evconnlistener_enable( lload_listeners[i]->listener );
     }
 }
+
+/* we need this in a file that compiles for both module and server */
+void
+lload_counters_init()
+{
+    memset( &lload_stats, 0, sizeof(lload_global_stats_t) );
+}
index 26b689f5898671314502d085244446b1c1f5a5e0..b7a230ebc4ad3d758175e21414feab555330e4fe 100644 (file)
@@ -96,6 +96,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
     /* We already have something to write */
     event_add( c->c_write_event, lload_write_timeout );
 
+    op->o_res = LLOAD_OP_COMPLETED;
     operation_destroy_from_client( op );
     CONNECTION_UNLOCK_INCREF(c);
 
index 6cfe725590cbc470482f73483ae8d44cb6bc7111..52b76fadc6465d01f66d36d46933e3b9d1cbf75c 100644 (file)
@@ -94,6 +94,8 @@ lload_init( int mode, const char *name )
             ldap_pvt_thread_pool_init_q( &connection_pool, connection_pool_max,
                     0, connection_pool_queues );
 
+            lload_counters_init();
+
             ldap_pvt_thread_mutex_init( &slapd_rq.rq_mutex );
             LDAP_STAILQ_INIT( &slapd_rq.task_list );
             LDAP_STAILQ_INIT( &slapd_rq.run_list );
index 70aeade9f5f28034549c88605ed5d70e19653cf6..2c7519aaad929953653da14e6d0f1ce18f475a22 100644 (file)
@@ -43,6 +43,7 @@
 #include "avl.h"
 
 #include "../servers/slapd/slap.h"
+#include "../slapd/back-monitor/back-monitor.h"
 
 #ifndef ldap_debug
 #define ldap_debug slap_debug
@@ -123,6 +124,26 @@ struct LloadPendingConnection {
     LDAP_LIST_ENTRY(LloadPendingConnection) next;
 };
 
+typedef struct lload_counters_t {
+    ldap_pvt_mp_t lc_ops_completed;
+    ldap_pvt_mp_t lc_ops_received;
+    ldap_pvt_mp_t lc_ops_forwarded;
+    ldap_pvt_mp_t lc_ops_rejected;
+    ldap_pvt_mp_t lc_ops_failed;
+} lload_counters_t;
+
+enum {
+    LLOAD_STATS_OPS_BIND = 0,
+    LLOAD_STATS_OPS_OTHER,
+    LLOAD_STATS_OPS_LAST
+};
+
+typedef struct lload_global_stats_t {
+    ldap_pvt_mp_t global_incoming;
+    ldap_pvt_mp_t global_outgoing;
+    lload_counters_t counters[LLOAD_STATS_OPS_LAST];
+} lload_global_stats_t;
+
 /* Can hold mutex when locking a linked connection */
 struct LloadBackend {
     ldap_pvt_thread_mutex_t b_mutex;
@@ -145,6 +166,8 @@ struct LloadBackend {
     long b_max_pending, b_max_conn_pending;
     long b_n_ops_executing;
 
+    lload_counters_t b_counters[LLOAD_STATS_OPS_LAST];
+
     LDAP_CIRCLEQ_ENTRY(LloadBackend) b_next;
 };
 
@@ -276,8 +299,9 @@ struct LloadConnection {
     enum lload_tls_type c_is_tls; /* true if this LDAP over raw TLS */
 #endif
 
-    long c_n_ops_executing; /* num of ops currently executing */
-    long c_n_ops_completed; /* num of ops completed */
+    long c_n_ops_executing;      /* num of ops currently executing */
+    long c_n_ops_completed;      /* num of ops completed */
+    lload_counters_t c_counters; /* per connection operation counters */
 
     /*
      * Protected by the CIRCLEQ mutex:
@@ -296,6 +320,14 @@ enum op_state {
     LLOAD_OP_DETACHING_UPSTREAM = 1 << 2,
     LLOAD_OP_DETACHING_CLIENT = 1 << 3,
 };
+
+/* operation result for monitoring purposes */
+enum op_result {
+    LLOAD_OP_REJECTED,  /* operation was not forwarded */
+    LLOAD_OP_COMPLETED, /* operation sent and response received */
+    LLOAD_OP_FAILED, /* operation was forwarded, but no response was received */
+};
+
 #define LLOAD_OP_FREEING_MASK \
     ( LLOAD_OP_FREEING_UPSTREAM | LLOAD_OP_FREEING_CLIENT )
 #define LLOAD_OP_DETACHING_MASK \
@@ -326,6 +358,7 @@ struct LloadOperation {
     time_t o_start;
     unsigned long o_pin_id;
 
+    enum op_result o_res;
     BerElement *o_ber;
     BerValue o_request, o_ctrls;
 };
@@ -354,6 +387,13 @@ struct LloadListener {
 #endif
 };
 
+typedef int (*CONNECTION_CLIENT_WALK)( LloadConnection *c, void *argv );
+
+struct lload_monitor_conn_arg {
+    Operation *op;
+    monitor_subsys_t *ms;
+    Entry **ep;
+};
 LDAP_END_DECL
 
 #include "proto-lload.h"
index 3f042f6b38aaf032efaa43d71187baa26c65d16e..e38e8888c7e40acc52fbf41fddeba2b66779cc5e 100644 (file)
 
 #include "ldap_rq.h"
 
-int
-lload_start_daemon()
+ldap_pvt_thread_t lloadd_main_thread;
+
+void *
+lload_start_daemon( void *arg )
 {
     struct event_base *daemon_base = event_base_new();
     int rc = 0, i;
@@ -48,11 +50,11 @@ lload_start_daemon()
         Debug( LDAP_DEBUG_ANY, "lload_start_daemon: "
                 "main event base allocation failed\n" );
         rc = 1;
-        return rc;
+        return (void *)(uintptr_t)rc;
     }
 
     rc = lloadd_daemon( daemon_base );
-    return rc;
+    return (void *)(uintptr_t)rc;
 }
 
 /* from init.c */
@@ -71,16 +73,90 @@ lload_conn_pool_init()
     return rc;
 }
 
+static int
+lload_module_incoming_count( LloadConnection *conn, void *argv )
+{
+    lload_global_stats_t *tmp_stats = argv;
+    tmp_stats->global_incoming++;
+    return 0;
+}
+
+/* update all global statistics other than rejected and received,
+ * these are updated in real time */
+void *
+lload_module_update_global_stats( void *ctx, void *arg )
+{
+    struct re_s *rtask = arg;
+    lload_global_stats_t tmp_stats = {};
+    LloadBackend *b;
+    int i;
+
+    Debug( LDAP_DEBUG_TRACE, "lload_module_update_global_stats: "
+            "updating stats\n" );
+    /* count incoming connections */
+    clients_walk( lload_module_incoming_count, &tmp_stats );
+
+    LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
+        LloadConnection *c;
+
+        ldap_pvt_thread_mutex_lock( &b->b_mutex );
+        tmp_stats.global_outgoing += b->b_active + b->b_bindavail;
+
+        /* merge completed and failed stats */
+        for ( i = 0; i < LLOAD_STATS_OPS_LAST; i++ ) {
+            tmp_stats.counters[i].lc_ops_completed +=
+                    b->b_counters[i].lc_ops_completed;
+            tmp_stats.counters[i].lc_ops_failed +=
+                    b->b_counters[i].lc_ops_failed;
+        }
+        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+    }
+
+    /* update lload_stats */
+    lload_stats.global_outgoing = tmp_stats.global_outgoing;
+    lload_stats.global_incoming = tmp_stats.global_incoming;
+    for ( i = 0; i < LLOAD_STATS_OPS_LAST; i++ ) {
+        lload_stats.counters[i].lc_ops_completed =
+                tmp_stats.counters[i].lc_ops_completed;
+        lload_stats.counters[i].lc_ops_failed =
+                tmp_stats.counters[i].lc_ops_failed;
+    }
+
+    /* reschedule */
+    ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+    ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
+    ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+    return NULL;
+}
+
 void *
 lload_module_start_daemon( void *ctx, void *arg )
 {
-    lload_start_daemon();
+    lload_counters_init();
+    lload_monitor_mss_init();
+
+    if ( ldap_pvt_thread_create(
+                 &lloadd_main_thread, 0, lload_start_daemon, NULL ) ) {
+        return NULL;
+    }
+
+    ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+    ldap_pvt_runqueue_insert( &slapd_rq, 1, lload_module_update_global_stats,
+            NULL, "lload_module_update_global_stats", "lloadd" );
+    ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
     return NULL;
 }
 
 int
 init_module( int argc, char *argv[] )
 {
+    if ( argc != 2 ) {
+        Debug( LDAP_DEBUG_CONFIG, "lloadd: "
+                "incorrect number of arguments to module\n" );
+        return -1;
+    }
+
     if ( slapMode & SLAP_TOOL_MODE ) {
         return 0;
     }
@@ -103,9 +179,10 @@ init_module( int argc, char *argv[] )
         return -1;
     }
 
-    ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
-    ldap_pvt_runqueue_insert( &slapd_rq, 0, lload_module_start_daemon, NULL,
-            "lload_module_start_daemon", "lloadd" );
-    ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
-    return 0;
+    if ( lload_monitor_initialize() != 0 ) {
+        return -1;
+    }
+
+    return ldap_pvt_thread_pool_submit(
+            &connection_pool, lload_module_start_daemon, NULL );
 }
diff --git a/servers/lloadd/monitor.c b/servers/lloadd/monitor.c
new file mode 100644 (file)
index 0000000..b448a5d
--- /dev/null
@@ -0,0 +1,1033 @@
+/* init.c - initialize various things */
+/* $OpenLDAP$ */
+/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
+ *
+ * Copyright 1998-2020 The OpenLDAP Foundation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted only as authorized by the OpenLDAP
+ * Public License.
+ *
+ * A copy of this license is available in the file LICENSE in the
+ * top-level directory of the distribution or, alternatively, at
+ * <http://www.OpenLDAP.org/license.html>.
+ */
+/* Portions Copyright (c) 1995 Regents of the University of Michigan.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms are permitted
+ * provided that this notice is preserved and that due credit is given
+ * to the University of Michigan at Ann Arbor. The name of the University
+ * may not be used to endorse or promote products derived from this
+ * software without specific prior written permission. This software
+ * is provided ``as is'' without express or implied warranty.
+ */
+
+#include "portable.h"
+
+#include <stdio.h>
+
+#include <ac/socket.h>
+#include <ac/string.h>
+#include <ac/time.h>
+
+#include "lload.h"
+#include "lber_pvt.h"
+
+#include "ldap_rq.h"
+#include "config.h"
+#include "../slapd/back-monitor/back-monitor.h"
+
+#define LLOAD_MONITOR_BALANCER_NAME "Load Balancer"
+#define LLOAD_MONITOR_BALANCER_RDN \
+    SLAPD_MONITOR_AT "=" LLOAD_MONITOR_BALANCER_NAME
+#define LLOAD_MONITOR_BALANCER_DN \
+    LLOAD_MONITOR_BALANCER_RDN "," SLAPD_MONITOR_BACKEND_DN
+
+#define LLOAD_MONITOR_INCOMING_NAME "Incoming Connections"
+#define LLOAD_MONITOR_INCOMING_RDN \
+    SLAPD_MONITOR_AT "=" LLOAD_MONITOR_INCOMING_NAME
+#define LLOAD_MONITOR_INCOMING_DN \
+    LLOAD_MONITOR_INCOMING_RDN "," LLOAD_MONITOR_BALANCER_DN
+
+#define LLOAD_MONITOR_OPERATIONS_NAME "Operations"
+#define LLOAD_MONITOR_OPERATIONS_RDN \
+    SLAPD_MONITOR_AT "=" LLOAD_MONITOR_OPERATIONS_NAME
+#define LLOAD_MONITOR_OPERATIONS_DN \
+    LLOAD_MONITOR_OPERATIONS_RDN "," LLOAD_MONITOR_BALANCER_DN
+
+#define LLOAD_MONITOR_BACKENDS_NAME "Backend Servers"
+#define LLOAD_MONITOR_BACKENDS_RDN \
+    SLAPD_MONITOR_AT "=" LLOAD_MONITOR_BACKENDS_NAME
+#define LLOAD_MONITOR_BACKENDS_DN \
+    LLOAD_MONITOR_BACKENDS_RDN "," LLOAD_MONITOR_BALANCER_DN
+
+struct lload_monitor_ops_t {
+    struct berval rdn;
+} lload_monitor_op[] = {
+    { BER_BVC("cn=Bind") },
+    { BER_BVC("cn=Other") },
+
+    { BER_BVNULL }
+};
+
+static ObjectClass *oc_olmBalancer;
+static ObjectClass *oc_olmBalancerServer;
+static ObjectClass *oc_olmBalancerConnection;
+static ObjectClass *oc_olmBalancerOperation;
+
+static ObjectClass *oc_monitorContainer;
+static ObjectClass *oc_monitorCounterObject;
+
+static AttributeDescription *ad_olmServerURI;
+static AttributeDescription *ad_olmReceivedOps;
+static AttributeDescription *ad_olmForwardedOps;
+static AttributeDescription *ad_olmRejectedOps;
+static AttributeDescription *ad_olmCompletedOps;
+static AttributeDescription *ad_olmFailedOps;
+static AttributeDescription *ad_olmConnectionType;
+static AttributeDescription *ad_olmPendingOps;
+static AttributeDescription *ad_olmPendingConnections;
+static AttributeDescription *ad_olmActiveConnections;
+static AttributeDescription *ad_olmIncomingConnections;
+static AttributeDescription *ad_olmOutgoingConnections;
+
+static struct {
+    char *name;
+    char *oid;
+} s_oid[] = {
+    { "olmBalancerAttributes", "olmModuleAttributes:1" },
+    { "olmBalancerObjectClasses", "olmModuleObjectClasses:1" },
+
+    { NULL }
+};
+
+static struct {
+    char *desc;
+    AttributeDescription **ad;
+} s_at[] = {
+    { "( olmBalancerAttributes:1 "
+      "NAME ( 'olmServerURI' ) "
+      "DESC 'URI of a backend server' "
+      "SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 "
+      "EQUALITY caseIgnoreMatch "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmServerURI },
+    { "( olmBalancerAttributes:2 "
+      "NAME ( 'olmReceivedOps' ) "
+      "DESC 'monitor received operations' "
+      "SUP monitorCounter "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmReceivedOps },
+    { "( olmBalancerAttributes:3 "
+      "NAME ( 'olmForwardedOps' ) "
+      "DESC 'monitor forwarded operations' "
+      "SUP monitorCounter "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmForwardedOps },
+    { "( olmBalancerAttributes:4 "
+      "NAME ( 'olmRejectedOps' ) "
+      "DESC 'monitor rejected operations' "
+      "SUP monitorCounter "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmRejectedOps },
+    { "( olmBalancerAttributes:5 "
+      "NAME ( 'olmCompletedOps' ) "
+      "DESC 'monitor completed operations' "
+      "SUP monitorCounter "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmCompletedOps },
+    { "( olmBalancerAttributes:6 "
+      "NAME ( 'olmFailedOps' ) "
+      "DESC 'monitor failed operations' "
+      "SUP monitorCounter "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmFailedOps },
+    { "( olmBalancerAttributes:7 "
+      "NAME ( 'olmPendingOps' ) "
+      "DESC 'monitor number of pending operations' "
+      "EQUALITY integerMatch "
+      "SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmPendingOps },
+    { "( olmBalancerAttributes:8 "
+      "NAME ( 'olmPendingConnections' ) "
+      "DESC 'monitor number of pending connections' "
+      "EQUALITY integerMatch "
+      "SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmPendingConnections },
+    { "( olmBalancerAttributes:9 "
+      "NAME ( 'olmActiveConnections' ) "
+      "DESC 'monitor number of active connections' "
+      "EQUALITY integerMatch "
+      "SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmActiveConnections },
+    { "( olmBalancerAttributes:10 "
+      "NAME ( 'olmConnectionType' ) "
+      "DESC 'Connection type' "
+      "EQUALITY integerMatch "
+      "SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmConnectionType },
+    { "( olmBalancerAttributes:11 "
+      "NAME ( 'olmIncomingConnections' ) "
+      "DESC 'monitor number of incoming connections' "
+      "EQUALITY integerMatch "
+      "SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmIncomingConnections },
+    { "( olmBalancerAttributes:12 "
+      "NAME ( 'olmOutgoingConnections' ) "
+      "DESC 'monitor number of active connections' "
+      "EQUALITY integerMatch "
+      "SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 "
+      "NO-USER-MODIFICATION "
+      "USAGE dSAOperation )",
+        &ad_olmOutgoingConnections },
+
+    { NULL }
+};
+
+static struct {
+    char *name;
+    ObjectClass **oc;
+} s_moc[] = {
+    { "monitorContainer", &oc_monitorContainer },
+    { "monitorCounterObject", &oc_monitorCounterObject },
+
+    { NULL }
+};
+
+static struct {
+    char *desc;
+    ObjectClass **oc;
+} s_oc[] = {
+    { "( olmBalancerObjectClasses:1 "
+      "NAME ( 'olmBalancer' ) "
+      "SUP top STRUCTURAL "
+      "MAY ( "
+      "olmIncomingConnections "
+      "$ olmOutgoingConnections "
+      ") )",
+        &oc_olmBalancer },
+    { "( olmBalancerObjectClasses:2 "
+      "NAME ( 'olmBalancerServer' ) "
+      "SUP top STRUCTURAL "
+      "MAY ( "
+      "olmServerURI "
+      "$ olmActiveConnections "
+      "$ olmPendingConnections "
+      "$ olmPendingOps"
+      "$ olmReceivedOps "
+      "$ olmCompletedOps "
+      "$ olmFailedOps "
+      ") )",
+        &oc_olmBalancerServer },
+
+    { "( olmBalancerObjectClasses:3 "
+      "NAME ( 'olmBalancerOperation' ) "
+      "SUP top STRUCTURAL "
+      "MAY ( "
+      "olmReceivedOps "
+      "$ olmForwardedOps "
+      "$ olmRejectedOps "
+      "$ olmCompletedOps "
+      "$ olmFailedOps "
+      ") )",
+        &oc_olmBalancerOperation },
+    { "( olmBalancerObjectClasses:4 "
+      "NAME ( 'olmBalancerConnection' ) "
+      "SUP top STRUCTURAL "
+      "MAY ( "
+      "olmConnectionType "
+      "$ olmPendingOps "
+      "$ olmReceivedOps "
+      "$ olmCompletedOps "
+      "$ olmFailedOps "
+      ") )",
+        &oc_olmBalancerConnection },
+    { NULL }
+};
+
+int
+lload_monitor_initialize( void )
+{
+    int i, code;
+    static int lload_monitor_initialized = 0;
+    ConfigArgs c;
+    char *argv[3];
+    /* set to 0 when successfully initialized; otherwise, remember failure */
+    static int lload_monitor_initialized_failure = 1;
+
+    /* register schema here; if compiled as dynamic object,
+     * must be loaded __after__ back_monitor.la */
+
+    if ( lload_monitor_initialized++ ) {
+        return lload_monitor_initialized_failure;
+    }
+
+    if ( backend_info( "monitor" ) == NULL ) {
+        return -1;
+    }
+
+    argv[0] = "lload monitor";
+    c.argv = argv;
+    c.argc = 3;
+    c.fname = argv[0];
+    for ( i = 0; s_oid[i].name; i++ ) {
+        argv[1] = s_oid[i].name;
+        argv[2] = s_oid[i].oid;
+
+        if ( parse_oidm( &c, 0, NULL ) != 0 ) {
+            Debug( LDAP_DEBUG_ANY, "lload_monitor_initialize: "
+                    "unable to add objectIdentifier \"%s=%s\"\n",
+                    s_oid[i].name, s_oid[i].oid );
+            return 2;
+        }
+    }
+
+    for ( i = 0; s_at[i].desc != NULL; i++ ) {
+        code = register_at( s_at[i].desc, s_at[i].ad, 1 );
+        if ( code != LDAP_SUCCESS ) {
+            Debug( LDAP_DEBUG_ANY, "lload_monitor_initialize: "
+                    "register_at failed for attributeType (%s)\n",
+                    s_at[i].desc );
+            return 3;
+
+        } else {
+            (*s_at[i].ad)->ad_type->sat_flags |= SLAP_AT_HIDE;
+        }
+    }
+
+    for ( i = 0; s_oc[i].desc != NULL; i++ ) {
+        code = register_oc( s_oc[i].desc, s_oc[i].oc, 1 );
+        if ( code != LDAP_SUCCESS ) {
+            Debug( LDAP_DEBUG_ANY, "lload_monitor_initialize: "
+                    "register_oc failed for objectClass (%s)\n",
+                    s_oc[i].desc );
+            return 4;
+
+        } else {
+            (*s_oc[i].oc)->soc_flags |= SLAP_OC_HIDE;
+        }
+    }
+
+    for ( i = 0; s_moc[i].name != NULL; i++ ) {
+        *s_moc[i].oc = oc_find( s_moc[i].name );
+        if ( !*s_moc[i].oc ) {
+            Debug( LDAP_DEBUG_ANY, "lload_monitor_initialize: "
+                    "failed to find objectClass (%s)\n",
+                    s_moc[i].name );
+            return 5;
+        }
+    }
+
+    return (lload_monitor_initialized_failure = LDAP_SUCCESS);
+}
+
+static int
+lload_monitor_subsystem_destroy( BackendDB *be, monitor_subsys_t *ms )
+{
+    return LDAP_SUCCESS;
+}
+
+static void
+lload_monitor_balancer_dispose( void **priv )
+{
+    return;
+}
+
+static int
+lload_monitor_balancer_free( Entry *e, void **priv )
+{
+    return LDAP_SUCCESS;
+}
+
+static int
+lload_monitor_balancer_update(
+        Operation *op,
+        SlapReply *rs,
+        Entry *e,
+        void *priv )
+{
+    Attribute *a;
+
+    a = attr_find( e->e_attrs, ad_olmIncomingConnections );
+    assert( a != NULL );
+
+    UI2BV( &a->a_vals[0], lload_stats.global_incoming );
+
+    a = attr_find( e->e_attrs, ad_olmOutgoingConnections );
+    assert( a != NULL );
+
+    UI2BV( &a->a_vals[0], lload_stats.global_outgoing );
+    return SLAP_CB_CONTINUE;
+}
+
+static int
+lload_monitor_ops_update( Operation *op, SlapReply *rs, Entry *e, void *priv )
+{
+    Attribute *a;
+    lload_counters_t *counters = (lload_counters_t *)priv;
+
+    a = attr_find( e->e_attrs, ad_olmReceivedOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], counters->lc_ops_received );
+
+    a = attr_find( e->e_attrs, ad_olmForwardedOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], counters->lc_ops_forwarded );
+
+    a = attr_find( e->e_attrs, ad_olmRejectedOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], counters->lc_ops_rejected );
+
+    a = attr_find( e->e_attrs, ad_olmCompletedOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], counters->lc_ops_completed );
+
+    a = attr_find( e->e_attrs, ad_olmFailedOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], counters->lc_ops_failed );
+
+    return SLAP_CB_CONTINUE;
+}
+
+static void
+lload_monitor_ops_dispose( void **priv )
+{
+    return;
+}
+
+static int
+lload_monitor_ops_free( Entry *e, void **priv )
+{
+    return LDAP_SUCCESS;
+}
+
+static int
+lload_monitor_balancer_init( BackendDB *be, monitor_subsys_t *ms )
+{
+    monitor_extra_t *mbe;
+    Entry *e;
+    int rc;
+    monitor_callback_t *cb;
+    struct berval value = BER_BVC("0");
+
+    assert( be != NULL );
+
+    mbe = (monitor_extra_t *)be->bd_info->bi_extra;
+
+    dnNormalize( 0, NULL, NULL, &ms->mss_dn, &ms->mss_ndn, NULL );
+
+    e = mbe->entry_stub( &ms->mss_dn, &ms->mss_ndn, &ms->mss_rdn,
+            oc_olmBalancer, NULL, NULL );
+    if ( e == NULL ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_balancer_init: "
+                "unable to create entry \"%s,%s\"\n",
+                ms->mss_rdn.bv_val, ms->mss_ndn.bv_val );
+        return -1;
+    }
+
+    ch_free( ms->mss_ndn.bv_val );
+    ber_dupbv( &ms->mss_dn, &e->e_name );
+    ber_dupbv( &ms->mss_ndn, &e->e_nname );
+
+    cb = ch_calloc( sizeof(monitor_callback_t), 1 );
+    cb->mc_update = lload_monitor_balancer_update;
+    cb->mc_free = lload_monitor_balancer_free;
+    cb->mc_dispose = lload_monitor_balancer_dispose;
+    cb->mc_private = NULL;
+
+    attr_merge_normalize_one( e, ad_olmIncomingConnections, &value, NULL );
+    attr_merge_normalize_one( e, ad_olmOutgoingConnections, &value, NULL );
+
+    rc = mbe->register_entry( e, cb, ms, 0 );
+    if ( rc != LDAP_SUCCESS ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_balancer_init: "
+                "unable to register entry \"%s\" for monitoring\n",
+                e->e_name.bv_val );
+        goto done;
+    }
+
+done:
+    entry_free( e );
+
+    return rc;
+}
+
+static int
+lload_monitor_ops_init( BackendDB *be, monitor_subsys_t *ms )
+{
+    monitor_extra_t *mbe;
+    Entry *e, *parent;
+    int rc;
+    int i;
+    struct berval value = BER_BVC("0");
+
+    assert( be != NULL );
+
+    mbe = (monitor_extra_t *)be->bd_info->bi_extra;
+
+    dnNormalize( 0, NULL, NULL, &ms->mss_dn, &ms->mss_ndn, NULL );
+    ms->mss_destroy = lload_monitor_subsystem_destroy;
+
+    parent = mbe->entry_stub( &ms->mss_dn, &ms->mss_ndn, &ms->mss_rdn,
+            oc_monitorContainer, NULL, NULL );
+    if ( parent == NULL ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_ops_init: "
+                "unable to create entry \"%s,%s\"\n",
+                ms->mss_rdn.bv_val, ms->mss_ndn.bv_val );
+        return -1;
+    }
+    ch_free( ms->mss_ndn.bv_val );
+    ber_dupbv( &ms->mss_dn, &parent->e_name );
+    ber_dupbv( &ms->mss_ndn, &parent->e_nname );
+
+    rc = mbe->register_entry( parent, NULL, ms, MONITOR_F_PERSISTENT_CH );
+    if ( rc != LDAP_SUCCESS ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_ops_init: "
+                "unable to register entry \"%s\" for monitoring\n",
+                parent->e_name.bv_val );
+        goto done;
+    }
+
+    for ( i = 0; lload_monitor_op[i].rdn.bv_val != NULL; i++ ) {
+        monitor_callback_t *cb;
+        e = mbe->entry_stub( &parent->e_name, &parent->e_nname,
+                &lload_monitor_op[i].rdn, oc_olmBalancerOperation, NULL, NULL );
+        if ( e == NULL ) {
+            Debug( LDAP_DEBUG_ANY, "lload_monitor_ops_init: "
+                    "unable to create entry \"%s,%s\"\n",
+                    lload_monitor_op[i].rdn.bv_val, parent->e_nname.bv_val );
+            return -1;
+        }
+
+        /* attr_merge_normalize_one( e, ad_olmDbOperations, &value, NULL ); */
+
+        /*
+         * We cannot share a single callback between entries.
+         *
+         * monitor_cache_destroy() tries to free all callbacks and it's called
+         * before mss_destroy() so we have no chance of handling it ourselves
+         */
+        cb = ch_calloc( sizeof(monitor_callback_t), 1 );
+        cb->mc_update = lload_monitor_ops_update;
+        cb->mc_free = lload_monitor_ops_free;
+        cb->mc_dispose = lload_monitor_ops_dispose;
+        cb->mc_private = &lload_stats.counters[i];
+
+        attr_merge_normalize_one( e, ad_olmReceivedOps, &value, NULL );
+        attr_merge_normalize_one( e, ad_olmForwardedOps, &value, NULL );
+        attr_merge_normalize_one( e, ad_olmRejectedOps, &value, NULL );
+        attr_merge_normalize_one( e, ad_olmCompletedOps, &value, NULL );
+        attr_merge_normalize_one( e, ad_olmFailedOps, &value, NULL );
+
+        rc = mbe->register_entry( e, cb, ms, 0 );
+
+        entry_free( e );
+
+        if ( rc != LDAP_SUCCESS ) {
+            Debug( LDAP_DEBUG_ANY, "lload_monitor_ops_init: "
+                    "unable to register entry \"%s\" for monitoring\n",
+                    e->e_name.bv_val );
+            ch_free( cb );
+            break;
+        }
+    }
+
+done:
+    entry_free( parent );
+    return rc;
+}
+
+static int
+lload_monitor_in_conn_entry( LloadConnection *conn, void *argv )
+{
+    Entry *e;
+    monitor_entry_t *mp;
+    struct lload_monitor_conn_arg *arg = argv;
+    monitor_extra_t *mbe = arg->op->o_bd->bd_info->bi_extra;
+    char buf[SLAP_TEXT_BUFLEN];
+    struct berval bv;
+
+    bv.bv_val = buf;
+    bv.bv_len = snprintf(
+            bv.bv_val, SLAP_TEXT_BUFLEN, "cn=Connection %lu", conn->c_connid );
+
+    e = mbe->entry_stub( &arg->ms->mss_dn, &arg->ms->mss_ndn, &bv,
+            oc_olmBalancerConnection, NULL, NULL );
+
+    mp = mbe->entrypriv_create();
+    e->e_private = mp;
+    mp->mp_info = arg->ms;
+    mp->mp_flags = MONITOR_F_SUB | MONITOR_F_VOLATILE;
+
+    *arg->ep = e;
+    arg->ep = &mp->mp_next;
+
+    return 0;
+}
+
+static int
+lload_monitor_in_conn_create(
+        Operation *op,
+        SlapReply *rs,
+        struct berval *ndn,
+        Entry *e_parent,
+        Entry **ep )
+{
+    monitor_entry_t *mp_parent;
+    struct lload_monitor_conn_arg arg = {
+        .op = op,
+        .ep = ep,
+    };
+
+    assert( e_parent->e_private != NULL );
+
+    mp_parent = e_parent->e_private;
+    arg.ms = (monitor_subsys_t *)mp_parent->mp_info;
+
+    clients_walk( lload_monitor_in_conn_entry, &arg );
+
+    return 0;
+}
+
+static int
+lload_monitor_up_conn_entry( LloadConnection *c, void *argv )
+{
+    Entry *e;
+    monitor_entry_t *mp;
+    struct lload_monitor_conn_arg *arg = argv;
+    monitor_extra_t *mbe = arg->op->o_bd->bd_info->bi_extra;
+    char buf[SLAP_TEXT_BUFLEN];
+    struct berval bv_rdn,
+        bv_type = BER_BVNULL,
+        bv_pending = BER_BVNULL,
+        bv_received = BER_BVNULL,
+        bv_completed = BER_BVNULL,
+        bv_failed = BER_BVNULL;
+
+    bv_rdn.bv_val = buf;
+    bv_rdn.bv_len = snprintf(
+            bv_rdn.bv_val, SLAP_TEXT_BUFLEN, "cn=Connection %lu", c->c_connid );
+
+    e = mbe->entry_stub( &arg->ms->mss_dn, &arg->ms->mss_ndn, &bv_rdn,
+            oc_olmBalancerConnection, NULL, NULL );
+
+    UI2BV( &bv_type, (long long unsigned int)c->c_type );
+    UI2BV( &bv_pending, (long long unsigned int)c->c_n_ops_executing );
+    UI2BV( &bv_received, c->c_counters.lc_ops_received );
+    UI2BV( &bv_completed, c->c_counters.lc_ops_completed );
+    UI2BV( &bv_failed, c->c_counters.lc_ops_failed );
+
+    attr_merge_normalize_one( e, ad_olmConnectionType, &bv_type, NULL );
+    attr_merge_normalize_one( e, ad_olmPendingOps, &bv_pending, NULL );
+    attr_merge_normalize_one( e, ad_olmReceivedOps, &bv_received, NULL );
+    attr_merge_normalize_one( e, ad_olmCompletedOps, &bv_completed, NULL );
+    attr_merge_normalize_one( e, ad_olmFailedOps, &bv_failed, NULL );
+
+    ch_free( bv_type.bv_val );
+    ch_free( bv_pending.bv_val );
+    ch_free( bv_received.bv_val );
+    ch_free( bv_completed.bv_val );
+    ch_free( bv_failed.bv_val );
+    mp = mbe->entrypriv_create();
+    e->e_private = mp;
+    mp->mp_info = arg->ms;
+    mp->mp_flags = MONITOR_F_SUB | MONITOR_F_VOLATILE;
+
+    *arg->ep = e;
+    arg->ep = &mp->mp_next;
+
+    return 0;
+}
+
+static int
+lload_monitor_up_conn_create(
+        Operation *op,
+        SlapReply *rs,
+        struct berval *ndn,
+        Entry *e_parent,
+        Entry **ep )
+{
+    monitor_entry_t *mp_parent;
+    monitor_subsys_t *ms;
+    LloadConnection *c;
+    LloadBackend *b;
+    struct lload_monitor_conn_arg *arg;
+
+    assert( e_parent->e_private != NULL );
+
+    mp_parent = e_parent->e_private;
+    ms = (monitor_subsys_t *)mp_parent->mp_info;
+    b = ms->mss_private;
+
+    if ( !b ) {
+        return -1;
+    }
+
+    arg = ch_calloc( 1, sizeof(struct lload_monitor_conn_arg) );
+    arg->op = op;
+    arg->ep = ep;
+    arg->ms = ms;
+
+    /* How to avoid this long lock? */
+    ldap_pvt_thread_mutex_lock( &b->b_mutex );
+    LDAP_CIRCLEQ_FOREACH ( c, &b->b_conns, c_next ) {
+        lload_monitor_up_conn_entry( c, arg );
+    }
+
+    LDAP_CIRCLEQ_FOREACH ( c, &b->b_bindconns, c_next ) {
+        lload_monitor_up_conn_entry( c, arg );
+    }
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+
+    ch_free( arg );
+    return 0;
+}
+
+int
+lload_monitor_incoming_conn_init( BackendDB *be, monitor_subsys_t *ms )
+{
+    monitor_extra_t *mbe;
+    Entry *e;
+    int rc;
+
+    assert( be != NULL );
+    mbe = (monitor_extra_t *)be->bd_info->bi_extra;
+
+    ms->mss_create = lload_monitor_in_conn_create;
+    ms->mss_destroy = lload_monitor_subsystem_destroy;
+
+    dnNormalize( 0, NULL, NULL, &ms->mss_dn, &ms->mss_ndn, NULL );
+
+    e = mbe->entry_stub( &ms->mss_dn, &ms->mss_ndn, &ms->mss_rdn,
+            oc_monitorContainer, NULL, NULL );
+    if ( e == NULL ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_incoming_conn_init: "
+                "unable to create entry \"%s,%s\"\n",
+                ms->mss_rdn.bv_val, ms->mss_ndn.bv_val );
+        return -1;
+    }
+    ch_free( ms->mss_ndn.bv_val );
+    ber_dupbv( &ms->mss_dn, &e->e_name );
+    ber_dupbv( &ms->mss_ndn, &e->e_nname );
+
+    rc = mbe->register_entry( e, NULL, ms, MONITOR_F_VOLATILE_CH );
+
+    if ( rc != LDAP_SUCCESS ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_incoming_conn_init: "
+                "unable to register entry \"%s\" for monitoring\n",
+                e->e_name.bv_val );
+        goto done;
+    }
+done:
+    entry_free( e );
+
+    return rc;
+}
+
+static struct monitor_subsys_t *servers_subsys;
+/* Not sure if this has to be a subsystem at all, perhaps just entries? */
+
+static int
+lload_monitor_server_update(
+        Operation *op,
+        SlapReply *rs,
+        Entry *e,
+        void *priv )
+{
+    Attribute *a;
+    LloadBackend *b = priv;
+    LloadConnection *c;
+    LloadPendingConnection *pc;
+    ldap_pvt_mp_t active = 0, pending = 0, received = 0, completed = 0,
+                  failed = 0;
+    int i;
+
+    ldap_pvt_thread_mutex_lock( &b->b_mutex );
+    active = b->b_active + b->b_bindavail;
+
+    LDAP_CIRCLEQ_FOREACH ( c, &b->b_preparing, c_next ) {
+        pending++;
+    }
+
+    LDAP_LIST_FOREACH( pc, &b->b_connecting, next ) {
+        pending++;
+    }
+
+    for ( i = 0; i < LLOAD_STATS_OPS_LAST; i++ ) {
+        received += b->b_counters[i].lc_ops_received;
+        completed += b->b_counters[i].lc_ops_completed;
+        failed += b->b_counters[i].lc_ops_failed;
+    }
+
+    a = attr_find( e->e_attrs, ad_olmPendingOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], (long long unsigned int)b->b_n_ops_executing );
+
+    ldap_pvt_thread_mutex_unlock( &b->b_mutex );
+
+    a = attr_find( e->e_attrs, ad_olmActiveConnections );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], active );
+
+    a = attr_find( e->e_attrs, ad_olmPendingConnections );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], pending );
+
+    a = attr_find( e->e_attrs, ad_olmReceivedOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], received );
+
+    a = attr_find( e->e_attrs, ad_olmCompletedOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], completed );
+
+    a = attr_find( e->e_attrs, ad_olmFailedOps );
+    assert( a != NULL );
+    UI2BV( &a->a_vals[0], failed );
+
+    return SLAP_CB_CONTINUE;
+}
+
+static int
+lload_monitor_backend_open( BackendDB *be, monitor_subsys_t *ms )
+{
+    Entry *e;
+    struct berval value = BER_BVC("0");
+    monitor_extra_t *mbe;
+    monitor_callback_t *cb;
+    LloadBackend *b = ms->mss_private;
+    int rc;
+
+    assert( be != NULL );
+    mbe = (monitor_extra_t *)be->bd_info->bi_extra;
+
+    dnNormalize( 0, NULL, NULL, &ms->mss_dn, &ms->mss_ndn, NULL );
+    e = mbe->entry_stub( &ms->mss_dn, &ms->mss_ndn, &ms->mss_rdn,
+            oc_olmBalancerServer, NULL, NULL );
+    if ( e == NULL ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_backend_open: "
+                "unable to create entry \"%s,%s\"\n",
+                ms->mss_rdn.bv_val, ms->mss_ndn.bv_val );
+        return -1;
+    }
+
+    ch_free( ms->mss_ndn.bv_val );
+    ber_dupbv( &ms->mss_dn, &e->e_name );
+    ber_dupbv( &ms->mss_ndn, &e->e_nname );
+
+    cb = ch_calloc( sizeof(monitor_callback_t), 1 );
+    cb->mc_update = lload_monitor_server_update;
+    cb->mc_free = NULL;
+    cb->mc_dispose = NULL;
+    cb->mc_private = b;
+
+    attr_merge_normalize_one( e, ad_olmServerURI, &b->b_uri, NULL );
+    attr_merge_normalize_one( e, ad_olmActiveConnections, &value, NULL );
+    attr_merge_normalize_one( e, ad_olmPendingConnections, &value, NULL );
+    attr_merge_normalize_one( e, ad_olmPendingOps, &value, NULL );
+    attr_merge_normalize_one( e, ad_olmReceivedOps, &value, NULL );
+    attr_merge_normalize_one( e, ad_olmCompletedOps, &value, NULL );
+    attr_merge_normalize_one( e, ad_olmFailedOps, &value, NULL );
+
+    rc = mbe->register_entry( e, cb, ms, MONITOR_F_VOLATILE_CH );
+
+    if ( rc != LDAP_SUCCESS ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_backend_open: "
+                "unable to register entry \"%s\" for monitoring\n",
+                e->e_name.bv_val );
+        goto done;
+    }
+done:
+    entry_free( e );
+    return LDAP_SUCCESS;
+}
+
+int
+lload_monitor_backends_init( BackendDB *be, monitor_subsys_t *ms )
+{
+    monitor_extra_t *mbe;
+    Entry *e;
+    unsigned int i = 1, j = 0;
+    int rc;
+    LloadBackend *b;
+
+    assert( be != NULL );
+    mbe = (monitor_extra_t *)be->bd_info->bi_extra;
+
+    dnNormalize( 0, NULL, NULL, &ms->mss_dn, &ms->mss_ndn, NULL );
+
+    e = mbe->entry_stub( &ms->mss_dn, &ms->mss_ndn, &ms->mss_rdn,
+            oc_monitorContainer, NULL, NULL );
+    if ( e == NULL ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_incoming_conn_init: "
+                "unable to create entry \"%s,%s\"\n",
+                ms->mss_rdn.bv_val, ms->mss_ndn.bv_val );
+        return -1;
+    }
+    ch_free( ms->mss_ndn.bv_val );
+    ber_dupbv( &ms->mss_dn, &e->e_name );
+    ber_dupbv( &ms->mss_ndn, &e->e_nname );
+
+    rc = mbe->register_entry( e, NULL, ms, MONITOR_F_PERSISTENT_CH );
+
+    if ( rc != LDAP_SUCCESS ) {
+        Debug( LDAP_DEBUG_ANY, "lload_monitor_backends_init: "
+                "unable to register entry \"%s\" for monitoring\n",
+                e->e_name.bv_val );
+        goto done;
+    }
+
+    LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
+        j++;
+    }
+
+    servers_subsys = ch_calloc( j, sizeof(monitor_subsys_t) );
+    LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
+        monitor_subsys_t *bk_mss;
+        struct berval bv;
+
+        bv.bv_len = sizeof( "cn=Server 4294967295" );
+        bv.bv_val = ch_malloc( bv.bv_len );
+        bv.bv_len = snprintf( bv.bv_val, bv.bv_len, "cn=Server %u", i );
+
+        bk_mss = &servers_subsys[i - 1];
+        bk_mss->mss_name = bv.bv_val;
+        ber_str2bv( LLOAD_MONITOR_BACKENDS_DN, 0, 0, &bk_mss->mss_dn );
+        ber_dupbv( &bk_mss->mss_rdn, &bv );
+        bk_mss->mss_flags = MONITOR_F_VOLATILE_CH;
+        bk_mss->mss_open = lload_monitor_backend_open;
+        bk_mss->mss_create = lload_monitor_up_conn_create;
+        bk_mss->mss_destroy = lload_monitor_subsystem_destroy;
+        bk_mss->mss_update = NULL;
+        bk_mss->mss_private = b;
+
+        if ( mbe->register_subsys_late( bk_mss ) ) {
+            Debug( LDAP_DEBUG_ANY, "lload_monitor_backends_init: "
+                    "failed to register %s subsystem",
+                    bk_mss->mss_name );
+            return -1;
+        }
+        i++;
+    }
+done:
+    entry_free( e );
+
+    return rc;
+}
+
+static char *lload_subsys_rdn[] = {
+    LLOAD_MONITOR_BALANCER_RDN,
+    LLOAD_MONITOR_INCOMING_RDN,
+    LLOAD_MONITOR_OPERATIONS_RDN,
+    LLOAD_MONITOR_BACKENDS_RDN,
+    NULL
+};
+
+static struct monitor_subsys_t balancer_subsys[] = {
+    {
+        LLOAD_MONITOR_BALANCER_NAME,
+        BER_BVNULL,
+        BER_BVC(SLAPD_MONITOR_BACKEND_DN),
+        BER_BVNULL,
+        { BER_BVC("Load Balancer information"),
+          BER_BVNULL },
+        MONITOR_F_PERSISTENT_CH,
+        lload_monitor_balancer_init,
+        lload_monitor_subsystem_destroy, /* destroy */
+        NULL,   /* update */
+        NULL,   /* create */
+        NULL    /* modify */
+    },
+    {
+        LLOAD_MONITOR_INCOMING_NAME,
+        BER_BVNULL,
+        BER_BVC(LLOAD_MONITOR_BALANCER_DN),
+        BER_BVNULL,
+        { BER_BVC("Load Balancer incoming connections"),
+          BER_BVNULL },
+        MONITOR_F_VOLATILE_CH,
+        lload_monitor_incoming_conn_init,
+        lload_monitor_subsystem_destroy, /* destroy */
+        NULL,   /* update */
+        NULL,   /* create */
+        NULL    /* modify */
+    },
+    {
+        LLOAD_MONITOR_OPERATIONS_NAME,
+        BER_BVNULL,
+        BER_BVC(LLOAD_MONITOR_BALANCER_DN),
+        BER_BVNULL,
+        { BER_BVC("Load Balancer global operation statistics"),
+          BER_BVNULL },
+        MONITOR_F_PERSISTENT_CH,
+        lload_monitor_ops_init,
+        lload_monitor_subsystem_destroy, /* destroy */
+        NULL,   /* update */
+        NULL,   /* create */
+        NULL    /* modify */
+    },
+    {
+        LLOAD_MONITOR_BACKENDS_NAME,
+        BER_BVNULL,
+        BER_BVC(LLOAD_MONITOR_BALANCER_DN),
+        BER_BVNULL,
+        { BER_BVC("Load Balancer Backends information"),
+          BER_BVNULL },
+        MONITOR_F_PERSISTENT_CH,
+        lload_monitor_backends_init,
+        lload_monitor_subsystem_destroy, /* destroy */
+        NULL,   /* update */
+        NULL,   /* create */
+        NULL    /* modify */
+    },
+    { NULL }
+};
+
+int
+lload_monitor_mss_init()
+{
+    BackendInfo *mi;
+    monitor_extra_t *mbe;
+    monitor_subsys_t *mss;
+    char **rdn;
+    int rc = 0;
+
+    mi = backend_info( "monitor" );
+    if ( !mi ) {
+        Debug( LDAP_DEBUG_CONFIG, "lload_monitor_mss_init: "
+                "not registering, monitor backend unavailable\n" );
+        return 0;
+    }
+    mbe = mi->bi_extra;
+
+    /* register the subsystems - Servers are registered in backends_init */
+    for ( mss = balancer_subsys, rdn = lload_subsys_rdn; mss->mss_name;
+            mss++, rdn++ ) {
+        ber_str2bv( *rdn, 0, 1, &mss->mss_rdn );
+        if ( mbe->register_subsys( mss ) ) {
+            Debug( LDAP_DEBUG_ANY, "lload_monitor_mss_init: "
+                    "failed to register %s subsystem",
+                    mss->mss_name );
+            return -1;
+        }
+    }
+    return rc;
+}
index fdcbd59bdd326b6bfe22bb6030373c4d1d46a34c..06c75be4f58fcca2460d7a23ed1268b302b0e199 100644 (file)
@@ -223,6 +223,9 @@ operation_destroy_from_client( LloadOperation *op )
         return;
     }
 
+    /* it seems we will be destroying the operation,
+     * so update the global rejected cunter if needed */
+    operation_update_global_rejected( op );
     /* 5. If we raced the upstream side and won, reclaim the token */
     ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
     if ( !(race_state & LLOAD_OP_DETACHING_UPSTREAM) ) {
@@ -281,6 +284,7 @@ operation_destroy_from_client( LloadOperation *op )
     if ( upstream ) {
         if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) {
             upstream->c_n_ops_executing--;
+            operation_update_conn_counters( op );
             b = (LloadBackend *)upstream->c_private;
         }
         CONNECTION_UNLOCK_OR_DESTROY(upstream);
@@ -288,6 +292,7 @@ operation_destroy_from_client( LloadOperation *op )
         if ( b ) {
             ldap_pvt_thread_mutex_lock( &b->b_mutex );
             b->b_n_ops_executing--;
+            operation_update_backend_counters( op, b );
             ldap_pvt_thread_mutex_unlock( &b->b_mutex );
         }
     }
@@ -331,9 +336,13 @@ operation_destroy_from_upstream( LloadOperation *op )
         return;
     }
 
+    /* it seems we will be destroying the operation,
+     * so update the global rejected cunter if needed */
+    operation_update_global_rejected( op );
     /* 2. Remove from the operation map and adjust the pending op count */
     if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) {
         upstream->c_n_ops_executing--;
+        operation_update_conn_counters( op );
         b = (LloadBackend *)upstream->c_private;
     }
 
@@ -357,6 +366,7 @@ operation_destroy_from_upstream( LloadOperation *op )
     if ( b ) {
         ldap_pvt_thread_mutex_lock( &b->b_mutex );
         b->b_n_ops_executing--;
+        operation_update_backend_counters( op, b );
         ldap_pvt_thread_mutex_unlock( &b->b_mutex );
     }
 
@@ -590,6 +600,10 @@ operation_abandon( LloadOperation *op )
     int rc = LDAP_SUCCESS;
 
     ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
+    /* for now consider all abandoned operations completed,
+     * perhaps add a separate counter later */
+    op->o_res = LLOAD_OP_COMPLETED;
+
     c = op->o_upstream;
     if ( !c ) {
         ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
@@ -620,6 +634,7 @@ operation_abandon( LloadOperation *op )
 
     ldap_pvt_thread_mutex_lock( &b->b_mutex );
     b->b_n_ops_executing--;
+    operation_update_backend_counters( op, b );
     ldap_pvt_thread_mutex_unlock( &b->b_mutex );
 
     if ( operation_send_abandon( op ) == LDAP_SUCCESS ) {
@@ -791,6 +806,7 @@ operation_lost_upstream( LloadOperation *op )
 
     CONNECTION_LOCK_DECREF(c);
     op->o_upstream_refcnt--;
+    op->o_res = LLOAD_OP_FAILED;
     operation_destroy_from_upstream( op );
     CONNECTION_UNLOCK(c);
 }
@@ -817,6 +833,7 @@ connection_timeout( LloadConnection *upstream, time_t threshold )
         }
 
         op->o_upstream_refcnt++;
+        op->o_res = LLOAD_OP_FAILED;
         found_op = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
         assert( op == found_op );
 
@@ -982,3 +999,43 @@ done:
             "timeout task finished\n" );
     evtimer_add( self, lload_timeout_api );
 }
+
+void
+operation_update_global_rejected( LloadOperation *op )
+{
+    if ( op->o_res == LLOAD_OP_REJECTED && op->o_upstream_connid == 0 ) {
+        switch ( op->o_tag ) {
+            case LDAP_REQ_BIND:
+                lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_rejected++;
+                break;
+            default:
+                lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_rejected++;
+                break;
+        }
+    }
+}
+
+void
+operation_update_conn_counters( LloadOperation *op )
+{
+    assert( op->o_upstream != NULL );
+    if ( op->o_res == LLOAD_OP_COMPLETED ) {
+        op->o_upstream->c_counters.lc_ops_completed++;
+    } else {
+        op->o_upstream->c_counters.lc_ops_failed++;
+    }
+}
+
+void
+operation_update_backend_counters( LloadOperation *op, LloadBackend *b )
+{
+    int stat_type = op->o_tag == LDAP_REQ_BIND ? LLOAD_STATS_OPS_BIND :
+                                                 LLOAD_STATS_OPS_OTHER;
+
+    assert( b != NULL );
+    if ( op->o_res == LLOAD_OP_COMPLETED ) {
+        b->b_counters[stat_type].lc_ops_completed++;
+    } else {
+        b->b_counters[stat_type].lc_ops_failed++;
+    }
+}
index 6efcac44a757f6d91cc3489c6b8166859fbe696b..22213d44e32e68877ae8a9b3c1e58920dfdc2b97 100644 (file)
@@ -65,6 +65,7 @@ LDAP_SLAPD_F (LloadConnection *) client_init( ber_socket_t s, LloadListener *url
 LDAP_SLAPD_F (void) client_reset( LloadConnection *c );
 LDAP_SLAPD_F (void) client_destroy( LloadConnection *c );
 LDAP_SLAPD_F (void) clients_destroy( void );
+LDAP_SLAPD_F (void) clients_walk( CONNECTION_CLIENT_WALK apply, void *argv );
 
 /*
  * config.c
@@ -126,6 +127,7 @@ LDAP_SLAPD_F (int) lload_exop_init( void );
  */
 LDAP_SLAPD_F (int) lload_init( int mode, const char *name );
 LDAP_SLAPD_F (int) lload_destroy( void );
+LDAP_SLAPD_F (void) lload_counters_init( void );
 
 /*
  * libevent_support.c
@@ -133,6 +135,13 @@ LDAP_SLAPD_F (int) lload_destroy( void );
 LDAP_SLAPD_F (int) lload_libevent_init( void );
 LDAP_SLAPD_F (void) lload_libevent_destroy( void );
 
+#ifdef BALANCER_MODULE
+/*
+ * monitor.c
+ */
+LDAP_SLAPD_F (int) lload_monitor_initialize( void );
+#endif /* BALANCER_MODULE */
+
 /*
  * operation.c
  */
@@ -150,7 +159,9 @@ LDAP_SLAPD_F (void) operation_lost_upstream( LloadOperation *op );
 LDAP_SLAPD_F (void) operation_destroy_from_client( LloadOperation *op );
 LDAP_SLAPD_F (void) operation_destroy_from_upstream( LloadOperation *op );
 LDAP_SLAPD_F (void) operations_timeout( evutil_socket_t s, short what, void *arg );
-
+LDAP_SLAPD_F (void) operation_update_conn_counters( LloadOperation *op );
+LDAP_SLAPD_F (void) operation_update_backend_counters( LloadOperation *op, LloadBackend *b );
+LDAP_SLAPD_F (void) operation_update_global_rejected( LloadOperation *op );
 /*
  * upstream.c
  */
@@ -181,6 +192,8 @@ LDAP_SLAPD_V (char *) global_host;
 LDAP_SLAPD_V (int) lber_debug;
 LDAP_SLAPD_V (int) ldap_syslog;
 
+LDAP_SLAPD_V (lload_global_stats_t) lload_stats;
+
 LDAP_END_DECL
 
 #endif /* PROTO_LLOAD_H */
index 7510e80200781a0552bde49b46027274bbecf177..41eaf779862495ffc158594fba209bd7f6ea5e0f 100644 (file)
@@ -89,6 +89,7 @@ forward_final_response(
             op->o_upstream_connid, op->o_upstream_msgid, op->o_client_connid );
     rc = forward_response( client, op, ber );
     CONNECTION_LOCK(op->o_upstream);
+    op->o_res = LLOAD_OP_COMPLETED;
     if ( !op->o_pin_id || !op->o_upstream_refcnt-- ) {
         operation_destroy_from_upstream( op );
     }
index ac8a50409b35c6d94eb395518912251fc87ec67f..e158aff0175e43439897c82368f5e463c7b10fc0 100644 (file)
@@ -1963,6 +1963,7 @@ monitor_back_initialize(
                { "olmGenericAttributes",               "olmSubSystemAttributes:0" },
                { "olmDatabaseAttributes",              "olmSubSystemAttributes:1" },
                { "olmOverlayAttributes",               "olmSubSystemAttributes:2" },
+               { "olmModuleAttributes",                "olmSubSystemAttributes:3" },
 
                /* for example, back-mdb specific attrs
                 * are in "olmDatabaseAttributes:12"
@@ -1975,6 +1976,7 @@ monitor_back_initialize(
                { "olmGenericObjectClasses",            "olmSubSystemObjectClasses:0" },
                { "olmDatabaseObjectClasses",           "olmSubSystemObjectClasses:1" },
                { "olmOverlayObjectClasses",            "olmSubSystemObjectClasses:2" },
+               { "olmModuleObjectClasses",                     "olmSubSystemObjectClasses:3" },
 
                /* for example, back-mdb specific objectClasses
                 * are in "olmDatabaseObjectClasses:12"