]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
Implement pause handlers
authorOndřej Kuzník <okuznik@symas.com>
Wed, 7 Feb 2018 12:38:40 +0000 (12:38 +0000)
committerOndřej Kuzník <okuznik@symas.com>
Tue, 17 Nov 2020 17:58:14 +0000 (17:58 +0000)
servers/lloadd/daemon.c
servers/lloadd/init.c
servers/lloadd/lload.h
servers/lloadd/module_init.c
servers/lloadd/proto-lload.h

index ed1a7b7a56c2203f0297336b14a3298af1449897..bb8fc8e22af613b5a4bb170e3ad8f397a4bf8b50 100644 (file)
@@ -74,6 +74,10 @@ volatile sig_atomic_t slapd_abrupt_shutdown = 0;
 
 static int emfile;
 
+ldap_pvt_thread_mutex_t lload_wait_mutex;
+ldap_pvt_thread_cond_t lload_wait_cond;
+ldap_pvt_thread_cond_t lload_pause_cond;
+
 #ifndef SLAPD_MAX_DAEMON_THREADS
 #define SLAPD_MAX_DAEMON_THREADS 16
 #endif
@@ -139,6 +143,43 @@ lloadd_close( ber_socket_t s )
     tcp_close( s );
 }
 
+static int
+lload_base_dispatch( struct event_base *base )
+{
+    int rc;
+
+    while ( (rc = event_base_dispatch( base )) == 0 ) {
+        if ( event_base_got_exit( base ) ) {
+            break;
+        }
+
+        Debug( LDAP_DEBUG_TRACE, "lload_base_dispatch: "
+                "handling pause\n" );
+        /*
+         * We are pausing, signal the pausing thread we've finished and
+         * wait until the thread pool resumes operation.
+         *
+         * Do this in lockstep with the pausing thread.
+         */
+        ldap_pvt_thread_mutex_lock( &lload_wait_mutex );
+        ldap_pvt_thread_cond_signal( &lload_wait_cond );
+
+        /* Now wait until we resume */
+        ldap_pvt_thread_cond_wait( &lload_pause_cond, &lload_wait_mutex );
+        ldap_pvt_thread_mutex_unlock( &lload_wait_mutex );
+
+        Debug( LDAP_DEBUG_TRACE, "lload_base_dispatch: "
+                "resuming\n" );
+    }
+
+    if ( rc ) {
+        Debug( LDAP_DEBUG_ANY, "lload_base_dispatch: "
+                "event_base_dispatch() returned an error rc=%d\n",
+                rc );
+    }
+    return rc;
+}
+
 static void
 lload_free_listener_addresses( struct sockaddr **sal )
 {
@@ -946,7 +987,7 @@ lload_listener(
 static void *
 lload_listener_thread( void *ctx )
 {
-    int rc = event_base_dispatch( listener_base );
+    int rc = lload_base_dispatch( listener_base );
     Debug( LDAP_DEBUG_ANY, "lload_listener_thread: "
             "event loop finished: rc=%d\n",
             rc );
@@ -1214,7 +1255,7 @@ lloadd_io_task( void *ptr )
     lload_daemon[tid].wakeup_event = event;
 
     /* run */
-    rc = event_base_dispatch( base );
+    rc = lload_base_dispatch( base );
     Debug( LDAP_DEBUG_ANY, "lloadd_io_task: "
             "Daemon %d, event loop finished: rc=%d\n",
             tid, rc );
@@ -1305,7 +1346,7 @@ lloadd_daemon( struct event_base *daemon_base )
     }
 
     lloadd_inited = 1;
-    rc = event_base_dispatch( daemon_base );
+    rc = lload_base_dispatch( daemon_base );
     Debug( LDAP_DEBUG_ANY, "lloadd shutdown: "
             "Main event loop finished: rc=%d\n",
             rc );
@@ -1352,6 +1393,55 @@ daemon_wakeup_cb( evutil_socket_t sig, short what, void *arg )
     }
 }
 
+#ifdef BALANCER_MODULE
+/*
+ * Signal the event base to terminate processing as soon as it can and wait for
+ * lload_base_dispatch to notify us this has happened.
+ */
+static int
+lload_pause_base( struct event_base *base )
+{
+    int rc;
+
+    ldap_pvt_thread_mutex_lock( &lload_wait_mutex );
+    event_base_loopbreak( base );
+    rc = ldap_pvt_thread_cond_wait( &lload_wait_cond, &lload_wait_mutex );
+    ldap_pvt_thread_mutex_unlock( &lload_wait_mutex );
+
+    return rc;
+}
+
+void
+lload_pause_server( void )
+{
+    int i;
+
+    lload_pause_base( listener_base );
+    lload_pause_base( daemon_base );
+
+    for ( i = 0; i < lload_daemon_threads; i++ ) {
+        lload_pause_base( lload_daemon[i].base );
+    }
+}
+
+void
+lload_unpause_server( void )
+{
+    /*
+     * Make sure lloadd is completely ready to unpause by now:
+     *
+     * After the broadcast, we handle I/O and begin filling the thread pool, in
+     * high load conditions, we might hit the pool limits and start processing
+     * operations in the I/O threads (one PDU per socket at a time for fairness
+     * sake) even before a pause has finished from slapd's point of view!
+     *
+     * When (max_pdus_per_cycle == 0) we don't use the pool for these at all and
+     * most lload processing starts immediately making this even more prominent.
+     */
+    ldap_pvt_thread_cond_broadcast( &lload_pause_cond );
+}
+#endif /* BALANCER_MODULE */
+
 void
 lload_sig_shutdown( evutil_socket_t sig, short what, void *arg )
 {
index a2f95d0d1b5a6640eaf7824ee9525124acb0ef85..96481ccc410c9ff079addc8cdae063927875dbc6 100644 (file)
@@ -101,6 +101,10 @@ lload_init( int mode, const char *name )
             LDAP_STAILQ_INIT( &slapd_rq.task_list );
             LDAP_STAILQ_INIT( &slapd_rq.run_list );
 
+            ldap_pvt_thread_mutex_init( &lload_wait_mutex );
+            ldap_pvt_thread_cond_init( &lload_wait_cond );
+            ldap_pvt_thread_cond_init( &lload_pause_cond );
+
             ldap_pvt_thread_mutex_init( &backend_mutex );
             ldap_pvt_thread_mutex_init( &clients_mutex );
             ldap_pvt_thread_mutex_init( &lload_pin_mutex );
index 2c7519aaad929953653da14e6d0f1ce18f475a22..73fde464fe57a76ef16937828dfc64c0c439a741 100644 (file)
@@ -92,6 +92,11 @@ LDAP_SLAPD_V (LloadBackend *) current_backend;
 LDAP_SLAPD_V (struct slap_bindconf) bindconf;
 LDAP_SLAPD_V (struct berval) lloadd_identity;
 
+/* Used to coordinate server (un)pause, shutdown */
+LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) lload_wait_mutex;
+LDAP_SLAPD_V (ldap_pvt_thread_cond_t) lload_pause_cond;
+LDAP_SLAPD_V (ldap_pvt_thread_cond_t) lload_wait_cond;
+
 typedef int lload_cf_aux_table_parse_x( struct berval *val,
         void *bc,
         slap_cf_aux_table *tab0,
index 7caa526a3b02dacbb6011092b26d53203ed8cbf0..4f8caff575c6a20ddad1cc90c3de59793e049e34 100644 (file)
@@ -64,6 +64,10 @@ lload_conn_pool_init()
 {
     int rc = 0;
 
+    ldap_pvt_thread_mutex_init( &lload_wait_mutex );
+    ldap_pvt_thread_cond_init( &lload_pause_cond );
+    ldap_pvt_thread_cond_init( &lload_wait_cond );
+
     ldap_pvt_thread_mutex_init( &backend_mutex );
     ldap_pvt_thread_mutex_init( &clients_mutex );
     ldap_pvt_thread_mutex_init( &lload_pin_mutex );
@@ -74,6 +78,20 @@ lload_conn_pool_init()
     return rc;
 }
 
+static int
+lload_pause_cb( BackendInfo *bi )
+{
+    lload_pause_server();
+    return 0;
+}
+
+static int
+lload_unpause_cb( BackendInfo *bi )
+{
+    lload_unpause_server();
+    return 0;
+}
+
 int
 lload_back_open( BackendInfo *bi )
 {
@@ -110,7 +128,10 @@ lload_back_close( BackendInfo *bi )
         return 0;
     }
 
+    ldap_pvt_thread_mutex_lock( &lload_wait_mutex );
     event_base_loopexit( daemon_base, NULL );
+    ldap_pvt_thread_cond_wait( &lload_wait_cond, &lload_wait_mutex );
+    ldap_pvt_thread_mutex_unlock( &lload_wait_mutex );
     ldap_pvt_thread_join( lloadd_main_thread, (void *)NULL );
 
     return 0;
@@ -122,6 +143,8 @@ lload_back_initialize( BackendInfo *bi )
     bi->bi_flags = SLAP_BFLAG_STANDALONE;
     bi->bi_open = lload_back_open;
     bi->bi_config = config_generic_wrapper;
+    bi->bi_pause = lload_pause_cb;
+    bi->bi_unpause = lload_unpause_cb;
     bi->bi_close = lload_back_close;
     bi->bi_destroy = 0;
 
index adfe96c145fd9d4060d13002c676e56d6e4d06fc..25baf8b77542a779fa80d25e2834228425e3e535 100644 (file)
@@ -104,6 +104,9 @@ LDAP_SLAPD_V (int) lload_daemon_mask;
 
 LDAP_SLAPD_F (void) lload_sig_shutdown( evutil_socket_t sig, short what, void *arg );
 
+LDAP_SLAPD_F (void) lload_pause_server( void );
+LDAP_SLAPD_F (void) lload_unpause_server( void );
+
 LDAP_SLAPD_V (struct event_base *) daemon_base;
 LDAP_SLAPD_V (struct evdns_base *) dnsbase;
 LDAP_SLAPD_V (volatile sig_atomic_t) slapd_shutdown;