]> git.ipfire.org Git - thirdparty/openldap.git/commitdiff
ITS#7926 dynamic changes to olcListenerThreads
authorHoward Chu <hyc@openldap.org>
Tue, 18 Aug 2020 19:14:38 +0000 (20:14 +0100)
committerHoward Chu <hyc@openldap.org>
Tue, 18 Aug 2020 21:37:50 +0000 (22:37 +0100)
Reallocates sockets from old to new listener threads

include/ldap_pvt_thread.h
libraries/libldap/tpool.c
servers/slapd/bconfig.c
servers/slapd/connection.c
servers/slapd/daemon.c
servers/slapd/proto-slap.h

index 1a0a012c3131b4e191f705a7ba78318495902014..67f7ede3eedb7af43fb474178ba62213e7836694 100644 (file)
@@ -282,6 +282,10 @@ LDAP_F( int )
 ldap_pvt_thread_pool_pausecheck LDAP_P((
        ldap_pvt_thread_pool_t *pool ));
 
+LDAP_F( int )
+ldap_pvt_thread_pool_pausecheck_native LDAP_P((
+       ldap_pvt_thread_pool_t *pool ));
+
 LDAP_F( int )
 ldap_pvt_thread_pool_pause LDAP_P((
        ldap_pvt_thread_pool_t *pool ));
index 5df3dd76af639cbf5b517f5cd4113bc28056a3a6..67507c0ae61998443b70cd0f66dae6c1565b4a77 100644 (file)
@@ -1239,6 +1239,32 @@ ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
        return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
 }
 
+/*
+ * Wait for a pause, from a non-pooled thread.
+ */
+int
+ldap_pvt_thread_pool_pausecheck_native( ldap_pvt_thread_pool_t *tpool )
+{
+       struct ldap_int_thread_pool_s *pool;
+
+       if (tpool == NULL)
+               return(-1);
+
+       pool = *tpool;
+
+       if (pool == NULL)
+               return(0);
+
+       if (!pool->ltp_pause)
+               return(0);
+
+       ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
+       while (pool->ltp_pause)
+                       ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
+       ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
+       return 1;
+}
+
 /*
  * Pause the pool.  The calling task must be active, not idle.
  * Return when all other tasks are paused or idle.
index ead975b3be59651d2f97337168329d5df045ff06..c8bcdbadd287ba78ddc7c1eaa1a92d7ee856bce1 100644 (file)
@@ -1043,6 +1043,14 @@ typedef struct ADlist {
 
 static ADlist *sortVals;
 
+static int new_daemon_threads;
+
+static int
+config_resize_lthreads(ConfigArgs *c)
+{
+       return slapd_daemon_resize( new_daemon_threads );
+}
+
 static int
 config_generic(ConfigArgs *c) {
        int i;
@@ -1806,7 +1814,7 @@ config_generic(ConfigArgs *c) {
                case CFG_THREADQS:
                        if ( c->value_int < 1 ) {
                                snprintf( c->cr_msg, sizeof( c->cr_msg ),
-                                       "threadqueuess=%d smaller than minimum value 1",
+                                       "threadqueues=%d smaller than minimum value 1",
                                        c->value_int );
                                Debug(LDAP_DEBUG_ANY, "%s: %s.\n",
                                        c->log, c->cr_msg );
@@ -1824,6 +1832,14 @@ config_generic(ConfigArgs *c) {
                        break;
 
                case CFG_LTHREADS:
+                       if ( c->value_uint < 1 ) {
+                               snprintf( c->cr_msg, sizeof( c->cr_msg ),
+                                       "listenerthreads=%u smaller than minimum value 1",
+                                       c->value_uint );
+                               Debug(LDAP_DEBUG_ANY, "%s: %s.\n",
+                                       c->log, c->cr_msg );
+                               return 1;
+                       }
                        { int mask = 0;
                        /* use a power of two */
                        while (c->value_uint > 1) {
@@ -1831,8 +1847,8 @@ config_generic(ConfigArgs *c) {
                                mask <<= 1;
                                mask |= 1;
                        }
-                       slapd_daemon_mask = mask;
-                       slapd_daemon_threads = mask+1;
+                       new_daemon_threads = mask+1;
+                       config_push_cleanup( c, config_resize_lthreads );
                        }
                        break;
 
@@ -4195,11 +4211,11 @@ config_tls_option(ConfigArgs *c) {
        if (c->op == SLAP_CONFIG_EMIT) {
                return ldap_pvt_tls_get_option( ld, flag, berval ? (void *)&c->value_bv : (void *)&c->value_string );
        } else if ( c->op == LDAP_MOD_DELETE ) {
-               c->cleanup = config_tls_cleanup;
+               config_push_cleanup( c, config_tls_cleanup );
                return ldap_pvt_tls_set_option( ld, flag, NULL );
        }
        if ( !berval ) ch_free(c->value_string);
-       c->cleanup = config_tls_cleanup;
+       config_push_cleanup( c, config_tls_cleanup );
        rc = ldap_pvt_tls_set_option(ld, flag, berval ? (void *)&c->value_bv : (void *)c->argv[1]);
        if ( berval ) ch_free(c->value_bv.bv_val);
        return rc;
@@ -4223,11 +4239,11 @@ config_tls_config(ConfigArgs *c) {
                return slap_tls_get_config( slap_tls_ld, flag, &c->value_string );
        } else if ( c->op == LDAP_MOD_DELETE ) {
                int i = 0;
-               c->cleanup = config_tls_cleanup;
+               config_push_cleanup( c, config_tls_cleanup );
                return ldap_pvt_tls_set_option( slap_tls_ld, flag, &i );
        }
        ch_free( c->value_string );
-       c->cleanup = config_tls_cleanup;
+       config_push_cleanup( c, config_tls_cleanup );
        if ( isdigit( (unsigned char)c->argv[1][0] ) && c->type != CFG_TLS_PROTOCOL_MIN ) {
                if ( lutil_atoi( &i, c->argv[1] ) != 0 ) {
                        Debug(LDAP_DEBUG_ANY, "%s: "
@@ -5613,8 +5629,8 @@ ok:
                                rc = ca->bi->bi_db_open( ca->be, &ca->reply );
                                ca->be->bd_info = bi_orig;
                        }
-               } else if ( ca->cleanup ) {
-                       rc = ca->cleanup( ca );
+               } else if ( ca->num_cleanups ) {
+                       rc = config_run_cleanup( ca );
                }
                if ( rc ) {
                        if (ca->cr_msg[0] == '\0')
@@ -5684,8 +5700,8 @@ done:
                        overlay_destroy_one( ca->be, (slap_overinst *)ca->bi );
                } else if ( coptr->co_type == Cft_Schema ) {
                        schema_destroy_one( ca, colst, nocs, last );
-               } else if ( ca->cleanup ) {
-                       ca->cleanup( ca );
+               } else if ( ca->num_cleanups ) {
+                       config_run_cleanup( ca );
                }
        }
 done_noop:
@@ -6224,8 +6240,8 @@ out:
                ca->reply = msg;
        }
 
-       if ( ca->cleanup ) {
-               i = ca->cleanup( ca );
+       if ( ca->num_cleanups ) {
+               i = config_run_cleanup( ca );
                if (rc == LDAP_SUCCESS)
                        rc = i;
        }
index 6bb48b87b9a2f43c82d1e900ff090b07a1e4cdce..e9130a9d0bf2a4bf430122449cffd88da1b8b337 100644 (file)
@@ -706,6 +706,17 @@ connection_destroy( Connection *c )
        }
 }
 
+int connection_is_active( ber_socket_t s )
+{
+       Connection *c;
+
+       assert( s < dtblsize );
+       c = &connections[s];
+       return c->c_conn_state == SLAP_C_CLOSING ||
+               c->c_conn_state == SLAP_C_BINDING ||
+               c->c_conn_state == SLAP_C_ACTIVE ;
+}
+
 int connection_valid( Connection *c )
 {
        /* c_mutex must be locked by caller */
index 050a62ce3ef29dc0feae4ad01e4ec19d8af4a05d..48d502136e203e9d83bbee6966aa33c123a28c26 100644 (file)
@@ -81,9 +81,6 @@ ber_socket_t dtblsize;
 slap_ssf_t local_ssf = LDAP_PVT_SASL_LOCAL_SSF;
 struct runqueue_s slapd_rq;
 
-#ifndef SLAPD_MAX_DAEMON_THREADS
-#define SLAPD_MAX_DAEMON_THREADS       16
-#endif
 int slapd_daemon_threads = 1;
 int slapd_daemon_mask;
 
@@ -94,7 +91,6 @@ int slapd_tcp_wmem;
 
 Listener **slap_listeners = NULL;
 static volatile sig_atomic_t listening = 1; /* 0 when slap_listeners closed */
-static ldap_pvt_thread_t *listener_tid;
 
 #ifndef SLAPD_LISTEN_BACKLOG
 #define SLAPD_LISTEN_BACKLOG 2048
@@ -102,7 +98,9 @@ static ldap_pvt_thread_t *listener_tid;
 
 #define        DAEMON_ID(fd)   (fd & slapd_daemon_mask)
 
-static ber_socket_t wake_sds[SLAPD_MAX_DAEMON_THREADS][2];
+typedef ber_socket_t sdpair[2];
+
+static sdpair *wake_sds;
 static ldap_pvt_thread_mutex_t emfile_mutex;
 static int emfile;
 
@@ -136,6 +134,7 @@ typedef struct slap_daemon_st {
        ber_socket_t            sd_nactives;
        int                     sd_nwriters;
        int                     sd_nfds;
+       ldap_pvt_thread_t       sd_tid;
 
 #if defined(HAVE_KQUEUE)
        uint8_t*        sd_fdmodes; /* indexed by fd */
@@ -173,7 +172,7 @@ typedef struct slap_daemon_st {
 #endif /* ! kqueue && ! epoll && ! /dev/poll */
 } slap_daemon_st;
 
-static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS];
+static slap_daemon_st *slap_daemon;
 
 /*
  * NOTE: naming convention for macros:
@@ -1881,11 +1880,13 @@ slapd_daemon_init( const char *urls )
        Debug( LDAP_DEBUG_ARGS, "daemon_init: %s\n",
                urls ? urls : "<null>" );
 
-       for ( i=0; i<SLAPD_MAX_DAEMON_THREADS; i++ ) {
+       wake_sds = ch_malloc( slapd_daemon_threads * sizeof( sdpair ));
+       for ( i=0; i<slapd_daemon_threads; i++ ) {
                wake_sds[i][0] = AC_SOCKET_INVALID;
                wake_sds[i][1] = AC_SOCKET_INVALID;
        }
 
+       slap_daemon = ch_calloc( slapd_daemon_threads, sizeof( slap_daemon_st ));
        ldap_pvt_thread_mutex_init( &slap_daemon[0].sd_mutex );
 #ifdef HAVE_TCPD
        ldap_pvt_thread_mutex_init( &sd_tcpd_mutex );
@@ -1972,6 +1973,61 @@ slapd_daemon_init( const char *urls )
        return !i;
 }
 
+/* transfer control of active sockets from old to new listener threads */
+static void
+slapd_socket_realloc( int newnum )
+{
+       int i, j, oldid, newid;
+       int newmask = newnum - 1;
+       Listener *sl;
+       int num_listeners;
+
+       for ( i=0; slap_listeners[i] != NULL; i++ ) ;
+       num_listeners = i;
+
+       for ( i=0; i<dtblsize; i++ ) {
+               int skip = 0;
+
+               /* don't bother with wake_sds, they're assigned independent of mask */
+               for (j=0; j<slapd_daemon_threads; j++) {
+                       if ( i == wake_sds[j][0] || i == wake_sds[j][1] ) {
+                               skip = 1;
+                               break;
+                       }
+               }
+               if ( skip ) continue;
+
+               oldid = DAEMON_ID(i);
+               newid = i & newmask;
+               if ( oldid == newid ) continue;
+               if ( !SLAP_SOCK_IS_ACTIVE( oldid, i )) continue;
+               sl = NULL;
+               if ( num_listeners ) {
+                       for ( j=0; slap_listeners[j] != NULL; j++ ) {
+                               if ( slap_listeners[j]->sl_sd == i ) {
+                                       sl = slap_listeners[j];
+                                       num_listeners--;
+                                       break;
+                               }
+                       }
+               }
+               SLAP_SOCK_ADD( newid, i, sl );
+               if ( SLAP_SOCK_IS_READ( oldid, i )) {
+                       SLAP_SOCK_SET_READ( newid, i );
+               }
+               if ( SLAP_SOCK_IS_WRITE( oldid, i )) {
+                       SLAP_SOCK_SET_WRITE( newid, i );
+                       slap_daemon[oldid].sd_nwriters--;
+                       slap_daemon[newid].sd_nwriters++;
+               }
+               if ( connection_is_active( i )) {
+                       slap_daemon[oldid].sd_nactives--;
+                       slap_daemon[newid].sd_nactives++;
+               }
+               SLAP_SOCK_DEL( oldid, i );
+       }
+}
+
 
 int
 slapd_daemon_destroy( void )
@@ -2409,7 +2465,8 @@ slapd_daemon_task(
        int l;
        time_t last_idle_check = 0;
        int ebadf = 0;
-       int tid = (ldap_pvt_thread_t *) ptr - listener_tid;
+       int tid = (slap_daemon_st *) ptr - slap_daemon;
+       int old_threads = slapd_daemon_threads;
 
 #define SLAPD_IDLE_CHECK_LIMIT 4
 
@@ -2783,6 +2840,8 @@ loop:
                                continue;
                        }
 
+                       if ( DAEMON_ID( lr->sl_sd ) != tid ) continue;
+
                        if ( lr->sl_mute ) {
                                Debug( LDAP_DEBUG_CONNS,
                                        "daemon: " SLAP_EVENT_FNAME ": "
@@ -2870,6 +2929,7 @@ loop:
 
                        if ( ns <= 0 ) break;
                        if ( slap_listeners[l]->sl_sd == AC_SOCKET_INVALID ) continue;
+                       if ( DAEMON_ID( slap_listeners[l]->sl_sd ) != tid ) continue;
 #ifdef LDAP_CONNECTIONLESS
                        if ( slap_listeners[l]->sl_is_udp ) continue;
 #endif /* LDAP_CONNECTIONLESS */
@@ -3088,6 +3148,13 @@ loop:
                }
 #endif /* SLAP_EVENTS_ARE_INDEXED */
 
+               /* Was number of listener threads decreased? */
+               if ( ldap_pvt_thread_pool_pausecheck_native( &connection_pool )) {
+                       /* decreased, let this thread finish */
+                       if ( tid >= slapd_daemon_threads )
+                               break;
+               }
+
 #ifndef HAVE_YIELDING_SELECT
                ldap_pvt_thread_yield();
 #endif /* ! HAVE_YIELDING_SELECT */
@@ -3136,6 +3203,107 @@ loop:
        return NULL;
 }
 
+typedef struct slap_tid_waiter {
+       int num_tids;
+       ldap_pvt_thread_t tids[0];
+} slap_tid_waiter;
+
+static void *
+slapd_daemon_tid_cleanup(
+       void *ctx,
+       void *ptr )
+{
+       slap_tid_waiter *tids = ptr;
+       int i;
+
+       for ( i=0; i<tids->num_tids; i++ )
+               ldap_pvt_thread_join( tids->tids[i], (void *)NULL );
+       ch_free( ptr );
+       return NULL;
+}
+
+int
+slapd_daemon_resize( int newnum )
+{
+       int i, rc;
+
+       if ( newnum == slapd_daemon_threads )
+               return 0;
+
+       /* wake up all current listener threads */
+       for ( i=0; i<slapd_daemon_threads; i++ )
+               WAKE_LISTENER(i,1);
+
+       /* mutexes may not survive realloc, so destroy & recreate later */
+       for ( i=0; i<slapd_daemon_threads; i++ )
+               ldap_pvt_thread_mutex_destroy( &slap_daemon[i].sd_mutex );
+
+       if ( newnum > slapd_daemon_threads ) {
+               wake_sds = ch_realloc( wake_sds, newnum * sizeof( sdpair ));
+               slap_daemon = ch_realloc( slap_daemon, newnum * sizeof( slap_daemon_st ));
+
+               for ( i=slapd_daemon_threads; i<newnum; i++ )
+               {
+                       memset( &slap_daemon[i], 0, sizeof( slap_daemon_st ));
+                       if( (rc = lutil_pair( wake_sds[i] )) < 0 ) {
+                               Debug( LDAP_DEBUG_ANY,
+                                       "daemon: lutil_pair() failed rc=%d\n", rc );
+                               return rc;
+                       }
+                       ber_pvt_socket_set_nonblock( wake_sds[i][1], 1 );
+
+                       SLAP_SOCK_INIT(i);
+               }
+
+               for ( i=0; i<newnum; i++ )
+                       ldap_pvt_thread_mutex_init( &slap_daemon[i].sd_mutex );
+
+               slapd_socket_realloc( newnum );
+
+               for ( i=slapd_daemon_threads; i<newnum; i++ )
+               {
+                       /* listener as a separate THREAD */
+                       rc = ldap_pvt_thread_create( &slap_daemon[i].sd_tid,
+                               0, slapd_daemon_task, &slap_daemon[i] );
+
+                       if ( rc != 0 ) {
+                               Debug( LDAP_DEBUG_ANY,
+                               "listener ldap_pvt_thread_create failed (%d)\n", rc );
+                               return rc;
+                       }
+               }
+       } else {
+               int j;
+               slap_tid_waiter *tids = ch_malloc( sizeof(slap_tid_waiter) +
+                       ((slapd_daemon_threads - newnum) * sizeof(ldap_pvt_thread_t )));
+               slapd_socket_realloc( newnum );
+               tids->num_tids = slapd_daemon_threads - newnum;
+               for ( i=newnum, j=0; i<slapd_daemon_threads; i++, j++ ) {
+                       tids->tids[j] = slap_daemon[i].sd_tid;
+#ifdef HAVE_WINSOCK
+                       if ( wake_sds[i][1] != INVALID_SOCKET &&
+                               SLAP_FD2SOCK( wake_sds[i][1] ) != SLAP_FD2SOCK( wake_sds[i][0] ))
+#endif /* HAVE_WINSOCK */
+                               tcp_close( SLAP_FD2SOCK(wake_sds[i][1]) );
+#ifdef HAVE_WINSOCK
+                       if ( wake_sds[i][0] != INVALID_SOCKET )
+#endif /* HAVE_WINSOCK */
+                               tcp_close( SLAP_FD2SOCK(wake_sds[i][0]) );
+
+                       SLAP_SOCK_DESTROY( i );
+               }
+
+               wake_sds = ch_realloc( wake_sds, newnum * sizeof( sdpair ));
+               slap_daemon = ch_realloc( slap_daemon, newnum * sizeof( slap_daemon_st ));
+               for ( i=0; i<newnum; i++ )
+                       ldap_pvt_thread_mutex_init( &slap_daemon[i].sd_mutex );
+               ldap_pvt_thread_pool_submit( &connection_pool,
+                       slapd_daemon_tid_cleanup, (void *) tids );
+       }
+       slapd_daemon_threads = newnum;
+       slapd_daemon_mask = newnum - 1;
+       return 0;
+}
 
 #ifdef LDAP_CONNECTIONLESS
 static int
@@ -3177,11 +3345,6 @@ slapd_daemon( void )
        connectionless_init();
 #endif /* LDAP_CONNECTIONLESS */
 
-       if ( slapd_daemon_threads > SLAPD_MAX_DAEMON_THREADS )
-               slapd_daemon_threads = SLAPD_MAX_DAEMON_THREADS;
-
-       listener_tid = ch_malloc(slapd_daemon_threads * sizeof(ldap_pvt_thread_t));
-
        SLAP_SOCK_INIT2();
 
        /* daemon_init only inits element 0 */
@@ -3202,8 +3365,8 @@ slapd_daemon( void )
        for ( i=0; i<slapd_daemon_threads; i++ )
        {
                /* listener as a separate THREAD */
-               rc = ldap_pvt_thread_create( &listener_tid[i],
-                       0, slapd_daemon_task, &listener_tid[i] );
+               rc = ldap_pvt_thread_create( &slap_daemon[i].sd_tid,
+                       0, slapd_daemon_task, &slap_daemon[i] );
 
                if ( rc != 0 ) {
                        Debug( LDAP_DEBUG_ANY,
@@ -3214,11 +3377,9 @@ slapd_daemon( void )
 
        /* wait for the listener threads to complete */
        for ( i=0; i<slapd_daemon_threads; i++ )
-               ldap_pvt_thread_join( listener_tid[i], (void *)NULL );
+               ldap_pvt_thread_join( slap_daemon[i].sd_tid, (void *)NULL );
 
        destroy_listeners();
-       ch_free( listener_tid );
-       listener_tid = NULL;
 
        return 0;
 }
index 628f165fedcf57dd58479657b299498cfeee0e1b..470ebf31c581eb5de1d64db5a6e8e9f2f24303e1 100644 (file)
@@ -802,6 +802,7 @@ LDAP_SLAPD_F (Connection *) connection_init LDAP_P((
 
 LDAP_SLAPD_F (void) connection_closing LDAP_P((
        Connection *c, const char *why ));
+LDAP_SLAPD_F (int) connection_is_active LDAP_P(( ber_socket_t s ));
 LDAP_SLAPD_F (int) connection_valid LDAP_P(( Connection *c ));
 LDAP_SLAPD_F (const char *) connection_state2str LDAP_P(( int state ))
        LDAP_GCCATTR((const));
@@ -876,6 +877,7 @@ LDAP_SLAPD_F (void) slap_queue_csn LDAP_P(( Operation *, struct berval * ));
  */
 LDAP_SLAPD_F (void) slapd_add_internal(ber_socket_t s, int isactive);
 LDAP_SLAPD_F (int) slapd_daemon_init( const char *urls );
+LDAP_SLAPD_F (int) slapd_daemon_resize( int newnum );
 LDAP_SLAPD_F (int) slapd_daemon_destroy(void);
 LDAP_SLAPD_F (int) slapd_daemon(void);
 LDAP_SLAPD_F (Listener **)     slapd_get_listeners LDAP_P((void));