]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Unify manager behind a single event queue
authorMark Spencer <markster@digium.com>
Sun, 2 Apr 2006 19:59:55 +0000 (19:59 +0000)
committerMark Spencer <markster@digium.com>
Sun, 2 Apr 2006 19:59:55 +0000 (19:59 +0000)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@16957 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/manager.h
manager.c

index fb4514d62045d0b40698ed86bf84adb6d7fe8df2..4541e7f173777e0466dd4a4aeee90a3ee1fd0c69 100644 (file)
 #define AST_MAX_MANHEADERS 80
 #define AST_MAX_MANHEADER_LEN 256
 
-struct eventqent {
-       struct eventqent *next;
-       char eventdata[1];
-};
-
 struct mansession;
 
 struct message {
index 7615a4bc8a6d4b96644963500a110a941e35df33..915d83b9e23baebe534e1f3feccbfd2a0ce6c41e 100644 (file)
--- a/manager.c
+++ b/manager.c
@@ -83,6 +83,14 @@ struct fast_originate_helper {
        struct ast_variable *vars;
 };
 
+struct eventqent {
+       int usecount;
+       int category;
+       ast_mutex_t lock;
+       struct eventqent *next;
+       char eventdata[1];
+};
+
 static int enabled = 0;
 static int portno = DEFAULT_MANAGER_PORT;
 static int asock = -1;
@@ -93,6 +101,8 @@ static int httptimeout = 60;
 static pthread_t t;
 AST_MUTEX_DEFINE_STATIC(sessionlock);
 static int block_sockets = 0;
+static int num_sessions = 0;
+struct eventqent *master_eventq = NULL;
 
 static struct permalias {
        int num;
@@ -472,6 +482,23 @@ static int handle_showmanconn(int fd, int argc, char *argv[])
        return RESULT_SUCCESS;
 }
 
+/*! \brief  handle_showmanconn: CLI command show manager connected */
+/* Should change to "manager show connected" */
+static int handle_showmaneventq(int fd, int argc, char *argv[])
+{
+       struct eventqent *s;
+       ast_mutex_lock(&sessionlock);
+       s = master_eventq;
+       while (s) {
+               ast_cli(fd, "Usecount: %d\n",s->usecount);
+               ast_cli(fd, "Category: %d\n", s->category);
+               ast_cli(fd, "Event:\n%s", s->eventdata);
+               s = s->next;
+       }
+       ast_mutex_unlock(&sessionlock);
+       return RESULT_SUCCESS;
+}
+
 static char showmancmd_help[] = 
 "Usage: show manager command <actionname>\n"
 "      Shows the detailed description for a specific Asterisk manager interface command.\n";
@@ -485,6 +512,11 @@ static char showmanconn_help[] =
 "      Prints a listing of the users that are currently connected to the\n"
 "Asterisk manager interface.\n";
 
+static char showmaneventq_help[] = 
+"Usage: show manager eventq\n"
+"      Prints a listing of all events pending in the Asterisk manger\n"
+"event queue.\n";
+
 static struct ast_cli_entry show_mancmd_cli =
        { { "show", "manager", "command", NULL },
        handle_showmancmd, "Show a manager interface command", showmancmd_help, complete_show_mancmd };
@@ -497,6 +529,24 @@ static struct ast_cli_entry show_manconn_cli =
        { { "show", "manager", "connected", NULL },
        handle_showmanconn, "Show connected manager interface users", showmanconn_help };
 
+static struct ast_cli_entry show_maneventq_cli =
+       { { "show", "manager", "eventq", NULL },
+       handle_showmaneventq, "Show manager interface queued events", showmaneventq_help };
+
+static void unuse_eventqent(struct eventqent *e)
+{
+       /* XXX Need to atomically decrement the users.  Change this to atomic_dec
+              one day when we have such a beast XXX */
+       int val;
+       ast_mutex_lock(&e->lock);
+       e->usecount--;
+       val = e->usecount && e->next;
+       ast_mutex_unlock(&e->lock);
+       /* Wake up sleeping beauty */
+       if (val)
+               pthread_kill(t, SIGURG);
+}
+
 static void free_session(struct mansession *s)
 {
        struct eventqent *eqe;
@@ -508,7 +558,7 @@ static void free_session(struct mansession *s)
        while(s->eventq) {
                eqe = s->eventq;
                s->eventq = s->eventq->next;
-               free(eqe);
+               unuse_eventqent(eqe);
        }
        free(s);
 }
@@ -530,6 +580,7 @@ static void destroy_session(struct mansession *s)
                else
                        sessions = cur->next;
                free_session(s);
+               num_sessions--;
        } else
                ast_log(LOG_WARNING, "Trying to delete nonexistent session %p?\n", s);
        ast_mutex_unlock(&sessionlock);
@@ -876,7 +927,7 @@ static int action_waitevent(struct mansession *s, struct message *m)
                ast_log(LOG_DEBUG, "Starting waiting for an event!\n");
        for (x=0;((x<timeout) || (timeout < 0)); x++) {
                ast_mutex_lock(&s->__lock);
-               if (s->eventq)
+               if (s->eventq && s->eventq->next)
                        needexit = 1;
                if (s->waiting_thread != pthread_self())
                        needexit = 1;
@@ -898,11 +949,14 @@ static int action_waitevent(struct mansession *s, struct message *m)
        if (s->waiting_thread == pthread_self()) {
                astman_send_response(s, m, "Success", "Waiting for Event...");
                /* Only show events if we're the most recent waiter */
-               while(s->eventq) {
-                       astman_append(s, "%s", s->eventq->eventdata);
-                       eqe = s->eventq;
-                       s->eventq = s->eventq->next;
-                       free(eqe);
+               while(s->eventq->next) {
+                       eqe = s->eventq->next;
+                       if (((s->readperm & eqe->category) == eqe->category) &&
+                           ((s->send_events & eqe->category) == eqe->category)) {
+                               astman_append(s, "%s", eqe->eventdata);
+                       }
+                       unuse_eventqent(s->eventq);
+                       s->eventq = eqe;
                }
                astman_append(s,
                        "Event: WaitEventComplete\r\n"
@@ -1566,6 +1620,30 @@ static int action_timeout(struct mansession *s, struct message *m)
        return 0;
 }
 
+static int process_events(struct mansession *s)
+{
+       struct eventqent *eqe;
+       int ret = 0;
+       ast_mutex_lock(&s->__lock);
+       if (s->fd > -1) {
+               s->busy--;
+               if (!s->eventq)
+                       s->eventq = master_eventq;
+               while(s->eventq->next) {
+                       eqe = s->eventq->next;
+                       if ((s->authenticated && (s->readperm & eqe->category) == eqe->category) &&
+                           ((s->send_events & eqe->category) == eqe->category)) {
+                               if (!ret && ast_carefulwrite(s->fd, eqe->eventdata, strlen(eqe->eventdata), s->writetimeout) < 0)
+                                       ret = -1;
+                       }
+                       unuse_eventqent(s->eventq);
+                       s->eventq = eqe;
+               }
+       }
+       ast_mutex_unlock(&s->__lock);
+       return ret;
+}
+
 static int process_message(struct mansession *s, struct message *m)
 {
        char action[80] = "";
@@ -1573,6 +1651,7 @@ static int process_message(struct mansession *s, struct message *m)
        char *id = astman_get_header(m,"ActionID");
        char idText[256] = "";
        char iabuf[INET_ADDRSTRLEN];
+       int ret = 0;
 
        ast_copy_string(action, astman_get_header(m, "Action"), sizeof(action));
        ast_log( LOG_DEBUG, "Manager received command '%s'\n", action );
@@ -1581,9 +1660,9 @@ static int process_message(struct mansession *s, struct message *m)
                astman_send_error(s, m, "Missing action in request");
                return 0;
        }
-        if (!ast_strlen_zero(id)) {
-                snprintf(idText, sizeof(idText), "ActionID: %s\r\n",id);
-        }
+       if (!ast_strlen_zero(id)) {
+               snprintf(idText, sizeof(idText), "ActionID: %s\r\n",id);
+       }
        if (!s->authenticated) {
                if (!strcasecmp(action, "Challenge")) {
                        char *authtype;
@@ -1623,8 +1702,6 @@ static int process_message(struct mansession *s, struct message *m)
                } else
                        astman_send_error(s, m, "Authentication Required");
        } else {
-               int ret=0;
-               struct eventqent *eqe;
                ast_mutex_lock(&s->__lock);
                s->busy++;
                ast_mutex_unlock(&s->__lock);
@@ -1642,23 +1719,10 @@ static int process_message(struct mansession *s, struct message *m)
                }
                if (!tmp)
                        astman_send_error(s, m, "Invalid/unknown command");
-               ast_mutex_lock(&s->__lock);
-               if (s->fd > -1) {
-                       s->busy--;
-                       while(s->eventq) {
-                               if (ast_carefulwrite(s->fd, s->eventq->eventdata, strlen(s->eventq->eventdata), s->writetimeout) < 0) {
-                                       ret = -1;
-                                       break;
-                               }
-                               eqe = s->eventq;
-                               s->eventq = s->eventq->next;
-                               free(eqe);
-                       }
-               }
-               ast_mutex_unlock(&s->__lock);
-               return ret;
        }
-       return 0;
+       if (ret)
+               return ret;
+       return process_events(s);
 }
 
 static int get_input(struct mansession *s, char *output)
@@ -1687,12 +1751,20 @@ static int get_input(struct mansession *s, char *output)
        fds[0].fd = s->fd;
        fds[0].events = POLLIN;
        do {
+               ast_mutex_lock(&s->__lock);
+               s->waiting_thread = pthread_self();
+               ast_mutex_unlock(&s->__lock);
+
                res = poll(fds, 1, -1);
+
+               ast_mutex_lock(&s->__lock);
+               s->waiting_thread = AST_PTHREADT_NULL;
+               ast_mutex_unlock(&s->__lock);
                if (res < 0) {
                        if (errno == EINTR) {
                                if (s->dead)
                                        return -1;
-                               continue;
+                               return 0;
                        }
                        ast_log(LOG_WARNING, "Select returned error: %s\n", strerror(errno));
                        return -1;
@@ -1734,8 +1806,12 @@ static void *session_do(void *data)
                                memset(&m, 0, sizeof(m));
                        } else if (m.hdrcount < AST_MAX_MANHEADERS - 1)
                                m.hdrcount++;
-               } else if (res < 0)
+               } else if (res < 0) {
                        break;
+               } else if (s->eventq->next) {
+                       if (process_events(s))
+                               break;
+               }
        }
        if (s->authenticated) {
                if (option_verbose > 1) {
@@ -1759,6 +1835,7 @@ static void *accept_thread(void *ignore)
        int as;
        struct sockaddr_in sin;
        socklen_t sinlen;
+       struct eventqent *eqe;
        struct mansession *s, *prev=NULL, *next;
        struct protoent *p;
        int arg = 1;
@@ -1779,6 +1856,7 @@ static void *accept_thread(void *ignore)
                while(s) {
                        next = s->next;
                        if (s->sessiontimeout && (now > s->sessiontimeout) && !s->inuse) {
+                               num_sessions--;
                                if (prev)
                                        prev->next = next;
                                else
@@ -1792,6 +1870,14 @@ static void *accept_thread(void *ignore)
                                prev = s;
                        s = next;
                }
+               /* Purge master event queue of old, unused events, but make sure we
+                  always keep at least one in the queue */
+               eqe = master_eventq;
+               while (master_eventq->next && !master_eventq->usecount) {
+                       eqe = master_eventq;
+                       master_eventq = master_eventq->next;
+                       free(eqe);
+               }
                ast_mutex_unlock(&sessionlock);
 
                sinlen = sizeof(sin);
@@ -1831,8 +1917,17 @@ static void *accept_thread(void *ignore)
                s->fd = as;
                s->send_events = -1;
                ast_mutex_lock(&sessionlock);
+               num_sessions++;
                s->next = sessions;
                sessions = s;
+               /* Find the last place in the master event queue and hook ourselves
+                  in there */
+               s->eventq = master_eventq;
+               while(s->eventq->next)
+                       s->eventq = s->eventq->next;
+               ast_mutex_lock(&s->eventq->lock);
+               s->eventq->usecount++;
+               ast_mutex_unlock(&s->eventq->lock);
                ast_mutex_unlock(&sessionlock);
                if (ast_pthread_create(&s->t, &attr, session_do, s))
                        destroy_session(s);
@@ -1841,21 +1936,24 @@ static void *accept_thread(void *ignore)
        return NULL;
 }
 
-static int append_event(struct mansession *s, const char *str)
+static int append_event(const char *str, int category)
 {
        struct eventqent *tmp, *prev=NULL;
        tmp = malloc(sizeof(struct eventqent) + strlen(str));
        if (tmp) {
+               ast_mutex_init(&tmp->lock);
                tmp->next = NULL;
+               tmp->category = category;
                strcpy(tmp->eventdata, str);
-               if (s->eventq) {
-                       prev = s->eventq;
+               if (master_eventq) {
+                       prev = master_eventq;
                        while(prev->next) 
                                prev = prev->next;
                        prev->next = tmp;
                } else {
-                       s->eventq = tmp;
+                       master_eventq = tmp;
                }
+               tmp->usecount = num_sessions;
                return 0;
        }
        return -1;
@@ -1870,45 +1968,33 @@ int manager_event(int category, const char *event, const char *fmt, ...)
        char *tmp_next = tmp;
        size_t tmp_left = sizeof(tmp) - 2;
        va_list ap;
+       struct timeval now;
 
+       /* Abort if there aren't any manager sessions */
+       if (!num_sessions)
+               return 0;
+
+       ast_build_string(&tmp_next, &tmp_left, "Event: %s\r\nPrivilege: %s\r\n",
+                        event, authority_to_str(category, auth, sizeof(auth)));
+       if (timestampevents) {
+               now = ast_tvnow();
+               ast_build_string(&tmp_next, &tmp_left, "Timestamp: %ld.%06lu\r\n",
+                                now.tv_sec, (unsigned long) now.tv_usec);
+       }
+       va_start(ap, fmt);
+       ast_build_string_va(&tmp_next, &tmp_left, fmt, ap);
+       va_end(ap);
+       *tmp_next++ = '\r';
+       *tmp_next++ = '\n';
+       *tmp_next = '\0';
+       
        ast_mutex_lock(&sessionlock);
+       /* Append even to master list and wake up any sleeping sessions */
+       append_event(tmp, category);
        for (s = sessions; s; s = s->next) {
-               if ((s->readperm & category) != category)
-                       continue;
-
-               if ((s->send_events & category) != category)
-                       continue;
-
-               if (ast_strlen_zero(tmp)) {
-                       struct timeval now;
-
-                       ast_build_string(&tmp_next, &tmp_left, "Event: %s\r\nPrivilege: %s\r\n",
-                                        event, authority_to_str(category, auth, sizeof(auth)));
-                       if (timestampevents) {
-                               now = ast_tvnow();
-                               ast_build_string(&tmp_next, &tmp_left, "Timestamp: %ld.%06lu\r\n",
-                                                now.tv_sec, (unsigned long) now.tv_usec);
-                       }
-                       va_start(ap, fmt);
-                       ast_build_string_va(&tmp_next, &tmp_left, fmt, ap);
-                       va_end(ap);
-                       *tmp_next++ = '\r';
-                       *tmp_next++ = '\n';
-                       *tmp_next = '\0';
-               }
-
                ast_mutex_lock(&s->__lock);
-               if (s->busy) {
-                       append_event(s, tmp);
-                       if (s->waiting_thread != AST_PTHREADT_NULL)
-                               pthread_kill(s->waiting_thread, SIGURG);
-               } else if (!s->dead && !s->sessiontimeout) {
-                       if (ast_carefulwrite(s->fd, tmp, tmp_next - tmp, s->writetimeout) < 0) {
-                               ast_log(LOG_WARNING, "Disconnecting slow (or gone) manager session!\n");
-                               s->dead = 1;
-                               pthread_kill(s->t, SIGURG);
-                       }
-               }
+               if (s->waiting_thread != AST_PTHREADT_NULL)
+                       pthread_kill(s->waiting_thread, SIGURG);
                ast_mutex_unlock(&s->__lock);
        }
        ast_mutex_unlock(&sessionlock);
@@ -2084,12 +2170,23 @@ static char *generic_http_callback(int format, struct sockaddr_in *requestor, co
                s->managerid = rand() | (unsigned long)s;
                s->next = sessions;
                sessions = s;
+               num_sessions++;
+               /* Hook into the last spot in the event queue */
+               s->eventq = master_eventq;
+               while(s->eventq->next)
+                       s->eventq = s->eventq->next;
+               ast_mutex_lock(&s->eventq->lock);
+               s->eventq->usecount++;
+               ast_mutex_unlock(&s->eventq->lock);
                ast_mutex_unlock(&sessionlock);
        }
 
-       /* Reset HTTP timeout */
+       /* Reset HTTP timeout.  If we're not yet authenticated, keep it extremely short */
        time(&s->sessiontimeout);
-       s->sessiontimeout += httptimeout;
+       if (!s->authenticated && (httptimeout > 5))
+               s->sessiontimeout += 5;
+       else
+               s->sessiontimeout += httptimeout;
        ast_mutex_unlock(&s->__lock);
        
        memset(&m, 0, sizeof(m));
@@ -2248,8 +2345,11 @@ int init_manager(void)
                ast_cli_register(&show_mancmd_cli);
                ast_cli_register(&show_mancmds_cli);
                ast_cli_register(&show_manconn_cli);
+               ast_cli_register(&show_maneventq_cli);
                ast_extension_state_add(NULL, NULL, manager_state_cb, NULL);
                registered = 1;
+               /* Append placeholder event so master_eventq never runs dry */
+               append_event("Event: Placeholder\r\n\r\n", 0);
        }
        portno = DEFAULT_MANAGER_PORT;
        displayconnects = 1;