]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Dump cache of published events when a node joins the cluster.
authorRussell Bryant <russell@russellbryant.com>
Tue, 13 Mar 2012 23:46:23 +0000 (23:46 +0000)
committerRussell Bryant <russell@russellbryant.com>
Tue, 13 Mar 2012 23:46:23 +0000 (23:46 +0000)
Also use a more reliable method for stopping the poll() thread.
........

Merged revisions 359053 from http://svn.asterisk.org/svn/asterisk/branches/1.8

git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/10@359054 65c4cc65-6c06-0410-ace0-fbb531ad65f3

res/ais/ais.h
res/ais/clm.c
res/ais/evt.c
res/res_ais.c

index 6aaeadf15f92341711180d22f2b698a1ec6ec408..648348935e8c0fb7e4b9b71d00eb9d54f349db9f 100644 (file)
@@ -45,4 +45,13 @@ int ast_ais_evt_unload_module(void);
 
 const char *ais_err2str(SaAisErrorT error);
 
+void ast_ais_evt_membership_changed(void);
+
+enum ast_ais_cmd {
+       AST_AIS_CMD_EXIT,
+       AST_AIS_CMD_MEMBERSHIP_CHANGED,
+};
+
+int ast_ais_cmd(enum ast_ais_cmd cmd);
+
 #endif /* RES_AIS_AIS_H */
index d290ee2cd3738d08152dbd12c35c9a506c6eaa3a..640f212591a4886796f010203b0a1f5b65a9757d 100644 (file)
@@ -67,7 +67,24 @@ static void clm_node_get_cb(SaInvocationT invocation,
 static void clm_track_cb(const SaClmClusterNotificationBufferT *notif_buffer,
        SaUint32T num_members, SaAisErrorT error)
 {
+       unsigned int i;
+       unsigned int node_joined = 0;
 
+       ast_debug(1, "Cluster membership changed. Number of members: %u\n", num_members);
+
+       for (i = 0; i < notif_buffer->numberOfItems; i++) {
+               SaClmClusterNotificationT *notif = notif_buffer->notification + i;
+
+               if (notif->clusterChange == SA_CLM_NODE_JOINED) {
+                       node_joined = 1;
+                       break;
+               }
+       }
+
+       if (node_joined) {
+               ast_debug(1, "A node has joined the cluster, dumping event cache.\n");
+               ast_ais_cmd(AST_AIS_CMD_MEMBERSHIP_CHANGED);
+       }
 }
 
 static char *ais_clm_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
@@ -135,6 +152,8 @@ static struct ast_cli_entry ais_cli[] = {
 
 int ast_ais_clm_load_module(void)
 {
+       SaAisErrorT ais_res;
+
        clm_init_res = saClmInitialize(&clm_handle, &clm_callbacks, &ais_version);
        if (clm_init_res != SA_AIS_OK) {
                ast_log(LOG_ERROR, "Could not initialize cluster membership service: %s\n",
@@ -142,6 +161,11 @@ int ast_ais_clm_load_module(void)
                return -1;
        }
 
+       ais_res = saClmClusterTrack(clm_handle, SA_TRACK_CHANGES, NULL);
+       if (ais_res != SA_AIS_OK) {
+               ast_log(LOG_ERROR, "Error starting tracking of cluster membership changes.\n");
+       }
+
        ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
 
        return 0;
index 8d11c647312c81c4cc2dc835cd2eea05a0628e24..872f1e58e67373ded45542ea76412fed0af55774 100644 (file)
@@ -232,8 +232,15 @@ static void ast_event_cb(const struct ast_event *ast_event, void *data)
                goto return_event_free;
        }
 
-       ais_res = saEvtEventPublish(event_handle,
-               ast_event, ast_event_get_size(ast_event), &event_id);
+       for (;;) {
+               ais_res = saEvtEventPublish(event_handle,
+                       ast_event, ast_event_get_size(ast_event), &event_id);
+               if (ais_res != SA_AIS_ERR_TRY_AGAIN) {
+                       break;
+               }
+               sched_yield();
+       }
+
        if (ais_res != SA_AIS_OK) {
                ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
                goto return_event_free;
@@ -305,6 +312,22 @@ static struct ast_cli_entry ais_cli[] = {
        AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
 };
 
+void ast_ais_evt_membership_changed(void)
+{
+       struct event_channel *ec;
+
+       AST_RWLIST_RDLOCK(&event_channels);
+       AST_RWLIST_TRAVERSE(&event_channels, ec, entry) {
+               struct publish_event *pe;
+
+               AST_LIST_TRAVERSE(&ec->publish_events, pe, entry) {
+                       ast_debug(1, "Dumping cache for event channel '%s'\n", ec->name);
+                       ast_event_dump_cache(pe->sub);
+               }
+       }
+       AST_RWLIST_UNLOCK(&event_channels);
+}
+
 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
 {
        int i;
index c64e8e674269f7946f70074faffb07f98249dd70..9d378158608780966f7b727be12f3f1acefb3e3c 100644 (file)
@@ -60,9 +60,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
 
 static struct {
        pthread_t id;
+       int alert_pipe[2];
        unsigned int stop:1;
 } dispatch_thread = {
        .id = AST_PTHREADT_NULL,
+       .alert_pipe = { -1, -1 },
 };
 
 SaVersionT ais_version = { 'B', 1, 1 };
@@ -116,7 +118,11 @@ static void *dispatch_thread_handler(void *data)
 {
        SaSelectionObjectT clm_fd, evt_fd;
        int res;
-       struct pollfd pfd[2] = { { .events = POLLIN, }, { .events = POLLIN, } };
+       struct pollfd pfd[3] = {
+               { .events = POLLIN, },
+               { .events = POLLIN, },
+               { .events = POLLIN, },
+       };
        SaAisErrorT ais_res;
 
        ais_res = saClmSelectionObjectGet(clm_handle, &clm_fd);
@@ -135,12 +141,14 @@ static void *dispatch_thread_handler(void *data)
 
        pfd[0].fd = clm_fd;
        pfd[1].fd = evt_fd;
+       pfd[2].fd = dispatch_thread.alert_pipe[0];
 
        while (!dispatch_thread.stop) {
                pfd[0].revents = 0;
                pfd[1].revents = 0;
+               pfd[2].revents = 0;
 
-               res = ast_poll(pfd, 2, -1);
+               res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
                if (res == -1 && errno != EINTR && errno != EAGAIN) {
                        ast_log(LOG_ERROR, "Select error (%s) dispatch thread going away now, "
                                "and the module will no longer operate.\n", strerror(errno));
@@ -153,15 +161,45 @@ static void *dispatch_thread_handler(void *data)
                if (pfd[1].revents & POLLIN) {
                        saEvtDispatch(evt_handle,   SA_DISPATCH_ALL);
                }
+               if (pfd[2].revents & POLLIN) {
+                       enum ast_ais_cmd cmd;
+                       ast_debug(1, "Got a command in the poll() loop\n");
+                       if (read(dispatch_thread.alert_pipe[0], &cmd, sizeof(cmd)) != -1) {
+                               switch (cmd) {
+                               case AST_AIS_CMD_MEMBERSHIP_CHANGED:
+                                       ast_ais_evt_membership_changed();
+                                       break;
+                               case AST_AIS_CMD_EXIT:
+                                       break;
+                               }
+                       }
+               }
        }
 
        return NULL;
 }
 
+int ast_ais_cmd(enum ast_ais_cmd cmd)
+{
+       int res;
+
+       res = write(dispatch_thread.alert_pipe[1], (char *) &cmd, sizeof(cmd));
+
+       ast_debug(1, "AIS cmd: %d, res: %d\n", cmd, res);
+
+       return res;
+}
+
 static int load_module(void)
 {
-       if (ast_ais_clm_load_module())
+       if (pipe(dispatch_thread.alert_pipe) == -1) {
+               ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
+                               strerror(errno), errno);
                goto return_error;
+       }
+
+       if (ast_ais_clm_load_module())
+               goto clm_failed;
 
        if (ast_ais_evt_load_module())
                goto evt_failed;
@@ -178,6 +216,9 @@ dispatch_failed:
        ast_ais_evt_unload_module();
 evt_failed:
        ast_ais_clm_unload_module();
+clm_failed:
+       close(dispatch_thread.alert_pipe[0]);
+       close(dispatch_thread.alert_pipe[1]);
 return_error:
        return AST_MODULE_LOAD_DECLINE;
 }
@@ -189,10 +230,23 @@ static int unload_module(void)
 
        if (dispatch_thread.id != AST_PTHREADT_NULL) {
                dispatch_thread.stop = 1;
-               pthread_kill(dispatch_thread.id, SIGURG); /* poke! */
+               if (ast_ais_cmd(AST_AIS_CMD_EXIT) == -1) {
+                       ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
+                                       strerror(errno), errno);
+               }
                pthread_join(dispatch_thread.id, NULL);
        }
 
+       if (dispatch_thread.alert_pipe[0] != -1) {
+               close(dispatch_thread.alert_pipe[0]);
+               dispatch_thread.alert_pipe[0] = -1;
+       }
+
+       if (dispatch_thread.alert_pipe[1] != -1) {
+               close(dispatch_thread.alert_pipe[1]);
+               dispatch_thread.alert_pipe[1] = -1;
+       }
+
        return 0;
 }