atomic_exchange_explicit((o), (v), memory_order_acquire)
#define atomic_exchange_acq_rel(o, v) \
atomic_exchange_explicit((o), (v), memory_order_acq_rel)
+#define atomic_fetch_add_acq_rel(o, v) \
+ atomic_fetch_add_explicit((o), (v), memory_order_acq_rel)
#define atomic_fetch_sub_acq_rel(o, v) \
atomic_fetch_sub_explicit((o), (v), memory_order_acq_rel)
#define atomic_compare_exchange_weak_acq_rel(o, e, d) \
unsigned int
isc_quota_getused(isc_quota_t *quota) {
REQUIRE(VALID_QUOTA(quota));
- return atomic_load_relaxed("a->used);
+ return atomic_load_acquire("a->used);
}
void
isc_quota_release(isc_quota_t *quota) {
+ struct cds_wfcq_node *node;
/*
* We are using the cds_wfcq_dequeue_blocking() variant here that
* has an internal mutex because we need synchronization on
* with cds_wfcq_empty() before acquiring the internal lock, so if
* there's nothing queued, the call should be very lightweight.
*/
- struct cds_wfcq_node *node =
- cds_wfcq_dequeue_blocking("a->jobs.head, "a->jobs.tail);
+again:
+ node = cds_wfcq_dequeue_blocking("a->jobs.head, "a->jobs.tail);
if (node == NULL) {
- uint_fast32_t used = atomic_fetch_sub_relaxed("a->used, 1);
+ uint_fast32_t used = atomic_fetch_sub_acq_rel("a->used, 1);
INSIST(used > 0);
+
+ /*
+ * If this was the last quota released and in the meantime a
+ * new job has appeared in the queue, then give it a chance
+ * to run, otherwise it could get stuck there until a new quota
+ * is acquired and released again.
+ */
+ if (used == 1 &&
+ !cds_wfcq_empty("a->jobs.head, "a->jobs.tail))
+ {
+ atomic_fetch_add_acq_rel("a->used, 1);
+ goto again;
+ }
+
return;
}
REQUIRE(VALID_QUOTA(quota));
REQUIRE(job == NULL || cb != NULL);
- uint_fast32_t used = atomic_fetch_add_relaxed("a->used, 1);
+ uint_fast32_t used = atomic_fetch_add_acq_rel("a->used, 1);
uint_fast32_t max = atomic_load_relaxed("a->max);
if (max != 0 && used >= max) {
- (void)atomic_fetch_sub_relaxed("a->used, 1);
+ (void)atomic_fetch_sub_acq_rel("a->used, 1);
if (job != NULL) {
job->cb = cb;
job->cbarg = cbarg;
*/
cds_wfcq_enqueue("a->jobs.head, "a->jobs.tail,
&job->wfcq_node);
+
+ /*
+ * While we were initializing and enqueuing a new node,
+ * quotas might have been released, and if no quota is
+ * used any more, then our newly enqueued job won't
+ * have a chance to get running until a new quota is
+ * acquired and released. To avoid a hangup, check
+ * quota->used again, if it's 0 then simulate a quota
+ * acquire/release for the current job to run as soon as
+ * possible, although we will still return ISC_R_QUOTA
+ * to the caller.
+ */
+ if (atomic_compare_exchange_strong_acq_rel(
+ "a->used, &(uint_fast32_t){ 0 }, 1))
+ {
+ isc_quota_release(quota);
+ }
}
return ISC_R_QUOTA;
}
REQUIRE(VALID_QUOTA(quota));
quota->magic = 0;
- INSIST(atomic_load("a->used) == 0);
+ INSIST(atomic_load_acquire("a->used) == 0);
INSIST(cds_wfcq_empty("a->jobs.head, "a->jobs.tail));
cds_wfcq_destroy("a->jobs.head, "a->jobs.tail);