]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9820 #resolve [Add thread_pool to libks]
authorAnthony Minessale <anthm@freeswitch.org>
Tue, 6 Dec 2016 22:46:08 +0000 (16:46 -0600)
committerAnthony Minessale <anthm@freeswitch.org>
Tue, 6 Dec 2016 22:46:08 +0000 (16:46 -0600)
libs/libks/Makefile.am
libs/libks/src/include/ks.h
libs/libks/src/include/ks_thread_pool.h [new file with mode: 0644]
libs/libks/src/include/ks_types.h
libs/libks/src/ks_mutex.c
libs/libks/src/ks_thread_pool.c [new file with mode: 0644]
libs/libks/test/Makefile.am
libs/libks/test/testq.c

index b04069b419efe8e2533ed5b566a83bdc3797ac5a..897236bd1cc3af3b9bfc4cdb0be80092b7e85aaf 100644 (file)
@@ -7,7 +7,7 @@ AM_CFLAGS    += -I$(top_srcdir)/src -I$(top_srcdir)/src/include -I$(top_srcdir)/
 AM_CPPFLAGS  = $(AM_CFLAGS)
 
 lib_LTLIBRARIES          = libks.la
-libks_la_SOURCES  = src/ks.c src/ks_string.c src/ks_json.c src/ks_thread.c src/ks_mutex.c src/ks_config.c
+libks_la_SOURCES  = src/ks.c src/ks_string.c src/ks_json.c src/ks_thread.c src/ks_thread_pool.c src/ks_mutex.c src/ks_config.c
 libks_la_SOURCES += src/ks_log.c src/ks_socket.c src/ks_buffer.c src/ks_pool.c src/simclist.c
 libks_la_SOURCES += src/ks_time.c src/ks_printf.c src/ks_hash.c src/ks_q.c src/ks_dso.c src/ks_dht.c
 libks_la_SOURCES += src/ks_ssl.c src/kws.c src/ks_rng.c
@@ -22,6 +22,7 @@ libks_la_LDFLAGS  = $(AM_LDFLAGS) -version-info 0:1:0 -lncurses -lpthread -lm
 
 library_includedir     = $(prefix)/include
 library_include_HEADERS = src/include/ks_config.h src/include/ks.h src/include/ks_threadmutex.h src/include/ks_json.h src/include/ks_buffer.h
+library_include_HEADERS += src/include/ks_thread_pool.h
 library_include_HEADERS += src/include/ks_pool.h src/include/simclist.h src/include/ks_time.h src/include/ks_q.h src/include/ks_socket.h
 library_include_HEADERS += src/include/ks_dso.h src/include/ks_dht.h src/include/ks_platform.h src/include/ks_types.h # src/include/ks_rng.h 
 library_include_HEADERS += src/include/ks_printf.h src/include/ks_hash.h src/include/ks_ssl.h src/include/kws.h
index 90c06879eb380b9e6b3039196a04659e169824e2..7839e3f61f05285f3715dc3073ebfdeced5536c5 100644 (file)
@@ -115,6 +115,7 @@ KS_DECLARE(void) ks_random_string(char *buf, uint16_t len, char *set);
 #include "ks_printf.h"
 #include "ks_json.h"
 #include "ks_threadmutex.h"
+#include "ks_thread_pool.h"
 #include "ks_hash.h"
 #include "ks_config.h"
 #include "ks_q.h"
diff --git a/libs/libks/src/include/ks_thread_pool.h b/libs/libks/src/include/ks_thread_pool.h
new file mode 100644 (file)
index 0000000..35ea849
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#ifndef _KS_THREAD_POOL_H_
+#define _KS_THREAD_POOL_H_
+
+KS_BEGIN_EXTERN_C
+KS_DECLARE(ks_status_t) ks_thread_pool_create(ks_thread_pool_t **tp, uint32_t min, uint32_t max, size_t stack_size, 
+                                                                                         ks_thread_priority_t priority, uint32_t idle_sec);
+KS_DECLARE(ks_status_t) ks_thread_pool_destroy(ks_thread_pool_t **tp);
+KS_DECLARE(ks_status_t) ks_thread_pool_add_job(ks_thread_pool_t *tp, ks_thread_function_t func, void *data);
+KS_DECLARE(ks_size_t) ks_thread_pool_backlog(ks_thread_pool_t *tp);
+
+KS_END_EXTERN_C
+
+#endif                                                 /* defined(_KS_THREAD_POOL_H_) */
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index f10a0229a1a9ef5403c9d8a378bb0bfe063c6be2..8c088bdd4005964cece2f0d6a13b8a46adcb469e 100644 (file)
@@ -210,6 +210,8 @@ struct ks_q_s;
 typedef struct ks_q_s ks_q_t;
 typedef void (*ks_flush_fn_t)(ks_q_t *q, void *ptr, void *flush_data);
 
+typedef struct ks_thread_pool_s ks_thread_pool_t;
+
 KS_END_EXTERN_C
 
 #endif                                                 /* defined(_KS_TYPES_H_) */
index 6c14f249584f6297e19cbc1e5ffe0a2efba0c888..aab2db845c2c04c3af857b8da537b9523ef7ca0b 100644 (file)
@@ -393,10 +393,14 @@ KS_DECLARE(ks_status_t) ks_cond_timedwait(ks_cond_t *cond, ks_time_t ms)
 #else
        struct timespec ts;
        ks_time_t n = ks_time_now() + (ms * 1000);
+       int r = 0;
+
        ts.tv_sec   = ks_time_sec(n);
        ts.tv_nsec  = ks_time_nsec(n);
-       if (pthread_cond_timedwait(&cond->cond, &cond->mutex->mutex, &ts)) {
-               switch(errno) {
+       r = pthread_cond_timedwait(&cond->cond, &cond->mutex->mutex, &ts);
+
+       if (r) {
+               switch(r) {
                case ETIMEDOUT:
                        return KS_STATUS_TIMEOUT;
                default:
diff --git a/libs/libks/src/ks_thread_pool.c b/libs/libks/src/ks_thread_pool.c
new file mode 100644 (file)
index 0000000..aa0ff29
--- /dev/null
@@ -0,0 +1,255 @@
+/*
+ * Copyright (c) 2007-2014, Anthony Minessale II
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <ks.h>
+
+#define TP_MAX_QLEN 1024
+
+typedef enum {
+       TP_STATE_DOWN = 0,
+       TP_STATE_RUNNING = 1
+} ks_thread_pool_state_t;
+
+struct ks_thread_pool_s {
+       uint32_t min;
+       uint32_t max;
+       uint32_t idle_sec;
+       size_t stack_size;
+       ks_thread_priority_t priority;
+       ks_q_t *q;
+       uint32_t thread_count;
+       uint32_t busy_thread_count;
+       uint32_t running_thread_count;
+       uint32_t dying_thread_count;
+       ks_thread_pool_state_t state;
+       ks_mutex_t *mutex;
+       ks_pool_t *pool;
+};
+
+typedef struct ks_thread_job_s {
+       ks_thread_function_t func;
+       void *data;
+} ks_thread_job_t;
+
+
+static void *worker_thread(ks_thread_t *thread, void *data);
+
+static int check_queue(ks_thread_pool_t *tp, ks_bool_t adding)
+{
+       ks_thread_t *thread;
+       int need = 0;
+
+       ks_mutex_lock(tp->mutex);
+
+       if (tp->state != TP_STATE_RUNNING) {
+               ks_mutex_unlock(tp->mutex);
+               return 1;
+       }
+
+
+       if (tp->thread_count < tp->min) {
+               need = tp->min - tp->thread_count;
+       }
+
+
+       if (adding) {
+               if (!need && tp->busy_thread_count >= tp->running_thread_count - tp->dying_thread_count && 
+                       (tp->thread_count - tp->dying_thread_count + 1 <= tp->max)) {
+                       need = 1;
+               }
+       }
+
+       tp->thread_count += need;
+
+       ks_mutex_unlock(tp->mutex);
+
+       while(need > 0) {
+               if (ks_thread_create_ex(&thread, worker_thread, tp, KS_THREAD_FLAG_DETATCHED, tp->stack_size, tp->priority, tp->pool) != KS_STATUS_SUCCESS) {
+                       ks_mutex_lock(tp->mutex);
+                       tp->thread_count--;
+                       ks_mutex_unlock(tp->mutex);
+               }
+               
+               need--;
+       }
+
+       ks_log(KS_LOG_DEBUG, "WORKER check: adding %d need %d running %d dying %d total %d max %d\n", 
+                  adding, need, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max);
+
+       return need;
+}
+
+static uint32_t TID = 0;
+
+static void *worker_thread(ks_thread_t *thread, void *data)
+{
+       ks_thread_pool_t *tp = (ks_thread_pool_t *) data;
+       uint32_t idle_sec = 0;
+       uint32_t my_id = 0;
+       int die = 0;
+
+       ks_mutex_lock(tp->mutex);
+       tp->running_thread_count++;
+       my_id = ++TID;
+       ks_mutex_unlock(tp->mutex);
+
+       while(tp->state == TP_STATE_RUNNING) {
+               ks_thread_job_t *job;
+               void *pop = NULL;
+               ks_status_t status;
+               
+               status = ks_q_pop_timeout(tp->q, &pop, 1000);
+
+               ks_log(KS_LOG_DEBUG, "WORKER %d idle_sec %d running %d dying %d total %d max %d\n", 
+                          my_id, idle_sec, tp->running_thread_count, tp->dying_thread_count, tp->thread_count, tp->max);               
+               
+               check_queue(tp, KS_FALSE);
+               
+               if (status == KS_STATUS_TIMEOUT) {
+                       idle_sec++;
+
+                       if (idle_sec >= tp->idle_sec) {
+
+                               ks_mutex_lock(tp->mutex);
+                               if (tp->running_thread_count - tp->dying_thread_count - tp->busy_thread_count > tp->min) {
+                                       tp->dying_thread_count++;
+                                       die = 1;
+                               }
+                               ks_mutex_unlock(tp->mutex);
+
+                               if (die) {
+                                       ks_log(KS_LOG_DEBUG, "WORKER %d IDLE TIMEOUT\n", my_id);
+                                       break;
+                               }
+                       }
+
+                       continue;
+               }
+
+               if ((status != KS_STATUS_SUCCESS && status != KS_STATUS_BREAK) || !pop) {
+                       ks_log(KS_LOG_DEBUG, "WORKER %d POP FAIL %d %p\n", my_id, status, (void *)pop);
+                       break;
+               }
+
+               job = (ks_thread_job_t *) pop;
+
+               ks_mutex_lock(tp->mutex);
+               tp->busy_thread_count++;
+               ks_mutex_unlock(tp->mutex);
+               
+               idle_sec = 0;
+               job->func(thread, job->data);
+               
+               ks_pool_free(tp->pool, job);
+
+               ks_mutex_lock(tp->mutex);
+               tp->busy_thread_count--;
+               ks_mutex_unlock(tp->mutex);
+       }
+
+       ks_mutex_lock(tp->mutex);
+       tp->running_thread_count--;
+       tp->thread_count--;
+       if (die) {
+               tp->dying_thread_count--;
+       }
+       ks_mutex_unlock(tp->mutex);
+
+       return NULL;
+}
+
+KS_DECLARE(ks_status_t) ks_thread_pool_create(ks_thread_pool_t **tp, uint32_t min, uint32_t max, size_t stack_size, 
+                                                                                         ks_thread_priority_t priority, uint32_t idle_sec)
+{
+       ks_pool_t *pool;
+
+       ks_pool_open(&pool);
+
+       *tp = (ks_thread_pool_t *) ks_pool_alloc(pool, sizeof(ks_thread_t));
+
+       (*tp)->min = min;
+       (*tp)->max = max;
+       (*tp)->pool = pool;
+       (*tp)->stack_size = stack_size;
+       (*tp)->priority = priority;
+       (*tp)->state = TP_STATE_RUNNING;
+       (*tp)->idle_sec = idle_sec;
+
+       ks_mutex_create(&(*tp)->mutex, KS_MUTEX_FLAG_DEFAULT, (*tp)->pool);
+       ks_q_create(&(*tp)->q, (*tp)->pool, TP_MAX_QLEN);
+
+       check_queue(*tp, KS_FALSE);
+
+       return KS_STATUS_SUCCESS;
+
+}
+
+
+KS_DECLARE(ks_status_t) ks_thread_pool_destroy(ks_thread_pool_t **tp)
+{
+       ks_pool_t *pool;
+
+       ks_assert(tp);
+
+       (*tp)->state = TP_STATE_DOWN;
+
+       while((*tp)->thread_count) {
+               ks_sleep(100000);
+       }
+
+       pool = (*tp)->pool;
+       ks_pool_close(&pool);
+
+       return KS_STATUS_SUCCESS;
+}
+
+
+KS_DECLARE(ks_status_t) ks_thread_pool_add_job(ks_thread_pool_t *tp, ks_thread_function_t func, void *data)
+{
+       ks_thread_job_t *job = (ks_thread_job_t *) ks_pool_alloc(tp->pool, sizeof(*job));
+
+       job->func = func;
+       job->data = data;
+       ks_q_push(tp->q, job);
+
+       check_queue(tp, KS_TRUE);
+
+       return KS_STATUS_SUCCESS;
+}
+
+
+
+KS_DECLARE(ks_size_t) ks_thread_pool_backlog(ks_thread_pool_t *tp)
+{
+       return ks_q_size(tp->q);
+}
index c7d3d8fd55f100bc506c24822c63348f0f9b7d51..5c64a37fc6442501419c3ab50133c5da990fd7df 100644 (file)
@@ -9,6 +9,11 @@ testpools_SOURCES = testpools.c tap.c
 testpools_CFLAGS = $(AM_CFLAGS)
 testpools_LDADD = $(TEST_LDADD)
 
+check_PROGRAMS += test_thread_pools
+test_thread_pools_SOURCES = test_thread_pools.c tap.c
+test_thread_pools_CFLAGS = $(AM_CFLAGS)
+test_thread_pools_LDADD = $(TEST_LDADD)
+
 check_PROGRAMS += testthreadmutex
 testthreadmutex_SOURCES = testthreadmutex.c tap.c
 testthreadmutex_CFLAGS = $(AM_CFLAGS)
index 90f129755aaab40c52cb380e508b71a245199d5d..9f5936e027afb02032f56b05eb4a320b734e0e75 100644 (file)
@@ -29,12 +29,19 @@ int qtest1(int loops)
        ks_q_t *q;
        ks_pool_t *pool;
        int i;
+       int r = 1;
+       void *pop;
 
        ks_pool_open(&pool);
        ks_q_create(&q, pool, loops);
        
        ks_thread_create(&thread, test1_thread, q, pool);
 
+       if (ks_q_pop_timeout(q, &pop, 500) != KS_STATUS_TIMEOUT) {
+               r = 0;
+               goto end;
+       }
+
        for (i = 0; i < 10000; i++) {
                int *val = (int *)ks_pool_alloc(pool, sizeof(int));
                *val = i;
@@ -57,11 +64,13 @@ int qtest1(int loops)
                ks_q_push(q, val);
        }
 
+ end:
+
        ks_q_destroy(&q);
 
        ks_pool_close(&pool);
 
-       return 1;
+       return r;
 
 }