]> git.ipfire.org Git - thirdparty/gcc.git/blob - libgfortran/io/async.c
Update copyright years.
[thirdparty/gcc.git] / libgfortran / io / async.c
1 /* Copyright (C) 2018-2024 Free Software Foundation, Inc.
2 Contributed by Nicolas Koenig
3
4 This file is part of the GNU Fortran runtime library (libgfortran).
5
6 Libgfortran is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
9 any later version.
10
11 Libgfortran is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 Under Section 7 of GPL version 3, you are granted additional
17 permissions described in the GCC Runtime Library Exception, version
18 3.1, as published by the Free Software Foundation.
19
20 You should have received a copy of the GNU General Public License and
21 a copy of the GCC Runtime Library Exception along with this program;
22 see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
23 <http://www.gnu.org/licenses/>. */
24
25 #include "libgfortran.h"
26
27 #define _GTHREAD_USE_COND_INIT_FUNC
28 #include "../../libgcc/gthr.h"
29 #include "io.h"
30 #include "fbuf.h"
31 #include "format.h"
32 #include "unix.h"
33 #include <string.h>
34 #include <assert.h>
35
36 #include <sys/types.h>
37
38 #include "async.h"
39 #if ASYNC_IO
40
41 DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
42
43 DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
44 DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
45 #ifdef __GTHREAD_RWLOCK_INIT
46 DEBUG_LINE (aio_rwlock_debug *aio_rwlock_debug_head = NULL;)
47 DEBUG_LINE (__gthread_rwlock_t debug_queue_rwlock = __GTHREAD_RWLOCK_INIT;)
48 #endif
49
50 /* Current unit for asynchronous I/O. Needed for error reporting. */
51
52 __thread gfc_unit *thread_unit = NULL;
53
54 /* Queue entry for the asynchronous I/O entry. */
55 typedef struct transfer_queue
56 {
57 enum aio_do type;
58 struct transfer_queue *next;
59 struct st_parameter_dt *new_pdt;
60 transfer_args arg;
61 _Bool has_id;
62 int read_flag;
63 } transfer_queue;
64
65 struct error {
66 st_parameter_dt *dtp;
67 int id;
68 };
69
70 /* Helper function to exchange the old vs. a new PDT. */
71
72 static void
73 update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
74 st_parameter_dt *temp;
75 NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
76 temp = *old;
77 *old = new;
78 free (temp);
79 }
80
81 /* Destroy an adv_cond structure. */
82
83 static void
84 destroy_adv_cond (struct adv_cond *ac)
85 {
86 T_ERROR (__gthread_cond_destroy, &ac->signal);
87 }
88
89 /* Function invoked as start routine for a new asynchronous I/O unit.
90 Contains the main loop for accepting requests and handling them. */
91
92 static void *
93 async_io (void *arg)
94 {
95 DEBUG_LINE (aio_prefix = TPREFIX);
96 transfer_queue *ctq = NULL, *prev = NULL;
97 gfc_unit *u = (gfc_unit *) arg;
98 async_unit *au = u->au;
99 LOCK (&au->lock);
100 thread_unit = u;
101 au->thread = __gthread_self ();
102 while (true)
103 {
104 /* Main loop. At this point, au->lock is always held. */
105 WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
106 LOCK (&au->lock);
107 ctq = au->head;
108 prev = NULL;
109 /* Loop over the queue entries until they are finished. */
110 while (ctq)
111 {
112 free (prev);
113 prev = ctq;
114 if (!au->error.has_error)
115 {
116 UNLOCK (&au->lock);
117
118 switch (ctq->type)
119 {
120 case AIO_WRITE_DONE:
121 NOTE ("Finalizing write");
122 st_write_done_worker (au->pdt, false);
123 UNLOCK (&au->io_lock);
124 break;
125
126 case AIO_READ_DONE:
127 NOTE ("Finalizing read");
128 st_read_done_worker (au->pdt, false);
129 UNLOCK (&au->io_lock);
130 break;
131
132 case AIO_DATA_TRANSFER_INIT:
133 NOTE ("Data transfer init");
134 LOCK (&au->io_lock);
135 update_pdt (&au->pdt, ctq->new_pdt);
136 data_transfer_init_worker (au->pdt, ctq->read_flag);
137 break;
138
139 case AIO_TRANSFER_SCALAR:
140 NOTE ("Starting scalar transfer");
141 ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
142 ctq->arg.scalar.data,
143 ctq->arg.scalar.i,
144 ctq->arg.scalar.s1,
145 ctq->arg.scalar.s2);
146 break;
147
148 case AIO_TRANSFER_ARRAY:
149 NOTE ("Starting array transfer");
150 NOTE ("ctq->arg.array.desc = %p",
151 (void *) (ctq->arg.array.desc));
152 transfer_array_inner (au->pdt, ctq->arg.array.desc,
153 ctq->arg.array.kind,
154 ctq->arg.array.charlen);
155 free (ctq->arg.array.desc);
156 break;
157
158 case AIO_CLOSE:
159 NOTE ("Received AIO_CLOSE");
160 LOCK (&au->lock);
161 goto finish_thread;
162
163 default:
164 internal_error (NULL, "Invalid queue type");
165 break;
166 }
167 LOCK (&au->lock);
168 if (unlikely (au->error.has_error))
169 au->error.last_good_id = au->id.low - 1;
170 }
171 else
172 {
173 if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
174 {
175 UNLOCK (&au->io_lock);
176 }
177 else if (ctq->type == AIO_CLOSE)
178 {
179 NOTE ("Received AIO_CLOSE during error condition");
180 goto finish_thread;
181 }
182 }
183
184 NOTE ("Next ctq, current id: %d", au->id.low);
185 if (ctq->has_id && au->id.waiting == au->id.low++)
186 SIGNAL (&au->id.done);
187
188 ctq = ctq->next;
189 }
190 au->tail = NULL;
191 au->head = NULL;
192 au->empty = 1;
193 SIGNAL (&au->emptysignal);
194 }
195 finish_thread:
196 au->tail = NULL;
197 au->head = NULL;
198 au->empty = 1;
199 SIGNAL (&au->emptysignal);
200 free (ctq);
201 UNLOCK (&au->lock);
202 return NULL;
203 }
204
205 /* Free an asynchronous unit. */
206
207 static void
208 free_async_unit (async_unit *au)
209 {
210 if (au->tail)
211 internal_error (NULL, "Trying to free nonempty asynchronous unit");
212
213 destroy_adv_cond (&au->work);
214 destroy_adv_cond (&au->emptysignal);
215 destroy_adv_cond (&au->id.done);
216 T_ERROR (__gthread_mutex_destroy, &au->lock);
217 free (au);
218 }
219
220 /* Initialize an adv_cond structure. */
221
222 static void
223 init_adv_cond (struct adv_cond *ac)
224 {
225 ac->pending = 0;
226 __GTHREAD_COND_INIT_FUNCTION (&ac->signal);
227 }
228
229 /* Initialize an asyncronous unit, returning zero on success,
230 nonzero on failure. It also sets u->au. */
231
232 void
233 init_async_unit (gfc_unit *u)
234 {
235 async_unit *au;
236 if (!__gthread_active_p ())
237 {
238 u->au = NULL;
239 return;
240 }
241
242 au = (async_unit *) xmalloc (sizeof (async_unit));
243 u->au = au;
244 init_adv_cond (&au->work);
245 init_adv_cond (&au->emptysignal);
246 __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
247 __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
248 LOCK (&au->lock);
249 T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
250 au->pdt = NULL;
251 au->head = NULL;
252 au->tail = NULL;
253 au->empty = true;
254 au->id.waiting = -1;
255 au->id.low = 0;
256 au->id.high = 0;
257 au->error.fatal_error = 0;
258 au->error.has_error = 0;
259 au->error.last_good_id = 0;
260 init_adv_cond (&au->id.done);
261 UNLOCK (&au->lock);
262 }
263
264 /* Enqueue a transfer statement. */
265
266 void
267 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
268 {
269 transfer_queue *tq = calloc (1, sizeof (transfer_queue));
270 tq->arg = *arg;
271 tq->type = type;
272 tq->has_id = 0;
273 LOCK (&au->lock);
274 if (!au->tail)
275 au->head = tq;
276 else
277 au->tail->next = tq;
278 au->tail = tq;
279 REVOKE_SIGNAL (&(au->emptysignal));
280 au->empty = false;
281 SIGNAL (&au->work);
282 UNLOCK (&au->lock);
283 }
284
285 /* Enqueue an st_write_done or st_read_done which contains an ID. */
286
287 int
288 enqueue_done_id (async_unit *au, enum aio_do type)
289 {
290 int ret;
291 transfer_queue *tq = calloc (1, sizeof (transfer_queue));
292
293 tq->type = type;
294 tq->has_id = 1;
295 LOCK (&au->lock);
296 if (!au->tail)
297 au->head = tq;
298 else
299 au->tail->next = tq;
300 au->tail = tq;
301 REVOKE_SIGNAL (&(au->emptysignal));
302 au->empty = false;
303 ret = au->id.high++;
304 NOTE ("Enqueue id: %d", ret);
305 SIGNAL (&au->work);
306 UNLOCK (&au->lock);
307 return ret;
308 }
309
310 /* Enqueue an st_write_done or st_read_done without an ID. */
311
312 void
313 enqueue_done (async_unit *au, enum aio_do type)
314 {
315 transfer_queue *tq = calloc (1, sizeof (transfer_queue));
316 tq->type = type;
317 tq->has_id = 0;
318 LOCK (&au->lock);
319 if (!au->tail)
320 au->head = tq;
321 else
322 au->tail->next = tq;
323 au->tail = tq;
324 REVOKE_SIGNAL (&(au->emptysignal));
325 au->empty = false;
326 SIGNAL (&au->work);
327 UNLOCK (&au->lock);
328 }
329
330 /* Enqueue a CLOSE statement. */
331
332 void
333 enqueue_close (async_unit *au)
334 {
335 transfer_queue *tq = calloc (1, sizeof (transfer_queue));
336
337 tq->type = AIO_CLOSE;
338 LOCK (&au->lock);
339 if (!au->tail)
340 au->head = tq;
341 else
342 au->tail->next = tq;
343 au->tail = tq;
344 REVOKE_SIGNAL (&(au->emptysignal));
345 au->empty = false;
346 SIGNAL (&au->work);
347 UNLOCK (&au->lock);
348 }
349
350 /* The asynchronous unit keeps the currently active PDT around.
351 This function changes that to the current one. */
352
353 void
354 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
355 {
356 st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
357 transfer_queue *tq = xmalloc (sizeof (transfer_queue));
358
359 memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
360
361 NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
362 NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
363 tq->next = NULL;
364 tq->type = AIO_DATA_TRANSFER_INIT;
365 tq->read_flag = read_flag;
366 tq->has_id = 0;
367 tq->new_pdt = new;
368 LOCK (&au->lock);
369
370 if (!au->tail)
371 au->head = tq;
372 else
373 au->tail->next = tq;
374 au->tail = tq;
375 REVOKE_SIGNAL (&(au->emptysignal));
376 au->empty = false;
377 SIGNAL (&au->work);
378 UNLOCK (&au->lock);
379 }
380
381 /* Collect the errors that may have happened asynchronously. Return true if
382 an error has been encountered. */
383
384 bool
385 collect_async_errors (st_parameter_common *cmp, async_unit *au)
386 {
387 bool has_error = au->error.has_error;
388
389 if (has_error)
390 {
391 if (generate_error_common (cmp, au->error.family, au->error.message))
392 {
393 au->error.has_error = 0;
394 au->error.cmp = NULL;
395 }
396 else
397 {
398 /* The program will exit later. */
399 au->error.fatal_error = true;
400 }
401 }
402 return has_error;
403 }
404
405 /* Perform a wait operation on an asynchronous unit with an ID specified,
406 which means collecting the errors that may have happened asynchronously.
407 Return true if an error has been encountered. */
408
409 bool
410 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
411 {
412 bool ret;
413
414 if (au == NULL)
415 return false;
416
417 if (cmp == NULL)
418 cmp = au->error.cmp;
419
420 if (au->error.has_error)
421 {
422 if (i <= au->error.last_good_id)
423 return false;
424
425 return collect_async_errors (cmp, au);
426 }
427
428 LOCK (&au->lock);
429 if (i > au->id.high)
430 {
431 generate_error_common (cmp, LIBERROR_BAD_WAIT_ID, NULL);
432 UNLOCK (&au->lock);
433 return true;
434 }
435
436 NOTE ("Waiting for id %d", i);
437 if (au->id.waiting < i)
438 au->id.waiting = i;
439 SIGNAL (&(au->work));
440 WAIT_SIGNAL_MUTEX (&(au->id.done),
441 (au->id.low >= au->id.waiting || au->empty), &au->lock);
442 LOCK (&au->lock);
443 ret = collect_async_errors (cmp, au);
444 UNLOCK (&au->lock);
445 return ret;
446 }
447
448 /* Perform a wait operation an an asynchronous unit without an ID. */
449
450 bool
451 async_wait (st_parameter_common *cmp, async_unit *au)
452 {
453 bool ret;
454
455 if (au == NULL)
456 return false;
457
458 if (cmp == NULL)
459 cmp = au->error.cmp;
460
461 LOCK (&(au->lock));
462 SIGNAL (&(au->work));
463
464 if (au->empty)
465 {
466 ret = collect_async_errors (cmp, au);
467 UNLOCK (&au->lock);
468 return ret;
469 }
470
471 WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
472 ret = collect_async_errors (cmp, au);
473 return ret;
474 }
475
476 /* Close an asynchronous unit. */
477
478 void
479 async_close (async_unit *au)
480 {
481 if (au == NULL)
482 return;
483
484 NOTE ("Closing async unit");
485 enqueue_close (au);
486 T_ERROR (__gthread_join, au->thread, NULL);
487 free_async_unit (au);
488 }
489
490 #else
491
492 /* Only set u->au to NULL so no async I/O will happen. */
493
494 void
495 init_async_unit (gfc_unit *u)
496 {
497 u->au = NULL;
498 return;
499 }
500
501 /* Do-nothing function, which will not be called. */
502
503 void
504 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
505 {
506 return;
507 }
508
509 /* Do-nothing function, which will not be called. */
510
511 int
512 enqueue_done_id (async_unit *au, enum aio_do type)
513 {
514 return 0;
515 }
516
517 /* Do-nothing function, which will not be called. */
518
519 void
520 enqueue_done (async_unit *au, enum aio_do type)
521 {
522 return;
523 }
524
525 /* Do-nothing function, which will not be called. */
526
527 void
528 enqueue_close (async_unit *au)
529 {
530 return;
531 }
532
533 /* Do-nothing function, which will not be called. */
534
535 void
536 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
537 {
538 return;
539 }
540
541 /* Do-nothing function, which will not be called. */
542
543 bool
544 collect_async_errors (st_parameter_common *cmp, async_unit *au)
545 {
546 return false;
547 }
548
549 /* Do-nothing function, which will not be called. */
550
551 bool
552 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
553 {
554 return false;
555 }
556
557 /* Do-nothing function, which will not be called. */
558
559 bool
560 async_wait (st_parameter_common *cmp, async_unit *au)
561 {
562 return false;
563 }
564
565 /* Do-nothing function, which will not be called. */
566
567 void
568 async_close (async_unit *au)
569 {
570 return;
571 }
572
573 #endif