From: Mark Michelson Date: Fri, 9 Nov 2012 22:28:10 +0000 (+0000) Subject: Genericize the allocation and destruction of taskprocessor listeners. X-Git-Tag: 13.0.0-beta1~2194^2~177 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d5716ecae2238c9e96576870c2146606ab46bc6c;p=thirdparty%2Fasterisk.git Genericize the allocation and destruction of taskprocessor listeners. The goal of this is to take the responsibility away from individual listeners to be sure to properly unref the taskprocessor. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376121 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index df66f59f07..a92e1f31c2 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -63,10 +63,14 @@ enum ast_tps_options { struct ast_taskprocessor_listener; struct ast_taskprocessor_listener_callbacks { + /*! Allocate the listener's private data */ + void *(*alloc)(struct ast_taskprocessor_listener *listener); /*! Indicates a task was pushed to the processor */ void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty); /*! Indicates the task processor has become empty */ void (*emptied)(struct ast_taskprocessor_listener *listener); + /*! Destroy the listener's private data */ + void (*destroy)(void *private_data); }; struct ast_taskprocessor_listener { @@ -75,6 +79,9 @@ struct ast_taskprocessor_listener { void *private_data; }; +struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps, + struct ast_taskprocessor_listener_callbacks *callbacks); + /*! * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary * diff --git a/main/astobj2.c b/main/astobj2.c index 082dfc0384..b36cee837e 100644 --- a/main/astobj2.c +++ b/main/astobj2.c @@ -431,6 +431,7 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li int ret; if (obj == NULL) { + ast_backtrace(); ast_assert(0); return -1; } diff --git a/main/taskprocessor.c b/main/taskprocessor.c index bd94103d25..4ca01f9ca2 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -131,17 +131,11 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, ast_cond_signal(&pvt->cond); } -static void default_listener_destroy(void *obj) +static void listener_destroy(void *obj) { struct ast_taskprocessor_listener *listener = obj; - struct default_taskprocessor_listener_pvt *pvt = listener->private_data; - default_tps_wake_up(pvt, 1); - pthread_join(pvt->poll_thread, NULL); - pvt->poll_thread = AST_PTHREADT_NULL; - ast_mutex_destroy(&pvt->lock); - ast_cond_destroy(&pvt->cond); - ast_free(pvt); + listener->callbacks->destroy(listener->private_data); ao2_ref(listener->tps, -1); listener->tps = NULL; @@ -173,6 +167,35 @@ static void *tps_processing_function(void *data) return NULL; } +static void *default_listener_alloc(struct ast_taskprocessor_listener *listener) +{ + struct default_taskprocessor_listener_pvt *pvt; + + pvt = ast_calloc(1, sizeof(*pvt)); + if (!pvt) { + return NULL; + } + ast_cond_init(&pvt->cond, NULL); + ast_mutex_init(&pvt->lock); + pvt->poll_thread = AST_PTHREADT_NULL; + if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) { + return NULL; + } + return pvt; +} + +static void default_listener_destroy(void *obj) +{ + struct default_taskprocessor_listener_pvt *pvt = obj; + + default_tps_wake_up(pvt, 1); + pthread_join(pvt->poll_thread, NULL); + pvt->poll_thread = AST_PTHREADT_NULL; + ast_mutex_destroy(&pvt->lock); + ast_cond_destroy(&pvt->cond); + ast_free(pvt); +} + static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { struct default_taskprocessor_listener_pvt *pvt = listener->private_data; @@ -188,8 +211,10 @@ static void default_emptied(struct ast_taskprocessor_listener *listener) } static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = { + .alloc = default_listener_alloc, .task_pushed = default_task_pushed, .emptied = default_emptied, + .destroy = default_listener_destroy, }; /*! \internal \brief Clean up resources on Asterisk shutdown */ @@ -432,29 +457,22 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps) return tps->name; } -static struct ast_taskprocessor_listener *default_listener_alloc(void) +struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps, + struct ast_taskprocessor_listener_callbacks *callbacks) { - struct ast_taskprocessor_listener *listener; - struct default_taskprocessor_listener_pvt *pvt; - - listener = ao2_alloc(sizeof(*listener), default_listener_destroy); + RAII_VAR(struct ast_taskprocessor_listener *, listener, + ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup); + if (!listener) { return NULL; } - pvt = ast_calloc(1, sizeof(*pvt)); - if (!pvt) { - ao2_ref(listener, -1); - return NULL; - } - listener->callbacks = &default_listener_callbacks; - listener->private_data = pvt; - ast_cond_init(&pvt->cond, NULL); - ast_mutex_init(&pvt->lock); - pvt->poll_thread = AST_PTHREADT_NULL; - if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) { - ao2_ref(listener, -1); + listener->callbacks = callbacks; + listener->private_data = listener->callbacks->alloc(listener); + if (!listener->private_data) { return NULL; } + + ao2_ref(listener, +1); return listener; } @@ -480,9 +498,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o return NULL; } /* Create a new taskprocessor. Start by creating a default listener */ - listener = default_listener_alloc(); + listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks); + if (!listener) { + return NULL; + } p = ast_taskprocessor_create_with_listener(name, listener); + if (!p) { + ao2_ref(listener, -1); + return NULL; + } + + /* Unref listener here since the taskprocessor has gained a reference to the listener */ ao2_ref(listener, -1); return p;