off / s; \
})
+
+static void
+lfjour_return_cleanup_token(struct lfjour *j, struct lfjour_recipient *r)
+{
+ /* Eligible for requesting cleanup */
+ if (!(atomic_fetch_and_explicit(&r->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER))
+ return;
+
+ /* Return the lastrunner's token. */
+ u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
+
+ /* Is this the last token? */
+ if (pings > 1)
+ return;
+
+ ASSERT_DIE(pings != 0);
+
+ /* No more cleanup tokens issued, request cleanup. */
+ lfjour_schedule_cleanup(j);
+}
+
struct lfjour_item *
lfjour_push_prepare(struct lfjour *j)
{
/* Releasing this export for cleanup routine */
if (pos + 1 == end)
- {
+{
lfjour_debug("lfjour(%p)_release(recipient=%p) of %p, seq=%lu (end)",
j, r, r->cur, r->cur->seq);
}
/* The last block may be available to free */
if ((pos + 1 == end) || last && (last_block != block))
- lfjour_schedule_cleanup(j);
+ lfjour_return_cleanup_token(j, r);
r->first_holding_seq = 0;
r->cur = NULL;
ASSERT_DIE(!r->cur);
lfjour_recipient_add_tail(&j->recipients, r);
+
+ if (j->issued_tokens < j->max_tokens)
+ {
+ /* Cleanup hook does not run, so we are sure j->issued_tokens is not increasing */
+ atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
+ atomic_fetch_or_explicit(&r->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel);
+ }
}
void
lfjour_release(r, r->cur);
lfjour_recipient_rem_node(&j->recipients, r);
- lfjour_schedule_cleanup(j);
+ lfjour_return_cleanup_token(j, r);
}
static inline void lfjour_cleanup_unlock_helper(struct domain_generic **dg)
DG_UNLOCK(*dg);
}
+static bool
+lfjour_issue_cleanup_token(struct lfjour *j, struct lfjour_recipient* rec)
+{
+ /* This function keeps the invariant that the total number of issued
+ * LFJOUR_R_LAST_RUNNER flags is the same as j->issued_tokens.
+ *
+ * Returs true if the token was successfully issued.
+ */
+
+ /* The journal is not empty and the recipient is on seq_max,
+ * so it won't call cleanup */
+ const struct lfjour_item *last = atomic_load_explicit(&rec->last, memory_order_acquire);
+ if (last && last->seq == (j->next_seq - 1))
+ return false;
+
+ /* Take a token from the pile */
+ if (atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel) >= j->max_tokens)
+ {
+ /* No more tokens to issue, already at max */
+ atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
+ return false;
+ }
+
+ /* Trying to give the token */
+ if (atomic_fetch_or_explicit(&rec->recipient_flags, LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER)
+ {
+ /* This recipient already has the token */
+ atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
+ return false;
+ }
+
+ /* Has't the recipient finished inbetween? (Recheck.) */
+ last = atomic_load_explicit(&rec->last, memory_order_acquire);
+ if (last && last->seq == (j->next_seq - 1))
+ {
+ /* It has! Retreat! */
+ if (atomic_fetch_and_explicit(&rec->recipient_flags, ~LFJOUR_R_LAST_RUNNER, memory_order_acq_rel) & LFJOUR_R_LAST_RUNNER)
+ {
+ /* The flag was still set, we managed to get it back safely.
+ * Now the recipient won't call the cleanup. */
+ atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
+ return false;
+ }
+
+ /* The recipient was quick enough to finish the task and
+ * grab the flag. Now we are going to get pinged anyway
+ * so we consider the flag to be issued. */
+ }
+
+ /* Now the token is issued. */
+ return true;
+}
+
+static void
+lfjour_cleanup_done(struct lfjour *j)
+{
+ /* Returning the cleanup token. */
+ u64 pings = atomic_fetch_sub_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
+
+ /* Somebody else is also holding a token, nothing to do.
+ * They'll call us when the time comes. */
+ if (pings > 1)
+ return;
+
+ ASSERT_DIE(pings == 1);
+
+ /* We'll need to know whether the journal is empty or not. */
+ struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
+
+ /* No recipients, schedule one more cleanup if not empty */
+ if (EMPTY_TLIST(lfjour_recipient, &j->recipients))
+ {
+ if (first)
+ lfjour_schedule_cleanup(j);
+ return;
+ }
+
+ /* There are some recipients but nothing to clean.
+ * Somebody is going to wake us, let's just throw
+ * the token to the crowd. */
+ if (!first)
+ {
+ WALK_TLIST(lfjour_recipient, r, &j->recipients)
+ /* If we failed to issue a cleanup token, there is definitely
+ * somebody holding it, no more tokens needed. */
+ if (! lfjour_issue_cleanup_token(j, r))
+ {
+ ASSERT_DIE(atomic_load_explicit(&j->issued_tokens, memory_order_acquire) > 0);
+ return;
+ }
+ }
+
+ /* We have to find some recipient which has not
+ * yet finished. When we find it, it will happily
+ * accept the token so we are done. */
+ WALK_TLIST(lfjour_recipient, r, &j->recipients)
+ if (lfjour_issue_cleanup_token(j, r))
+ return;
+
+ /* Nobody needs anything, but the journal is not empty.
+ * Run cleanup again. */
+ lfjour_schedule_cleanup(j);
+}
+
static void
lfjour_cleanup_hook(void *_j)
{
CLEANUP(lfjour_cleanup_unlock_helper) struct domain_generic *_locked = j->domain;
if (_locked) DG_LOCK(_locked);
+ /* We consider ourselves holding one token to keep recipients
+ * from scheduling us too early. */
+ atomic_fetch_add_explicit(&j->issued_tokens, 1, memory_order_acq_rel);
+
u64 min_seq = ~((u64) 0);
const struct lfjour_item *last_item_to_free = NULL;
struct lfjour_item *first = atomic_load_explicit(&j->first, memory_order_acquire);
if (!first)
{
+ lfjour_cleanup_done(j);
+
/* Nothing to cleanup, actually, just call the done callback */
ASSERT_DIE(EMPTY_TLIST(lfjour_block, &j->pending));
CALL(j->cleanup_done, j, 0, ~((u64) 0));
+
return;
}
const struct lfjour_item *last = atomic_load_explicit(&r->last, memory_order_acquire);
if (!last)
- /* No last export means that the channel has exported nothing since last cleanup */
+ {
+ /* No last export means that the channel has exported nothing since
+ * last cleanup. Let's try to give them the token,
+ * and we're done anyway. */
+ lfjour_issue_cleanup_token(j, r);
+ lfjour_cleanup_done(j);
return;
+ }
else if (min_seq > last->seq)
{
memory_order_acq_rel, memory_order_acquire))
{
lfjour_debug("lfjour(%p)_cleanup(recipient=%p): store last=NULL", j, r);
+
+ /* The most-behind-one gets the cleanup token */
+ lfjour_issue_cleanup_token(j, r);
}
else
{
first = next;
}
+ lfjour_cleanup_done(j);
+
CALL(j->cleanup_done, j, orig_first_seq, first ? first->seq : ~((u64) 0));
}
.hook = lfjour_cleanup_hook,
.data = j,
};
+ j->max_tokens = 20;
}