if (supervisor_main_loop (argc, argv, &exit_code))
return;
+
+ thread_support_cleanup ();
shared_memory_cleanup (&local->sm);
/* Free pseudo tokens and memory to allow main process to survive caf_init.
caf_static_list = tmp;
}
free (local);
+
exit (exit_code);
}
caf_teams_formed = NULL;
free (local);
+
+ thread_support_cleanup ();
}
int
{
lock_t *addr;
bool created;
+ size_t alloc_size;
allocator_lock (&local->ai.alloc);
- /* Allocate enough space for the metadata infront of the lock
- array. */
- addr
- = alloc_get_memory_by_id_created (&local->ai, size * sizeof (lock_t),
- next_memid, &created);
+#if defined(WIN32) || defined(__CYGWIN__)
+ /* On Windows mutexes are not an object stored in the shmem but
+ identified by an id. */
+ alloc_size = size * caf_current_team->u.image_info->image_count.count;
+#else
+ alloc_size = size;
+#endif
+ addr = alloc_get_memory_by_id_created (&local->ai,
+ alloc_size * sizeof (lock_t),
+ next_memid, &created);
if (created)
{
/* Initialize the mutex only, when the memory was allocated for the
first time. */
- for (size_t c = 0; c < size; ++c)
+ for (size_t c = 0; c < alloc_size; ++c)
initialize_shared_errorcheck_mutex (&addr[c]);
}
size *= sizeof (lock_t);
default: \
caf_runtime_error ("" #name \
" not available for type/kind combination"); \
+ opr = NULL; /* Prevent false warnings. */ \
} \
break; \
}
default: \
caf_runtime_error ("" #name \
" not available for type/kind combination"); \
+ opr = NULL; /* Prevent false warning. */ \
} \
break; \
default: \
caf_runtime_error ("" #name " not available for type/kind combination"); \
+ opr = NULL; /* Prevent false warning. */ \
}
void
}
void
-_gfortran_caf_lock (caf_token_t token, size_t index,
- int image_index __attribute__ ((unused)),
+_gfortran_caf_lock (caf_token_t token, size_t index, int image_index,
int *acquired_lock, int *stat, char *errmsg,
size_t errmsg_len)
{
const char *msg = "Already locked";
- lock_t *lock = &((lock_t *) MEMTOK (token))[index];
+#if defined(WIN32) || defined(__CYGWIN__)
+ const size_t lock_index
+ = image_index * caf_current_team->u.image_info->image_count.count + index;
+#else
+ const size_t lock_index = index;
+ (void) image_index; // Prevent unused warnings.
+#endif
+ lock_t *lock = &((lock_t *) MEMTOK (token))[lock_index];
int res;
- res
- = acquired_lock ? pthread_mutex_trylock (lock) : pthread_mutex_lock (lock);
+ res = acquired_lock ? caf_shmem_mutex_trylock (lock)
+ : caf_shmem_mutex_lock (lock);
if (stat)
*stat = res == EBUSY ? GFC_STAT_LOCKED : 0;
{
if (errmsg_len > 0)
{
- size_t len = (sizeof (msg) > errmsg_len) ? errmsg_len
- : sizeof (msg);
+ size_t len = (sizeof (msg) > errmsg_len) ? errmsg_len : sizeof (msg);
memcpy (errmsg, msg, len);
if (errmsg_len > len)
- memset (&errmsg[len], ' ', errmsg_len-len);
+ memset (&errmsg[len], ' ', errmsg_len - len);
}
return;
}
_gfortran_caf_error_stop_str (msg, strlen (msg), false);
}
-
void
-_gfortran_caf_unlock (caf_token_t token, size_t index,
- int image_index __attribute__ ((unused)),
+_gfortran_caf_unlock (caf_token_t token, size_t index, int image_index,
int *stat, char *errmsg, size_t errmsg_len)
{
const char *msg = "Variable is not locked";
- lock_t *lock = &((lock_t *) MEMTOK (token))[index];
+#if defined(WIN32) || defined(__CYGWIN__)
+ const size_t lock_index
+ = image_index * caf_current_team->u.image_info->image_count.count + index;
+#else
+ const size_t lock_index = index;
+ (void) image_index; // Prevent unused warnings.
+#endif
+ lock_t *lock = &((lock_t *) MEMTOK (token))[lock_index];
int res;
- res = pthread_mutex_unlock (lock);
+ res = caf_shmem_mutex_unlock (lock);
if (res == 0)
{
{
/* res == EPERM means that the lock is locked. Now figure, if by us by
trying to lock it or by other image, which fails. */
- res = pthread_mutex_trylock (lock);
+ res = caf_shmem_mutex_trylock (lock);
if (res == EBUSY)
*stat = GFC_STAT_LOCKED_OTHER_IMAGE;
else
{
*stat = GFC_STAT_UNLOCKED;
- pthread_mutex_unlock (lock);
+ caf_shmem_mutex_unlock (lock);
}
if (errmsg_len > 0)
{
- size_t len = (sizeof (msg) > errmsg_len) ? errmsg_len
- : sizeof (msg);
+ size_t len = (sizeof (msg) > errmsg_len) ? errmsg_len : sizeof (msg);
memcpy (errmsg, msg, len);
if (errmsg_len > len)
- memset (&errmsg[len], ' ', errmsg_len-len);
+ memset (&errmsg[len], ' ', errmsg_len - len);
}
return;
}
_gfortran_caf_error_stop_str (msg, strlen (msg), false);
}
-
/* Reference the libraries implementation. */
extern void _gfortran_random_seed_i4 (int32_t *size, gfc_array_i4 *put,
gfc_array_i4 *get);
-void _gfortran_caf_random_init (bool repeatable, bool image_distinct)
+void
+_gfortran_caf_random_init (bool repeatable, bool image_distinct)
{
static struct
{
++i)
t->u.image_info->image_map[i] = -1;
}
- counter_barrier_add (&t->u.image_info->image_count, 1);
- counter_barrier_add (&t->u.image_info->collsub.barrier, 1);
+ counter_barrier_init_add (&t->u.image_info->image_count, 1);
+ counter_barrier_init_add (&t->u.image_info->collsub.barrier, 1);
allocator_unlock (&local->ai.alloc);
if (new_index)
#include "../caf_error.h"
#include "supervisor.h"
#include "shared_memory.h"
+#include "thread_support.h"
#include <assert.h>
-#include <pthread.h>
#include <string.h>
/* Worker's part to initialize the alloc interface. */
void
allocator_lock (allocator *a)
{
- pthread_mutex_lock (&a->s->lock);
+ caf_shmem_mutex_lock (&a->s->lock);
}
void
allocator_unlock (allocator *a)
{
- pthread_mutex_unlock (&a->s->lock);
+ caf_shmem_mutex_unlock (&a->s->lock);
}
#define ALLOCATOR_HDR
#include "shared_memory.h"
+#include "thread_support.h"
#include <stddef.h>
-#include <pthread.h>
/* The number of bits a void pointer has. */
#define VOIDP_BITS (__CHAR_BIT__ * sizeof (void *))
/* The shared memory part of the allocator. */
typedef struct {
- pthread_mutex_t lock;
+ caf_shmem_mutex lock;
shared_mem_ptr free_bucket_head[VOIDP_BITS];
} allocator_shared;
{
void *ret;
- pthread_mutex_lock (&caf_current_team->u.image_info->collsub.mutex);
+ caf_shmem_mutex_lock (&caf_current_team->u.image_info->collsub.mutex);
/* curr_size is always at least sizeof(double), so we don't need to worry
about size == 0. */
if (size > caf_current_team->u.image_info->collsub.curr_size)
ret = SHMPTR_AS (void *, caf_current_team->u.image_info->collsub.collsub_buf,
&local->sm);
- pthread_mutex_unlock (&caf_current_team->u.image_info->collsub.mutex);
+ caf_shmem_mutex_unlock (&caf_current_team->u.image_info->collsub.mutex);
return ret;
}
size_t curr_size;
shared_mem_ptr collsub_buf;
counter_barrier barrier;
- pthread_mutex_t mutex;
+ caf_shmem_mutex mutex;
} collsub_shared;
void collsub_init_supervisor (collsub_shared *, allocator *,
static inline void
lock_counter_barrier (counter_barrier *b)
{
- pthread_mutex_lock (&b->mutex);
+ caf_shmem_mutex_lock (&b->mutex);
}
/* Unlock the associated counter of this barrier. */
static inline void
unlock_counter_barrier (counter_barrier *b)
{
- pthread_mutex_unlock (&b->mutex);
+ caf_shmem_mutex_unlock (&b->mutex);
}
void
counter_barrier_init (counter_barrier *b, int val)
{
- *b = (counter_barrier) {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,
- val, 0, val};
- initialize_shared_condition (&b->cond);
+ *b = (counter_barrier) {CAF_SHMEM_MUTEX_INITIALIZER,
+ CAF_SHMEM_COND_INITIALIZER, val, 0, val};
+ initialize_shared_condition (&b->cond, val);
initialize_shared_mutex (&b->mutex);
}
int wait_group_beginning;
lock_counter_barrier (b);
-
wait_group_beginning = b->curr_wait_group;
if ((--b->wait_count) <= 0)
- pthread_cond_broadcast (&b->cond);
+ caf_shmem_cond_broadcast (&b->cond);
else
{
while (b->wait_count > 0 && b->curr_wait_group == wait_group_beginning)
- pthread_cond_wait (&b->cond, &b->mutex);
+ caf_shmem_cond_wait (&b->cond, &b->mutex);
}
if (b->wait_count <= 0)
unlock_counter_barrier (b);
}
-
static inline void
change_internal_barrier_count (counter_barrier *b, int val)
{
b->wait_count += val;
if (b->wait_count <= 0)
- pthread_cond_broadcast (&b->cond);
+ caf_shmem_cond_broadcast (&b->cond);
}
int
counter_barrier_add (counter_barrier *c, int val)
{
int ret;
- pthread_mutex_lock (&c->mutex);
+ caf_shmem_mutex_lock (&c->mutex);
ret = counter_barrier_add_locked (c, val);
- pthread_mutex_unlock (&c->mutex);
+ caf_shmem_mutex_unlock (&c->mutex);
return ret;
}
+void
+counter_barrier_init_add (counter_barrier *b, int val)
+{
+ b->count += val;
+ b->wait_count += val;
+ caf_shmem_cond_update_count (&b->cond, val);
+}
+
int
counter_barrier_get_count (counter_barrier *c)
{
int ret;
- pthread_mutex_lock (&c->mutex);
+ caf_shmem_mutex_lock (&c->mutex);
ret = c->count;
- pthread_mutex_unlock (&c->mutex);
+ caf_shmem_mutex_unlock (&c->mutex);
return ret;
}
#ifndef COUNTER_BARRIER_HDR
#define COUNTER_BARRIER_HDR
-#include <pthread.h>
+#include "thread_support.h"
/* Usable as counter barrier and as waitable counter.
This "class" allows to sync all images acting as a barrier. For this the
typedef struct
{
- pthread_mutex_t mutex;
- pthread_cond_t cond;
+ caf_shmem_mutex mutex;
+ caf_shmem_condvar cond;
volatile int wait_count;
volatile int curr_wait_group;
volatile int count;
int counter_barrier_add (counter_barrier *, int);
+/* Add the given number to the counter barrier. This version does not signal.
+ The mutex needs to be locked for this routine to be safe. */
+void counter_barrier_init_add (counter_barrier *, int);
+
/* Get the count of the barrier. */
int counter_barrier_get_count (counter_barrier *);
see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
<http://www.gnu.org/licenses/>. */
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
#include "libgfortran.h"
#include "allocator.h"
#include "shared_memory.h"
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
+#ifdef HAVE_SYS_MMAN_H
#include <sys/mman.h>
+#elif defined(WIN32)
+#include <Windows.h>
+#include <Memoryapi.h>
+#endif
#include <unistd.h>
/* This implements shared memory based on POSIX mmap. We start with
char buffer[bufsize];
snprintf (buffer, bufsize, "%d", pid);
+#ifdef HAVE_SETENV
setenv (ENV_PPID, buffer, 1);
+#else
+ SetEnvironmentVariable (ENV_PPID, buffer);
+#endif
#undef bufsize
}
shared_memory_get_master (shared_memory_act *mem, size_t size, size_t align)
{
if (mem->glbl.meta->master)
- return (shared_mem_ptr) {mem->glbl.meta->master};
+ return (shared_mem_ptr) {mem->glbl.meta->master};
else
{
ptrdiff_t loc = mem->glbl.meta->used;
char shm_name[NAME_MAX];
const char *env_val = getenv (ENV_PPID), *base = getenv (ENV_BASE);
pid_t ppid = getpid ();
- int shm_fd, res;
void *base_ptr;
if (env_val)
if (!env_val)
{
- shm_fd = shm_open (shm_name, O_CREAT | O_RDWR | O_EXCL, 0600);
- if (shm_fd == -1)
+#ifdef HAVE_MMAP
+ int res;
+
+ mem->shm_fd = shm_open (shm_name, O_CREAT | O_RDWR | O_EXCL, 0600);
+ if (mem->shm_fd == -1)
{
perror ("creating shared memory segment failed.");
exit (1);
}
- res = ftruncate (shm_fd, size);
+ res = ftruncate (mem->shm_fd, size);
if (res == -1)
{
perror ("resizing shared memory segment failed.");
exit (1);
}
+#elif defined(WIN32)
+ mem->shm_fd
+ = CreateFileMapping (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
+ size >> (sizeof (DWORD) * 8),
+ (DWORD) (size & ~((DWORD) 0)), shm_name);
+ if (mem->shm_fd == NULL)
+ {
+ LPVOID lpMsgBuf;
+ DWORD dw = GetLastError ();
+
+ if (FormatMessage (FORMAT_MESSAGE_ALLOCATE_BUFFER
+ | FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, dw,
+ MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR) &lpMsgBuf, 0, NULL)
+ == 0)
+ {
+ fprintf (stderr, "formatting the error message failed.\n");
+ ExitProcess (dw);
+ }
+
+ fprintf (stderr, "creating shared memory segment failed: %d, %s\n",
+ dw, (LPCTSTR) lpMsgBuf);
+
+ LocalFree (lpMsgBuf);
+ exit (1);
+ }
+#else
+#error "no way to map shared memory."
+#endif
}
else
{
- shm_fd = shm_open (shm_name, O_RDWR, 0);
- if (shm_fd == -1)
+#ifdef HAVE_MMAP
+ mem->shm_fd = shm_open (shm_name, O_RDWR, 0);
+ if (mem->shm_fd == -1)
{
perror ("opening shared memory segment failed.");
exit (1);
}
+#elif defined(WIN32)
+ mem->shm_fd = OpenFileMapping (FILE_MAP_ALL_ACCESS, FALSE, shm_name);
+ if (mem->shm_fd == NULL)
+ {
+ perror ("opening shared memory segment failed.");
+ exit (1);
+ }
+#endif
}
-
+#ifdef HAVE_MMAP
mem->glbl.base
- = mmap (base_ptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
- res = close (shm_fd);
+ = mmap (base_ptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, mem->shm_fd, 0);
if (mem->glbl.base == MAP_FAILED)
{
perror ("mmap failed");
exit (1);
}
+#elif defined(WIN32)
+ mem->glbl.base
+ = (LPTSTR) MapViewOfFileExNuma (mem->shm_fd, FILE_MAP_ALL_ACCESS, 0, 0,
+ size, base_ptr, NUMA_NO_PREFERRED_NODE);
+ if (mem->glbl.base == NULL)
+ {
+ perror ("MapViewOfFile failed");
+ exit (1);
+ }
+#endif
if (!base_ptr)
{
#define bufsize 20
char buffer[bufsize];
snprintf (buffer, bufsize, "%p", mem->glbl.base);
+#ifdef HAVE_SETENV
setenv (ENV_BASE, buffer, 1);
+#else
+ SetEnvironmentVariable (ENV_BASE, buffer);
+#endif
#undef bufsize
}
- if (res)
- { // from close()
- perror ("closing shm file handle failed. Trying to continue...");
- }
mem->size = size;
if (!env_val)
*mem->glbl.meta
= (global_shared_memory_meta) {sizeof (global_shared_memory_meta), 0};
-
}
void
-shared_memory_cleanup (shared_memory_act *)
+shared_memory_cleanup (shared_memory_act *mem)
{
char shm_name[NAME_MAX];
- int res;
snprintf (shm_name, NAME_MAX, "/gfor-shm-%s", shared_memory_get_env ());
+#ifdef HAVE_MMAP
+ int res = munmap (mem->glbl.base, mem->size);
+ if (res)
+ {
+ perror ("unmapping shared memory segment failed");
+ }
+ res = close (mem->shm_fd);
+ if (res)
+ {
+ perror ("closing shm file handle failed. Trying to continue...");
+ }
res = shm_unlink (shm_name);
if (res == -1)
{
perror ("shm_unlink failed");
exit (1);
}
+#elif defined(WIN32)
+ if (!UnmapViewOfFile (mem->glbl.base))
+ {
+ perror ("unmapping shared memory segment failed");
+ }
+ CloseHandle (mem->shm_fd);
+#endif
}
#undef NAME_MAX
#ifndef SHARED_MEMORY_H
#define SHARED_MEMORY_H
+#include "thread_support.h"
+
#include <stdlib.h>
#include <stddef.h>
#include <unistd.h>
global_shared_memory_meta *meta;
} glbl;
size_t size; // const
+ caf_shmem_fd shm_fd;
} shared_memory_act;
/* A struct to serve as shared memory object. */
see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
<http://www.gnu.org/licenses/>. */
-#include "config.h"
-
#include "../caf_error.h"
#include "supervisor.h"
#include "teams_mgmt.h"
#elif HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
+#if !defined(_SC_PAGE_SIZE) && defined(WIN32)
+#include <windows.h>
+#endif
#define GFORTRAN_ENV_NUM_IMAGES "GFORTRAN_NUM_IMAGES"
#define GFORTRAN_ENV_SHARED_MEMORY_SIZE "GFORTRAN_SHARED_MEMORY_SIZE"
int nimages;
num_images_char = getenv (GFORTRAN_ENV_NUM_IMAGES);
if (!num_images_char)
- return sysconf (_SC_NPROCESSORS_ONLN); /* TODO: Make portable. */
- /* TODO: Error checking. */
+#ifdef _SC_NPROCESSORS_ONLN
+ return sysconf (_SC_NPROCESSORS_ONLN);
+#elif defined(WIN32)
+ num_images_char = getenv ("NUMBER_OF_PROCESSORS");
+#else
+#error "Unsupported system: No known way to get number of cores!"
+#endif
nimages = atoi (num_images_char);
return nimages;
}
if (sizeof (size_t) == 4)
sz = ((size_t) 1) << 28;
else
+#ifndef WIN32
sz = ((size_t) 1) << 34;
+#else
+ /* Use 1GB on Windows. */
+ sz = ((size_t) 1) << 30;
+#endif
}
return sz;
}
return;
local = malloc (sizeof (image_local));
+#if defined(_SC_PAGE_SIZE)
pagesize = sysconf (_SC_PAGE_SIZE);
+#elif defined(WIN32)
+ {
+ SYSTEM_INFO si;
+ GetNativeSystemInfo (&si);
+ pagesize = si.dwAllocationGranularity;
+ }
+#else
+#warning \
+ "Unsupported system: No known way to get memory page size. Assuming 4k!"
+ pagesize = 4096;
+#endif
shmem_size = round_to_pagesize (get_memory_size_from_envvar ());
local->total_num_images = get_image_num_from_envvar ();
shared_memory_init (&local->sm, shmem_size);
{
this_image = (image) {-1, get_supervisor ()};
this_image.supervisor->magic_number = SUPERVISOR_MAGIC_NUM;
+ thread_support_init_supervisor ();
counter_barrier_init (&this_image.supervisor->num_active_images,
local->total_num_images);
alloc_init_supervisor (&local->ai, &local->sm);
}
}
+#if !defined(environ)
extern char **environ;
+#endif
+/* argc and argv may not be used on certain OSes. Flag them unused therefore.
+ */
int
-supervisor_main_loop (int *argc __attribute__ ((unused)), char ***argv,
- int *exit_code)
+supervisor_main_loop (int *argc __attribute__ ((unused)),
+ char ***argv __attribute__ ((unused)), int *exit_code)
{
supervisor *m;
- pid_t new_pid, finished_pid;
image im;
+#if defined(WIN32) && !defined(HAVE_FORK)
+ HANDLE *process_handles = malloc (sizeof (HANDLE) * local->total_num_images),
+ *thread_handles = malloc (sizeof (HANDLE) * local->total_num_images),
+ *waiting_handles = malloc (sizeof (HANDLE) * local->total_num_images);
+ int count_waiting = local->total_num_images;
+ LPTCH *envs = malloc (sizeof (LPTCH) * local->total_num_images);
+ LPTSTR currentDir;
+ DWORD cdLen = GetCurrentDirectory (0, NULL);
+ currentDir = malloc (cdLen);
+ GetCurrentDirectory (cdLen, currentDir);
+#else
int chstatus;
+#endif
*exit_code = 0;
shared_memory_set_env (getpid ());
for (im.image_num = 0; im.image_num < local->total_num_images; im.image_num++)
{
+#ifdef HAVE_FORK
+ caf_shmem_pid new_pid;
if ((new_pid = fork ()))
{
if (new_pid == -1)
execve ((*argv)[0], *argv, new_env);
return 1;
}
+#elif defined(WIN32)
+ LPTCH new_env;
+ size_t n = 0, es;
+ STARTUPINFO si;
+ DWORD dwFlags = 0;
+ PROCESS_INFORMATION pi;
+ LPTCH env = GetEnvironmentStrings ();
+
+ ZeroMemory (&si, sizeof (si));
+ si.cb = sizeof (si);
+ ZeroMemory (&pi, sizeof (pi));
+
+ /* Count the number of characters in the current environment. */
+ for (LPTSTR e = (LPTSTR) env; *e; es = lstrlen (e) + 1, e += es, n += es)
+ ;
+ new_env = (LPCH) malloc (n + 32 * sizeof (TCHAR));
+ memcpy (new_env, env, n);
+ snprintf (&((TCHAR *) new_env)[n], 32, "%s=%d%c", GFORTRAN_ENV_IMAGE_NUM,
+ im.image_num, (char) 0);
+ if (!CreateProcessA (NULL, GetCommandLine (), NULL, NULL, FALSE, dwFlags,
+ new_env, currentDir, &si, &pi))
+ {
+ LPVOID lpMsgBuf;
+ DWORD dw = GetLastError ();
+
+ if (FormatMessage (FORMAT_MESSAGE_ALLOCATE_BUFFER
+ | FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, dw,
+ MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR) &lpMsgBuf, 0, NULL)
+ == 0)
+ {
+ fprintf (stderr, "formatting the error message failed.\n");
+ ExitProcess (dw);
+ }
+
+ fprintf (stderr, "error spawning child: %ld, %s\n", dw,
+ (LPCTSTR) lpMsgBuf);
+
+ LocalFree (lpMsgBuf);
+ exit (1);
+ }
+ m->images[im.image_num] = (image_tracker) {pi.hProcess, IMAGE_OK};
+ process_handles[im.image_num] = waiting_handles[im.image_num]
+ = pi.hProcess;
+ thread_handles[im.image_num] = pi.hThread;
+ envs[im.image_num] = new_env;
+#else
+#error "no way known to start child processes."
+#endif
}
- for (int j, i = 0; i < local->total_num_images; i++)
+ for (int i = 0; i < local->total_num_images; i++)
{
- finished_pid = wait (&chstatus);
+#ifdef HAVE_FORK
+ caf_shmem_pid finished_pid = wait (&chstatus);
+ int j;
if (WIFEXITED (chstatus) && !WEXITSTATUS (chstatus))
{
for (j = 0;
}
/* Trigger waiting sync images aka sync_table. */
for (j = 0; j < local->total_num_images; j++)
- pthread_cond_signal (&SHMPTR_AS (pthread_cond_t *,
- m->sync_shared.sync_images_cond_vars,
- &local->sm)[j]);
+ caf_shmem_cond_signal (&SHMPTR_AS (caf_shmem_condvar *,
+ m->sync_shared.sync_images_cond_vars,
+ &local->sm)[j]);
counter_barrier_add (&m->num_active_images, -1);
+#elif defined(WIN32)
+ DWORD res = WaitForMultipleObjects (count_waiting, waiting_handles, FALSE,
+ INFINITE);
+ HANDLE cand;
+ bool progress = false;
+ DWORD process_exit_code;
+ if (res == WAIT_FAILED)
+ caf_runtime_error ("waiting for process termination failed.");
+ int index = res - WAIT_OBJECT_0, finished_process;
+ bool fail;
+
+ do
+ {
+ cand = waiting_handles[index];
+ for (finished_process = 0;
+ finished_process < local->total_num_images
+ && cand != process_handles[finished_process];
+ ++finished_process)
+ ;
+
+ GetExitCodeProcess (cand, &process_exit_code);
+ fail = process_exit_code != 0;
+ fprintf (stderr, "terminating process %d with fail status %d (%ld)\n",
+ finished_process, fail, process_exit_code);
+ if (finished_process < local->total_num_images)
+ {
+ CloseHandle (process_handles[finished_process]);
+ process_handles[finished_process] = NULL;
+ CloseHandle (thread_handles[finished_process]);
+ FreeEnvironmentStrings (envs[finished_process]);
+ if (fail)
+ {
+ m->images[finished_process].status = IMAGE_FAILED;
+ atomic_fetch_add (&m->failed_images, 1);
+ if (*exit_code < process_exit_code)
+ *exit_code = process_exit_code;
+ }
+ else
+ {
+ m->images[finished_process].status = IMAGE_SUCCESS;
+ atomic_fetch_add (&m->finished_images, 1);
+ }
+ }
+ memmove (&waiting_handles[index], &waiting_handles[index + 1],
+ sizeof (HANDLE) * (count_waiting - index - 1));
+ --count_waiting;
+ counter_barrier_add (&m->num_active_images, -1);
+
+ /* Check if more than one process has terminated already. */
+ progress = false;
+ for (index = 0; index < count_waiting; ++index)
+ if (WaitForSingleObject (waiting_handles[index], 0)
+ == WAIT_OBJECT_0)
+ {
+ progress = true;
+ ++i;
+ break;
+ }
+ }
+ while (progress && count_waiting > 0);
+#endif
}
+
+#if defined(WIN32) && !defined(HAVE_FORK)
+ free (process_handles);
+ free (thread_handles);
+ free (envs);
+#endif
return 0;
}
#ifndef SUPERVISOR_H
#define SUPERVISOR_H
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
#include "caf/libcaf.h"
#include "alloc.h"
#include "collective_subroutine.h"
typedef struct
{
- pid_t pid;
+ caf_shmem_pid pid;
image_status status;
} image_tracker;
atomic_int failed_images;
atomic_int finished_images;
counter_barrier num_active_images;
- pthread_mutex_t image_tracker_lock;
+ caf_shmem_mutex image_tracker_lock;
+#ifdef WIN32
+ size_t global_used_handles;
+#endif
image_tracker images[];
} supervisor;
static inline void
lock_table (sync_t *si)
{
- pthread_mutex_lock (&si->cis->sync_images_table_lock);
+ caf_shmem_mutex_lock (&si->cis->sync_images_table_lock);
}
static inline void
unlock_table (sync_t *si)
{
- pthread_mutex_unlock (&si->cis->sync_images_table_lock);
+ caf_shmem_mutex_unlock (&si->cis->sync_images_table_lock);
}
void
*si = (sync_t) {
&this_image.supervisor->sync_shared,
SHMPTR_AS (int *, this_image.supervisor->sync_shared.sync_images_table, sm),
- SHMPTR_AS (pthread_cond_t *,
+ SHMPTR_AS (caf_shmem_condvar *,
this_image.supervisor->sync_shared.sync_images_cond_vars, sm)};
}
si->cis = &this_image.supervisor->sync_shared;
initialize_shared_mutex (&si->cis->event_lock);
- initialize_shared_condition (&si->cis->event_cond);
+ initialize_shared_condition (&si->cis->event_cond, num_images);
initialize_shared_mutex (&si->cis->sync_images_table_lock);
= allocator_shared_malloc (alloc_get_allocator (ai), table_size_in_bytes);
si->cis->sync_images_cond_vars
= allocator_shared_malloc (alloc_get_allocator (ai),
- sizeof (pthread_cond_t) * num_images);
+ sizeof (caf_shmem_condvar) * num_images);
si->table = SHMPTR_AS (int *, si->cis->sync_images_table, ai->mem);
si->triggers
- = SHMPTR_AS (pthread_cond_t *, si->cis->sync_images_cond_vars, ai->mem);
+ = SHMPTR_AS (caf_shmem_condvar *, si->cis->sync_images_cond_vars, ai->mem);
for (int i = 0; i < num_images; i++)
- initialize_shared_condition (&si->triggers[i]);
+ initialize_shared_condition (&si->triggers[i], num_images);
memset (si->table, 0, table_size_in_bytes);
}
for (i = 0; i < size; ++i)
{
++table[images[i] + img_c * this_image.image_num];
- pthread_cond_signal (&si->triggers[images[i]]);
+ caf_shmem_cond_signal (&si->triggers[images[i]]);
}
for (;;)
{
break;
if (i == size)
break;
- pthread_cond_wait (&si->triggers[this_image.image_num],
+ caf_shmem_cond_wait (&si->triggers[this_image.image_num],
&si->cis->sync_images_table_lock);
}
}
if (this_image.supervisor->images[map[i]].status != IMAGE_OK)
continue;
++table[map[i] + size * this_image.image_num];
- pthread_cond_signal (&si->triggers[map[i]]);
+ caf_shmem_cond_signal (&si->triggers[map[i]]);
}
for (;;)
{
break;
if (i == size)
break;
- pthread_cond_wait (&si->triggers[this_image.image_num],
+ caf_shmem_cond_wait (&si->triggers[this_image.image_num],
&si->cis->sync_images_table_lock);
}
}
void
lock_event (sync_t *si)
{
- pthread_mutex_lock (&si->cis->event_lock);
+ caf_shmem_mutex_lock (&si->cis->event_lock);
}
void
unlock_event (sync_t *si)
{
- pthread_mutex_unlock (&si->cis->event_lock);
+ caf_shmem_mutex_unlock (&si->cis->event_lock);
}
void
event_post (sync_t *si)
{
- pthread_cond_broadcast (&si->cis->event_cond);
+ caf_shmem_cond_broadcast (&si->cis->event_cond);
}
void
event_wait (sync_t *si)
{
- pthread_cond_wait (&si->cis->event_cond, &si->cis->event_lock);
+ caf_shmem_cond_wait (&si->cis->event_cond, &si->cis->event_lock);
}
#include "alloc.h"
#include "counter_barrier.h"
-#include <pthread.h>
-
typedef struct {
/* Mutex and condition variable needed for signaling events. */
- pthread_mutex_t event_lock;
- pthread_cond_t event_cond;
- pthread_mutex_t sync_images_table_lock;
+ caf_shmem_mutex event_lock;
+ caf_shmem_condvar event_cond;
+ caf_shmem_mutex sync_images_table_lock;
shared_mem_ptr sync_images_table;
shared_mem_ptr sync_images_cond_vars;
} sync_shared;
typedef struct {
sync_shared *cis;
int *table; // we can cache the table and the trigger pointers here
- pthread_cond_t *triggers;
+ caf_shmem_condvar *triggers;
} sync_t;
-typedef pthread_mutex_t lock_t;
+typedef caf_shmem_mutex lock_t;
typedef int event_t;
void
update_teams_images (caf_shmem_team_t team)
{
- pthread_mutex_lock (&team->u.image_info->image_count.mutex);
+ caf_shmem_mutex_lock (&team->u.image_info->image_count.mutex);
if (team->u.image_info->num_term_images
!= this_image.supervisor->finished_images
+ this_image.supervisor->failed_images)
old_num
- team->u.image_info->num_term_images);
}
- pthread_mutex_unlock (&team->u.image_info->image_count.mutex);
+ caf_shmem_mutex_unlock (&team->u.image_info->image_count.mutex);
}
void
#include <stdlib.h>
#include <stdio.h>
+#if !defined(WIN32) && !defined(__CYGWIN__)
+#include <pthread.h>
+
#define ERRCHECK(a) \
do \
{ \
while (0)
void
-initialize_shared_mutex (pthread_mutex_t *mutex)
+initialize_shared_mutex (caf_shmem_mutex *mutex)
{
pthread_mutexattr_t mattr;
ERRCHECK (pthread_mutexattr_init (&mattr));
}
void
-initialize_shared_errorcheck_mutex (pthread_mutex_t *mutex)
+initialize_shared_errorcheck_mutex (caf_shmem_mutex *mutex)
{
pthread_mutexattr_t mattr;
ERRCHECK (pthread_mutexattr_init (&mattr));
- ERRCHECK (pthread_mutexattr_setpshared (&mattr, PTHREAD_PROCESS_SHARED));
ERRCHECK (pthread_mutexattr_settype (&mattr, PTHREAD_MUTEX_ERRORCHECK));
+ ERRCHECK (pthread_mutexattr_setpshared (&mattr, PTHREAD_PROCESS_SHARED));
ERRCHECK (pthread_mutex_init (mutex, &mattr));
ERRCHECK (pthread_mutexattr_destroy (&mattr));
}
void
-initialize_shared_condition (pthread_cond_t *cond)
+initialize_shared_condition (caf_shmem_condvar *cond, const int)
{
pthread_condattr_t cattr;
ERRCHECK (pthread_condattr_init (&cattr));
ERRCHECK (pthread_cond_init (cond, &cattr));
ERRCHECK (pthread_condattr_destroy (&cattr));
}
+#else
+#include "../caf_error.h"
+#include "supervisor.h"
+#include "teams_mgmt.h"
+#include <windows.h>
+#include <assert.h>
+
+static HANDLE *handles = NULL;
+static size_t cap_handles = 0;
+
+static const int ULONGBITS = sizeof (unsigned long) << 3; // *8
+
+static size_t
+smax (size_t a, size_t b)
+{
+ return a < b ? b : a;
+}
+
+static HANDLE
+get_handle (const size_t id, const char t)
+{
+ const int add = t == 'c' ? 1 : 0;
+ while (id + add >= cap_handles)
+ {
+ cap_handles += 1024;
+ if (handles)
+ handles = realloc (handles, sizeof (HANDLE) * cap_handles);
+ else
+ handles = malloc (sizeof (HANDLE) * cap_handles);
+ if (!handles)
+ caf_runtime_error (
+ "can not get buffer for synchronication objects, aborting");
+
+ memset (&handles[cap_handles - 1024], 0, sizeof (HANDLE) * 1024);
+ }
+ if (!handles[id])
+ {
+ static char *pid = NULL;
+ char name[MAX_PATH];
+
+ if (!pid)
+ pid = shared_memory_get_env ();
+ snprintf (name, MAX_PATH, "Global_gfortran-%s-%c-%zd", pid, t, id);
+ switch (t)
+ {
+ case 'm':
+ handles[id] = CreateMutex (NULL, false, name);
+ break;
+ case 'c':
+ {
+ handles[id] = CreateSemaphore (NULL, 0, __INT_MAX__, name);
+ snprintf (name, MAX_PATH, "Global_gfortran-%s-%c-%zd_lock", pid, t,
+ id);
+ handles[id + 1] = CreateSemaphore (NULL, 1, 1, name);
+ this_image.supervisor->global_used_handles
+ = smax (this_image.supervisor->global_used_handles, id + 2);
+ break;
+ }
+ default:
+ caf_runtime_error ("Unknown handle type %c", t);
+ exit (1);
+ }
+ if (handles[id] == NULL)
+ {
+ caf_runtime_error (
+ "Could not create synchronisation object, error: %d",
+ GetLastError ());
+ return NULL;
+ }
+
+ this_image.supervisor->global_used_handles
+ = smax (this_image.supervisor->global_used_handles, id + 1);
+ }
+
+ return handles[id];
+}
+
+static HANDLE
+get_mutex (caf_shmem_mutex *m)
+{
+ return get_handle (m->id, 'm');
+}
+
+static HANDLE
+get_condvar (caf_shmem_condvar *cv)
+{
+ return get_handle (cv->id, 'c');
+}
+
+void
+thread_support_init_supervisor (void)
+{
+ if (local->total_num_images > ULONGBITS * MAX_NUM_SIGNALED)
+ caf_runtime_error ("Maximum number of supported images is %zd.",
+ ULONGBITS * MAX_NUM_SIGNALED);
+ this_image.supervisor->global_used_handles = 0;
+}
+
+int
+caf_shmem_mutex_lock (caf_shmem_mutex *m)
+{
+ HANDLE mutex = get_mutex (m);
+ DWORD res = WaitForSingleObject (mutex, INFINITE);
+
+ /* Return zero on success. */
+ return res != WAIT_OBJECT_0;
+}
+
+int
+caf_shmem_mutex_trylock (caf_shmem_mutex *m)
+{
+ HANDLE mutex = get_mutex (m);
+ DWORD res = WaitForSingleObject (mutex, 0);
+
+ return res == WAIT_OBJECT_0 ? 0 : EBUSY;
+}
+
+int
+caf_shmem_mutex_unlock (caf_shmem_mutex *m)
+{
+ HANDLE mutex = get_mutex (m);
+ BOOL res = ReleaseMutex (mutex);
+
+ if (!res)
+ {
+ LPVOID lpMsgBuf;
+ DWORD dw = GetLastError ();
+
+ if (FormatMessage (FORMAT_MESSAGE_ALLOCATE_BUFFER
+ | FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, dw, MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR) &lpMsgBuf, 0, NULL)
+ == 0)
+ {
+ fprintf (stderr, "%d: formatting the error message failed.\n",
+ this_image.image_num);
+ ExitProcess (dw);
+ }
+
+ fprintf (stderr, "%d: unlock mutex failed: %d, %s\n",
+ this_image.image_num, dw, (LPCTSTR) lpMsgBuf);
+
+ LocalFree (lpMsgBuf);
+ }
+ return res ? 0 : EPERM;
+}
+
+static bool
+bm_is_set (volatile unsigned long mask[], const int b)
+{
+ return (mask[b / ULONGBITS] & (1UL << (b % ULONGBITS))) != 0;
+}
+
+static void
+bm_clear_bit (volatile unsigned long mask[], const int b)
+{
+ mask[b / ULONGBITS] &= ~(1UL << (b % ULONGBITS));
+}
+
+static void
+bm_set_mask (volatile unsigned long mask[], const int size)
+{
+ const int entries = size / ULONGBITS;
+ const int rem = size % ULONGBITS;
+ int i = 0;
+ assert (entries >= 0);
+
+ for (; i < entries; ++i)
+ mask[i] = ~0UL;
+ if (rem != 0)
+ mask[i] = ~0UL >> (ULONGBITS - rem);
+}
+
+__attribute_used__ static bool
+bm_is_none (volatile unsigned long mask[], const int size)
+{
+ const int entries = size / ULONGBITS;
+ const int rem = size % ULONGBITS;
+ int i = 0;
+ for (; i < entries; ++i)
+ if (mask[i] != 0)
+ return false;
+
+ return rem == 0 || ((mask[i] & (~0UL >> (ULONGBITS - rem))) == 0);
+}
+
+void
+caf_shmem_cond_wait (caf_shmem_condvar *cv, caf_shmem_mutex *m)
+{
+ HANDLE mutex = get_mutex (m), condvar = get_condvar (cv),
+ lock = get_handle (cv->id + 1, 'c');
+ HANDLE entry[3] = {mutex, condvar, lock};
+ int res;
+
+ WaitForSingleObject (lock, INFINITE);
+ for (;;)
+ {
+ if (bm_is_set (cv->signaled, this_image.image_num) || cv->any)
+ {
+ break;
+ }
+ ReleaseMutex (mutex);
+ ReleaseSemaphore (lock, 1, NULL);
+ res = WaitForMultipleObjects (3, entry, true, INFINITE);
+ if (res != WAIT_OBJECT_0)
+ {
+ fprintf (stderr, "%d: failed to get all wait for: %d\n",
+ this_image.image_num, res);
+ fflush (stderr);
+ }
+ ReleaseSemaphore (condvar, 1, NULL);
+ }
+ res = WaitForSingleObject (condvar, INFINITE);
+ if (res != WAIT_OBJECT_0)
+ {
+ fprintf (stderr, "%d: failed to get condvar: %d\n", this_image.image_num,
+ res);
+ fflush (stderr);
+ }
+
+ bm_clear_bit (cv->signaled, this_image.image_num);
+ cv->any = 0;
+ ReleaseSemaphore (lock, 1, NULL);
+}
+
+void
+caf_shmem_cond_broadcast (caf_shmem_condvar *cv)
+{
+ HANDLE condvar = get_condvar (cv), lock = get_handle (cv->id + 1, 'c');
+
+ WaitForSingleObject (lock, INFINITE);
+ bm_set_mask (cv->signaled, cv->size);
+ bm_clear_bit (cv->signaled, this_image.image_num);
+
+ ReleaseSemaphore (condvar, cv->size, NULL);
+ ReleaseSemaphore (lock, 1, NULL);
+}
+
+void
+caf_shmem_cond_signal (caf_shmem_condvar *cv)
+{
+ HANDLE condvar = get_condvar (cv), lock = get_handle (cv->id + 1, 'c');
+
+ if (caf_current_team)
+ {
+ WaitForSingleObject (lock, INFINITE);
+ }
+ else
+ return;
+ /* The first image is zero, which wouldn't allow it to signal. */
+ cv->any = this_image.image_num + 1;
+ ReleaseSemaphore (condvar, 1, NULL);
+ ReleaseSemaphore (lock, 1, NULL);
+}
+
+void
+caf_shmem_cond_update_count (caf_shmem_condvar *cv, int val)
+{
+ cv->size += val;
+}
+
+void
+initialize_shared_mutex (caf_shmem_mutex *m)
+{
+ *m = (caf_shmem_mutex) {this_image.supervisor->global_used_handles};
+
+ get_mutex (m);
+}
+
+void
+initialize_shared_errorcheck_mutex (caf_shmem_mutex *m)
+{
+ *m = (caf_shmem_mutex) {this_image.supervisor->global_used_handles};
+
+ get_mutex (m);
+}
+
+void
+initialize_shared_condition (caf_shmem_condvar *cv, const int size)
+{
+ *cv = (caf_shmem_condvar) {this_image.supervisor->global_used_handles,
+ 0,
+ size,
+ {}};
+
+ memset ((void *) cv->signaled, 0, sizeof (unsigned long) * MAX_NUM_SIGNALED);
+ get_condvar (cv);
+ assert (bm_is_none (cv->signaled, cv->size));
+}
+
+void
+thread_support_cleanup (void)
+{
+ for (size_t i = 0; i < this_image.supervisor->global_used_handles; ++i)
+ if (handles[i])
+ CloseHandle (handles[i]);
+}
+#endif
#ifndef THREAD_SUPPORT_H
#define THREAD_SUPPORT_H
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#ifndef WIN32
+#include <sys/types.h>
+
+typedef pid_t caf_shmem_pid;
+typedef int caf_shmem_fd;
+#else
+#include <handleapi.h>
+
+typedef HANDLE caf_shmem_pid;
+typedef HANDLE caf_shmem_fd;
+#endif
+
+#if !defined(WIN32) && !defined(__CYGWIN__)
#include <pthread.h>
+typedef pthread_mutex_t caf_shmem_mutex;
+typedef pthread_cond_t caf_shmem_condvar;
+
+#define CAF_SHMEM_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
+#define CAF_SHMEM_COND_INITIALIZER PTHREAD_COND_INITIALIZER
+
+#define thread_support_init_supervisor() (void) 0
+
+#define caf_shmem_mutex_lock pthread_mutex_lock
+#define caf_shmem_mutex_trylock pthread_mutex_trylock
+#define caf_shmem_mutex_unlock pthread_mutex_unlock
+
+#define caf_shmem_cond_wait pthread_cond_wait
+#define caf_shmem_cond_broadcast pthread_cond_broadcast
+#define caf_shmem_cond_signal pthread_cond_signal
+#define caf_shmem_cond_update_count(c, v) (void) 0
+
+#define thread_support_cleanup() (void) 0
+#else
+#include <synchapi.h>
+#include <stddef.h>
+
+typedef struct caf_shmem_mutex
+{
+ size_t id;
+} caf_shmem_mutex;
+
+#define MAX_NUM_SIGNALED 8
+
+typedef struct caf_shmem_condvar
+{
+ size_t id;
+ volatile int any;
+ int size;
+ volatile unsigned long signaled[MAX_NUM_SIGNALED];
+} caf_shmem_condvar;
+
+#define CAF_SHMEM_MUTEX_INITIALIZER (caf_shmem_mutex){0}
+#define CAF_SHMEM_COND_INITIALIZER \
+ (caf_shmem_condvar) \
+ { \
+ 0, 0, 0, {} \
+ }
+
+void thread_support_init_supervisor (void);
+
+int caf_shmem_mutex_lock (caf_shmem_mutex *);
+int caf_shmem_mutex_trylock (caf_shmem_mutex *);
+int caf_shmem_mutex_unlock (caf_shmem_mutex *);
+
+void caf_shmem_cond_wait (caf_shmem_condvar *, caf_shmem_mutex *);
+void caf_shmem_cond_broadcast (caf_shmem_condvar *);
+void caf_shmem_cond_signal (caf_shmem_condvar *);
+void caf_shmem_cond_update_count (caf_shmem_condvar *, int);
+
+void thread_support_cleanup (void);
+#endif
+
/* Support routines to setup pthread structs in shared memory. */
-void initialize_shared_mutex (pthread_mutex_t *);
+void initialize_shared_mutex (caf_shmem_mutex *);
-void initialize_shared_errorcheck_mutex (pthread_mutex_t *);
+void initialize_shared_errorcheck_mutex (caf_shmem_mutex *);
-void initialize_shared_condition (pthread_cond_t *);
+void initialize_shared_condition (caf_shmem_condvar *, const int size);
#endif