]> git.ipfire.org Git - thirdparty/kernel/linux.git/commitdiff
rcu-tasks: Use workqueues for multiple rcu_tasks_invoke_cbs() invocations
authorPaul E. McKenney <paulmck@kernel.org>
Wed, 10 Nov 2021 23:56:40 +0000 (15:56 -0800)
committerPaul E. McKenney <paulmck@kernel.org>
Thu, 9 Dec 2021 18:52:00 +0000 (10:52 -0800)
If there is a flood of callbacks, it is necessary to put multiple
CPUs to work invoking those callbacks.  This commit therefore uses a
workqueue-flooding approach to parallelize RCU Tasks callback execution.

Reported-by: Martin Lau <kafai@fb.com>
Cc: Neeraj Upadhyay <neeraj.iitr10@gmail.com>
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
kernel/rcu/tasks.h

index 6b3406a477205a133543c84dc6ebd479568e767f..97bd7a667e94917568e7e5d27311bc9d2423d274 100644 (file)
@@ -24,10 +24,14 @@ typedef void (*postgp_func_t)(struct rcu_tasks *rtp);
  * struct rcu_tasks_percpu - Per-CPU component of definition for a Tasks-RCU-like mechanism.
  * @cblist: Callback list.
  * @lock: Lock protecting per-CPU callback list.
+ * @rtp_work: Work queue for invoking callbacks.
  */
 struct rcu_tasks_percpu {
        struct rcu_segcblist cblist;
        raw_spinlock_t __private lock;
+       struct work_struct rtp_work;
+       int cpu;
+       struct rcu_tasks *rtpp;
 };
 
 /**
@@ -146,6 +150,8 @@ static const char * const rcu_tasks_gp_state_names[] = {
 //
 // Generic code.
 
+static void rcu_tasks_invoke_cbs_wq(struct work_struct *wp);
+
 /* Record grace-period phase and time. */
 static void set_tasks_gp_state(struct rcu_tasks *rtp, int newstate)
 {
@@ -185,6 +191,9 @@ static void cblist_init_generic(struct rcu_tasks *rtp)
                raw_spin_lock_rcu_node(rtpcp); // irqs already disabled.
                if (rcu_segcblist_empty(&rtpcp->cblist))
                        rcu_segcblist_init(&rtpcp->cblist);
+               INIT_WORK(&rtpcp->rtp_work, rcu_tasks_invoke_cbs_wq);
+               rtpcp->cpu = cpu;
+               rtpcp->rtpp = rtp;
                raw_spin_unlock_rcu_node(rtpcp); // irqs remain disabled.
        }
        raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
@@ -256,35 +265,55 @@ static int rcu_tasks_need_gpcb(struct rcu_tasks *rtp)
 }
 
 // Advance callbacks and invoke any that are ready.
-static void rcu_tasks_invoke_cbs(struct rcu_tasks *rtp)
+static void rcu_tasks_invoke_cbs(struct rcu_tasks *rtp, struct rcu_tasks_percpu *rtpcp)
 {
        int cpu;
+       int cpunext;
        unsigned long flags;
        int len;
-       struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
        struct rcu_head *rhp;
-
-       for (cpu = 0; cpu < rtp->percpu_enqueue_lim; cpu++) {
-               struct rcu_tasks_percpu *rtpcp = per_cpu_ptr(rtp->rtpcpu, cpu);
-
-               if (rcu_segcblist_empty(&rtpcp->cblist))
-                       continue;
-               raw_spin_lock_irqsave_rcu_node(rtpcp, flags);
-               rcu_segcblist_advance(&rtpcp->cblist, rcu_seq_current(&rtp->tasks_gp_seq));
-               rcu_segcblist_extract_done_cbs(&rtpcp->cblist, &rcl);
-               raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
-               len = rcl.len;
-               for (rhp = rcu_cblist_dequeue(&rcl); rhp; rhp = rcu_cblist_dequeue(&rcl)) {
-                       local_bh_disable();
-                       rhp->func(rhp);
-                       local_bh_enable();
-                       cond_resched();
+       struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
+       struct rcu_tasks_percpu *rtpcp_next;
+
+       cpu = rtpcp->cpu;
+       cpunext = cpu * 2 + 1;
+       if (cpunext < rtp->percpu_enqueue_lim) {
+               rtpcp_next = per_cpu_ptr(rtp->rtpcpu, cpunext);
+               queue_work_on(cpunext, system_wq, &rtpcp_next->rtp_work);
+               cpunext++;
+               if (cpunext < rtp->percpu_enqueue_lim) {
+                       rtpcp_next = per_cpu_ptr(rtp->rtpcpu, cpunext);
+                       queue_work_on(cpunext, system_wq, &rtpcp_next->rtp_work);
                }
-               raw_spin_lock_irqsave_rcu_node(rtpcp, flags);
-               rcu_segcblist_add_len(&rtpcp->cblist, -len);
-               (void)rcu_segcblist_accelerate(&rtpcp->cblist, rcu_seq_snap(&rtp->tasks_gp_seq));
-               raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
        }
+
+       if (rcu_segcblist_empty(&rtpcp->cblist))
+               return;
+       raw_spin_lock_irqsave_rcu_node(rtpcp, flags);
+       rcu_segcblist_advance(&rtpcp->cblist, rcu_seq_current(&rtp->tasks_gp_seq));
+       rcu_segcblist_extract_done_cbs(&rtpcp->cblist, &rcl);
+       raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
+       len = rcl.len;
+       for (rhp = rcu_cblist_dequeue(&rcl); rhp; rhp = rcu_cblist_dequeue(&rcl)) {
+               local_bh_disable();
+               rhp->func(rhp);
+               local_bh_enable();
+               cond_resched();
+       }
+       raw_spin_lock_irqsave_rcu_node(rtpcp, flags);
+       rcu_segcblist_add_len(&rtpcp->cblist, -len);
+       (void)rcu_segcblist_accelerate(&rtpcp->cblist, rcu_seq_snap(&rtp->tasks_gp_seq));
+       raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
+}
+
+// Workqueue flood to advance callbacks and invoke any that are ready.
+static void rcu_tasks_invoke_cbs_wq(struct work_struct *wp)
+{
+       struct rcu_tasks *rtp;
+       struct rcu_tasks_percpu *rtpcp = container_of(wp, struct rcu_tasks_percpu, rtp_work);
+
+       rtp = rtpcp->rtpp;
+       rcu_tasks_invoke_cbs(rtp, rtpcp);
 }
 
 /* RCU-tasks kthread that detects grace periods and invokes callbacks. */
@@ -320,7 +349,7 @@ static int __noreturn rcu_tasks_kthread(void *arg)
 
                /* Invoke callbacks. */
                set_tasks_gp_state(rtp, RTGS_INVOKE_CBS);
-               rcu_tasks_invoke_cbs(rtp);
+               rcu_tasks_invoke_cbs(rtp, per_cpu_ptr(rtp->rtpcpu, 0));
 
                /* Paranoid sleep to keep this from entering a tight loop */
                schedule_timeout_idle(rtp->gp_sleep);