1 /* Copyright (C) 2018-2020 Free Software Foundation, Inc.
2 Contributed by Nicolas Koenig
4 This file is part of the GNU Fortran runtime library (libgfortran).
6 Libgfortran is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
11 Libgfortran is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 Under Section 7 of GPL version 3, you are granted additional
17 permissions described in the GCC Runtime Library Exception, version
18 3.1, as published by the Free Software Foundation.
20 You should have received a copy of the GNU General Public License and
21 a copy of the GCC Runtime Library Exception along with this program;
22 see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
23 <http://www.gnu.org/licenses/>. */
25 #include "libgfortran.h"
27 #define _GTHREAD_USE_COND_INIT_FUNC
28 #include "../../libgcc/gthr.h"
36 #include <sys/types.h>
41 DEBUG_LINE (__thread
const char *aio_prefix
= MPREFIX
);
43 DEBUG_LINE (__gthread_mutex_t debug_queue_lock
= __GTHREAD_MUTEX_INIT
;)
44 DEBUG_LINE (aio_lock_debug
*aio_debug_head
= NULL
;)
46 /* Current unit for asynchronous I/O. Needed for error reporting. */
48 __thread gfc_unit
*thread_unit
= NULL
;
50 /* Queue entry for the asynchronous I/O entry. */
51 typedef struct transfer_queue
54 struct transfer_queue
*next
;
55 struct st_parameter_dt
*new_pdt
;
66 /* Helper function to exchange the old vs. a new PDT. */
69 update_pdt (st_parameter_dt
**old
, st_parameter_dt
*new) {
70 st_parameter_dt
*temp
;
71 NOTE ("Changing pdts, current_unit = %p", (void *) (new->u
.p
.current_unit
));
78 /* Destroy an adv_cond structure. */
81 destroy_adv_cond (struct adv_cond
*ac
)
83 T_ERROR (__gthread_cond_destroy
, &ac
->signal
);
86 /* Function invoked as start routine for a new asynchronous I/O unit.
87 Contains the main loop for accepting requests and handling them. */
92 DEBUG_LINE (aio_prefix
= TPREFIX
);
93 transfer_queue
*ctq
= NULL
, *prev
= NULL
;
94 gfc_unit
*u
= (gfc_unit
*) arg
;
95 async_unit
*au
= u
->au
;
98 au
->thread
= __gthread_self ();
101 /* Main loop. At this point, au->lock is always held. */
102 WAIT_SIGNAL_MUTEX (&au
->work
, au
->tail
!= NULL
, &au
->lock
);
106 /* Loop over the queue entries until they are finished. */
112 if (!au
->error
.has_error
)
119 NOTE ("Finalizing write");
120 st_write_done_worker (au
->pdt
);
121 UNLOCK (&au
->io_lock
);
125 NOTE ("Finalizing read");
126 st_read_done_worker (au
->pdt
);
127 UNLOCK (&au
->io_lock
);
130 case AIO_DATA_TRANSFER_INIT
:
131 NOTE ("Data transfer init");
133 update_pdt (&au
->pdt
, ctq
->new_pdt
);
134 data_transfer_init_worker (au
->pdt
, ctq
->read_flag
);
137 case AIO_TRANSFER_SCALAR
:
138 NOTE ("Starting scalar transfer");
139 ctq
->arg
.scalar
.transfer (au
->pdt
, ctq
->arg
.scalar
.arg_bt
,
140 ctq
->arg
.scalar
.data
,
146 case AIO_TRANSFER_ARRAY
:
147 NOTE ("Starting array transfer");
148 NOTE ("ctq->arg.array.desc = %p",
149 (void *) (ctq
->arg
.array
.desc
));
150 transfer_array_inner (au
->pdt
, ctq
->arg
.array
.desc
,
152 ctq
->arg
.array
.charlen
);
153 free (ctq
->arg
.array
.desc
);
157 NOTE ("Received AIO_CLOSE");
162 internal_error (NULL
, "Invalid queue type");
166 if (unlikely (au
->error
.has_error
))
167 au
->error
.last_good_id
= au
->id
.low
- 1;
171 if (ctq
->type
== AIO_WRITE_DONE
|| ctq
->type
== AIO_READ_DONE
)
173 UNLOCK (&au
->io_lock
);
175 else if (ctq
->type
== AIO_CLOSE
)
177 NOTE ("Received AIO_CLOSE during error condition");
182 NOTE ("Next ctq, current id: %d", au
->id
.low
);
183 if (ctq
->has_id
&& au
->id
.waiting
== au
->id
.low
++)
184 SIGNAL (&au
->id
.done
);
191 SIGNAL (&au
->emptysignal
);
197 SIGNAL (&au
->emptysignal
);
203 /* Free an asynchronous unit. */
206 free_async_unit (async_unit
*au
)
209 internal_error (NULL
, "Trying to free nonempty asynchronous unit");
211 destroy_adv_cond (&au
->work
);
212 destroy_adv_cond (&au
->emptysignal
);
213 destroy_adv_cond (&au
->id
.done
);
214 T_ERROR (__gthread_mutex_destroy
, &au
->lock
);
218 /* Initialize an adv_cond structure. */
221 init_adv_cond (struct adv_cond
*ac
)
224 __GTHREAD_COND_INIT_FUNCTION (&ac
->signal
);
227 /* Initialize an asyncronous unit, returning zero on success,
228 nonzero on failure. It also sets u->au. */
231 init_async_unit (gfc_unit
*u
)
234 if (!__gthread_active_p ())
240 au
= (async_unit
*) xmalloc (sizeof (async_unit
));
242 init_adv_cond (&au
->work
);
243 init_adv_cond (&au
->emptysignal
);
244 __GTHREAD_MUTEX_INIT_FUNCTION (&au
->lock
);
245 __GTHREAD_MUTEX_INIT_FUNCTION (&au
->io_lock
);
247 T_ERROR (__gthread_create
, &au
->thread
, &async_io
, (void *) u
);
255 au
->error
.fatal_error
= 0;
256 au
->error
.has_error
= 0;
257 au
->error
.last_good_id
= 0;
258 init_adv_cond (&au
->id
.done
);
262 /* Enqueue a transfer statement. */
265 enqueue_transfer (async_unit
*au
, transfer_args
*arg
, enum aio_do type
)
267 transfer_queue
*tq
= calloc (sizeof (transfer_queue
), 1);
277 REVOKE_SIGNAL (&(au
->emptysignal
));
283 /* Enqueue an st_write_done or st_read_done which contains an ID. */
286 enqueue_done_id (async_unit
*au
, enum aio_do type
)
289 transfer_queue
*tq
= calloc (sizeof (transfer_queue
), 1);
299 REVOKE_SIGNAL (&(au
->emptysignal
));
302 NOTE ("Enqueue id: %d", ret
);
308 /* Enqueue an st_write_done or st_read_done without an ID. */
311 enqueue_done (async_unit
*au
, enum aio_do type
)
313 transfer_queue
*tq
= calloc (sizeof (transfer_queue
), 1);
322 REVOKE_SIGNAL (&(au
->emptysignal
));
328 /* Enqueue a CLOSE statement. */
331 enqueue_close (async_unit
*au
)
333 transfer_queue
*tq
= calloc (sizeof (transfer_queue
), 1);
335 tq
->type
= AIO_CLOSE
;
342 REVOKE_SIGNAL (&(au
->emptysignal
));
348 /* The asynchronous unit keeps the currently active PDT around.
349 This function changes that to the current one. */
352 enqueue_data_transfer_init (async_unit
*au
, st_parameter_dt
*dt
, int read_flag
)
354 st_parameter_dt
*new = xmalloc (sizeof (st_parameter_dt
));
355 transfer_queue
*tq
= xmalloc (sizeof (transfer_queue
));
357 memcpy ((void *) new, (void *) dt
, sizeof (st_parameter_dt
));
359 NOTE ("dt->internal_unit_desc = %p", dt
->internal_unit_desc
);
360 NOTE ("common.flags & mask = %d", dt
->common
.flags
& IOPARM_LIBRETURN_MASK
);
362 tq
->type
= AIO_DATA_TRANSFER_INIT
;
363 tq
->read_flag
= read_flag
;
373 REVOKE_SIGNAL (&(au
->emptysignal
));
379 /* Collect the errors that may have happened asynchronously. Return true if
380 an error has been encountered. */
383 collect_async_errors (st_parameter_common
*cmp
, async_unit
*au
)
385 bool has_error
= au
->error
.has_error
;
389 if (generate_error_common (cmp
, au
->error
.family
, au
->error
.message
))
391 au
->error
.has_error
= 0;
392 au
->error
.cmp
= NULL
;
396 /* The program will exit later. */
397 au
->error
.fatal_error
= true;
403 /* Perform a wait operation on an asynchronous unit with an ID specified,
404 which means collecting the errors that may have happened asynchronously.
405 Return true if an error has been encountered. */
408 async_wait_id (st_parameter_common
*cmp
, async_unit
*au
, int i
)
418 if (au
->error
.has_error
)
420 if (i
<= au
->error
.last_good_id
)
423 return collect_async_errors (cmp
, au
);
427 NOTE ("Waiting for id %d", i
);
428 if (au
->id
.waiting
< i
)
430 SIGNAL (&(au
->work
));
431 WAIT_SIGNAL_MUTEX (&(au
->id
.done
),
432 (au
->id
.low
>= au
->id
.waiting
|| au
->empty
), &au
->lock
);
434 ret
= collect_async_errors (cmp
, au
);
439 /* Perform a wait operation an an asynchronous unit without an ID. */
442 async_wait (st_parameter_common
*cmp
, async_unit
*au
)
453 SIGNAL (&(au
->work
));
457 ret
= collect_async_errors (cmp
, au
);
462 WAIT_SIGNAL_MUTEX (&(au
->emptysignal
), (au
->empty
), &au
->lock
);
463 ret
= collect_async_errors (cmp
, au
);
467 /* Close an asynchronous unit. */
470 async_close (async_unit
*au
)
475 NOTE ("Closing async unit");
477 T_ERROR (__gthread_join
, au
->thread
, NULL
);
478 free_async_unit (au
);
483 /* Only set u->au to NULL so no async I/O will happen. */
486 init_async_unit (gfc_unit
*u
)
492 /* Do-nothing function, which will not be called. */
495 enqueue_transfer (async_unit
*au
, transfer_args
*arg
, enum aio_do type
)
500 /* Do-nothing function, which will not be called. */
503 enqueue_done_id (async_unit
*au
, enum aio_do type
)
508 /* Do-nothing function, which will not be called. */
511 enqueue_done (async_unit
*au
, enum aio_do type
)
516 /* Do-nothing function, which will not be called. */
519 enqueue_close (async_unit
*au
)
524 /* Do-nothing function, which will not be called. */
527 enqueue_data_transfer_init (async_unit
*au
, st_parameter_dt
*dt
, int read_flag
)
532 /* Do-nothing function, which will not be called. */
535 collect_async_errors (st_parameter_common
*cmp
, async_unit
*au
)
540 /* Do-nothing function, which will not be called. */
543 async_wait_id (st_parameter_common
*cmp
, async_unit
*au
, int i
)
548 /* Do-nothing function, which will not be called. */
551 async_wait (st_parameter_common
*cmp
, async_unit
*au
)
556 /* Do-nothing function, which will not be called. */
559 async_close (async_unit
*au
)