--- /dev/null
+From: Gerald Schaefer <geraldsc@de.ibm.com>
+Subject: stp/etr: smp_call_function races.
+References: bnc#450096
+
+Symptom: System hangs when setting stp/etr online.
+Problem: The stp/etr code uses smp_call_function() with blocking
+ function on the signalled cpus. This is prone to cause
+ deadlocks.
+Solution: Replace the smp_call_function() synchronization with one
+ based on stop_machine_run.
+
+Acked-by: John Jolly <jjolly@suse.de>
+---
+ arch/s390/kernel/time.c | 210 +++++++++++++++++++++++++++++-------------------
+ 1 file changed, 130 insertions(+), 80 deletions(-)
+
+Index: linux-2.6.27/arch/s390/kernel/time.c
+===================================================================
+--- linux-2.6.27.orig/arch/s390/kernel/time.c
++++ linux-2.6.27/arch/s390/kernel/time.c
+@@ -22,6 +22,8 @@
+ #include <linux/string.h>
+ #include <linux/mm.h>
+ #include <linux/interrupt.h>
++#include <linux/cpu.h>
++#include <linux/stop_machine.h>
+ #include <linux/time.h>
+ #include <linux/sysdev.h>
+ #include <linux/delay.h>
+@@ -365,6 +367,15 @@ static void enable_sync_clock(void)
+ atomic_set_mask(0x80000000, sw_ptr);
+ }
+
++/* Single threaded workqueue used for etr and stp sync events */
++static struct workqueue_struct *time_sync_wq;
++
++static void __init time_init_wq(void)
++{
++ if (!time_sync_wq)
++ time_sync_wq = create_singlethread_workqueue("timesync");
++}
++
+ /*
+ * External Time Reference (ETR) code.
+ */
+@@ -457,17 +468,18 @@ static int __init etr_init(void)
+
+ if (!test_bit(CLOCK_SYNC_HAS_ETR, &clock_sync_flags))
+ return 0;
++ time_init_wq();
+ /* Check if this machine has the steai instruction. */
+ if (etr_steai(&aib, ETR_STEAI_STEPPING_PORT) == 0)
+ etr_steai_available = 1;
+ setup_timer(&etr_timer, etr_timeout, 0UL);
+ if (etr_port0_online) {
+ set_bit(ETR_EVENT_PORT0_CHANGE, &etr_events);
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ }
+ if (etr_port1_online) {
+ set_bit(ETR_EVENT_PORT1_CHANGE, &etr_events);
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ }
+ return 0;
+ }
+@@ -494,7 +506,7 @@ void etr_switch_to_local(void)
+ if (test_bit(CLOCK_SYNC_ETR, &clock_sync_flags))
+ disable_sync_clock(NULL);
+ set_bit(ETR_EVENT_SWITCH_LOCAL, &etr_events);
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ }
+
+ /*
+@@ -510,7 +522,7 @@ void etr_sync_check(void)
+ if (test_bit(CLOCK_SYNC_ETR, &clock_sync_flags))
+ disable_sync_clock(NULL);
+ set_bit(ETR_EVENT_SYNC_CHECK, &etr_events);
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ }
+
+ /*
+@@ -534,13 +546,13 @@ static void etr_timing_alert(struct etr_
+ * Both ports are not up-to-date now.
+ */
+ set_bit(ETR_EVENT_PORT_ALERT, &etr_events);
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ }
+
+ static void etr_timeout(unsigned long dummy)
+ {
+ set_bit(ETR_EVENT_UPDATE, &etr_events);
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ }
+
+ /*
+@@ -647,14 +659,16 @@ static int etr_aib_follows(struct etr_ai
+ }
+
+ struct clock_sync_data {
++ atomic_t cpus;
+ int in_sync;
+ unsigned long long fixup_cc;
++ int etr_port;
++ struct etr_aib *etr_aib;
+ };
+
+-static void clock_sync_cpu_start(void *dummy)
++static void clock_sync_cpu(struct clock_sync_data *sync)
+ {
+- struct clock_sync_data *sync = dummy;
+-
++ atomic_dec(&sync->cpus);
+ enable_sync_clock();
+ /*
+ * This looks like a busy wait loop but it isn't. etr_sync_cpus
+@@ -680,39 +694,35 @@ static void clock_sync_cpu_start(void *d
+ fixup_clock_comparator(sync->fixup_cc);
+ }
+
+-static void clock_sync_cpu_end(void *dummy)
+-{
+-}
+-
+ /*
+ * Sync the TOD clock using the port refered to by aibp. This port
+ * has to be enabled and the other port has to be disabled. The
+ * last eacr update has to be more than 1.6 seconds in the past.
+ */
+-static int etr_sync_clock(struct etr_aib *aib, int port)
++static int etr_sync_clock(void *data)
+ {
+- struct etr_aib *sync_port;
+- struct clock_sync_data etr_sync;
++ static int first;
+ unsigned long long clock, old_clock, delay, delta;
+- int follows;
++ struct clock_sync_data *etr_sync;
++ struct etr_aib *sync_port, *aib;
++ int port;
+ int rc;
+
+- /* Check if the current aib is adjacent to the sync port aib. */
+- sync_port = (port == 0) ? &etr_port0 : &etr_port1;
+- follows = etr_aib_follows(sync_port, aib, port);
+- memcpy(sync_port, aib, sizeof(*aib));
+- if (!follows)
+- return -EAGAIN;
++ etr_sync = data;
+
+- /*
+- * Catch all other cpus and make them wait until we have
+- * successfully synced the clock. smp_call_function will
+- * return after all other cpus are in etr_sync_cpu_start.
+- */
+- memset(&etr_sync, 0, sizeof(etr_sync));
+- preempt_disable();
+- smp_call_function(clock_sync_cpu_start, &etr_sync, 0);
+- local_irq_disable();
++ if (xchg(&first, 1) == 1) {
++ /* Slave */
++ clock_sync_cpu(etr_sync);
++ return 0;
++ }
++
++ /* Wait until all other cpus entered the sync function. */
++ while (atomic_read(&etr_sync->cpus) != 0)
++ cpu_relax();
++
++ port = etr_sync->etr_port;
++ aib = etr_sync->etr_aib;
++ sync_port = (port == 0) ? &etr_port0 : &etr_port1;
+ enable_sync_clock();
+
+ /* Set clock to next OTE. */
+@@ -729,16 +739,16 @@ static int etr_sync_clock(struct etr_aib
+ delay = (unsigned long long)
+ (aib->edf2.etv - sync_port->edf2.etv) << 32;
+ delta = adjust_time(old_clock, clock, delay);
+- etr_sync.fixup_cc = delta;
++ etr_sync->fixup_cc = delta;
+ fixup_clock_comparator(delta);
+ /* Verify that the clock is properly set. */
+ if (!etr_aib_follows(sync_port, aib, port)) {
+ /* Didn't work. */
+ disable_sync_clock(NULL);
+- etr_sync.in_sync = -EAGAIN;
++ etr_sync->in_sync = -EAGAIN;
+ rc = -EAGAIN;
+ } else {
+- etr_sync.in_sync = 1;
++ etr_sync->in_sync = 1;
+ rc = 0;
+ }
+ } else {
+@@ -746,12 +756,33 @@ static int etr_sync_clock(struct etr_aib
+ __ctl_clear_bit(0, 29);
+ __ctl_clear_bit(14, 21);
+ disable_sync_clock(NULL);
+- etr_sync.in_sync = -EAGAIN;
++ etr_sync->in_sync = -EAGAIN;
+ rc = -EAGAIN;
+ }
+- local_irq_enable();
+- smp_call_function(clock_sync_cpu_end, NULL, 0);
+- preempt_enable();
++ xchg(&first, 0);
++ return rc;
++}
++
++static int etr_sync_clock_stop(struct etr_aib *aib, int port)
++{
++ struct clock_sync_data etr_sync;
++ struct etr_aib *sync_port;
++ int follows;
++ int rc;
++
++ /* Check if the current aib is adjacent to the sync port aib. */
++ sync_port = (port == 0) ? &etr_port0 : &etr_port1;
++ follows = etr_aib_follows(sync_port, aib, port);
++ memcpy(sync_port, aib, sizeof(*aib));
++ if (!follows)
++ return -EAGAIN;
++ memset(&etr_sync, 0, sizeof(etr_sync));
++ etr_sync.etr_aib = aib;
++ etr_sync.etr_port = port;
++ get_online_cpus();
++ atomic_set(&etr_sync.cpus, num_online_cpus() - 1);
++ rc = stop_machine(etr_sync_clock, &etr_sync, &cpu_online_map);
++ put_online_cpus();
+ return rc;
+ }
+
+@@ -908,7 +939,7 @@ static void etr_update_eacr(struct etr_e
+ }
+
+ /*
+- * ETR tasklet. In this function you'll find the main logic. In
++ * ETR work. In this function you'll find the main logic. In
+ * particular this is the only function that calls etr_update_eacr(),
+ * it "controls" the etr control register.
+ */
+@@ -1041,7 +1072,7 @@ static void etr_work_fn(struct work_stru
+ etr_update_eacr(eacr);
+ set_bit(CLOCK_SYNC_ETR, &clock_sync_flags);
+ if (now < etr_tolec + (1600000 << 12) ||
+- etr_sync_clock(&aib, sync_port) != 0) {
++ etr_sync_clock_stop(&aib, sync_port) != 0) {
+ /* Sync failed. Try again in 1/2 second. */
+ eacr.es = 0;
+ etr_update_eacr(eacr);
+@@ -1130,13 +1161,13 @@ static ssize_t etr_online_store(struct s
+ return count; /* Nothing to do. */
+ etr_port0_online = value;
+ set_bit(ETR_EVENT_PORT0_CHANGE, &etr_events);
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ } else {
+ if (etr_port1_online == value)
+ return count; /* Nothing to do. */
+ etr_port1_online = value;
+ set_bit(ETR_EVENT_PORT1_CHANGE, &etr_events);
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ }
+ return count;
+ }
+@@ -1371,8 +1402,12 @@ static void __init stp_reset(void)
+
+ static int __init stp_init(void)
+ {
+- if (test_bit(CLOCK_SYNC_HAS_STP, &clock_sync_flags) && stp_online)
+- schedule_work(&stp_work);
++ if (!test_bit(CLOCK_SYNC_HAS_STP, &clock_sync_flags))
++ return 0;
++ time_init_wq();
++ if (!stp_online)
++ return 0;
++ queue_work(time_sync_wq, &stp_work);
+ return 0;
+ }
+
+@@ -1389,7 +1424,7 @@ arch_initcall(stp_init);
+ static void stp_timing_alert(struct stp_irq_parm *intparm)
+ {
+ if (intparm->tsc || intparm->lac || intparm->tcpc)
+- schedule_work(&stp_work);
++ queue_work(time_sync_wq, &stp_work);
+ }
+
+ /*
+@@ -1417,46 +1452,34 @@ void stp_island_check(void)
+ if (!test_bit(CLOCK_SYNC_STP, &clock_sync_flags))
+ return;
+ disable_sync_clock(NULL);
+- schedule_work(&stp_work);
++ queue_work(time_sync_wq, &stp_work);
+ }
+
+-/*
+- * STP tasklet. Check for the STP state and take over the clock
+- * synchronization if the STP clock source is usable.
+- */
+-static void stp_work_fn(struct work_struct *work)
++
++static int stp_sync_clock(void *data)
+ {
+- struct clock_sync_data stp_sync;
++ static int first;
+ unsigned long long old_clock, delta;
++ struct clock_sync_data *stp_sync;
+ int rc;
+
+- if (!stp_online) {
+- chsc_sstpc(stp_page, STP_OP_CTRL, 0x0000);
+- return;
+- }
++ stp_sync = data;
+
+- rc = chsc_sstpc(stp_page, STP_OP_CTRL, 0xb0e0);
+- if (rc)
+- return;
++ if (xchg(&first, 1) == 1) {
++ /* Slave */
++ clock_sync_cpu(stp_sync);
++ return 0;
++ }
+
+- rc = chsc_sstpi(stp_page, &stp_info, sizeof(struct stp_sstpi));
+- if (rc || stp_info.c == 0)
+- return;
++ /* Wait until all other cpus entered the sync function. */
++ while (atomic_read(&stp_sync->cpus) != 0)
++ cpu_relax();
+
+- /*
+- * Catch all other cpus and make them wait until we have
+- * successfully synced the clock. smp_call_function will
+- * return after all other cpus are in clock_sync_cpu_start.
+- */
+- memset(&stp_sync, 0, sizeof(stp_sync));
+- preempt_disable();
+- smp_call_function(clock_sync_cpu_start, &stp_sync, 0);
+- local_irq_disable();
+ enable_sync_clock();
+
+ set_bit(CLOCK_SYNC_STP, &clock_sync_flags);
+ if (test_and_clear_bit(CLOCK_SYNC_ETR, &clock_sync_flags))
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+
+ rc = 0;
+ if (stp_info.todoff[0] || stp_info.todoff[1] ||
+@@ -1475,16 +1498,43 @@ static void stp_work_fn(struct work_stru
+ }
+ if (rc) {
+ disable_sync_clock(NULL);
+- stp_sync.in_sync = -EAGAIN;
++ stp_sync->in_sync = -EAGAIN;
+ clear_bit(CLOCK_SYNC_STP, &clock_sync_flags);
+ if (etr_port0_online || etr_port1_online)
+- schedule_work(&etr_work);
++ queue_work(time_sync_wq, &etr_work);
+ } else
+- stp_sync.in_sync = 1;
++ stp_sync->in_sync = 1;
++ xchg(&first, 0);
++ return 0;
++}
+
+- local_irq_enable();
+- smp_call_function(clock_sync_cpu_end, NULL, 0);
+- preempt_enable();
++/*
++ * STP work. Check for the STP state and take over the clock
++ * synchronization if the STP clock source is usable.
++ */
++static void stp_work_fn(struct work_struct *work)
++{
++ struct clock_sync_data stp_sync;
++ int rc;
++
++ if (!stp_online) {
++ chsc_sstpc(stp_page, STP_OP_CTRL, 0x0000);
++ return;
++ }
++
++ rc = chsc_sstpc(stp_page, STP_OP_CTRL, 0xb0e0);
++ if (rc)
++ return;
++
++ rc = chsc_sstpi(stp_page, &stp_info, sizeof(struct stp_sstpi));
++ if (rc || stp_info.c == 0)
++ return;
++
++ memset(&stp_sync, 0, sizeof(stp_sync));
++ get_online_cpus();
++ atomic_set(&stp_sync.cpus, num_online_cpus() - 1);
++ stop_machine(stp_sync_clock, &stp_sync, &cpu_online_map);
++ put_online_cpus();
+ }
+
+ /*
+@@ -1593,7 +1643,7 @@ static ssize_t stp_online_store(struct s
+ if (!test_bit(CLOCK_SYNC_HAS_STP, &clock_sync_flags))
+ return -EOPNOTSUPP;
+ stp_online = value;
+- schedule_work(&stp_work);
++ queue_work(time_sync_wq, &stp_work);
+ return count;
+ }
+