]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: activity: add a new "show tasks" command to list currently active tasks
authorWilly Tarreau <w@1wt.eu>
Fri, 29 Jan 2021 10:32:55 +0000 (11:32 +0100)
committerWilly Tarreau <w@1wt.eu>
Fri, 29 Jan 2021 11:12:28 +0000 (12:12 +0100)
This finally adds the long-awaited solution to inspect the run queues
and figure what is eating the CPU or causing latencies. We can even see
the experienced latencies when profiling is enabled. Example on a
saturated process:

> show tasks
Running tasks: 14983 (4 threads)
  function                     places     %    lat_tot   lat_avg
  process_stream                 4948   33.0   5.840m    70.82ms
  h1_io_cb                       2535   16.9      -         -
  main+0x9e670                   2508   16.7   2.930m    70.10ms
  ssl_sock_io_cb                 2499   16.6      -         -
  si_cs_io_cb                    2493   16.6      -         -

doc/management.txt
src/activity.c

index e71849a525b0541522ad1e6c9b7d1533e606e0ba..de3fbf6076b86bf80a5451e72f42273733efd66d 100644 (file)
@@ -2761,6 +2761,17 @@ show table <name> [ data.<type> <operator> <value> [data.<type> ...]] | [ key <k
           | fgrep 'key=' | cut -d' ' -f2 | cut -d= -f2 > abusers-ip.txt
           ( or | awk '/key/{ print a[split($2,a,"=")]; }' )
 
+show tasks
+  Dumps the number of tasks currently in the run queue, with the number of
+  occurrences for each function, and their average latency when it's known
+  (for pure tasks with task profiling enabled). The dump is a snapshot of the
+  instant it's done, and there may be variations depending on what tasks are
+  left in the queue at the moment it happens, especially in mono-thread mode
+  as there's less chance that I/Os can refill the queue (unless the queue is
+  full). This command takes exclusive access to the process and can cause
+  minor but measurable latencies when issued on a highly loaded process, so
+  it must not be abused by monitoring bots.
+
 show threads
   Dumps some internal states and structures for each thread, that may be useful
   to help developers understand a problem. The output tries to be readable by
index ffad3701d6b7c778e56bc122fba7eab819dbaf98..96f5008f32a9f9bdd483954197871a1c975f44be 100644 (file)
@@ -182,6 +182,141 @@ static int cli_io_handler_show_profiling(struct appctx *appctx)
        return 1;
 }
 
+/* This function scans all threads' run queues and collects statistics about
+ * running tasks. It returns 0 if the output buffer is full and it needs to be
+ * called again, otherwise non-zero.
+ */
+static int cli_io_handler_show_tasks(struct appctx *appctx)
+{
+       struct sched_activity tmp_activity[256] __attribute__((aligned(64)));
+       struct stream_interface *si = appctx->owner;
+       struct buffer *name_buffer = get_trash_chunk();
+       struct sched_activity *entry;
+       const struct tasklet *tl;
+       const struct task *t;
+       uint64_t now_ns, lat;
+       struct eb32sc_node *rqnode;
+       uint64_t tot_calls;
+       int thr, queue;
+       int i, max;
+
+       if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
+               return 1;
+
+       /* It's not possible to scan queues in small chunks and yield in the
+        * middle of the dump and come back again. So what we're doing instead
+        * is to freeze all threads and inspect their queues at once as fast as
+        * possible, using a sched_activity array to collect metrics with
+        * limited collision, then we'll report statistics only. The tasks'
+        * #calls will reflect the number of occurrences, and the lat_time will
+        * reflect the latency when set.
+        */
+
+       now_ns = now_mono_time();
+       memset(tmp_activity, 0, sizeof(tmp_activity));
+
+       thread_isolate();
+
+       /* 1. global run queue */
+
+#ifdef USE_THREAD
+       rqnode = eb32sc_first(&rqueue, ~0UL);
+       while (rqnode) {
+               t = eb32sc_entry(rqnode, struct task, rq);
+               entry = sched_activity_entry(tmp_activity, t->process);
+               if (t->call_date) {
+                       lat = now_ns - t->call_date;
+                       entry->lat_time += lat;
+               }
+               entry->calls++;
+               rqnode = eb32sc_next(rqnode, ~0UL);
+       }
+#endif
+       /* 2. all threads's local run queues */
+       for (thr = 0; thr < global.nbthread; thr++) {
+               /* task run queue */
+               rqnode = eb32sc_first(&task_per_thread[thr].rqueue, ~0UL);
+               while (rqnode) {
+                       t = eb32sc_entry(rqnode, struct task, rq);
+                       entry = sched_activity_entry(tmp_activity, t->process);
+                       if (t->call_date) {
+                               lat = now_ns - t->call_date;
+                               entry->lat_time += lat;
+                       }
+                       entry->calls++;
+                       rqnode = eb32sc_next(rqnode, ~0UL);
+               }
+
+               /* shared tasklet list */
+               list_for_each_entry(tl, mt_list_to_list(&task_per_thread[thr].shared_tasklet_list), list) {
+                       t = (const struct task *)tl;
+                       entry = sched_activity_entry(tmp_activity, t->process);
+                       if (!TASK_IS_TASKLET(t) && t->call_date) {
+                               lat = now_ns - t->call_date;
+                               entry->lat_time += lat;
+                       }
+                       entry->calls++;
+               }
+
+               /* classful tasklets */
+               for (queue = 0; queue < TL_CLASSES; queue++) {
+                       list_for_each_entry(tl, &task_per_thread[thr].tasklets[queue], list) {
+                               t = (const struct task *)tl;
+                               entry = sched_activity_entry(tmp_activity, t->process);
+                               if (!TASK_IS_TASKLET(t) && t->call_date) {
+                                       lat = now_ns - t->call_date;
+                                       entry->lat_time += lat;
+                               }
+                               entry->calls++;
+                       }
+               }
+       }
+
+       /* hopefully we're done */
+       thread_release();
+
+       chunk_reset(&trash);
+
+       tot_calls = 0;
+       for (i = 0; i < 256; i++)
+               tot_calls += tmp_activity[i].calls;
+
+       qsort(tmp_activity, 256, sizeof(tmp_activity[0]), cmp_sched_activity);
+
+       chunk_appendf(&trash, "Running tasks: %d (%d threads)\n"
+                     "  function                     places     %%    lat_tot   lat_avg\n",
+                     (int)tot_calls, global.nbthread);
+
+       for (i = 0; i < 256 && tmp_activity[i].calls; i++) {
+               chunk_reset(name_buffer);
+
+               if (!tmp_activity[i].func)
+                       chunk_printf(name_buffer, "other");
+               else
+                       resolve_sym_name(name_buffer, "", tmp_activity[i].func);
+
+               /* reserve 35 chars for name+' '+#calls, knowing that longer names
+                * are often used for less often called functions.
+                */
+               max = 35 - name_buffer->data;
+               if (max < 1)
+                       max = 1;
+               chunk_appendf(&trash, "  %s%*llu  %3d.%1d",
+                             name_buffer->area, max, (unsigned long long)tmp_activity[i].calls,
+                             (int)(100ULL * tmp_activity[i].calls / tot_calls),
+                             (int)((1000ULL * tmp_activity[i].calls / tot_calls)%10));
+               print_time_short(&trash, "   ", tmp_activity[i].lat_time, "");
+               print_time_short(&trash, "   ", tmp_activity[i].lat_time / tmp_activity[i].calls, "\n");
+       }
+
+       if (ci_putchk(si_ic(si), &trash) == -1) {
+               /* failed, try again */
+               si_rx_room_blk(si);
+               return 0;
+       }
+       return 1;
+}
+
 /* config keyword parsers */
 static struct cfg_kw_list cfg_kws = {ILH, {
        { CFG_GLOBAL, "profiling.tasks",      cfg_parse_prof_tasks      },
@@ -193,6 +328,7 @@ INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
 /* register cli keywords */
 static struct cli_kw_list cli_kws = {{ },{
        { { "show", "profiling", NULL }, "show profiling : show CPU profiling options",   NULL, cli_io_handler_show_profiling, NULL },
+       { { "show", "tasks", NULL },     "show tasks     : show running tasks",           NULL, cli_io_handler_show_tasks,     NULL },
        { { "set",  "profiling", NULL }, "set  profiling : enable/disable CPU profiling", cli_parse_set_profiling,  NULL },
        {{},}
 }};