#include <common/mini-clist.h>
#include <types/applet.h>
#include <proto/connection.h>
+#include <proto/task.h>
extern unsigned int nb_applets;
-extern unsigned long active_applets_mask;
-extern unsigned int applets_active_queue;
-__decl_hathreads(extern HA_SPINLOCK_T applet_active_lock);
-extern struct list applet_active_queue;
-void applet_run_active();
+struct task *task_run_applet(struct task *t, void *context, unsigned short state);
static int inline appctx_res_wakeup(struct appctx *appctx);
appctx->chunk = NULL;
appctx->io_release = NULL;
appctx->thread_mask = thread_mask;
- appctx->state = APPLET_SLEEPING;
+ appctx->state = 0;
}
/* Tries to allocate a new appctx and initialize its main fields. The appctx
appctx->obj_type = OBJ_TYPE_APPCTX;
appctx->applet = applet;
appctx_init(appctx, thread_mask);
- LIST_INIT(&appctx->runq);
+ appctx->t = task_new(thread_mask);
+ if (unlikely(appctx->t == NULL)) {
+ pool_free(pool_head_connection, appctx);
+ return NULL;
+ }
+ appctx->t->process = task_run_applet;
+ appctx->t->context = appctx;
LIST_INIT(&appctx->buffer_wait.list);
appctx->buffer_wait.target = appctx;
appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
*/
static inline void __appctx_free(struct appctx *appctx)
{
- if (!LIST_ISEMPTY(&appctx->runq)) {
- LIST_DEL(&appctx->runq);
- applets_active_queue--;
- }
-
+ task_delete(appctx->t);
+ task_free(appctx->t);
if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&appctx->buffer_wait.list);
pool_free(pool_head_connection, appctx);
HA_ATOMIC_SUB(&nb_applets, 1);
}
+
static inline void appctx_free(struct appctx *appctx)
{
- HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
- if (appctx->state & APPLET_RUNNING) {
+ /* The task is supposed to be run on this thread, so we can just
+ * check if it's running already (or about to run) or not
+ */
+ if (!(appctx->t->state & TASK_RUNNING))
+ __appctx_free(appctx);
+ else {
+ /* if it's running, or about to run, defer the freeing
+ * until the callback is called.
+ */
appctx->state |= APPLET_WANT_DIE;
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
- return;
+ task_wakeup(appctx->t, TASK_WOKEN_OTHER);
}
- __appctx_free(appctx);
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
}
/* wakes up an applet when conditions have changed */
-static inline void __appctx_wakeup(struct appctx *appctx)
-{
- if (LIST_ISEMPTY(&appctx->runq)) {
- LIST_ADDQ(&applet_active_queue, &appctx->runq);
- applets_active_queue++;
- active_applets_mask |= appctx->thread_mask;
- }
-}
-
static inline void appctx_wakeup(struct appctx *appctx)
{
- HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
- if (appctx->state & APPLET_RUNNING) {
- appctx->state |= APPLET_WOKEN_UP;
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
- return;
- }
- __appctx_wakeup(appctx);
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
+ task_wakeup(appctx->t, TASK_WOKEN_OTHER);
}
/* Callback used to wake up an applet when a buffer is available. The applet
* requested */
static inline int appctx_res_wakeup(struct appctx *appctx)
{
- HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
- if (appctx->state & APPLET_RUNNING) {
- if (appctx->state & APPLET_WOKEN_UP) {
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
- return 0;
- }
- appctx->state |= APPLET_WOKEN_UP;
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
- return 1;
- }
- __appctx_wakeup(appctx);
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
- return 1;
+ int ret;
+
+ /* To detect if we have already been waken or not, we now that
+ * if the state contains TASK_RUNNING, but not just TASK_RUNNING.
+ * This is racy, but that's OK. At worst we will wake a little more
+ * tasks than necessary when a buffer is available.
+ */
+ ret = ((appctx->state & TASK_RUNNING) != 0) &&
+ ((appctx->state != TASK_RUNNING));
+ task_wakeup(appctx->t, TASK_WOKEN_OTHER);
+ return ret;
}
#include <types/stream.h>
#include <types/stream_interface.h>
-#include <proto/applet.h>
#include <proto/task.h>
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
{
if (chn->buf->size && buffer_empty(chn->buf)) {
b_free(&chn->buf);
- offer_buffers(wait->target, tasks_run_queue + applets_active_queue);
+ offer_buffers(wait->target, tasks_run_queue);
}
}
unsigned int timeout; /* execution timeout. */
};
-#define APPLET_SLEEPING 0x00 /* applet is currently sleeping or pending in active queue */
-#define APPLET_RUNNING 0x01 /* applet is currently running */
-#define APPLET_WOKEN_UP 0x02 /* applet was running and requested to woken up again */
-#define APPLET_WANT_DIE 0x04 /* applet was running and requested to die */
+#define APPLET_WANT_DIE 0x01 /* applet was running and requested to die */
#define APPCTX_CLI_ST1_PROMPT (1 << 0)
#define APPCTX_CLI_ST1_PAYLOAD (1 << 1)
/* Context of a running applet. */
struct appctx {
- struct list runq; /* chaining in the applet run queue */
enum obj_type obj_type; /* OBJ_TYPE_APPCTX */
/* 3 unused bytes here */
unsigned short state; /* Internal appctx state */
int cli_severity_output; /* used within the cli_io_handler to format severity output of informational feedback */
struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
unsigned long thread_mask; /* mask of thread IDs authorized to process the applet */
+ struct task *t; /* task associated to the applet */
union {
struct {
unsigned int loops; // complete loops in run_poll_loop()
unsigned int wake_cache; // active fd_cache prevented poll() from sleeping
unsigned int wake_tasks; // active tasks prevented poll() from sleeping
- unsigned int wake_applets; // active applets prevented poll() from sleeping
unsigned int wake_signal; // pending signal prevented poll() from sleeping
unsigned int poll_exp; // number of times poll() sees an expired timeout (includes wake_*)
unsigned int poll_drop; // poller dropped a dead FD from the update list
#include <proto/channel.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
+#include <proto/task.h>
unsigned int nb_applets = 0;
-unsigned long active_applets_mask = 0;
-unsigned int applets_active_queue = 0;
-__decl_hathreads(HA_SPINLOCK_T applet_active_lock); /* spin lock related to applet active queue */
-struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
-
-void applet_run_active()
+struct task *task_run_applet(struct task *t, void *context, unsigned short state)
{
- struct appctx *curr, *next;
- struct stream_interface *si;
- struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue);
- int max_processed;
-
- max_processed = applets_active_queue;
- if (max_processed > 200)
- max_processed = 200;
+ struct appctx *app = context;
+ struct stream_interface *si = app->owner;
- HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
- if (!(active_applets_mask & tid_bit)) {
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
- return;
- }
- active_applets_mask &= ~tid_bit;
- curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
- while (&curr->runq != &applet_active_queue) {
- next = LIST_NEXT(&curr->runq, typeof(next), runq);
- if (curr->thread_mask & tid_bit) {
- LIST_DEL(&curr->runq);
- curr->state = APPLET_RUNNING;
- LIST_ADDQ(&applet_cur_queue, &curr->runq);
- applets_active_queue--;
- max_processed--;
- }
- curr = next;
- if (max_processed <= 0) {
- active_applets_mask |= tid_bit;
- break;
- }
+ if (app->state & APPLET_WANT_DIE) {
+ __appctx_free(app);
+ return NULL;
}
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-
- /* The list is only scanned from the head. This guarantees that if any
- * applet removes another one, there is no side effect while walking
- * through the list.
+ /* Now we'll try to allocate the input buffer. We wake up the
+ * applet in all cases. So this is the applet responsibility to
+ * check if this buffer was allocated or not. This let a chance
+ * for applets to do some other processing if needed. */
+ if (!channel_alloc_buffer(si_ic(si), &app->buffer_wait))
+ si_applet_cant_put(si);
+
+ /* We always pretend the applet can't get and doesn't want to
+ * put, it's up to it to change this if needed. This ensures
+ * that one applet which ignores any event will not spin.
*/
- while (!LIST_ISEMPTY(&applet_cur_queue)) {
- curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq);
- si = curr->owner;
-
- /* Now we'll try to allocate the input buffer. We wake up the
- * applet in all cases. So this is the applet responsibility to
- * check if this buffer was allocated or not. This let a chance
- * for applets to do some other processing if needed. */
- if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait))
- si_applet_cant_put(si);
+ si_applet_cant_get(si);
+ si_applet_stop_put(si);
- /* We always pretend the applet can't get and doesn't want to
- * put, it's up to it to change this if needed. This ensures
- * that one applet which ignores any event will not spin.
- */
- si_applet_cant_get(si);
- si_applet_stop_put(si);
-
- curr->applet->fct(curr);
- si_applet_wake_cb(si);
- channel_release_buffer(si_ic(si), &curr->buffer_wait);
-
- if (applet_cur_queue.n == &curr->runq) {
- /* curr was left in the list, move it back to the active list */
- LIST_DEL(&curr->runq);
- LIST_INIT(&curr->runq);
- HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
- if (curr->state & APPLET_WANT_DIE) {
- curr->state = APPLET_SLEEPING;
- __appctx_free(curr);
- }
- else {
- if (curr->state & APPLET_WOKEN_UP) {
- curr->state = APPLET_SLEEPING;
- __appctx_wakeup(curr);
- }
- else {
- curr->state = APPLET_SLEEPING;
- }
- }
- HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
- }
- }
+ app->applet->fct(app);
+ si_applet_wake_cb(si);
+ channel_release_buffer(si_ic(si), &app->buffer_wait);
+ return t;
}
-__attribute__((constructor))
-static void __applet_init(void)
-{
- HA_SPIN_INIT(&applet_active_lock);
-}
#include <proto/stick_table.h>
#include <proto/task.h>
#include <proto/tcp_rules.h>
+#include <proto/connection.h>
/* This is the SSLv3 CLIENT HELLO packet used in conjunction with the
chunk_appendf(&trash, "\nloops:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].loops);
chunk_appendf(&trash, "\nwake_cache:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_cache);
chunk_appendf(&trash, "\nwake_tasks:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_tasks);
- chunk_appendf(&trash, "\nwake_applets:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_applets);
chunk_appendf(&trash, "\nwake_signal:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_signal);
chunk_appendf(&trash, "\npoll_exp:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_exp);
chunk_appendf(&trash, "\npoll_drop:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_drop);
/* Release the buffer if needed */
if ((*buf)->size) {
b_free(buf);
- offer_buffers(buffer_wait->target,
- tasks_run_queue + applets_active_queue);
+ offer_buffers(buffer_wait->target, tasks_run_queue);
}
}
#include <types/peers.h>
#include <proto/acl.h>
-#include <proto/applet.h>
#include <proto/arg.h>
#include <proto/auth.h>
#include <proto/backend.h>
activity[tid].wake_cache++;
else if (active_tasks_mask & tid_bit)
activity[tid].wake_tasks++;
- else if (active_applets_mask & tid_bit)
- activity[tid].wake_applets++;
else if (signal_queue_len)
activity[tid].wake_signal++;
else
/* The poller will ensure it returns around <next> */
cur_poller.poll(&cur_poller, exp);
fd_process_cached_events();
- applet_run_active();
/* Synchronize all polling loops */
#include <common/hpack-enc.h>
#include <common/hpack-tbl.h>
#include <common/net_helper.h>
-#include <proto/applet.h>
#include <proto/connection.h>
#include <proto/h1.h>
#include <proto/stream.h>
{
if ((*bptr)->size) {
b_free(bptr);
- offer_buffers(h2c->buf_wait.target,
- tasks_run_queue + applets_active_queue);
+ offer_buffers(h2c->buf_wait.target, tasks_run_queue);
}
}
if (s->req.buf->size || s->res.buf->size) {
b_drop(&s->req.buf);
b_drop(&s->res.buf);
- offer_buffers(NULL, tasks_run_queue + applets_active_queue);
+ offer_buffers(NULL, tasks_run_queue);
}
hlua_ctx_destroy(s->hlua);
* someone waiting, we can wake up a waiter and offer them.
*/
if (offer)
- offer_buffers(s, tasks_run_queue + applets_active_queue);
+ offer_buffers(s, tasks_run_queue);
}
/* perform minimal intializations, report 0 in case of error, 1 if OK. */