#define VM_EVENT_MAINT "vm::maintenance"
#define VM_MAX_GREETINGS 9
+#define VM_EVENT_QUEUE_SIZE 50000
static switch_status_t voicemail_inject(const char *data, switch_core_session_t *session);
switch_hash_t *profile_hash;
int debug;
int message_query_exact_match;
+ int32_t threads;
+ int32_t running;
+ switch_queue_t *event_queue;
switch_mutex_t *mutex;
switch_memory_pool_t *pool;
} globals;
}
-static void message_query_handler(switch_event_t *event)
+static void actual_message_query_handler(switch_event_t *event)
{
char *account = switch_event_get_header(event, "message-account");
int created = 0;
}
+static int EVENT_THREAD_RUNNING = 0;
+static int EVENT_THREAD_STARTED = 0;
+
+void *SWITCH_THREAD_FUNC vm_event_thread_run(switch_thread_t *thread, void *obj)
+{
+ void *pop;
+ int done = 0;
+
+ switch_mutex_lock(globals.mutex);
+ if (!EVENT_THREAD_RUNNING) {
+ EVENT_THREAD_RUNNING++;
+ globals.threads++;
+ } else {
+ done = 1;
+ }
+ switch_mutex_unlock(globals.mutex);
+
+ if (done) {
+ return NULL;
+ }
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Started\n");
+
+ while (globals.running == 1) {
+ int count = 0;
+
+ if (switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
+ switch_event_t *event = (switch_event_t *) pop;
+
+ if (!pop) {
+ break;
+ }
+ actual_message_query_handler(event);
+ switch_event_destroy(&event);
+ count++;
+ }
+
+ if (!count) {
+ switch_yield(100000);
+ }
+ }
+
+ while (switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
+ switch_event_t *event = (switch_event_t *) pop;
+ switch_event_destroy(&event);
+ }
+
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n");
+
+ switch_mutex_lock(globals.mutex);
+ globals.threads--;
+ EVENT_THREAD_RUNNING = EVENT_THREAD_STARTED = 0;
+ switch_mutex_unlock(globals.mutex);
+
+ return NULL;
+}
+
+void vm_event_thread_start(void)
+{
+ switch_thread_t *thread;
+ switch_threadattr_t *thd_attr = NULL;
+ int done = 0;
+
+ switch_mutex_lock(globals.mutex);
+ if (!EVENT_THREAD_STARTED) {
+ EVENT_THREAD_STARTED++;
+ } else {
+ done = 1;
+ }
+ switch_mutex_unlock(globals.mutex);
+
+ if (done) {
+ return;
+ }
+
+ switch_threadattr_create(&thd_attr, globals.pool);
+ switch_threadattr_detach_set(thd_attr, 1);
+ switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+ switch_threadattr_priority_increase(thd_attr);
+ switch_thread_create(&thread, thd_attr, vm_event_thread_run, NULL, globals.pool);
+}
+
+void vm_event_handler(switch_event_t *event)
+{
+ switch_event_t *cloned_event;
+
+ switch_event_dup(&cloned_event, event);
+ switch_assert(cloned_event);
+ switch_queue_push(globals.event_queue, cloned_event);
+
+ if (!EVENT_THREAD_STARTED) {
+ vm_event_thread_start();
+ }
+}
struct holder {
vm_profile_t *profile;
switch_core_hash_init(&globals.profile_hash, globals.pool);
switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
+ switch_mutex_lock(globals.mutex);
+ globals.running = 1;
+ switch_mutex_unlock(globals.mutex);
+
+ switch_queue_create(&globals.event_queue, VM_EVENT_QUEUE_SIZE, globals.pool);
if ((status = load_config()) != SWITCH_STATUS_SUCCESS) {
+ globals.running = 0;
return status;
}
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
- if (switch_event_bind(modname, SWITCH_EVENT_MESSAGE_QUERY, SWITCH_EVENT_SUBCLASS_ANY, message_query_handler, NULL)
+ if (switch_event_bind(modname, SWITCH_EVENT_MESSAGE_QUERY, SWITCH_EVENT_SUBCLASS_ANY, vm_event_handler, NULL)
!= SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
return SWITCH_STATUS_GENERR;
void *val = NULL;
const void *key;
switch_ssize_t keylen;
+ int sanity = 0;
+
+ switch_mutex_lock(globals.mutex);
+ if (globals.running == 1) {
+ globals.running = 0;
+ }
+ switch_mutex_unlock(globals.mutex);
switch_event_free_subclass(VM_EVENT_MAINT);
- switch_event_unbind_callback(message_query_handler);
+ switch_event_unbind_callback(vm_event_handler);
+
+ while (globals.threads) {
+ switch_cond_next();
+ if (++sanity >= 60000) {
+ break;
+ }
+ }
switch_mutex_lock(globals.mutex);
while ((hi = switch_hash_first(NULL, globals.profile_hash))) {