#define LOCAL_DEBUG
+_Thread_local struct lfuc_unlock_queue *lfuc_unlock_queue;
+
+void lfuc_unlock_deferred(void *_q)
+{
+ struct lfuc_unlock_queue *q = _q;
+ for (u32 i = 0; i < q->pos; i++)
+ lfuc_unlock_immediately(q->block[i].c, q->block[i].el, q->block[i].ev);
+
+ free_page(q);
+ lfuc_unlock_queue = NULL;
+}
+
#if 0
#define lfjour_debug(...) log(L_TRACE __VA_ARGS__)
#define lfjour_debug_detailed(...) log(L_TRACE __VA_ARGS__)
#include "lib/rcu.h"
#include "lib/settle.h"
#include "lib/tlists.h"
+#include "lib/io-loop.h"
#include <stdatomic.h>
}
/**
- * lfuc_unlock - decrease an atomic usecount
+ * lfuc_unlock_immediately - decrease an atomic usecount
* @c: the usecount structure
* @el: prune event list
* @ev: prune event itself
* If the usecount reaches zero, a prune event is run to possibly free the object.
* The prune event MUST use lfuc_finished() to check the object state.
*/
-static inline u64 lfuc_unlock(struct lfuc *c, event_list *el, event *ev)
+static inline void lfuc_unlock_immediately(struct lfuc *c, event_list *el, event *ev)
{
/* Unlocking is tricky. We do it lockless so at the same time, the prune
* event may be running, therefore if the unlock gets us to zero, it must be
* RCU synchronization instead of a busy loop. */
rcu_read_unlock();
- return uc - LFUC_IN_PROGRESS - 1;
+// return uc - LFUC_IN_PROGRESS - 1;
+}
+
+extern _Thread_local struct lfuc_unlock_queue {
+ event e;
+ u32 pos;
+ struct lfuc_unlock_queue_block {
+ struct lfuc *c;
+ event_list *el;
+ event *ev;
+ } block[0];
+} *lfuc_unlock_queue;
+
+void lfuc_unlock_deferred(void *queue);
+
+static inline void lfuc_unlock(struct lfuc *c, event_list *el, event *ev)
+{
+ static u32 queue_items = 0;
+ if (queue_items == 0)
+ {
+ ASSERT_DIE((u64) page_size > sizeof(struct lfuc_unlock_queue) + sizeof(struct lfuc_unlock_queue_block));
+ queue_items = (page_size - OFFSETOF(struct lfuc_unlock_queue, block))
+ / sizeof lfuc_unlock_queue->block[0];
+ }
+
+ if (!lfuc_unlock_queue || (lfuc_unlock_queue->pos >= queue_items))
+ {
+ lfuc_unlock_queue = alloc_page();
+ *lfuc_unlock_queue = (struct lfuc_unlock_queue) {
+ .e = {
+ .hook = lfuc_unlock_deferred,
+ .data = lfuc_unlock_queue,
+ },
+ };
+
+ ev_send_this_thread(&lfuc_unlock_queue->e);
+ }
+
+ lfuc_unlock_queue->block[lfuc_unlock_queue->pos++] = (struct lfuc_unlock_queue_block) {
+ .c = c,
+ .el = el,
+ .ev = ev,
+ };
}
/**
inline void mpls_unlock_fec(struct mpls_fec *fec)
{
- UNUSED u64 s = lfuc_unlock(&fec->uc, birdloop_event_list(fec->map->loop), fec->map->cleanup_event);
- DBGL("Unlocked FEC %p %u, now %lu", fec, fec->label, s);
+ lfuc_unlock(&fec->uc, birdloop_event_list(fec->map->loop), fec->map->cleanup_event);
+ DBGL("Unlocked FEC %p %u (deferred)", fec, fec->label);
}
static inline void