]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
taskprocessors: Implement high/low water mark alerts. 52/2952/3
authorRichard Mudgett <rmudgett@digium.com>
Thu, 2 Jun 2016 21:08:19 +0000 (16:08 -0500)
committerRichard Mudgett <rmudgett@digium.com>
Tue, 7 Jun 2016 23:57:36 +0000 (18:57 -0500)
When taskprocessors get backed up, there is a good chance that we are
being overloaded and need to defer adding new work to the system.

* Implemented a high/low water alert mechanism for modules to check if the
system is being overloaded and take appropriate action.  When a
taskprocessor is created it has default congestion levels set.  A
taskprocessor can later have those congestion levels altered for specific
needs if stress testing shows that the taskprocessor is a symptom of
overloading or needs to handle bursty activity without triggering an
overload alert.

* Add CLI "core show taskprocessor" low/high water columns.

* Fixed __allocate_taskprocessor() to not use RAII_VAR().  RAII_VAR() was
never a good thing to use when creating a taskprocessor because of the
nature of how its references needed to be cleaned up on a partial
creation.

* Made res_pjsip's distributor check if the taskprocessor overload alert
is active before placing a message representing brand new work onto a
distributor serializer.

ASTERISK-26088
Reported by:  Richard Mudgett

Change-Id: I182f1be603529cd665958661c4c05ff9901825fa

include/asterisk/taskprocessor.h
main/taskprocessor.c
res/res_pjsip/pjsip_distributor.c

index af3ce747f6382d7a6d5f8c0ec9373ec3fa1c30b0..e51122269d4248df4f720246e391a82489c9db1a 100644 (file)
@@ -59,6 +59,7 @@ struct ast_taskprocessor;
 /*! \brief Suggested maximum taskprocessor name length (less null terminator). */
 #define AST_TASKPROCESSOR_MAX_NAME     45
 
+/*! Default taskprocessor high water level alert trigger */
 #define AST_TASKPROCESSOR_HIGH_WATER_LEVEL 500
 
 /*!
@@ -297,4 +298,26 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
  */
 long ast_taskprocessor_size(struct ast_taskprocessor *tps);
 
+/*!
+ * \brief Get the current taskprocessor high water alert count.
+ * \since 13.10.0
+ *
+ * \retval 0 if no taskprocessors are in high water alert.
+ * \retval non-zero if some task processors are in high water alert.
+ */
+unsigned int ast_taskprocessor_alert_get(void);
+
+/*!
+ * \brief Set the high and low alert water marks of the given taskprocessor queue.
+ * \since 13.10.0
+ *
+ * \param tps Taskprocessor to update queue water marks.
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water);
+
 #endif /* __AST_TASKPROCESSOR_H__ */
index 1ba0c8a2fb693a89d149f2dfe1d6beaad17ccb33..7ce3e4f16f3c69e61c17a7f3728debd4529b8835 100644 (file)
@@ -76,6 +76,10 @@ struct ast_taskprocessor {
        void *local_data;
        /*! \brief Taskprocessor current queue size */
        long tps_queue_size;
+       /*! \brief Taskprocessor low water clear alert level */
+       long tps_queue_low;
+       /*! \brief Taskprocessor high water alert trigger level */
+       long tps_queue_high;
        /*! \brief Taskprocessor queue */
        AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
        struct ast_taskprocessor_listener *listener;
@@ -85,6 +89,8 @@ struct ast_taskprocessor {
        unsigned int executing:1;
        /*! Indicates that a high water warning has been issued on this task processor */
        unsigned int high_water_warned:1;
+       /*! Indicates that a high water alert is active on this taskprocessor */
+       unsigned int high_water_alert:1;
 };
 
 /*!
@@ -121,15 +127,9 @@ static int tps_hash_cb(const void *obj, const int flags);
 /*! \brief The astobj2 compare callback for taskprocessors */
 static int tps_cmp_cb(void *obj, void *arg, int flags);
 
-/*! \brief Destroy the taskprocessor when its refcount reaches zero */
-static void tps_taskprocessor_destroy(void *tps);
-
 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
 static int tps_ping_handler(void *datap);
 
-/*! \brief Remove the front task off the taskprocessor queue */
-static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
-
 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 
@@ -472,8 +472,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
        struct ao2_container *sorted_tps;
        struct ast_taskprocessor *tps;
        struct ao2_iterator iter;
-#define FMT_HEADERS            "%-45s %10s %10s %10s\n"
-#define FMT_FIELDS             "%-45s %10lu %10lu %10lu\n"
+#define FMT_HEADERS            "%-45s %10s %10s %10s %10s %10s\n"
+#define FMT_FIELDS             "%-45s %10lu %10lu %10lu %10lu %10lu\n"
 
        switch (cmd) {
        case CLI_INIT:
@@ -498,7 +498,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
                return CLI_FAILURE;
        }
 
-       ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth");
+       ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
        tcount = 0;
        iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
        while ((tps = ao2_iterator_next(&iter))) {
@@ -511,7 +511,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
                        maxqsize = 0;
                        processed = 0;
                }
-               ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize);
+               ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
+                       tps->tps_queue_low, tps->tps_queue_high);
                ast_taskprocessor_unreference(tps);
                ++tcount;
        }
@@ -539,28 +540,106 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
        return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+/*! Count of the number of taskprocessors in high water alert. */
+static unsigned int tps_alert_count;
+
+/*! Access protection for tps_alert_count */
+AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
+
+/*!
+ * \internal
+ * \brief Add a delta to tps_alert_count with protection.
+ * \since 13.10.0
+ *
+ * \param tps Taskprocessor updating queue water mark alert trigger.
+ * \param delta The amount to add to tps_alert_count.
+ *
+ * \return Nothing
+ */
+static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
+{
+       unsigned int old;
+
+       ast_rwlock_wrlock(&tps_alert_lock);
+       old = tps_alert_count;
+       tps_alert_count += delta;
+       if (DEBUG_ATLEAST(3)
+               /* and tps_alert_count becomes zero or non-zero */
+               && !old != !tps_alert_count) {
+               ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
+                       tps->name, tps_alert_count ? "triggered" : "cleared");
+       }
+       ast_rwlock_unlock(&tps_alert_lock);
+}
+
+unsigned int ast_taskprocessor_alert_get(void)
+{
+       unsigned int count;
+
+       ast_rwlock_rdlock(&tps_alert_lock);
+       count = tps_alert_count;
+       ast_rwlock_unlock(&tps_alert_lock);
+
+       return count;
+}
+
+int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
+{
+       if (!tps || high_water < 0 || high_water < low_water) {
+               return -1;
+       }
+
+       if (low_water < 0) {
+               /* Set low water level to 90% of high water level */
+               low_water = (high_water * 9) / 10;
+       }
+
+       ao2_lock(tps);
+
+       tps->tps_queue_low = low_water;
+       tps->tps_queue_high = high_water;
+
+       if (tps->high_water_alert) {
+               if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
+                       /* Update water mark alert immediately */
+                       tps->high_water_alert = 0;
+                       tps_alert_add(tps, -1);
+               }
+       } else {
+               if (high_water <= tps->tps_queue_size) {
+                       /* Update water mark alert immediately */
+                       tps->high_water_alert = 1;
+                       tps_alert_add(tps, +1);
+               }
+       }
+
+       ao2_unlock(tps);
+
+       return 0;
+}
+
 /* destroy the taskprocessor */
-static void tps_taskprocessor_destroy(void *tps)
+static void tps_taskprocessor_dtor(void *tps)
 {
        struct ast_taskprocessor *t = tps;
        struct tps_task *task;
 
-       if (!tps) {
-               ast_log(LOG_ERROR, "missing taskprocessor\n");
-               return;
+       while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
+               tps_task_free(task);
        }
-       ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
-       /* free it */
+       t->tps_queue_size = 0;
+
+       if (t->high_water_alert) {
+               t->high_water_alert = 0;
+               tps_alert_add(t, -1);
+       }
+
        ast_free(t->stats);
        t->stats = NULL;
        ast_free((char *) t->name);
-       if (t->listener) {
-               ao2_ref(t->listener, -1);
-               t->listener = NULL;
-       }
-       while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
-               tps_task_free(task);
-       }
+       t->name = NULL;
+       ao2_cleanup(t->listener);
+       t->listener = NULL;
 }
 
 /* pop the front task and return it */
@@ -569,7 +648,11 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
        struct tps_task *task;
 
        if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
-               tps->tps_queue_size--;
+               --tps->tps_queue_size;
+               if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
+                       tps->high_water_alert = 0;
+                       tps_alert_add(tps, -1);
+               }
        }
        return task;
 }
@@ -648,19 +731,22 @@ static void *default_listener_pvt_alloc(void)
 
 static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
 {
-       RAII_VAR(struct ast_taskprocessor *, p,
-                       ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
+       struct ast_taskprocessor *p;
 
+       p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor);
        if (!p) {
                ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
                return NULL;
        }
 
-       if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
-               ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
-               return NULL;
-       }
-       if (!(p->name = ast_strdup(name))) {
+       /* Set default congestion water level alert triggers. */
+       p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
+       p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+
+       p->stats = ast_calloc(1, sizeof(*p->stats));
+       p->name = ast_strdup(name);
+       if (!p->stats || !p->name) {
+               ao2_ref(p, -1);
                return NULL;
        }
 
@@ -675,22 +761,18 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
        if (!(ao2_link(tps_singletons, p))) {
                ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
                listener->tps = NULL;
-               ao2_ref(p, -1);
+               ao2_ref(p, -2);
                return NULL;
        }
 
        if (p->listener->callbacks->start(p->listener)) {
-               ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
+               ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
+                       p->name);
                ast_taskprocessor_unreference(p);
                return NULL;
        }
 
-       /* RAII_VAR will decrement the refcount at the end of the function.
-        * Since we want to pass back a reference to p, we bump the refcount
-        */
-       ao2_ref(p, +1);
        return p;
-
 }
 
 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
@@ -799,10 +881,16 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
        AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
        previous_size = tps->tps_queue_size++;
 
-       if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
-               ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
-                       tps->name, previous_size);
-               tps->high_water_warned = 1;
+       if (previous_size >= tps->tps_queue_high) {
+               if (!tps->high_water_warned) {
+                       tps->high_water_warned = 1;
+                       ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
+                               tps->name, previous_size);
+               }
+               if (!tps->high_water_alert) {
+                       tps->high_water_alert = 1;
+                       tps_alert_add(tps, +1);
+               }
        }
 
        /* The currently executing task counts as still in queue */
index 528ccb627d51f07288cdae751d0209ce736083b5..e8ed89361ea01f3038042a0b806160fff22283b2 100644 (file)
@@ -369,8 +369,6 @@ static pjsip_module endpoint_mod = {
        .on_rx_request = endpoint_lookup,
 };
 
-#define SIP_MAX_QUEUE (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 3)
-
 static pj_bool_t distributor(pjsip_rx_data *rdata)
 {
        pjsip_dialog *dlg;
@@ -408,6 +406,13 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
                        pjsip_rx_data_get_info(rdata));
                serializer = find_request_serializer(rdata);
                if (!serializer) {
+                       if (ast_taskprocessor_alert_get()) {
+                               /* We're overloaded, ignore the unmatched response. */
+                               ast_debug(3, "Taskprocessor overload alert: Ignoring unmatched '%s'.\n",
+                                       pjsip_rx_data_get_info(rdata));
+                               return PJ_TRUE;
+                       }
+
                        /*
                         * Pick a serializer for the unmatched response.  Maybe
                         * the stack can figure out what it is for, or we really
@@ -422,6 +427,21 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
                        PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
                return PJ_TRUE;
        } else {
+               if (ast_taskprocessor_alert_get()) {
+                       /*
+                        * When taskprocessors get backed up, there is a good chance that
+                        * we are being overloaded and need to defer adding new work to
+                        * the system.  To defer the work we will ignore the request and
+                        * rely on the peer's transport layer to retransmit the message.
+                        * We usually work off the overload within a few seconds.  The
+                        * alternative is to send back a 503 response to these requests
+                        * and be done with it.
+                        */
+                       ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n",
+                               pjsip_rx_data_get_info(rdata));
+                       return PJ_TRUE;
+               }
+
                /* Pick a serializer for the out-of-dialog request. */
                serializer = ast_sip_get_distributor_serializer(rdata);
        }
@@ -432,21 +452,9 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
                clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
        }
 
-       if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
-               /* When the threadpool is backed up this much, there is a good chance that we have encountered
-                * some sort of terrible condition and don't need to be adding more work to the threadpool.
-                * It's in our best interest to send back a 503 response and be done with it.
-                */
-               if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
-                       pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
-               }
+       if (ast_sip_push_task(serializer, distribute, clone)) {
                ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
                pjsip_rx_data_free_cloned(clone);
-       } else {
-               if (ast_sip_push_task(serializer, distribute, clone)) {
-                       ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
-                       pjsip_rx_data_free_cloned(clone);
-               }
        }
 
        ast_taskprocessor_unreference(serializer);