SIGNALS_LOCK,
STK_TABLE_LOCK,
STK_SESS_LOCK,
+ APPLETS_LOCK,
LOCK_LABELS
};
struct lock_stat {
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
"TASK_RQ", "TASK_WQ", "POOL",
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
- "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS" };
+ "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
+ "APPLETS" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
extern unsigned int nb_applets;
extern unsigned int applets_active_queue;
-
+#ifdef USE_THREAD
+extern HA_SPINLOCK_T applet_active_lock;
+#endif
extern struct list applet_active_queue;
void applet_run_active();
* minimum acceptable initialization for an appctx. This means only the
* 3 integer states st0, st1, st2 are zeroed.
*/
-static inline void appctx_init(struct appctx *appctx)
+static inline void appctx_init(struct appctx *appctx, unsigned long thread_mask)
{
appctx->st0 = appctx->st1 = appctx->st2 = 0;
appctx->io_release = NULL;
+ appctx->process_mask = thread_mask;
appctx->state = APPLET_SLEEPING;
}
* pool_free2(connection) or appctx_free(), since it's allocated from the
* connection pool. <applet> is assigned as the applet, but it can be NULL.
*/
-static inline struct appctx *appctx_new(struct applet *applet)
+static inline struct appctx *appctx_new(struct applet *applet, unsigned long thread_mask)
{
struct appctx *appctx;
if (likely(appctx != NULL)) {
appctx->obj_type = OBJ_TYPE_APPCTX;
appctx->applet = applet;
- appctx_init(appctx);
+ appctx_init(appctx, thread_mask);
LIST_INIT(&appctx->runq);
LIST_INIT(&appctx->buffer_wait.list);
appctx->buffer_wait.target = appctx;
appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
- nb_applets++;
+ HA_ATOMIC_ADD(&nb_applets, 1);
}
return appctx;
}
LIST_DEL(&appctx->runq);
applets_active_queue--;
}
+
if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
LIST_DEL(&appctx->buffer_wait.list);
LIST_INIT(&appctx->buffer_wait.list);
}
+
pool_free2(pool2_connection, appctx);
- nb_applets--;
+ HA_ATOMIC_SUB(&nb_applets, 1);
}
static inline void appctx_free(struct appctx *appctx)
{
+ SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (appctx->state & APPLET_RUNNING) {
appctx->state |= APPLET_WANT_DIE;
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return;
}
__appctx_free(appctx);
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
}
/* wakes up an applet when conditions have changed */
static inline void appctx_wakeup(struct appctx *appctx)
{
+ SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (appctx->state & APPLET_RUNNING) {
appctx->state |= APPLET_WOKEN_UP;
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return;
}
__appctx_wakeup(appctx);
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
}
/* Callback used to wake up an applet when a buffer is available. The applet
* requested */
static inline int appctx_res_wakeup(struct appctx *appctx)
{
+ SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (appctx->state & APPLET_RUNNING) {
if (appctx->state & APPLET_WOKEN_UP) {
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return 0;
}
appctx->state |= APPLET_WOKEN_UP;
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return 1;
}
if (!LIST_ISEMPTY(&appctx->runq)) {
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return 0;
}
__appctx_wakeup(appctx);
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
return 1;
}
unsigned int nb_applets = 0;
unsigned int applets_active_queue = 0;
+#ifdef USE_THREAD
+HA_SPINLOCK_T applet_active_lock; /* spin lock related to applet active queue */
+#endif
+
struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
void applet_run_active()
if (!applets_active_queue)
return;
+ SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
+
curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
while (&curr->runq != &applet_active_queue) {
next = LIST_NEXT(&curr->runq, typeof(next), runq);
- LIST_DEL(&curr->runq);
- curr->state = APPLET_RUNNING;
- LIST_ADDQ(&applet_cur_queue, &curr->runq);
- applets_active_queue--;
+ if (curr->process_mask & (1UL << tid)) {
+ LIST_DEL(&curr->runq);
+ curr->state = APPLET_RUNNING;
+ LIST_ADDQ(&applet_cur_queue, &curr->runq);
+ applets_active_queue--;
+ }
curr = next;
}
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
+
/* The list is only scanned from the head. This guarantees that if any
* applet removes another one, there is no side effect while walking
* through the list.
/* curr was left in the list, move it back to the active list */
LIST_DEL(&curr->runq);
LIST_INIT(&curr->runq);
+ SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
if (curr->state & APPLET_WANT_DIE) {
curr->state = APPLET_SLEEPING;
__appctx_free(curr);
curr->state = APPLET_SLEEPING;
}
}
+ SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
}
}
}
+
+__attribute__((constructor))
+static void __applet_init(void)
+{
+ SPIN_INIT(&applet_active_lock);
+}