#include "asterisk/stasis.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/stasis_system.h"
+#include "asterisk/taskprocessor.h"
AST_RWLOCK_DEFINE_STATIC(event_types_lock);
+AST_RWLOCK_DEFINE_STATIC(init_cpg_lock);
+
+/*! \brief Timeout for Corosync's poll process */
+#define COROSYNC_POLL_TIMEOUT (10 * 1000)
static void publish_mwi_to_stasis(struct ast_event *event);
static void publish_device_state_to_stasis(struct ast_event *event);
static void publish_cluster_discovery_to_stasis(struct ast_event *event);
+/*! \brief Join to corosync */
+static int corosync_node_joined = 0;
+
/*! \brief All the nodes that we're aware of */
static struct ao2_container *nodes;
{
struct corosync_ping_payload *payload;
struct stasis_message *message;
+ struct ast_eid *event_eid;
ast_assert(ast_event_get_type(event) == AST_EVENT_PING);
ast_assert(event != NULL);
if (!payload) {
return;
}
- payload->event = event;
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ payload->event = ast_event_new(AST_EVENT_PING,
+ AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
+ AST_EVENT_IE_END);
message = stasis_message_create(corosync_ping_message_type(), payload);
if (!message) {
ast_assert(ast_event_get_type(event) == AST_EVENT_CLUSTER_DISCOVERY);
event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
- if (!ast_eid_cmp(&ast_eid_default, event_eid)) {
+ if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
/* Don't feed events back in that originated locally. */
return;
}
struct ast_event *event;
void (*publish_handler)(struct ast_event *) = NULL;
enum ast_event_type event_type;
+ struct ast_eid *event_eid;
if (msg_len < ast_event_minimum_length()) {
ast_debug(1, "Ignoring event that's too small. %u < %u\n",
return;
}
- if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) {
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(msg, AST_EVENT_IE_EID);
+ if (!event_eid || !ast_eid_cmp(&ast_eid_default, event_eid)) {
/* Don't feed events back in that originated locally. */
return;
}
}
ast_rwlock_rdlock(&event_types_lock);
+ ast_debug(5, "cpg_deliver_cb rdlock\n");
publish_handler = event_types[event_type].publish_to_stasis;
if (!event_types[event_type].subscribe || !publish_handler) {
/* We are not configured to subscribe to these events or
we have no way to publish it internally. */
ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cpg_deliver_cb unlock\n");
return;
}
ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cpg_deliver_cb unlock\n");
if (!(event = ast_malloc(msg_len))) {
return;
const struct ast_eid *eid;
char buf[128] = "";
- eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
}
ast_debug(5, "Publishing event %s (%u) to stasis\n",
ast_event_get_type_name(event), event_type);
publish_handler(event);
+ ast_free(event);
}
static void publish_event_to_corosync(struct ast_event *event)
/* The stasis subscription will only exist if we are configured to publish
* these events, so just send away. */
- if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
- ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
- cs_err, ast_event_get_type_name(event), ast_event_get_type(event));
+ if (corosync_node_joined && !ast_rwlock_tryrdlock(&init_cpg_lock)) {
+ ast_debug(5, "publish_event_to_corosync rdlock\n");
+ if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
+ ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
+ cs_err, ast_event_get_type_name(event), ast_event_get_type(event));
+ }
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "publish_event_to_corosync unlock\n");
+ } else {
+ ast_log(LOG_WARNING, "CPG mcast not executed for event %s (%u): initializing CPG.\n",
+ ast_event_get_type_name(event), ast_event_get_type(event));
}
}
static void publish_to_corosync(struct stasis_message *message)
{
struct ast_event *event;
+ struct ast_eid *event_eid;
event = stasis_message_to_event(message);
if (!event) {
return;
}
- if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
+ event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ if (!event_eid || ast_eid_cmp(&ast_eid_default, event_eid)) {
/* If the event didn't originate from this server, don't send it back out. */
ast_event_destroy(event);
return;
const struct ast_eid *eid;
char buf[128] = "";
- eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
+ eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
}
publish_event_to_corosync(event);
+ ast_event_destroy(event);
}
static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
return 0;
}
+static int clear_node_cache(void *obj, void *arg, int flags)
+{
+ struct stasis_message *cached_msg = obj;
+ struct stasis_topic *topic = arg;
+ struct stasis_message *msg;
+ struct ast_eid *msg_eid;
+
+ if (!cached_msg) {
+ return 0;
+ }
+
+ msg_eid = (struct ast_eid *)stasis_message_eid(cached_msg);
+ if(msg_eid && ast_eid_cmp(&ast_eid_default, msg_eid))
+ {
+ msg = stasis_cache_clear_create(cached_msg);
+ if (msg) {
+ stasis_publish(topic, msg);
+ ao2_cleanup(msg);
+ }
+ }
+
+ return 0;
+}
+
static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
for (i = 0; i < left_list_entries; i++) {
const struct cpg_address *cpg_node = &left_list[i];
struct corosync_node* node;
+ unsigned int j;
node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
if (!node) {
continue;
}
+ for (j = 0; j < ARRAY_LEN(event_types); j++) {
+ struct ao2_container *messages;
+ int messages_count;
+
+ ast_rwlock_rdlock(&event_types_lock);
+ ast_debug(5, "cpg_confchg_cb rdlock\n");
+ if (!event_types[j].subscribe) {
+ ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cpg_confchg_cb unlock\n");
+ continue;
+ }
+
+ if (!event_types[j].cache_fn || !event_types[j].message_type_fn) {
+ ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cpg_confchg_cb unlock\n");
+ continue;
+ }
+ ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cpg_confchg_cb unlock\n");
+
+ messages = stasis_cache_dump_by_eid(event_types[j].cache_fn(), event_types[j].message_type_fn(), &node->eid);
+
+ messages_count = ao2_container_count(messages);
+ ast_log(LOG_NOTICE, "Clearing %i events of type %s of node %i from stasis cache.\n", messages_count, event_types[j].name, node->id);
+ ao2_callback(messages, OBJ_NODATA, clear_node_cache, event_types[j].topic_fn());
+ ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[j].name);
+
+ ao2_t_ref(messages, -1, "Dispose of flushed cache");
+ }
+
publish_cluster_discovery_to_stasis_full(node, 0);
ao2_ref(node, -1);
}
for (i = 0; i < ARRAY_LEN(event_types); i++) {
struct ao2_container *messages;
+ int messages_count;
ast_rwlock_rdlock(&event_types_lock);
+ ast_debug(5, "cpg_confchg_cb rdlock\n");
if (!event_types[i].publish) {
ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cpg_confchg_cb unlock\n");
continue;
}
if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cpg_confchg_cb unlock\n");
continue;
}
-
- messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(),
- event_types[i].message_type_fn(),
- &ast_eid_default);
ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cpg_confchg_cb unlock\n");
+
+ messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(), event_types[i].message_type_fn(), &ast_eid_default);
+ messages_count = ao2_container_count(messages);
+ ast_log(LOG_NOTICE, "Sending %i events of type %s to corosync.\n", messages_count, event_types[i].name);
ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL);
+ ast_log(LOG_NOTICE, "Sent events of type %s to corosync.\n", event_types[i].name);
ao2_t_ref(messages, -1, "Dispose of dumped cache");
}
char buf[128];
int res;
- if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
- return;
- }
+ if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
+ ast_debug(5, "send_cluster_notify rdlock\n");
- if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
- ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
- return;
+ if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
+ ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
+ return;
+ }
+
+ if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
+ ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
+ return;
+ }
+
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "send_cluster_notify unlock\n");
}
sa = (struct sockaddr *)corosync_addr.address;
AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_PLTYPE_STR, buf,
AST_EVENT_IE_END);
publish_event_to_corosync(event);
- ast_free(event);
+ ast_event_destroy(event);
}
static void *dispatch_thread_handler(void *data)
{ .events = POLLIN, },
};
- if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
- return NULL;
- }
+ if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
+ ast_debug(5, "dispatch_thread_handler rdlock\n");
+ if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ return NULL;
+ }
+
+ if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ return NULL;
+ }
- if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
+ pfd[2].fd = dispatch_thread.alert_pipe[0];
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ } else {
+ ast_log(LOG_ERROR, "Failed to get fd: initiliazing CPG. This module is now broken.\n");
return NULL;
}
-
- pfd[2].fd = dispatch_thread.alert_pipe[0];
-
send_cluster_notify();
while (!dispatch_thread.stop) {
int res;
pfd[1].revents = 0;
pfd[2].revents = 0;
- res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
+ res = ast_poll(pfd, ARRAY_LEN(pfd), COROSYNC_POLL_TIMEOUT);
if (res == -1 && errno != EINTR && errno != EAGAIN) {
ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
- continue;
- }
-
- if (pfd[0].revents & POLLIN) {
- if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
+ cs_err = CS_ERR_BAD_HANDLE;
+ } else if (res == 0) {
+ unsigned int local_nodeid;
+
+ if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
+ ast_debug(5, "dispatch_thread_handler rdlock\n");
+ if ((cs_err = cpg_local_get(cpg_handle, &local_nodeid)) == CS_OK) {
+ struct cpg_name name;
+ struct cpg_address address[CPG_MEMBERS_MAX];
+ int entries = CPG_MEMBERS_MAX;
+
+ ast_copy_string(name.value, "asterisk", sizeof(name.value));
+ name.length = strlen(name.value);
+ if ((cs_err = cpg_membership_get(cpg_handle, &name, address, &entries)) == CS_OK) {
+ int i;
+ int found = 0;
+
+ ast_debug(1, "CPG group has %i node membership\n", entries);
+ for (i = 0; (i < entries) && !found; i++) {
+ if (address[i].nodeid == local_nodeid)
+ found = 1;
+ }
+ if (!found) {
+ ast_log(LOG_WARNING, "Failed to check CPG node membership\n");
+ corosync_node_joined = 0;
+ cs_err = CS_ERR_BAD_HANDLE;
+ }
+ } else {
+ ast_log(LOG_WARNING, "Failed to get CPG node membership: %u\n", cs_err);
+ corosync_node_joined = 0;
+ cs_err = CS_ERR_BAD_HANDLE;
+ }
+ } else {
+ ast_log(LOG_WARNING, "Failed to get CPG local node id: %u\n", cs_err);
+ corosync_node_joined = 0;
+ cs_err = CS_ERR_BAD_HANDLE;
+ }
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ } else {
+ ast_log(LOG_WARNING, "Failed to check CPG node membership: initializing CPG.\n");
+ corosync_node_joined = 0;
+ cs_err = CS_ERR_BAD_HANDLE;
}
- }
-
- if (pfd[1].revents & POLLIN) {
- if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
- ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
+ } else {
+ if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
+ ast_debug(5, "dispatch_thread_handler rdlock\n");
+ if (pfd[0].revents & POLLIN) {
+ if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
+ ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
+ }
+ }
+
+ if (pfd[1].revents & POLLIN) {
+ if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
+ ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
+ }
+ }
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ } else {
+ ast_log(LOG_WARNING, "Failed to dispatch: initializing CPG.\n");
}
}
-
if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
- struct cpg_name name;
/* If corosync gets restarted out from under Asterisk, try to recover. */
ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
- if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
- sleep(5);
- continue;
+ if (!ast_rwlock_trywrlock(&init_cpg_lock)) {
+ struct cpg_name name;
+ ast_debug(5, "dispatch_thread_handler wrlock\n");
+
+ corosync_node_joined = 0;
+ if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
+ }
+
+ if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
+ }
+
+ if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ sleep(5);
+ continue;
+ }
+
+ if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
+ ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ sleep(5);
+ continue;
+ }
+
+ if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ sleep(5);
+ continue;
+ }
+
+ if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ sleep(5);
+ continue;
+ }
+
+ ast_copy_string(name.value, "asterisk", sizeof(name.value));
+ name.length = strlen(name.value);
+ if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ sleep(5);
+ continue;
+ }
+ corosync_node_joined = 1;
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "dispatch_thread_handler unlock\n");
+ ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
+ send_cluster_notify();
+ } else {
+ ast_log(LOG_NOTICE, "Failed to recover from corosync failure: initializing CPG.\n");
}
-
- if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
- ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
- sleep(5);
- continue;
- }
-
- if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
- sleep(5);
- continue;
- }
-
- if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
- sleep(5);
- continue;
- }
-
- ast_copy_string(name.value, "asterisk", sizeof(name.value));
- name.length = strlen(name.value);
- if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
- sleep(5);
- continue;
- }
-
- ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
- send_cluster_notify();
}
}
return CLI_SHOWUSAGE;
}
- cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
-
- if (cs_err != CS_OK) {
- ast_cli(a->fd, "Failed to initialize CPG iterator.\n");
- return CLI_FAILURE;
- }
+ if (!ast_rwlock_tryrdlock(&init_cpg_lock)) {
+ ast_debug(5, "corosync_show_members rdlock\n");
+ cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
- ast_cli(a->fd, "\n"
- "=============================================================\n"
- "=== Cluster members =========================================\n"
- "=============================================================\n"
- "===\n");
-
- for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
- cs_err == CS_OK;
- cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
-#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
- corosync_cfg_node_address_t addrs[8];
- int num_addrs = 0;
- unsigned int j;
-#endif
-
- ast_cli(a->fd, "=== Node %u\n", i);
- ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
-
-#ifdef HAVE_COROSYNC_CFG_STATE_TRACK
- /*
- * Corosync 2.x cfg lib needs to allocate 1M on stack after calling
- * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M
- * resulting in crash.
- */
- cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
- ARRAY_LEN(addrs), &num_addrs, addrs);
if (cs_err != CS_OK) {
- ast_log(LOG_WARNING, "Failed to get node addresses\n");
- continue;
+ ast_cli(a->fd, "Failed to initialize CPG iterator: %u.\n", cs_err);
+ cpg_iteration_finalize(cpg_iter);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "corosync_show_members unlock\n");
+ return CLI_FAILURE;
}
- for (j = 0; j < num_addrs; j++) {
- struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
- size_t sa_len = (size_t) addrs[j].address_length;
- char buf[128];
+ ast_cli(a->fd, "\n"
+ "=============================================================\n"
+ "=== Cluster members =========================================\n"
+ "=============================================================\n"
+ "===\n");
+
+ for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
+ cs_err == CS_OK;
+ cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
+ #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
+ corosync_cfg_node_address_t addrs[8];
+ int num_addrs = 0;
+ unsigned int j;
+ #endif
+
+ ast_cli(a->fd, "=== Node %u\n", i);
+ ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
+
+ #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
+ /*
+ * Corosync 2.x cfg lib needs to allocate 1M on stack after calling
+ * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M
+ * resulting in crash.
+ */
+ cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
+ ARRAY_LEN(addrs), &num_addrs, addrs);
+ if (cs_err != CS_OK) {
+ ast_log(LOG_WARNING, "Failed to get node addresses\n");
+ continue;
+ }
+
+ for (j = 0; j < num_addrs; j++) {
+ struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
+ size_t sa_len = (size_t) addrs[j].address_length;
+ char buf[128];
- getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
+ getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
- ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
+ ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
+ }
+ #else
+ ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid);
+ #endif
}
-#else
- ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid);
-#endif
- }
- ast_cli(a->fd, "===\n"
- "=============================================================\n"
- "\n");
+ ast_cli(a->fd, "===\n"
+ "=============================================================\n"
+ "\n");
- cpg_iteration_finalize(cpg_iter);
+ cpg_iteration_finalize(cpg_iter);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "corosync_show_members unlock\n");
+ } else {
+ ast_cli(a->fd, "Failed to initialize CPG iterator: initializing CPG.\n");
+ }
return CLI_SUCCESS;
}
return CLI_FAILURE;
}
- ast_rwlock_rdlock(&event_types_lock);
event_types[AST_EVENT_PING].publish_to_stasis(event);
- ast_rwlock_unlock(&event_types_lock);
+ ast_event_destroy(event);
return CLI_SUCCESS;
}
"===\n");
ast_rwlock_rdlock(&event_types_lock);
+ ast_debug(5, "corosync_show_config rdlock\n");
for (i = 0; i < ARRAY_LEN(event_types); i++) {
if (event_types[i].publish) {
ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
}
}
ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "corosync_show_config unlock\n");
ast_cli(a->fd, "===\n"
"=============================================================\n"
unsigned int i;
ast_rwlock_wrlock(&event_types_lock);
+ ast_debug(5, "load_general_config wrlock\n");
for (i = 0; i < ARRAY_LEN(event_types); i++) {
event_types[i].publish = event_types[i].publish_default;
}
ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "load_general_config unlock\n");
return res;
}
if (stasis_router) {
/* Unsubscribe all topic forwards and cancel all message routes */
- ast_rwlock_wrlock(&event_types_lock);
for (i = 0; i < ARRAY_LEN(event_types); i++) {
+ struct ao2_container *messages = NULL;
+ int messages_count;
+ unsigned char subscribe = 0;
+
+ ast_rwlock_wrlock(&event_types_lock);
+ ast_debug(5, "cleanup_module wrlock\n");
+ subscribe = event_types[i].subscribe;
+
if (event_types[i].sub) {
event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
- stasis_message_router_remove(stasis_router,
- event_types[i].message_type_fn());
+ stasis_message_router_remove(stasis_router, event_types[i].message_type_fn());
}
event_types[i].publish = 0;
event_types[i].subscribe = 0;
+ ast_rwlock_unlock(&event_types_lock);
+ ast_debug(5, "cleanup_module unlock\n");
+
+ if (subscribe && event_types[i].cache_fn && event_types[i].message_type_fn) {
+ messages = stasis_cache_dump_all(event_types[i].cache_fn(), event_types[i].message_type_fn());
+ messages_count = ao2_container_count(messages);
+ ast_log(LOG_NOTICE, "Clearing %i events of type %s of other nodes from stasis cache.\n", messages_count, event_types[i].name);
+ ao2_callback(messages, OBJ_NODATA, clear_node_cache, event_types[i].topic_fn());
+ ast_log(LOG_NOTICE, "Cleared events of type %s from stasis cache.\n", event_types[i].name);
+ ao2_t_ref(messages, -1, "Dispose of flushed cache");
+ }
}
- ast_rwlock_unlock(&event_types_lock);
stasis_message_router_unsubscribe_and_join(stasis_router);
stasis_router = NULL;
dispatch_thread.alert_pipe[1] = -1;
}
- if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
- }
- cpg_handle = 0;
+ if (!ast_rwlock_trywrlock(&init_cpg_lock)) {
+ ast_debug(5, "cleanup_module wrlock\n");
+ if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
+ }
+ cpg_handle = 0;
- if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
+ if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
+ }
+ cfg_handle = 0;
+ corosync_node_joined = 0;
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "cleanup_module unlock\n");
}
- cfg_handle = 0;
-
ao2_cleanup(nodes);
nodes = NULL;
}
ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
goto failed;
}
+ stasis_message_router_set_congestion_limits(stasis_router, -1,
+ 10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
goto failed;
}
- if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
- goto failed;
- }
+ if (!ast_rwlock_trywrlock(&init_cpg_lock)) {
+ corosync_node_joined = 0;
+ ast_debug(5, "load_module wrlock\n");
+ if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "load_module unlock\n");
+ goto failed;
+ }
- if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
- goto failed;
- }
+ if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "load_module unlock\n");
+ goto failed;
+ }
- ast_copy_string(name.value, "asterisk", sizeof(name.value));
- name.length = strlen(name.value);
+ ast_copy_string(name.value, "asterisk", sizeof(name.value));
+ name.length = strlen(name.value);
- if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
- ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
- goto failed;
- }
+ if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
+ ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "load_module unlock\n");
+ goto failed;
+ }
- if (pipe(dispatch_thread.alert_pipe) == -1) {
- ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
- strerror(errno), errno);
- goto failed;
- }
+ if (pipe(dispatch_thread.alert_pipe) == -1) {
+ ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
+ strerror(errno), errno);
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "load_module unlock\n");
+ goto failed;
+ }
+ corosync_node_joined = 1;
+
+ ast_rwlock_unlock(&init_cpg_lock);
+ ast_debug(5, "load_module unlock\n");
+ if (corosync_pthread_create_background(&dispatch_thread.id, NULL,
+ dispatch_thread_handler, NULL)) {
+ ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
+ goto failed;
+ }
- if (corosync_pthread_create_background(&dispatch_thread.id, NULL,
- dispatch_thread_handler, NULL)) {
- ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
+ ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
+ } else {
goto failed;
}
- ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
-
-
return AST_MODULE_LOAD_SUCCESS;
failed: