1 From: Gerald Schaefer <geraldsc@de.ibm.com>
2 Subject: qdio: rework inbound buffer acknowledgement
5 Symptom: Inbound network traffic stall.
6 Problem: Under high load the qdio buffer that specifies the acknowledge
7 mode can be overwritten.
8 Solution: Use automatic acknowledgement of incoming buffers in QEBSM mode
9 to improve performance with QIOASSIST.
10 Move ACK for non-QEBSM mode always to the newest buffer to prevent
11 a race with qdio_stop_polling.
12 Remove the polling spinlock, the upper layer drivers return new
13 buffers in the same code path and could not run in parallel.
14 Don't flood the error log in case of no-target-buffer-empty.
15 In handle_inbound we check if we would overwrite an ACK'ed buffer,
16 if so advance the pointer to the oldest ACK'ed buffer so we don't
17 overwrite an empty buffer in qdio_stop_polling.
19 Acked-by: John Jolly <jjolly@suse.de>
22 drivers/s390/cio/qdio.h | 19 ++--
23 drivers/s390/cio/qdio_debug.c | 7 +
24 drivers/s390/cio/qdio_main.c | 181 ++++++++++++++++++++++++++--------------
25 drivers/s390/cio/qdio_perf.c | 2
26 drivers/s390/cio/qdio_perf.h | 1
27 drivers/s390/cio/qdio_setup.c | 1
28 drivers/s390/cio/qdio_thinint.c | 2
29 7 files changed, 139 insertions(+), 74 deletions(-)
31 Index: linux-sles11/drivers/s390/cio/qdio.h
32 ===================================================================
33 --- linux-sles11.orig/drivers/s390/cio/qdio.h
34 +++ linux-sles11/drivers/s390/cio/qdio.h
35 @@ -112,12 +112,12 @@ static inline int do_sqbs(u64 token, uns
38 static inline int do_eqbs(u64 token, unsigned char *state, int queue,
39 - int *start, int *count)
40 + int *start, int *count, int ack)
42 register unsigned long _ccq asm ("0") = *count;
43 register unsigned long _token asm ("1") = token;
44 unsigned long _queuestart = ((unsigned long)queue << 32) | *start;
45 - unsigned long _state = 0;
46 + unsigned long _state = (unsigned long)ack << 63;
49 " .insn rrf,0xB99c0000,%1,%2,0,0"
50 @@ -134,7 +134,7 @@ static inline int do_eqbs(u64 token, uns
51 static inline int do_sqbs(u64 token, unsigned char state, int queue,
52 int *start, int *count) { return 0; }
53 static inline int do_eqbs(u64 token, unsigned char *state, int queue,
54 - int *start, int *count) { return 0; }
55 + int *start, int *count, int ack) { return 0; }
56 #endif /* CONFIG_64BIT */
59 @@ -187,11 +187,11 @@ struct qdio_input_q {
60 /* input buffer acknowledgement flag */
63 + /* how much sbals are acknowledged with qebsm */
66 /* last time of noticing incoming data */
69 - /* lock for clearing the acknowledgement */
73 struct qdio_output_q {
74 @@ -348,10 +348,13 @@ static inline unsigned long long get_use
75 ((bufnr + 1) & QDIO_MAX_BUFFERS_MASK)
76 #define add_buf(bufnr, inc) \
77 ((bufnr + inc) & QDIO_MAX_BUFFERS_MASK)
78 +#define sub_buf(bufnr, dec) \
79 + ((bufnr - dec) & QDIO_MAX_BUFFERS_MASK)
81 /* prototypes for thin interrupt */
82 void qdio_sync_after_thinint(struct qdio_q *q);
83 -int get_buf_state(struct qdio_q *q, unsigned int bufnr, unsigned char *state);
84 +int get_buf_state(struct qdio_q *q, unsigned int bufnr, unsigned char *state,
86 void qdio_check_outbound_after_thinint(struct qdio_q *q);
87 int qdio_inbound_q_moved(struct qdio_q *q);
88 void qdio_kick_inbound_handler(struct qdio_q *q);
89 @@ -385,6 +388,8 @@ int qdio_setup_irq(struct qdio_initializ
90 void qdio_print_subchannel_info(struct qdio_irq *irq_ptr,
91 struct ccw_device *cdev);
92 void qdio_release_memory(struct qdio_irq *irq_ptr);
93 +int qdio_setup_create_sysfs(struct ccw_device *cdev);
94 +void qdio_setup_destroy_sysfs(struct ccw_device *cdev);
95 int qdio_setup_init(void);
96 void qdio_setup_exit(void);
98 Index: linux-sles11/drivers/s390/cio/qdio_debug.c
99 ===================================================================
100 --- linux-sles11.orig/drivers/s390/cio/qdio_debug.c
101 +++ linux-sles11/drivers/s390/cio/qdio_debug.c
102 @@ -59,16 +59,18 @@ static int qstat_show(struct seq_file *m
106 - seq_printf(m, "device state indicator: %d\n", *q->irq_ptr->dsci);
107 + seq_printf(m, "device state indicator: %d\n", *(u32 *)q->irq_ptr->dsci);
108 seq_printf(m, "nr_used: %d\n", atomic_read(&q->nr_buf_used));
109 seq_printf(m, "ftc: %d\n", q->first_to_check);
110 seq_printf(m, "last_move_ftc: %d\n", q->last_move_ftc);
111 seq_printf(m, "polling: %d\n", q->u.in.polling);
112 + seq_printf(m, "ack count: %d\n", q->u.in.ack_count);
113 seq_printf(m, "slsb buffer states:\n");
114 + seq_printf(m, "|0 |8 |16 |24 |32 |40 |48 |56 63|\n");
117 for (i = 0; i < QDIO_MAX_BUFFERS_PER_Q; i++) {
118 - get_buf_state(q, i, &state);
119 + get_buf_state(q, i, &state, 0);
121 case SLSB_P_INPUT_NOT_INIT:
122 case SLSB_P_OUTPUT_NOT_INIT:
123 @@ -100,6 +102,7 @@ static int qstat_show(struct seq_file *m
127 + seq_printf(m, "|64 |72 |80 |88 |96 |104 |112 | 127|\n");
131 Index: linux-sles11/drivers/s390/cio/qdio_main.c
132 ===================================================================
133 --- linux-sles11.orig/drivers/s390/cio/qdio_main.c
134 +++ linux-sles11/drivers/s390/cio/qdio_main.c
135 @@ -112,12 +112,13 @@ static inline int qdio_check_ccq(struct
136 * @state: state of the extracted buffers
137 * @start: buffer number to start at
138 * @count: count of buffers to examine
139 + * @auto_ack: automatically acknowledge buffers
141 * Returns the number of successfull extracted equal buffer states.
142 * Stops processing if a state is different from the last buffers state.
144 static int qdio_do_eqbs(struct qdio_q *q, unsigned char *state,
145 - int start, int count)
146 + int start, int count, int auto_ack)
148 unsigned int ccq = 0;
149 int tmp_count = count, tmp_start = start;
150 @@ -129,7 +130,8 @@ static int qdio_do_eqbs(struct qdio_q *q
152 nr += q->irq_ptr->nr_input_qs;
154 - ccq = do_eqbs(q->irq_ptr->sch_token, state, nr, &tmp_start, &tmp_count);
155 + ccq = do_eqbs(q->irq_ptr->sch_token, state, nr, &tmp_start, &tmp_count,
157 rc = qdio_check_ccq(q, ccq);
159 /* At least one buffer was processed, return and extract the remaining
160 @@ -172,6 +174,9 @@ static int qdio_do_sqbs(struct qdio_q *q
167 BUG_ON(!q->irq_ptr->sch_token);
170 @@ -197,7 +202,8 @@ again:
172 /* returns number of examined buffers and their common state in *state */
173 static inline int get_buf_states(struct qdio_q *q, unsigned int bufnr,
174 - unsigned char *state, unsigned int count)
175 + unsigned char *state, unsigned int count,
178 unsigned char __state = 0;
180 @@ -206,7 +212,7 @@ static inline int get_buf_states(struct
181 BUG_ON(count > QDIO_MAX_BUFFERS_PER_Q);
184 - return qdio_do_eqbs(q, state, bufnr, count);
185 + return qdio_do_eqbs(q, state, bufnr, count, auto_ack);
187 for (i = 0; i < count; i++) {
189 @@ -220,9 +226,9 @@ static inline int get_buf_states(struct
192 inline int get_buf_state(struct qdio_q *q, unsigned int bufnr,
193 - unsigned char *state)
194 + unsigned char *state, int auto_ack)
196 - return get_buf_states(q, bufnr, state, 1);
197 + return get_buf_states(q, bufnr, state, 1, auto_ack);
200 /* wrap-around safe setting of slsb states, returns number of changed buffers */
201 @@ -367,29 +373,91 @@ void qdio_sync_after_thinint(struct qdio
203 inline void qdio_stop_polling(struct qdio_q *q)
205 - spin_lock_bh(&q->u.in.lock);
206 - if (!q->u.in.polling) {
207 - spin_unlock_bh(&q->u.in.lock);
208 + if (!q->u.in.polling)
213 qdio_perf_stat_inc(&perf_stats.debug_stop_polling);
215 /* show the card that we are not polling anymore */
216 - set_buf_state(q, q->last_move_ftc, SLSB_P_INPUT_NOT_INIT);
217 - spin_unlock_bh(&q->u.in.lock);
219 + set_buf_states(q, q->last_move_ftc, SLSB_P_INPUT_NOT_INIT,
220 + q->u.in.ack_count);
221 + q->u.in.ack_count = 0;
223 + set_buf_state(q, q->last_move_ftc, SLSB_P_INPUT_NOT_INIT);
226 -static void announce_buffer_error(struct qdio_q *q)
227 +static void announce_buffer_error(struct qdio_q *q, int count)
229 + q->qdio_error = QDIO_ERROR_SLSB_STATE;
231 + /* special handling for no target buffer empty */
232 + if ((!q->is_input_q &&
233 + (q->sbal[q->first_to_check]->element[15].flags & 0xff) == 0x10)) {
234 + qdio_perf_stat_inc(&perf_stats.outbound_target_full);
235 + DBF_DEV_EVENT(DBF_INFO, q->irq_ptr, "OUTFULL FTC:%3d",
236 + q->first_to_check);
240 DBF_ERROR("%4x BUF ERROR", SCH_NO(q));
241 DBF_ERROR((q->is_input_q) ? "IN:%2d" : "OUT:%2d", q->nr);
242 - DBF_ERROR("FTC:%3d", q->first_to_check);
243 + DBF_ERROR("FTC:%3d C:%3d", q->first_to_check, count);
244 DBF_ERROR("F14:%2x F15:%2x",
245 q->sbal[q->first_to_check]->element[14].flags & 0xff,
246 q->sbal[q->first_to_check]->element[15].flags & 0xff);
249 - q->qdio_error = QDIO_ERROR_SLSB_STATE;
250 +static inline void inbound_primed(struct qdio_q *q, int count)
254 + DBF_DEV_EVENT(DBF_INFO, q->irq_ptr, "in prim: %3d", count);
256 + /* for QEBSM the ACK was already set by EQBS */
258 + if (!q->u.in.polling) {
259 + q->u.in.polling = 1;
260 + q->u.in.ack_count = count;
261 + q->last_move_ftc = q->first_to_check;
265 + /* delete the previous ACK's */
266 + set_buf_states(q, q->last_move_ftc, SLSB_P_INPUT_NOT_INIT,
267 + q->u.in.ack_count);
268 + q->u.in.ack_count = count;
269 + q->last_move_ftc = q->first_to_check;
274 + * ACK the newest buffer. The ACK will be removed in qdio_stop_polling
275 + * or by the next inbound run.
277 + new = add_buf(q->first_to_check, count - 1);
278 + if (q->u.in.polling) {
279 + /* reset the previous ACK but first set the new one */
280 + set_buf_state(q, new, SLSB_P_INPUT_ACK);
281 + set_buf_state(q, q->last_move_ftc, SLSB_P_INPUT_NOT_INIT);
284 + q->u.in.polling = 1;
285 + set_buf_state(q, q->first_to_check, SLSB_P_INPUT_ACK);
288 + q->last_move_ftc = new;
294 + * Need to change all PRIMED buffers to NOT_INIT, otherwise
295 + * we're loosing initiative in the thinint code.
297 + set_buf_states(q, next_buf(q->first_to_check), SLSB_P_INPUT_NOT_INIT,
301 static int get_inbound_buffer_frontier(struct qdio_q *q)
302 @@ -398,13 +466,6 @@ static int get_inbound_buffer_frontier(s
306 - * If we still poll don't update last_move_ftc, keep the
307 - * previously ACK buffer there.
309 - if (!q->u.in.polling)
310 - q->last_move_ftc = q->first_to_check;
313 * Don't check 128 buffers, as otherwise qdio_inbound_q_moved
316 @@ -424,34 +485,13 @@ check_next:
317 if (q->first_to_check == stop)
320 - count = get_buf_states(q, q->first_to_check, &state, count);
321 + count = get_buf_states(q, q->first_to_check, &state, count, 1);
326 case SLSB_P_INPUT_PRIMED:
327 - DBF_DEV_EVENT(DBF_INFO, q->irq_ptr, "in prim: %3d", count);
330 - * Only ACK the first buffer. The ACK will be removed in
331 - * qdio_stop_polling.
333 - if (q->u.in.polling)
334 - state = SLSB_P_INPUT_NOT_INIT;
336 - q->u.in.polling = 1;
337 - state = SLSB_P_INPUT_ACK;
339 - set_buf_state(q, q->first_to_check, state);
342 - * Need to change all PRIMED buffers to NOT_INIT, otherwise
343 - * we're loosing initiative in the thinint code.
346 - set_buf_states(q, next_buf(q->first_to_check),
347 - SLSB_P_INPUT_NOT_INIT, count - 1);
349 + inbound_primed(q, count);
351 * No siga-sync needed for non-qebsm here, as the inbound queue
352 * will be synced on the next siga-r, resp.
353 @@ -461,7 +501,7 @@ check_next:
354 atomic_sub(count, &q->nr_buf_used);
356 case SLSB_P_INPUT_ERROR:
357 - announce_buffer_error(q);
358 + announce_buffer_error(q, count);
359 /* process the buffer, the upper layer will take care of it */
360 q->first_to_check = add_buf(q->first_to_check, count);
361 atomic_sub(count, &q->nr_buf_used);
362 @@ -507,7 +547,7 @@ static int qdio_inbound_q_done(struct qd
366 - get_buf_state(q, q->first_to_check, &state);
367 + get_buf_state(q, q->first_to_check, &state, 0);
368 if (state == SLSB_P_INPUT_PRIMED)
369 /* we got something to do */
371 @@ -610,7 +650,7 @@ check_next:
372 if (q->first_to_check == stop)
373 return q->first_to_check;
375 - count = get_buf_states(q, q->first_to_check, &state, count);
376 + count = get_buf_states(q, q->first_to_check, &state, count, 0);
378 return q->first_to_check;
380 @@ -629,7 +669,7 @@ check_next:
383 case SLSB_P_OUTPUT_ERROR:
384 - announce_buffer_error(q);
385 + announce_buffer_error(q, count);
386 /* process the buffer, the upper layer will take care of it */
387 q->first_to_check = add_buf(q->first_to_check, count);
388 atomic_sub(count, &q->nr_buf_used);
389 @@ -1441,23 +1481,38 @@ static inline int buf_in_between(int buf
390 static void handle_inbound(struct qdio_q *q, unsigned int callflags,
391 int bufnr, int count)
393 - unsigned long flags;
395 + int used, rc, diff;
398 - * do_QDIO could run in parallel with the queue tasklet so the
399 - * upper-layer programm could empty the ACK'ed buffer here.
400 - * If that happens we must clear the polling flag, otherwise
401 - * qdio_stop_polling() could set the buffer to NOT_INIT after
402 - * it was set to EMPTY which would kill us.
404 - spin_lock_irqsave(&q->u.in.lock, flags);
405 - if (q->u.in.polling)
406 - if (buf_in_between(q->last_move_ftc, bufnr, count))
407 + if (!q->u.in.polling)
410 + /* protect against stop polling setting an ACK for an emptied slsb */
411 + if (count == QDIO_MAX_BUFFERS_PER_Q) {
412 + /* overwriting everything, just delete polling status */
413 + q->u.in.polling = 0;
414 + q->u.in.ack_count = 0;
416 + } else if (buf_in_between(q->last_move_ftc, bufnr, count)) {
418 + /* partial overwrite, just update last_move_ftc */
419 + diff = add_buf(bufnr, count);
420 + diff = sub_buf(diff, q->last_move_ftc);
421 + q->u.in.ack_count -= diff;
422 + if (q->u.in.ack_count <= 0) {
423 + q->u.in.polling = 0;
424 + q->u.in.ack_count = 0;
425 + /* TODO: must we set last_move_ftc to something meaningful? */
428 + q->last_move_ftc = add_buf(q->last_move_ftc, diff);
431 + /* the only ACK will be deleted, so stop polling */
436 count = set_buf_states(q, bufnr, SLSB_CU_INPUT_EMPTY, count);
437 - spin_unlock_irqrestore(&q->u.in.lock, flags);
439 used = atomic_add_return(count, &q->nr_buf_used) - count;
440 BUG_ON(used + count > QDIO_MAX_BUFFERS_PER_Q);
441 @@ -1516,7 +1571,7 @@ static void handle_outbound(struct qdio_
444 /* try to fast requeue buffers */
445 - get_buf_state(q, prev_buf(bufnr), &state);
446 + get_buf_state(q, prev_buf(bufnr), &state, 0);
447 if (state != SLSB_CU_OUTPUT_PRIMED)
448 qdio_kick_outbound_q(q);
450 Index: linux-sles11/drivers/s390/cio/qdio_perf.c
451 ===================================================================
452 --- linux-sles11.orig/drivers/s390/cio/qdio_perf.c
453 +++ linux-sles11/drivers/s390/cio/qdio_perf.c
454 @@ -74,6 +74,8 @@ static int qdio_perf_proc_show(struct se
456 seq_printf(m, "Number of fast requeues (outg. SBAL w/o SIGA)\t: %li\n",
457 (long)atomic_long_read(&perf_stats.fast_requeue));
458 + seq_printf(m, "Number of outbound target full condition\t: %li\n",
459 + (long)atomic_long_read(&perf_stats.outbound_target_full));
460 seq_printf(m, "Number of outbound tasklet mod_timer calls\t: %li\n",
461 (long)atomic_long_read(&perf_stats.debug_tl_out_timer));
462 seq_printf(m, "Number of stop polling calls\t\t\t: %li\n",
463 Index: linux-sles11/drivers/s390/cio/qdio_perf.h
464 ===================================================================
465 --- linux-sles11.orig/drivers/s390/cio/qdio_perf.h
466 +++ linux-sles11/drivers/s390/cio/qdio_perf.h
467 @@ -36,6 +36,7 @@ struct qdio_perf_stats {
468 atomic_long_t inbound_handler;
469 atomic_long_t outbound_handler;
470 atomic_long_t fast_requeue;
471 + atomic_long_t outbound_target_full;
474 atomic_long_t debug_tl_out_timer;
475 Index: linux-sles11/drivers/s390/cio/qdio_setup.c
476 ===================================================================
477 --- linux-sles11.orig/drivers/s390/cio/qdio_setup.c
478 +++ linux-sles11/drivers/s390/cio/qdio_setup.c
479 @@ -167,7 +167,6 @@ static void setup_queues(struct qdio_irq
480 setup_queues_misc(q, irq_ptr, qdio_init->input_handler, i);
483 - spin_lock_init(&q->u.in.lock);
484 setup_storage_lists(q, irq_ptr, input_sbal_array, i);
485 input_sbal_array += QDIO_MAX_BUFFERS_PER_Q;
487 Index: linux-sles11/drivers/s390/cio/qdio_thinint.c
488 ===================================================================
489 --- linux-sles11.orig/drivers/s390/cio/qdio_thinint.c
490 +++ linux-sles11/drivers/s390/cio/qdio_thinint.c
491 @@ -131,7 +131,7 @@ static inline int tiqdio_inbound_q_done(
495 - get_buf_state(q, q->first_to_check, &state);
496 + get_buf_state(q, q->first_to_check, &state, 0);
498 if (state == SLSB_P_INPUT_PRIMED)
499 /* more work coming */