static void threadpool_active_thread_idle(struct ast_threadpool *pool,
struct worker_thread *worker)
{
- struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
struct worker_thread *worker)
{
- struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
*/
static int threadpool_execute(struct ast_threadpool *pool)
{
+ ao2_lock(pool);
if (!pool->shutting_down) {
+ ao2_unlock(pool);
return ast_taskprocessor_execute(pool->tps);
}
+ ao2_unlock(pool);
return 0;
}
int was_empty)
{
struct ast_threadpool *pool = listener->private_data;
- struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
+ struct task_pushed_data *tpd;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ tpd = task_pushed_data_alloc(pool, was_empty);
if (!tpd) {
return;
}
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{
struct ast_threadpool *pool = listener->private_data;
+ SCOPED_AO2LOCK(lock, pool);
+
+ if (pool->shutting_down) {
+ return;
+ }
ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
}
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
{
struct set_size_data *ssd;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
ssd = set_size_data_alloc(pool, size);
if (!ssd) {
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
{
+ SCOPED_AO2LOCK(lock, pool);
if (!pool->shutting_down) {
return ast_taskprocessor_push(pool->tps, task, data);
}
/* Shut down the taskprocessors and everything else just
* takes care of itself via the taskprocessor callbacks
*/
- ast_atomic_fetchadd_int(&pool->shutting_down, +1);
+ ao2_lock(pool);
+ pool->shutting_down = 1;
+ ao2_unlock(pool);
ast_taskprocessor_unreference(pool->control_tps);
ast_taskprocessor_unreference(pool->tps);
}
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
+ ast_log(LOG_NOTICE, "Worker dying\n");
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);