switch_event_node_t *node;
char hostname[256];
char *dbname;
- char *odbc_dsn;
+ char odbc_dsn[1024];
char *odbc_user;
char *odbc_pass;
int node_thread_running;
return NULL;
}
+
switch_core_new_memory_pool(&pool);
node = switch_core_alloc(pool, sizeof(*node));
node->outbound_strategy = default_strategy;
node->name = switch_core_strdup(node->pool, name);
for (x = 0; x < MAX_PRI; x++) {
- fifo_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, node->pool);
+ fifo_queue_create(&node->fifo_list[x], 1000, node->pool);
switch_assert(node->fifo_list[x]);
}
switch_memory_pool_t *pool;
};
-#define MAX_ROWS 2048
+#define MAX_ROWS 25
struct callback_helper {
int need;
switch_memory_pool_t *pool;
struct call_helper *rows[MAX_ROWS] = { 0 };
int rowcount = 0;
switch_memory_pool_t *pool;
-
- if (!globals.running) return NULL;
-
- switch_uuid_get(&uuid);
- switch_uuid_format(uuid_str, &uuid);
switch_mutex_lock(globals.mutex);
globals.threads++;
switch_mutex_unlock(globals.mutex);
+ if (!globals.running) goto dpool;
+
+ switch_uuid_get(&uuid);
+ switch_uuid_format(uuid_str, &uuid);
+
if (!cbh->rowcount) {
goto end;
}
switch_event_destroy(&pop_dup);
}
+ dpool:
+
pool = cbh->pool;
switch_core_destroy_memory_pool(&pool);
}
-static void dec_use_count(switch_core_session_t *session)
+static void dec_use_count(switch_core_session_t *session, switch_bool_t send_event)
{
char *sql;
const char *outbound_id;
fifo_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
}
-
- if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
- switch_channel_event_set_data(channel, event);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop");
- switch_event_fire(&event);
+
+ if (send_event) {
+ if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
+ switch_channel_event_set_data(channel, event);
+ switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", MANUAL_QUEUE_NAME);
+ switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", "bridge-consumer-stop");
+ switch_event_fire(&event);
+ }
}
}
switch_channel_state_t state = switch_channel_get_state(channel);
if (state == CS_HANGUP) {
- dec_use_count(session);
+ dec_use_count(session, SWITCH_TRUE);
switch_core_event_hook_remove_state_change(session, hanguphook);
}
const char *serviced_uuid = NULL;
if (switch_core_event_hook_remove_receive_message(session, messagehook) == SWITCH_STATUS_SUCCESS) {
- dec_use_count(session);
+ dec_use_count(session, SWITCH_FALSE);
switch_core_event_hook_remove_state_change(session, hanguphook);
}
switch_mutex_lock(globals.mutex);
for (hi = switch_hash_first(NULL, hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val);
- stream->write_function(stream, " %s: %s\n", (char *)var, (char *)val);
+ stream->write_function(stream, " %s\n", (char *)var);
}
switch_mutex_unlock(globals.mutex);
}
if (!strcasecmp(argv[0], "reparse")) {
load_config(1, argv[1] && !strcasecmp(argv[1], "del_all"));
+ stream->write_function(stream, "+OK\n");
goto done;
}
if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
if (switch_odbc_available()) {
- globals.odbc_dsn = switch_core_strdup(globals.pool, val);
+ switch_set_string(globals.odbc_dsn, val);
if ((globals.odbc_user = strchr(globals.odbc_dsn, ':'))) {
*globals.odbc_user++ = '\0';
if ((globals.odbc_pass = strchr(globals.odbc_user, ':'))) {
return SWITCH_STATUS_GENERR;
}
- switch_core_new_memory_pool(&globals.pool);
+ globals.pool = pool;
switch_core_hash_init(&globals.fifo_hash, globals.pool);
switch_core_hash_init(&globals.caller_orig_hash, globals.pool);
switch_event_unbind(&globals.node);
switch_event_free_subclass(FIFO_EVENT);
switch_core_hash_destroy(&globals.fifo_hash);
- switch_core_destroy_memory_pool(&globals.pool);
return status;
}
void *val;
switch_event_t *pop = NULL;
fifo_node_t *node;
- switch_memory_pool_t *pool = globals.pool;
switch_mutex_t *mutex = globals.mutex;
switch_event_unbind(&globals.node);
switch_core_hash_destroy(&globals.fifo_hash);
memset(&globals, 0, sizeof(globals));
switch_mutex_unlock(mutex);
- switch_core_destroy_memory_pool(&pool);
+
return SWITCH_STATUS_SUCCESS;
}
*/
#include <switch.h>
#define CMD_BUFLEN 1024 * 1000
-
+#define MAX_QUEUE_LEN 5000
+#define MAX_MISSED 200
SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_event_socket_shutdown);
SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime);
}
static void remove_listener(listener_t *listener);
+static void kill_listener(listener_t *l);
static void kill_all_listeners(void);
static uint32_t next_id(void)
}
} else {
switch_log_node_free(&dnode);
- l->lost_logs++;
+ if (++l->lost_logs > MAX_MISSED) {
+ kill_listener(l);
+ }
}
}
}
}
}
} else {
- l->lost_events++;
+ if (++l->lost_events > MAX_MISSED) {
+ kill_listener(l);
+ }
switch_event_destroy(&clone);
}
} else {
}
switch_thread_rwlock_create(&listener->rwlock, switch_core_session_get_pool(session));
- switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
- switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, switch_core_session_get_pool(session));
+ switch_queue_create(&listener->event_queue, MAX_QUEUE_LEN, switch_core_session_get_pool(session));
+ switch_queue_create(&listener->log_queue, MAX_QUEUE_LEN, switch_core_session_get_pool(session));
listener->sock = new_sock;
listener->pool = switch_core_session_get_pool(session);
switch_mutex_unlock(globals.listener_mutex);
}
+static void kill_listener(listener_t *l)
+{
+ switch_clear_flag(l, LFLAG_RUNNING);
+ if (l->sock) {
+ switch_socket_shutdown(l->sock, SWITCH_SHUTDOWN_READWRITE);
+ switch_socket_close(l->sock);
+ }
+
+}
static void kill_all_listeners(void)
{
switch_mutex_lock(globals.listener_mutex);
for (l = listen_list.listeners; l; l = l->next) {
- switch_clear_flag(l, LFLAG_RUNNING);
- if (l->sock) {
- switch_socket_shutdown(l->sock, SWITCH_SHUTDOWN_READWRITE);
- switch_socket_close(l->sock);
- }
+ kill_listener(l);
}
switch_mutex_unlock(globals.listener_mutex);
}
switch_set_flag(listener, LFLAG_AUTHED);
switch_set_flag(listener, LFLAG_STATEFUL);
switch_set_flag(listener, LFLAG_ALLOW_LOG);
- switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener->pool);
- switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener->pool);
+ switch_queue_create(&listener->event_queue, MAX_QUEUE_LEN, listener->pool);
+ switch_queue_create(&listener->log_queue, MAX_QUEUE_LEN, listener->pool);
if (loglevel) {
switch_log_level_t ltype = switch_log_str2level(loglevel);
}
switch_thread_rwlock_create(&listener->rwlock, listener_pool);
- switch_queue_create(&listener->event_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
- switch_queue_create(&listener->log_queue, SWITCH_CORE_QUEUE_LEN, listener_pool);
+ switch_queue_create(&listener->event_queue, MAX_QUEUE_LEN, listener_pool);
+ switch_queue_create(&listener->log_queue, MAX_QUEUE_LEN, listener_pool);
listener->sock = inbound_socket;
listener->pool = listener_pool;