void *data;
};
+struct placeholder
+{
+ int executed;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+};
+
+struct asyncwait_info
+{
+ struct placeholder *placeholderp;
+};
+
+enum entry_type
+{
+ KERNEL_LAUNCH,
+ CALLBACK,
+ ASYNC_WAIT,
+ ASYNC_PLACEHOLDER
+};
+
struct queue_entry
{
- int type;
+ enum entry_type type;
union {
struct kernel_launch launch;
struct callback callback;
+ struct asyncwait_info asyncwait;
+ struct placeholder placeholder;
} u;
};
return propval;
}
+static void
+wait_for_queue_nonfull (struct goacc_asyncqueue *aq)
+{
+ if (aq->queue_n == ASYNC_QUEUE_SIZE)
+ {
+ pthread_mutex_lock (&aq->mutex);
+
+ /* Queue is full. Wait for it to not be full. */
+ while (aq->queue_n == ASYNC_QUEUE_SIZE)
+ pthread_cond_wait (&aq->queue_cond_out, &aq->mutex);
+
+ pthread_mutex_unlock (&aq->mutex);
+ }
+}
+
static void
queue_push_launch (struct goacc_asyncqueue *aq, struct kernel_info *kernel,
void *vars, struct GOMP_kernel_launch_attributes *kla)
{
assert (aq->agent == kernel->agent);
- if (aq->queue_n == ASYNC_QUEUE_SIZE)
- GOMP_PLUGIN_fatal ("ran out of async queue in thread %d:%d",
- aq->agent->device_id, aq->id);
+ wait_for_queue_nonfull (aq);
pthread_mutex_lock (&aq->mutex);
HSA_DEBUG ("queue_push_launch %d:%d: at %i\n", aq->agent->device_id,
aq->id, queue_last);
- aq->queue[queue_last].type = 0;
+ aq->queue[queue_last].type = KERNEL_LAUNCH;
aq->queue[queue_last].u.launch.kernel = kernel;
aq->queue[queue_last].u.launch.vars = vars;
aq->queue[queue_last].u.launch.kla = *kla;
queue_push_callback (struct goacc_asyncqueue *aq, void (*fn)(void *),
void *data)
{
- if (aq->queue_n == ASYNC_QUEUE_SIZE)
- {
- pthread_mutex_lock (&aq->mutex);
-
- /* Queue is full. Wait for it to not be full. */
- while (aq->queue_n == ASYNC_QUEUE_SIZE)
- pthread_cond_wait (&aq->queue_cond_out, &aq->mutex);
-
- pthread_mutex_unlock (&aq->mutex);
- }
+ wait_for_queue_nonfull (aq);
pthread_mutex_lock (&aq->mutex);
HSA_DEBUG ("queue_push_callback %d:%d: at %i\n", aq->agent->device_id,
aq->id, queue_last);
- aq->queue[queue_last].type = 1;
+ aq->queue[queue_last].type = CALLBACK;
aq->queue[queue_last].u.callback.fn = fn;
aq->queue[queue_last].u.callback.data = data;
pthread_mutex_unlock (&aq->mutex);
}
+/* Push an entry on AQ to wait for the event described by PLACEHOLDERP (on
+ another queue) to execute. */
+
+static void
+queue_push_asyncwait (struct goacc_asyncqueue *aq,
+ struct placeholder *placeholderp)
+{
+ wait_for_queue_nonfull (aq);
+
+ pthread_mutex_lock (&aq->mutex);
+
+ int queue_last = ((aq->queue_first + aq->queue_n) % ASYNC_QUEUE_SIZE);
+ if (DEBUG_QUEUES)
+ HSA_DEBUG ("queue_push_asyncwait %d:%d: at %i\n", aq->agent->device_id,
+ aq->id, queue_last);
+
+ aq->queue[queue_last].type = ASYNC_WAIT;
+ aq->queue[queue_last].u.asyncwait.placeholderp = placeholderp;
+
+ aq->queue_n++;
+
+ if (DEBUG_THREAD_SIGNAL)
+ HSA_DEBUG ("signalling async thread %d:%d: cond_in\n",
+ aq->agent->device_id, aq->id);
+ pthread_cond_signal (&aq->queue_cond_in);
+
+ pthread_mutex_unlock (&aq->mutex);
+}
+
+static struct placeholder *
+queue_push_placeholder (struct goacc_asyncqueue *aq)
+{
+ struct placeholder *placeholderp;
+
+ wait_for_queue_nonfull (aq);
+
+ pthread_mutex_lock (&aq->mutex);
+
+ int queue_last = ((aq->queue_first + aq->queue_n) % ASYNC_QUEUE_SIZE);
+ if (DEBUG_QUEUES)
+ HSA_DEBUG ("queue_push_placeholder %d:%d: at %i\n", aq->agent->device_id,
+ aq->id, queue_last);
+
+ aq->queue[queue_last].type = ASYNC_PLACEHOLDER;
+ placeholderp = &aq->queue[queue_last].u.placeholder;
+
+ if (pthread_mutex_init (&placeholderp->mutex, NULL))
+ {
+ pthread_mutex_unlock (&aq->mutex);
+ GOMP_PLUGIN_error ("Failed to initialize serialization mutex");
+ }
+
+ if (pthread_cond_init (&placeholderp->cond, NULL))
+ {
+ pthread_mutex_unlock (&aq->mutex);
+ GOMP_PLUGIN_error ("Failed to initialize serialization cond");
+ }
+
+ placeholderp->executed = 0;
+
+ aq->queue_n++;
+
+ if (DEBUG_THREAD_SIGNAL)
+ HSA_DEBUG ("signalling async thread %d:%d: cond_in\n",
+ aq->agent->device_id, aq->id);
+ pthread_cond_signal (&aq->queue_cond_in);
+
+ pthread_mutex_unlock (&aq->mutex);
+
+ return placeholderp;
+}
+
static void run_kernel (struct kernel_info *kernel, void *vars,
struct GOMP_kernel_launch_attributes *kla,
struct goacc_asyncqueue *aq, bool module_locked);
+static void wait_queue (struct goacc_asyncqueue *aq);
+
static void
execute_queue_entry (struct goacc_asyncqueue *aq, int index)
{
struct queue_entry *entry = &aq->queue[index];
- if (entry->type == 0)
+
+ switch (entry->type)
{
+ case KERNEL_LAUNCH:
if (DEBUG_QUEUES)
HSA_DEBUG ("Async thread %d:%d: Executing launch entry (%d)\n",
aq->agent->device_id, aq->id, index);
if (DEBUG_QUEUES)
HSA_DEBUG ("Async thread %d:%d: Executing launch entry (%d) done\n",
aq->agent->device_id, aq->id, index);
- }
- else if (entry->type == 1)
- {
+ break;
+
+ case CALLBACK:
if (DEBUG_QUEUES)
HSA_DEBUG ("Async thread %d:%d: Executing callback entry (%d)\n",
aq->agent->device_id, aq->id, index);
if (DEBUG_QUEUES)
HSA_DEBUG ("Async thread %d:%d: Executing callback entry (%d) done\n",
aq->agent->device_id, aq->id, index);
+ break;
+
+ case ASYNC_WAIT:
+ {
+ struct placeholder *placeholderp = entry->u.asyncwait.placeholderp;
+
+ if (DEBUG_QUEUES)
+ HSA_DEBUG ("Async thread %d:%d: Executing async wait entry (%d)\n",
+ aq->agent->device_id, aq->id, index);
+
+ pthread_mutex_lock (&placeholderp->mutex);
+
+ while (!placeholderp->executed)
+ pthread_cond_wait (&placeholderp->cond, &placeholderp->mutex);
+
+ pthread_mutex_unlock (&placeholderp->mutex);
+
+ if (pthread_cond_destroy (&placeholderp->cond))
+ GOMP_PLUGIN_error ("Failed to destroy serialization cond");
+
+ if (pthread_mutex_destroy (&placeholderp->mutex))
+ GOMP_PLUGIN_error ("Failed to destroy serialization mutex");
+
+ if (DEBUG_QUEUES)
+ HSA_DEBUG ("Async thread %d:%d: Executing async wait "
+ "entry (%d) done\n", aq->agent->device_id, aq->id, index);
+ }
+ break;
+
+ case ASYNC_PLACEHOLDER:
+ pthread_mutex_lock (&entry->u.placeholder.mutex);
+ entry->u.placeholder.executed = 1;
+ pthread_cond_signal (&entry->u.placeholder.cond);
+ pthread_mutex_unlock (&entry->u.placeholder.mutex);
+ break;
+
+ default:
+ GOMP_PLUGIN_fatal ("Unknown queue element");
}
- else
- GOMP_PLUGIN_fatal ("Unknown queue element");
}
static void *
GOMP_OFFLOAD_openacc_async_serialize (struct goacc_asyncqueue *aq1,
struct goacc_asyncqueue *aq2)
{
- /* FIXME: what should happen here???? */
- wait_queue (aq1);
- wait_queue (aq2);
+ /* For serialize, stream aq2 waits for aq1 to complete work that has been
+ scheduled to run on it up to this point. */
+ if (aq1 != aq2)
+ {
+ struct placeholder *placeholderp = queue_push_placeholder (aq1);
+ queue_push_asyncwait (aq2, placeholderp);
+ }
return true;
}