]> git.ipfire.org Git - thirdparty/gcc.git/blob - libgfortran/io/async.c
Update copyright years.
[thirdparty/gcc.git] / libgfortran / io / async.c
1 /* Copyright (C) 2018-2020 Free Software Foundation, Inc.
2 Contributed by Nicolas Koenig
3
4 This file is part of the GNU Fortran runtime library (libgfortran).
5
6 Libgfortran is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
9 any later version.
10
11 Libgfortran is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 Under Section 7 of GPL version 3, you are granted additional
17 permissions described in the GCC Runtime Library Exception, version
18 3.1, as published by the Free Software Foundation.
19
20 You should have received a copy of the GNU General Public License and
21 a copy of the GCC Runtime Library Exception along with this program;
22 see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
23 <http://www.gnu.org/licenses/>. */
24
25 #include "libgfortran.h"
26
27 #define _GTHREAD_USE_COND_INIT_FUNC
28 #include "../../libgcc/gthr.h"
29 #include "io.h"
30 #include "fbuf.h"
31 #include "format.h"
32 #include "unix.h"
33 #include <string.h>
34 #include <assert.h>
35
36 #include <sys/types.h>
37
38 #include "async.h"
39 #if ASYNC_IO
40
41 DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
42
43 DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
44 DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
45
46 /* Current unit for asynchronous I/O. Needed for error reporting. */
47
48 __thread gfc_unit *thread_unit = NULL;
49
50 /* Queue entry for the asynchronous I/O entry. */
51 typedef struct transfer_queue
52 {
53 enum aio_do type;
54 struct transfer_queue *next;
55 struct st_parameter_dt *new_pdt;
56 transfer_args arg;
57 _Bool has_id;
58 int read_flag;
59 } transfer_queue;
60
61 struct error {
62 st_parameter_dt *dtp;
63 int id;
64 };
65
66 /* Helper function to exchange the old vs. a new PDT. */
67
68 static void
69 update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
70 st_parameter_dt *temp;
71 NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
72 temp = *old;
73 *old = new;
74 if (temp)
75 free (temp);
76 }
77
78 /* Destroy an adv_cond structure. */
79
80 static void
81 destroy_adv_cond (struct adv_cond *ac)
82 {
83 T_ERROR (__gthread_mutex_destroy, &ac->lock);
84 T_ERROR (__gthread_cond_destroy, &ac->signal);
85 }
86
87 /* Function invoked as start routine for a new asynchronous I/O unit.
88 Contains the main loop for accepting requests and handling them. */
89
90 static void *
91 async_io (void *arg)
92 {
93 DEBUG_LINE (aio_prefix = TPREFIX);
94 transfer_queue *ctq = NULL, *prev = NULL;
95 gfc_unit *u = (gfc_unit *) arg;
96 async_unit *au = u->au;
97 LOCK (&au->lock);
98 thread_unit = u;
99 au->thread = __gthread_self ();
100 while (true)
101 {
102 /* Main loop. At this point, au->lock is always held. */
103 WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
104 LOCK (&au->lock);
105 ctq = au->head;
106 prev = NULL;
107 /* Loop over the queue entries until they are finished. */
108 while (ctq)
109 {
110 if (prev)
111 free (prev);
112 prev = ctq;
113 if (!au->error.has_error)
114 {
115 UNLOCK (&au->lock);
116
117 switch (ctq->type)
118 {
119 case AIO_WRITE_DONE:
120 NOTE ("Finalizing write");
121 st_write_done_worker (au->pdt);
122 UNLOCK (&au->io_lock);
123 break;
124
125 case AIO_READ_DONE:
126 NOTE ("Finalizing read");
127 st_read_done_worker (au->pdt);
128 UNLOCK (&au->io_lock);
129 break;
130
131 case AIO_DATA_TRANSFER_INIT:
132 NOTE ("Data transfer init");
133 LOCK (&au->io_lock);
134 update_pdt (&au->pdt, ctq->new_pdt);
135 data_transfer_init_worker (au->pdt, ctq->read_flag);
136 break;
137
138 case AIO_TRANSFER_SCALAR:
139 NOTE ("Starting scalar transfer");
140 ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
141 ctq->arg.scalar.data,
142 ctq->arg.scalar.i,
143 ctq->arg.scalar.s1,
144 ctq->arg.scalar.s2);
145 break;
146
147 case AIO_TRANSFER_ARRAY:
148 NOTE ("Starting array transfer");
149 NOTE ("ctq->arg.array.desc = %p",
150 (void *) (ctq->arg.array.desc));
151 transfer_array_inner (au->pdt, ctq->arg.array.desc,
152 ctq->arg.array.kind,
153 ctq->arg.array.charlen);
154 free (ctq->arg.array.desc);
155 break;
156
157 case AIO_CLOSE:
158 NOTE ("Received AIO_CLOSE");
159 goto finish_thread;
160
161 default:
162 internal_error (NULL, "Invalid queue type");
163 break;
164 }
165 LOCK (&au->lock);
166 if (unlikely (au->error.has_error))
167 au->error.last_good_id = au->id.low - 1;
168 }
169 else
170 {
171 if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
172 {
173 UNLOCK (&au->io_lock);
174 }
175 else if (ctq->type == AIO_CLOSE)
176 {
177 NOTE ("Received AIO_CLOSE during error condition");
178 UNLOCK (&au->lock);
179 goto finish_thread;
180 }
181 }
182
183 NOTE ("Next ctq, current id: %d", au->id.low);
184 if (ctq->has_id && au->id.waiting == au->id.low++)
185 SIGNAL (&au->id.done);
186
187 ctq = ctq->next;
188 }
189 au->tail = NULL;
190 au->head = NULL;
191 au->empty = 1;
192 UNLOCK (&au->lock);
193 SIGNAL (&au->emptysignal);
194 LOCK (&au->lock);
195 }
196 finish_thread:
197 au->tail = NULL;
198 au->head = NULL;
199 au->empty = 1;
200 SIGNAL (&au->emptysignal);
201 free (ctq);
202 return NULL;
203 }
204
205 /* Free an asynchronous unit. */
206
207 static void
208 free_async_unit (async_unit *au)
209 {
210 if (au->tail)
211 internal_error (NULL, "Trying to free nonempty asynchronous unit");
212
213 destroy_adv_cond (&au->work);
214 destroy_adv_cond (&au->emptysignal);
215 destroy_adv_cond (&au->id.done);
216 T_ERROR (__gthread_mutex_destroy, &au->lock);
217 free (au);
218 }
219
220 /* Initialize an adv_cond structure. */
221
222 static void
223 init_adv_cond (struct adv_cond *ac)
224 {
225 ac->pending = 0;
226 __GTHREAD_MUTEX_INIT_FUNCTION (&ac->lock);
227 __GTHREAD_COND_INIT_FUNCTION (&ac->signal);
228 }
229
230 /* Initialize an asyncronous unit, returning zero on success,
231 nonzero on failure. It also sets u->au. */
232
233 void
234 init_async_unit (gfc_unit *u)
235 {
236 async_unit *au;
237 if (!__gthread_active_p ())
238 {
239 u->au = NULL;
240 return;
241 }
242
243 au = (async_unit *) xmalloc (sizeof (async_unit));
244 u->au = au;
245 init_adv_cond (&au->work);
246 init_adv_cond (&au->emptysignal);
247 __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
248 __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
249 LOCK (&au->lock);
250 T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
251 au->pdt = NULL;
252 au->head = NULL;
253 au->tail = NULL;
254 au->empty = true;
255 au->id.waiting = -1;
256 au->id.low = 0;
257 au->id.high = 0;
258 au->error.fatal_error = 0;
259 au->error.has_error = 0;
260 au->error.last_good_id = 0;
261 init_adv_cond (&au->id.done);
262 UNLOCK (&au->lock);
263 }
264
265 /* Enqueue a transfer statement. */
266
267 void
268 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
269 {
270 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
271 tq->arg = *arg;
272 tq->type = type;
273 tq->has_id = 0;
274 LOCK (&au->lock);
275 if (!au->tail)
276 au->head = tq;
277 else
278 au->tail->next = tq;
279 au->tail = tq;
280 REVOKE_SIGNAL (&(au->emptysignal));
281 au->empty = false;
282 UNLOCK (&au->lock);
283 SIGNAL (&au->work);
284 }
285
286 /* Enqueue an st_write_done or st_read_done which contains an ID. */
287
288 int
289 enqueue_done_id (async_unit *au, enum aio_do type)
290 {
291 int ret;
292 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
293
294 tq->type = type;
295 tq->has_id = 1;
296 LOCK (&au->lock);
297 if (!au->tail)
298 au->head = tq;
299 else
300 au->tail->next = tq;
301 au->tail = tq;
302 REVOKE_SIGNAL (&(au->emptysignal));
303 au->empty = false;
304 ret = au->id.high++;
305 NOTE ("Enqueue id: %d", ret);
306 UNLOCK (&au->lock);
307 SIGNAL (&au->work);
308 return ret;
309 }
310
311 /* Enqueue an st_write_done or st_read_done without an ID. */
312
313 void
314 enqueue_done (async_unit *au, enum aio_do type)
315 {
316 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
317 tq->type = type;
318 tq->has_id = 0;
319 LOCK (&au->lock);
320 if (!au->tail)
321 au->head = tq;
322 else
323 au->tail->next = tq;
324 au->tail = tq;
325 REVOKE_SIGNAL (&(au->emptysignal));
326 au->empty = false;
327 UNLOCK (&au->lock);
328 SIGNAL (&au->work);
329 }
330
331 /* Enqueue a CLOSE statement. */
332
333 void
334 enqueue_close (async_unit *au)
335 {
336 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
337
338 tq->type = AIO_CLOSE;
339 LOCK (&au->lock);
340 if (!au->tail)
341 au->head = tq;
342 else
343 au->tail->next = tq;
344 au->tail = tq;
345 REVOKE_SIGNAL (&(au->emptysignal));
346 au->empty = false;
347 UNLOCK (&au->lock);
348 SIGNAL (&au->work);
349 }
350
351 /* The asynchronous unit keeps the currently active PDT around.
352 This function changes that to the current one. */
353
354 void
355 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
356 {
357 st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
358 transfer_queue *tq = xmalloc (sizeof (transfer_queue));
359
360 memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
361
362 NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
363 NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
364 tq->next = NULL;
365 tq->type = AIO_DATA_TRANSFER_INIT;
366 tq->read_flag = read_flag;
367 tq->has_id = 0;
368 tq->new_pdt = new;
369 LOCK (&au->lock);
370
371 if (!au->tail)
372 au->head = tq;
373 else
374 au->tail->next = tq;
375 au->tail = tq;
376 REVOKE_SIGNAL (&(au->emptysignal));
377 au->empty = 0;
378 UNLOCK (&au->lock);
379 SIGNAL (&au->work);
380 }
381
382 /* Collect the errors that may have happened asynchronously. Return true if
383 an error has been encountered. */
384
385 bool
386 collect_async_errors (st_parameter_common *cmp, async_unit *au)
387 {
388 bool has_error = au->error.has_error;
389
390 if (has_error)
391 {
392 if (generate_error_common (cmp, au->error.family, au->error.message))
393 {
394 au->error.has_error = 0;
395 au->error.cmp = NULL;
396 }
397 else
398 {
399 /* The program will exit later. */
400 au->error.fatal_error = true;
401 }
402 }
403 return has_error;
404 }
405
406 /* Perform a wait operation on an asynchronous unit with an ID specified,
407 which means collecting the errors that may have happened asynchronously.
408 Return true if an error has been encountered. */
409
410 bool
411 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
412 {
413 bool ret;
414
415 if (au == NULL)
416 return false;
417
418 if (cmp == NULL)
419 cmp = au->error.cmp;
420
421 if (au->error.has_error)
422 {
423 if (i <= au->error.last_good_id)
424 return false;
425
426 return collect_async_errors (cmp, au);
427 }
428
429 LOCK (&au->lock);
430 NOTE ("Waiting for id %d", i);
431 if (au->id.waiting < i)
432 au->id.waiting = i;
433 UNLOCK (&au->lock);
434 SIGNAL (&(au->work));
435 LOCK (&au->lock);
436 WAIT_SIGNAL_MUTEX (&(au->id.done),
437 (au->id.low >= au->id.waiting || au->empty), &au->lock);
438 LOCK (&au->lock);
439 ret = collect_async_errors (cmp, au);
440 UNLOCK (&au->lock);
441 return ret;
442 }
443
444 /* Perform a wait operation an an asynchronous unit without an ID. */
445
446 bool
447 async_wait (st_parameter_common *cmp, async_unit *au)
448 {
449 bool ret;
450
451 if (au == NULL)
452 return false;
453
454 if (cmp == NULL)
455 cmp = au->error.cmp;
456
457 SIGNAL (&(au->work));
458 LOCK (&(au->lock));
459
460 if (au->empty)
461 {
462 ret = collect_async_errors (cmp, au);
463 UNLOCK (&au->lock);
464 return ret;
465 }
466
467 WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
468 ret = collect_async_errors (cmp, au);
469 return ret;
470 }
471
472 /* Close an asynchronous unit. */
473
474 void
475 async_close (async_unit *au)
476 {
477 if (au == NULL)
478 return;
479
480 NOTE ("Closing async unit");
481 enqueue_close (au);
482 T_ERROR (__gthread_join, au->thread, NULL);
483 free_async_unit (au);
484 }
485
486 #else
487
488 /* Only set u->au to NULL so no async I/O will happen. */
489
490 void
491 init_async_unit (gfc_unit *u)
492 {
493 u->au = NULL;
494 return;
495 }
496
497 /* Do-nothing function, which will not be called. */
498
499 void
500 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
501 {
502 return;
503 }
504
505 /* Do-nothing function, which will not be called. */
506
507 int
508 enqueue_done_id (async_unit *au, enum aio_do type)
509 {
510 return 0;
511 }
512
513 /* Do-nothing function, which will not be called. */
514
515 void
516 enqueue_done (async_unit *au, enum aio_do type)
517 {
518 return;
519 }
520
521 /* Do-nothing function, which will not be called. */
522
523 void
524 enqueue_close (async_unit *au)
525 {
526 return;
527 }
528
529 /* Do-nothing function, which will not be called. */
530
531 void
532 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
533 {
534 return;
535 }
536
537 /* Do-nothing function, which will not be called. */
538
539 bool
540 collect_async_errors (st_parameter_common *cmp, async_unit *au)
541 {
542 return false;
543 }
544
545 /* Do-nothing function, which will not be called. */
546
547 bool
548 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
549 {
550 return false;
551 }
552
553 /* Do-nothing function, which will not be called. */
554
555 bool
556 async_wait (st_parameter_common *cmp, async_unit *au)
557 {
558 return false;
559 }
560
561 /* Do-nothing function, which will not be called. */
562
563 void
564 async_close (async_unit *au)
565 {
566 return;
567 }
568
569 #endif