int tracelevel;
char *capture_server;
int rewrite_multicasted_fs_path;
+ int presence_flush;
};
extern struct mod_sofia_globals mod_sofia_globals;
int ireg_seconds;
sofia_paid_type_t paid_type;
uint32_t rtp_digit_delay;
+ switch_queue_t *event_queue;
};
struct private_object {
void sofia_glue_set_name(private_object_t *tech_pvt, const char *channame);
private_object_t *sofia_glue_new_pvt(switch_core_session_t *session);
switch_status_t sofia_init(void);
+void sofia_glue_fire_events(sofia_profile_t *profile);
+void sofia_event_fire(sofia_profile_t *profile, switch_event_t **event);
/* For Emacs:
* Local Variables:
{
sofia_dispatch_event_t *de = *dep;
switch_memory_pool_t *pool;
- sofia_profile_t *profile = (*dep)->profile;
+ //sofia_profile_t *profile = (*dep)->profile;
switch_thread_data_t *td;
switch_core_new_memory_pool(&pool);
td->func = sofia_msg_thread_run_once;
td->obj = de;
- switch_mutex_lock(profile->ireg_mutex);
switch_thread_pool_launch_thread(&td);
- switch_mutex_unlock(profile->ireg_mutex);
-}
-
-#if 0
-void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep)
-{
- sofia_dispatch_event_t *de = *dep;
- switch_threadattr_t *thd_attr = NULL;
- switch_memory_pool_t *pool;
- switch_thread_t *thread;
- sofia_profile_t *profile = (*dep)->profile;
- switch_status_t status;
-
- switch_core_new_memory_pool(&pool);
-
- *dep = NULL;
- de->pool = pool;
-
- switch_mutex_lock(profile->ireg_mutex);
- switch_threadattr_create(&thd_attr, de->pool);
- switch_threadattr_detach_set(thd_attr, 1);
- switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
- status = switch_thread_create(&thread,
- thd_attr,
- sofia_msg_thread_run_once,
- de,
- de->pool);
- switch_mutex_unlock(profile->ireg_mutex);
-
- if (status != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot create threads!\n");
- sofia_process_dispatch_event(&de);
- }
}
-#endif
void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
{
sofia_queue_message(de);
end:
+ //switch_cond_next();
return;
}
contact_str = fixed_contact_str;
}
- switch_mutex_lock(profile->ireg_mutex);
+
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
switch_find_local_ip(guess_ip4, sizeof(guess_ip4), NULL, AF_INET);
sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Propagating registration for %s@%s->%s\n", from_user, from_host, contact_str);
}
- switch_mutex_unlock(profile->ireg_mutex);
+
if (profile) {
sofia_glue_release_profile(profile);
switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool);
switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool);
+ switch_queue_create(&profile->event_queue, SOFIA_QUEUE_SIZE, profile->pool);
+
switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name);
switch_sql_queue_manager_init_name(qname,
switch_cache_db_release_db_handle(&dbh);
+
+ sofia_glue_fire_events(profile);
+
return ret;
}
return s;
}
+void sofia_glue_fire_events(sofia_profile_t *profile)
+{
+ void *pop = NULL;
+
+ while (profile->event_queue && switch_queue_trypop(profile->event_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
+ switch_event_t *event = (switch_event_t *) pop;
+ switch_event_fire(&event);
+ }
+
+}
+
+void sofia_event_fire(sofia_profile_t *profile, switch_event_t **event)
+{
+ switch_queue_push(profile->event_queue, *event);
+ *event = NULL;
+}
+
/* For Emacs:
* Local Variables:
switch_safe_free(dup_domain);
}
-static void actual_sofia_presence_event_handler(switch_event_t *event)
+static switch_event_t *actual_sofia_presence_event_handler(switch_event_t *event)
{
sofia_profile_t *profile = NULL;
char *from = switch_event_get_header(event, "from");
switch_console_callback_match_t *matches;
struct presence_helper helper = { 0 };
int hup = 0;
-
+ switch_event_t *s_event = NULL;
if (!mod_sofia_globals.running) {
- return;
+ goto done;
}
if (zstr(proto) || !strcasecmp(proto, "any")) {
if (!mod_sofia_globals.profile_hash) {
- return;
+ goto done;
}
if (from) {
}
switch_safe_free(sql);
- return;
+ goto done;
}
if (zstr(event_type)) {
}
} else {
switch_safe_free(user);
- return;
+ goto done;
}
if ((euser = strchr(user, '+'))) {
euser++;
euser = user;
}
} else {
- return;
+ goto done;
}
switch (event->event_id) {
if (hup && dh.hits < 1) {
/* so many phones get confused when whe hangup we have to reprobe to get them all to reset to absolute states so the lights stay correct */
- switch_event_t *s_event;
-
+
if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_PROBE) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "proto", SOFIA_CHAT_PROTO);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "login", profile->name);
switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "to", "%s@%s", euser, host);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence");
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog");
- switch_event_fire(&s_event);
}
}
-
+
if (!zstr((char *) helper.stream.data)) {
char *this_sql = (char *) helper.stream.data;
switch_safe_free(sql);
switch_safe_free(user);
+
+ return s_event;
}
static int EVENT_THREAD_RUNNING = 0;
static int EVENT_THREAD_STARTED = 0;
+static void do_flush(void)
+{
+ void *pop = NULL;
+
+ while (mod_sofia_globals.presence_queue && switch_queue_trypop(mod_sofia_globals.presence_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
+ switch_event_t *event = (switch_event_t *) pop;
+ switch_event_destroy(&event);
+ }
+
+}
+
void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread, void *obj)
{
void *pop;
break;
}
+ if (mod_sofia_globals.presence_flush) {
+ switch_mutex_lock(mod_sofia_globals.mutex);
+ if (mod_sofia_globals.presence_flush) {
+ do_flush();
+ mod_sofia_globals.presence_flush = 0;
+ }
+ switch_mutex_unlock(mod_sofia_globals.mutex);
+ }
+
switch(event->event_id) {
case SWITCH_EVENT_MESSAGE_WAITING:
actual_sofia_presence_mwi_event_handler(event);
conference_data_event_handler(event);
break;
default:
- actual_sofia_presence_event_handler(event);
+ do {
+ switch_event_t *ievent = event;
+ event = actual_sofia_presence_event_handler(ievent);
+ switch_event_destroy(&ievent);
+ } while (event);
break;
}
}
}
- while (switch_queue_trypop(mod_sofia_globals.presence_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
- switch_event_t *event = (switch_event_t *) pop;
- switch_event_destroy(&event);
- }
+ do_flush();
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n");
{
switch_event_t *cloned_event;
+ if (!EVENT_THREAD_STARTED) {
+ sofia_presence_event_thread_start();
+ switch_yield(500000);
+ }
+
switch_event_dup(&cloned_event, event);
switch_assert(cloned_event);
- switch_queue_push(mod_sofia_globals.presence_queue, cloned_event);
- if (!EVENT_THREAD_STARTED) {
- sofia_presence_event_thread_start();
+ if (switch_queue_trypush(mod_sofia_globals.presence_queue, cloned_event) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Presence queue overloaded.... Flushing queue\n");
+ switch_mutex_lock(mod_sofia_globals.mutex);
+ mod_sofia_globals.presence_flush = 1;
+ switch_mutex_unlock(mod_sofia_globals.mutex);
+ switch_event_destroy(&cloned_event);
}
+
+
}
}
- switch_event_fire(&event);
+ sofia_event_fire(profile, &event);
}
return 0;
}
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event_subtype", "probe");
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "proto-specific-event-name", event_name);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "expires", expires);
- switch_event_fire(&event);
+ sofia_event_fire(profile, &event);
}
return 0;
}
}
- switch_event_fire(&event);
+ sofia_event_fire(profile, &event);
}
switch_safe_free(free_me);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "contact", argv[3]);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "expires", argv[6]);
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "user-agent", argv[7]);
- switch_event_fire(&s_event);
+ sofia_event_fire(profile, &s_event);
}
if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) {
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "status", "Unregistered");
switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence");
- switch_event_fire(&s_event);
+ sofia_event_fire(profile, &s_event);
}
}
sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
-
sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname);
sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE);
if (switch_queue_trypush(l->log_queue, dnode) == SWITCH_STATUS_SUCCESS) {
if (l->lost_logs) {
int ll = l->lost_logs;
- switch_event_t *event;
l->lost_logs = 0;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Lost %d log lines!\n", ll);
- if (switch_event_create(&event, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
- switch_event_add_header(event, SWITCH_STACK_BOTTOM, "info", "lost %d log lines", ll);
- switch_event_fire(&event);
- }
}
} else {
switch_log_node_free(&dnode);
int le = l->lost_events;
l->lost_events = 0;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost %d events!\n", le);
- clone = NULL;
- if (switch_event_create(&clone, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) {
- switch_event_add_header(clone, SWITCH_STACK_BOTTOM, "info", "lost %d events", le);
- switch_event_fire(&clone);
- }
}
} else {
if (++l->lost_events > MAX_MISSED) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n");
-
for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) {
switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL);
}
}
x = 0;
- while (x < 10000 && THREAD_COUNT) {
- switch_cond_next();
+ while (x < 100 && THREAD_COUNT) {
+ switch_yield(100000);
if (THREAD_COUNT == last) {
x++;
}