From: Russell Bryant Date: Tue, 13 Mar 2012 23:46:23 +0000 (+0000) Subject: Dump cache of published events when a node joins the cluster. X-Git-Tag: 10.3.0~49 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=267af08c88b945a3a33b9373e959bb4965dd58fa;p=thirdparty%2Fasterisk.git Dump cache of published events when a node joins the cluster. Also use a more reliable method for stopping the poll() thread. ........ Merged revisions 359053 from http://svn.asterisk.org/svn/asterisk/branches/1.8 git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/10@359054 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- diff --git a/res/ais/ais.h b/res/ais/ais.h index 6aaeadf15f..648348935e 100644 --- a/res/ais/ais.h +++ b/res/ais/ais.h @@ -45,4 +45,13 @@ int ast_ais_evt_unload_module(void); const char *ais_err2str(SaAisErrorT error); +void ast_ais_evt_membership_changed(void); + +enum ast_ais_cmd { + AST_AIS_CMD_EXIT, + AST_AIS_CMD_MEMBERSHIP_CHANGED, +}; + +int ast_ais_cmd(enum ast_ais_cmd cmd); + #endif /* RES_AIS_AIS_H */ diff --git a/res/ais/clm.c b/res/ais/clm.c index d290ee2cd3..640f212591 100644 --- a/res/ais/clm.c +++ b/res/ais/clm.c @@ -67,7 +67,24 @@ static void clm_node_get_cb(SaInvocationT invocation, static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer, SaUint32T num_members, SaAisErrorT error) { + unsigned int i; + unsigned int node_joined = 0; + ast_debug(1, "Cluster membership changed. Number of members: %u\n", num_members); + + for (i = 0; i < notif_buffer->numberOfItems; i++) { + SaClmClusterNotificationT *notif = notif_buffer->notification + i; + + if (notif->clusterChange == SA_CLM_NODE_JOINED) { + node_joined = 1; + break; + } + } + + if (node_joined) { + ast_debug(1, "A node has joined the cluster, dumping event cache.\n"); + ast_ais_cmd(AST_AIS_CMD_MEMBERSHIP_CHANGED); + } } static char *ais_clm_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) @@ -135,6 +152,8 @@ static struct ast_cli_entry ais_cli[] = { int ast_ais_clm_load_module(void) { + SaAisErrorT ais_res; + clm_init_res = saClmInitialize(&clm_handle, &clm_callbacks, &ais_version); if (clm_init_res != SA_AIS_OK) { ast_log(LOG_ERROR, "Could not initialize cluster membership service: %s\n", @@ -142,6 +161,11 @@ int ast_ais_clm_load_module(void) return -1; } + ais_res = saClmClusterTrack(clm_handle, SA_TRACK_CHANGES, NULL); + if (ais_res != SA_AIS_OK) { + ast_log(LOG_ERROR, "Error starting tracking of cluster membership changes.\n"); + } + ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli)); return 0; diff --git a/res/ais/evt.c b/res/ais/evt.c index 8d11c64731..872f1e58e6 100644 --- a/res/ais/evt.c +++ b/res/ais/evt.c @@ -232,8 +232,15 @@ static void ast_event_cb(const struct ast_event *ast_event, void *data) goto return_event_free; } - ais_res = saEvtEventPublish(event_handle, - ast_event, ast_event_get_size(ast_event), &event_id); + for (;;) { + ais_res = saEvtEventPublish(event_handle, + ast_event, ast_event_get_size(ast_event), &event_id); + if (ais_res != SA_AIS_ERR_TRY_AGAIN) { + break; + } + sched_yield(); + } + if (ais_res != SA_AIS_OK) { ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res)); goto return_event_free; @@ -305,6 +312,22 @@ static struct ast_cli_entry ais_cli[] = { AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"), }; +void ast_ais_evt_membership_changed(void) +{ + struct event_channel *ec; + + AST_RWLIST_RDLOCK(&event_channels); + AST_RWLIST_TRAVERSE(&event_channels, ec, entry) { + struct publish_event *pe; + + AST_LIST_TRAVERSE(&ec->publish_events, pe, entry) { + ast_debug(1, "Dumping cache for event channel '%s'\n", ec->name); + ast_event_dump_cache(pe->sub); + } + } + AST_RWLIST_UNLOCK(&event_channels); +} + static void add_publish_event(struct event_channel *event_channel, const char *event_type) { int i; diff --git a/res/res_ais.c b/res/res_ais.c index c64e8e6742..9d37815860 100644 --- a/res/res_ais.c +++ b/res/res_ais.c @@ -60,9 +60,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); static struct { pthread_t id; + int alert_pipe[2]; unsigned int stop:1; } dispatch_thread = { .id = AST_PTHREADT_NULL, + .alert_pipe = { -1, -1 }, }; SaVersionT ais_version = { 'B', 1, 1 }; @@ -116,7 +118,11 @@ static void *dispatch_thread_handler(void *data) { SaSelectionObjectT clm_fd, evt_fd; int res; - struct pollfd pfd[2] = { { .events = POLLIN, }, { .events = POLLIN, } }; + struct pollfd pfd[3] = { + { .events = POLLIN, }, + { .events = POLLIN, }, + { .events = POLLIN, }, + }; SaAisErrorT ais_res; ais_res = saClmSelectionObjectGet(clm_handle, &clm_fd); @@ -135,12 +141,14 @@ static void *dispatch_thread_handler(void *data) pfd[0].fd = clm_fd; pfd[1].fd = evt_fd; + pfd[2].fd = dispatch_thread.alert_pipe[0]; while (!dispatch_thread.stop) { pfd[0].revents = 0; pfd[1].revents = 0; + pfd[2].revents = 0; - res = ast_poll(pfd, 2, -1); + res = ast_poll(pfd, ARRAY_LEN(pfd), -1); if (res == -1 && errno != EINTR && errno != EAGAIN) { ast_log(LOG_ERROR, "Select error (%s) dispatch thread going away now, " "and the module will no longer operate.\n", strerror(errno)); @@ -153,15 +161,45 @@ static void *dispatch_thread_handler(void *data) if (pfd[1].revents & POLLIN) { saEvtDispatch(evt_handle, SA_DISPATCH_ALL); } + if (pfd[2].revents & POLLIN) { + enum ast_ais_cmd cmd; + ast_debug(1, "Got a command in the poll() loop\n"); + if (read(dispatch_thread.alert_pipe[0], &cmd, sizeof(cmd)) != -1) { + switch (cmd) { + case AST_AIS_CMD_MEMBERSHIP_CHANGED: + ast_ais_evt_membership_changed(); + break; + case AST_AIS_CMD_EXIT: + break; + } + } + } } return NULL; } +int ast_ais_cmd(enum ast_ais_cmd cmd) +{ + int res; + + res = write(dispatch_thread.alert_pipe[1], (char *) &cmd, sizeof(cmd)); + + ast_debug(1, "AIS cmd: %d, res: %d\n", cmd, res); + + return res; +} + static int load_module(void) { - if (ast_ais_clm_load_module()) + if (pipe(dispatch_thread.alert_pipe) == -1) { + ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n", + strerror(errno), errno); goto return_error; + } + + if (ast_ais_clm_load_module()) + goto clm_failed; if (ast_ais_evt_load_module()) goto evt_failed; @@ -178,6 +216,9 @@ dispatch_failed: ast_ais_evt_unload_module(); evt_failed: ast_ais_clm_unload_module(); +clm_failed: + close(dispatch_thread.alert_pipe[0]); + close(dispatch_thread.alert_pipe[1]); return_error: return AST_MODULE_LOAD_DECLINE; } @@ -189,10 +230,23 @@ static int unload_module(void) if (dispatch_thread.id != AST_PTHREADT_NULL) { dispatch_thread.stop = 1; - pthread_kill(dispatch_thread.id, SIGURG); /* poke! */ + if (ast_ais_cmd(AST_AIS_CMD_EXIT) == -1) { + ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n", + strerror(errno), errno); + } pthread_join(dispatch_thread.id, NULL); } + if (dispatch_thread.alert_pipe[0] != -1) { + close(dispatch_thread.alert_pipe[0]); + dispatch_thread.alert_pipe[0] = -1; + } + + if (dispatch_thread.alert_pipe[1] != -1) { + close(dispatch_thread.alert_pipe[1]); + dispatch_thread.alert_pipe[1] = -1; + } + return 0; }