switch_mutex_t *task_mutex;
uint32_t task_id;
int task_thread_running;
+ switch_queue_t *event_queue;
switch_memory_pool_t *memory_pool;
} globals;
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tp->desc);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Group", switch_str_nil(tp->task.group));
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-Runtime", "%" SWITCH_INT64_T_FMT, tp->task.runtime);
- switch_event_fire(&event);
+ switch_queue_push(globals.event_queue, event);
+ event = NULL;
}
} else {
- if (switch_event_create(&event, SWITCH_EVENT_DEL_SCHEDULE) == SWITCH_STATUS_SUCCESS) {
- switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tp->task.task_id);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tp->desc);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Group", switch_str_nil(tp->task.group));
- switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-Runtime", "%" SWITCH_INT64_T_FMT, tp->task.runtime);
- switch_event_fire(&event);
- }
tp->destroyed = 1;
}
}
switch_mutex_lock(globals.task_mutex);
for (tp = globals.task_list; tp;) {
if (tp->destroyed && !tp->in_thread) {
+ switch_event_t *event;
+
tofree = tp;
tp = tp->next;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting task %u %s (%s)\n",
tofree->task.task_id, tofree->desc, switch_str_nil(tofree->task.group));
+
+
+ if (switch_event_create(&event, SWITCH_EVENT_DEL_SCHEDULE) == SWITCH_STATUS_SUCCESS) {
+ switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tofree->task.task_id);
+ switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tofree->desc);
+ switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Group", switch_str_nil(tofree->task.group));
+ switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-Runtime", "%" SWITCH_INT64_T_FMT, tofree->task.runtime);
+ switch_queue_push(globals.event_queue, event);
+ event = NULL;
+ }
+
if (last) {
last->next = tofree->next;
} else {
static void *SWITCH_THREAD_FUNC switch_scheduler_task_thread(switch_thread_t *thread, void *obj)
{
-
+ void *pop;
globals.task_thread_running = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Starting task thread\n");
if (task_thread_loop(0)) {
break;
}
- switch_yield(500000);
+ if (switch_queue_pop_timeout(globals.event_queue, &pop, 500) == SWITCH_STATUS_SUCCESS) {
+ switch_event_t *event = (switch_event_t *) pop;
+ switch_event_fire(&event);
+ }
}
task_thread_loop(1);
switch_scheduler_task_container_t *container, *tp;
switch_event_t *event;
switch_time_t now = switch_epoch_time_now(NULL);
+ switch_ssize_t hlen = -1;
switch_mutex_lock(globals.task_mutex);
switch_zmalloc(container, sizeof(*container));
container->task.cmd_arg = cmd_arg;
container->flags = flags;
container->desc = strdup(desc ? desc : "none");
+ container->task.hash = switch_ci_hashfunc_default(container->task.group, &hlen);
for (tp = globals.task_list; tp && tp->next; tp = tp->next);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tp->desc);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Group", switch_str_nil(tp->task.group));
switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-Runtime", "%" SWITCH_INT64_T_FMT, tp->task.runtime);
- switch_event_fire(&event);
+ switch_queue_push(globals.event_queue, event);
+ event = NULL;
}
return container->task.task_id;
}
SWITCH_DECLARE(uint32_t) switch_scheduler_del_task_id(uint32_t task_id)
{
switch_scheduler_task_container_t *tp;
- switch_event_t *event;
uint32_t delcnt = 0;
switch_mutex_lock(globals.task_mutex);
}
tp->destroyed++;
- if (switch_event_create(&event, SWITCH_EVENT_DEL_SCHEDULE) == SWITCH_STATUS_SUCCESS) {
- switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tp->task.task_id);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tp->desc);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Group", switch_str_nil(tp->task.group));
- switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-Runtime", "%" SWITCH_INT64_T_FMT, tp->task.runtime);
- switch_event_fire(&event);
- }
delcnt++;
break;
}
SWITCH_DECLARE(uint32_t) switch_scheduler_del_task_group(const char *group)
{
switch_scheduler_task_container_t *tp;
- switch_event_t *event;
uint32_t delcnt = 0;
+ switch_ssize_t hlen = -1;
+ unsigned long hash = 0;
+
+ if (zstr(group)) {
+ return 0;
+ }
+
+ hash = switch_ci_hashfunc_default(group, &hlen);
switch_mutex_lock(globals.task_mutex);
for (tp = globals.task_list; tp; tp = tp->next) {
- if (!zstr(group) && !strcmp(tp->task.group, group)) {
+ if (tp->destroyed) {
+ continue;
+ }
+ if (hash == tp->task.hash && !strcmp(tp->task.group, group)) {
if (switch_test_flag(tp, SSHF_NO_DEL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Attempt made to delete undeletable task #%u (group %s)\n",
tp->task.task_id, group);
continue;
}
- if (switch_event_create(&event, SWITCH_EVENT_DEL_SCHEDULE) == SWITCH_STATUS_SUCCESS) {
- switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tp->task.task_id);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tp->desc);
- switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Group", switch_str_nil(tp->task.group));
- switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-Runtime", "%" SWITCH_INT64_T_FMT, tp->task.runtime);
- switch_event_fire(&event);
- }
tp->destroyed++;
delcnt++;
}
switch_core_new_memory_pool(&globals.memory_pool);
switch_threadattr_create(&thd_attr, globals.memory_pool);
switch_mutex_init(&globals.task_mutex, SWITCH_MUTEX_NESTED, globals.memory_pool);
+ switch_queue_create(&globals.event_queue, 250000, globals.memory_pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_thread_create(&task_thread_p, thd_attr, switch_scheduler_task_thread, NULL, globals.memory_pool);