From: Willy Tarreau Date: Sun, 29 Apr 2007 08:41:56 +0000 (+0200) Subject: [MAJOR] replaced rbtree with ul2tree. X-Git-Tag: v1.3.10~31 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=96bcfd75aad6f410d7c8c0311d76799cf1ad87a5;p=thirdparty%2Fhaproxy.git [MAJOR] replaced rbtree with ul2tree. The rbtree-based wait queue consumes a lot of CPU. Use the ul2tree instead. Lots of cleanups and code reorganizations made it possible to reduce the task struct and simplify the code a bit. --- diff --git a/include/proto/task.h b/include/proto/task.h index 8bd973a513..424a46c6d0 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -2,7 +2,7 @@ include/proto/task.h Functions for task management. - Copyright (C) 2000-2006 Willy Tarreau - w@1wt.eu + Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -27,42 +27,66 @@ #include #include +#include +#include + #include +extern void *run_queue; + +/* needed later */ +void *tree_delete(void *node); /* puts the task in run queue , and returns */ -static inline struct task *task_wakeup(struct task **q, struct task *t) +static inline struct task *task_wakeup(struct task *t) { if (t->state == TASK_RUNNING) return t; - else { - t->rqnext = *q; - t->state = TASK_RUNNING; - return *q = t; + + if (t->qlist.p != NULL) + DLIST_DEL(&t->qlist); + + DLIST_ADD(run_queue, &t->qlist); + t->state = TASK_RUNNING; + + if (likely(t->wq)) { + tree_delete(t->wq); + t->wq = NULL; } + + return t; } -/* removes the task from the queue - * MUST be 's first task. - * set the run queue to point to the next one, and return it +/* removes the task from the run queue if it was in it. + * returns . */ -static inline struct task *task_sleep(struct task **q, struct task *t) +static inline struct task *task_sleep(struct task *t) { if (t->state == TASK_RUNNING) { - *q = t->rqnext; - t->state = TASK_IDLE; /* tell that s has left the run queue */ + DLIST_DEL(&t->qlist); + t->qlist.p = NULL; + t->state = TASK_IDLE; } - return *q; /* return next running task */ + return t; } /* - * removes the task from its wait queue. It must have already been removed - * from the run queue. A pointer to the task itself is returned. + * unlinks the task from wherever it is queued : + * - eternity_queue, run_queue + * - wait queue : wq not null => remove carrier node too + * A pointer to the task itself is returned. */ static inline struct task *task_delete(struct task *t) { - rb_erase(&t->rb_node, t->wq); - t->wq = NULL; + if (t->qlist.p != NULL) { + DLIST_DEL(&t->qlist); + t->qlist.p = NULL; + } + + if (t->wq) { + tree_delete(t->wq); + t->wq = NULL; + } return t; } diff --git a/include/types/task.h b/include/types/task.h index d09efae2c8..1fd78be56b 100644 --- a/include/types/task.h +++ b/include/types/task.h @@ -25,7 +25,8 @@ #include #include -#include +#include +#include /* values for task->state */ #define TASK_IDLE 0 @@ -33,10 +34,9 @@ /* The base for all tasks */ struct task { - struct rb_node rb_node; - struct rb_root *wq; - struct task *rqnext; /* chaining in run queue ... */ - int state; /* task state : IDLE or RUNNING */ + struct list qlist; /* chaining in the same queue; bidirectionnal but not circular */ + struct ultree *wq; /* NULL if unqueued, or back ref to the carrier node in the WQ */ + int state; /* task state : IDLE or RUNNING */ struct timeval expire; /* next expiration time for this task, use only for fast sorting */ int (*process)(struct task *t); /* the function which processes the task */ void *context; /* the task's context */ @@ -45,10 +45,6 @@ struct task { #define sizeof_task sizeof(struct task) extern void **pool_task; -extern struct rb_root wait_queue[2]; -extern struct task *rq; - - #endif /* _TYPES_TASK_H */ /* diff --git a/src/appsession.c b/src/appsession.c index 62b096ff48..0adbc332c3 100644 --- a/src/appsession.c +++ b/src/appsession.c @@ -114,7 +114,7 @@ int appsession_task_init(void) if ((t = pool_alloc(task)) == NULL) return -1; t->wq = NULL; - t->rqnext = NULL; + t->qlist.p = NULL; t->state = TASK_IDLE; t->context = NULL; tv_delayfrom(&t->expire, &now, TBLCHKINT); diff --git a/src/backend.c b/src/backend.c index 0eb97ed066..f096874f20 100644 --- a/src/backend.c +++ b/src/backend.c @@ -572,7 +572,7 @@ int srv_count_retry_down(struct session *t, int conn_err) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } return 0; @@ -611,7 +611,7 @@ int srv_retryable_connect(struct session *t) t->be->failed_conns++; /* release other sessions waiting for this server */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } /* ensure that we have enough retries left */ @@ -625,7 +625,7 @@ int srv_retryable_connect(struct session *t) */ /* let's try to offer this slot to anybody */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); if (t->srv) t->srv->failed_conns++; @@ -691,7 +691,7 @@ int srv_redispatch_connect(struct session *t) /* release other sessions waiting for this server */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } /* if we get here, it's because we got SRV_STATUS_OK, which also diff --git a/src/cfgparse.c b/src/cfgparse.c index 33670f1556..289d634112 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -2415,7 +2415,7 @@ int readcfgfile(const char *file) return -1; } - t->rqnext = NULL; + t->qlist.p = NULL; t->wq = NULL; t->state = TASK_IDLE; t->process = process_srv_queue; @@ -2463,7 +2463,7 @@ int readcfgfile(const char *file) } t->wq = NULL; - t->rqnext = NULL; + t->qlist.p = NULL; t->state = TASK_IDLE; t->process = process_chk; t->context = newsrv; diff --git a/src/checks.c b/src/checks.c index 309d0c4a76..f9bc5c5a9c 100644 --- a/src/checks.c +++ b/src/checks.c @@ -75,7 +75,7 @@ static void set_server_down(struct server *s) sess->srv = NULL; /* it's left to the dispatcher to choose a server */ http_flush_cookie_flags(&sess->txn); pendconn_free(pc); - task_wakeup(&rq, sess->task); + task_wakeup(sess->task); xferred++; } } @@ -167,7 +167,7 @@ static int event_srv_chk_w(int fd) } } out_wakeup: - task_wakeup(&rq, t); + task_wakeup(t); out_nowake: EV_FD_CLR(fd, DIR_WR); /* nothing more to write */ fdtab[fd].ev &= ~FD_POLL_WR; @@ -237,7 +237,7 @@ static int event_srv_chk_r(int fd) out_wakeup: EV_FD_CLR(fd, DIR_RD); - task_wakeup(&rq, t); + task_wakeup(t); fdtab[fd].ev &= ~FD_POLL_RD; return 1; } @@ -436,7 +436,7 @@ int process_chk(struct task *t) p->sess->srv = s; sess = p->sess; pendconn_free(p); - task_wakeup(&rq, sess->task); + task_wakeup(sess->task); } sprintf(trash, diff --git a/src/client.c b/src/client.c index 521b8604b3..028282ef1c 100644 --- a/src/client.c +++ b/src/client.c @@ -152,7 +152,7 @@ int event_accept(int fd) { setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)); t->wq = NULL; - t->rqnext = NULL; + t->qlist.p = NULL; t->state = TASK_IDLE; t->process = process_session; t->context = s; @@ -422,7 +422,7 @@ int event_accept(int fd) { task_queue(t); if (p->mode != PR_MODE_HEALTH) - task_wakeup(&rq, t); + task_wakeup(t); p->feconn++; /* beconn will be increased later */ if (p->feconn > p->feconn_max) diff --git a/src/haproxy.c b/src/haproxy.c index f8946f9368..5776d18461 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -269,6 +269,7 @@ void sig_dump_state(int sig) void dump(int sig) { +#if 0 struct task *t; struct session *s; struct rb_node *node; @@ -290,6 +291,7 @@ void dump(int sig) s->req->l, s->rep?s->rep->l:0, s->cli_fd ); } +#endif } #ifdef DEBUG_MEMORY diff --git a/src/proto_http.c b/src/proto_http.c index e9b3b56a30..00277f5ca7 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -2353,7 +2353,7 @@ int process_srv(struct session *t) */ /* let's try to offer this slot to anybody */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); if (t->srv) t->srv->failed_conns++; @@ -2539,7 +2539,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (t->srv && may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } @@ -2581,7 +2581,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (t->srv && may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } @@ -2753,7 +2753,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (t->srv && may_dequeue_tasks(t->srv, cur_proxy)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } } @@ -2988,7 +2988,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } @@ -3101,7 +3101,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } @@ -3117,7 +3117,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } @@ -3137,7 +3137,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } @@ -3182,7 +3182,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } @@ -3198,7 +3198,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } @@ -3218,7 +3218,7 @@ int process_srv(struct session *t) * we have to inform the server that it may be used by another session. */ if (may_dequeue_tasks(t->srv, t->be)) - task_wakeup(&rq, t->srv->queue_mgt); + task_wakeup(t->srv->queue_mgt); return 1; } diff --git a/src/queue.c b/src/queue.c index 54fd6a3b28..9b53687de1 100644 --- a/src/queue.c +++ b/src/queue.c @@ -62,7 +62,7 @@ int process_srv_queue(struct task *t) sess = pendconn_get_next_sess(s, p); if (sess == NULL) break; - task_wakeup(&rq, sess->task); + task_wakeup(sess->task); } return TIME_ETERNITY; diff --git a/src/stream_sock.c b/src/stream_sock.c index 2a7275baa5..77eda02c83 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -156,7 +156,7 @@ int stream_sock_read(int fd) { else tv_eternity(&b->rex); - task_wakeup(&rq, fdtab[fd].owner); + task_wakeup(fdtab[fd].owner); } fdtab[fd].ev &= ~FD_POLL_RD; @@ -291,7 +291,7 @@ int stream_sock_write(int fd) { } } - task_wakeup(&rq, fdtab[fd].owner); + task_wakeup(fdtab[fd].owner); fdtab[fd].ev &= ~FD_POLL_WR; return retval; } diff --git a/src/task.c b/src/task.c index ca262d634e..a0bf7af60b 100644 --- a/src/task.c +++ b/src/task.c @@ -1,7 +1,7 @@ /* * Task management functions. * - * Copyright 2000-2006 Willy Tarreau + * Copyright 2000-2007 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -13,95 +13,117 @@ #include #include #include +#include #include +#include + +// FIXME: check 8bitops.c for faster FLS +#include +#include /* FIXME : this should be removed very quickly ! */ extern int maintain_proxies(void); void **pool_task= NULL; -struct task *rq = NULL; /* global run queue */ - -struct rb_root wait_queue[2] = { - RB_ROOT, - RB_ROOT, -}; +void **pool_tree64 = NULL; +static struct ultree *stack[LLONGBITS]; +UL2TREE_HEAD(timer_wq); +void *eternity_queue = NULL; +void *run_queue = NULL; -static inline void __rb_insert_task_queue(struct task *newtask) +struct ultree *ul2tree_insert(struct ultree *root, unsigned long h, unsigned long l) { - struct rb_node **p = &newtask->wq->rb_node; - struct rb_node *parent = NULL; - struct task * task; - - while (*p) - { - parent = *p; - task = rb_entry(parent, struct task, rb_node); - if (tv_cmp_ge2(&task->expire, &newtask->expire)) - p = &(*p)->rb_left; - else - p = &(*p)->rb_right; - } - rb_link_node(&newtask->rb_node, parent, p); + return __ul2tree_insert(root, h, l); } -static void rb_insert_task_queue(struct task *newtask) -{ - __rb_insert_task_queue(newtask); - rb_insert_color(&newtask->rb_node, newtask->wq); +void *tree_delete(void *node) { + return __tree_delete(node); } - +/* + * task_queue() + * + * Inserts a task into the wait queue at the position given by its expiration + * date. + * + */ struct task *task_queue(struct task *task) { - struct rb_node *node; - struct task *next, *prev; - - if (tv_iseternity(&task->expire)) { - if (task->wq) { - if (task->wq == &wait_queue[1]) - return task; - else - task_delete(task); - } - task->wq = &wait_queue[1]; - rb_insert_task_queue(task); + if (unlikely(task->qlist.p != NULL)) { + DLIST_DEL(&task->qlist); + task->qlist.p = NULL; + } + + if (unlikely(task->wq)) { + tree_delete(task->wq); + task->wq = NULL; + } + + if (unlikely(tv_iseternity(&task->expire))) { + task->wq = NULL; + DLIST_ADD(eternity_queue, &task->qlist); return task; - } else { - if (task->wq != &wait_queue[0]) { - if (task->wq) - task_delete(task); - task->wq = &wait_queue[0]; - rb_insert_task_queue(task); - return task; - } + } + + task->wq = ul2tree_insert(&timer_wq, task->expire.tv_sec, task->expire.tv_usec); + DLIST_ADD(task->wq->data, &task->qlist); + return task; +} + + +/* + * Extract all expired timers from the wait queue, and wakes up all + * associated tasks. + * Returns the time to wait for next task (next_time). + * + * FIXME: Use an alternative queue for ETERNITY tasks. + * + */ +int wake_expired_tasks() +{ + int slen; + struct task *task; + void *data; + int next_time; + + /* + * Hint: tasks are *rarely* expired. So we can try to optimize + * by not scanning the tree at all in most cases. + */ + + if (likely(timer_wq.data != NULL)) { + task = LIST_ELEM(timer_wq.data, struct task *, qlist); + if (likely(tv_cmp_ge(&task->expire, &now) > 0)) + return tv_remain(&now, &task->expire); + } + + /* OK we lose. Let's scan the tree then. */ + next_time = TIME_ETERNITY; - // check whether task should be re insert - node = rb_prev(&task->rb_node); - if (node) { - prev = rb_entry(node, struct task, rb_node); - if (tv_cmp_ge(&prev->expire, &task->expire)) { - task_delete(task); - task->wq = &wait_queue[0]; - rb_insert_task_queue(task); - return task; - } + tree64_foreach(&timer_wq, data, stack, slen) { + task = LIST_ELEM(data, struct task *, qlist); + + if (unlikely(tv_cmp_ge(&task->expire, &now) > 0)) { + next_time = tv_remain(&now, &task->expire); + break; } - node = rb_next(&task->rb_node); - if (node) { - next = rb_entry(node, struct task, rb_node); - if (tv_cmp_ge(&task->expire, &next->expire)) { - task_delete(task); - task->wq = &wait_queue[0]; - rb_insert_task_queue(task); - return task; - } + /* + * OK, all tasks linked to this node will be unlinked, as well + * as the node itself, so we do not need to care about correct + * unlinking. + */ + foreach_dlist_item(task, data, struct task *, qlist) { + DLIST_DEL(&task->qlist); + task->wq = NULL; + DLIST_ADD(run_queue, &task->qlist); + task->state = TASK_RUNNING; } - return task; } + return next_time; } /* @@ -117,35 +139,23 @@ int process_runnable_tasks() int next_time; int time2; struct task *t; - struct rb_node *node; - - next_time = TIME_ETERNITY; - for (node = rb_first(&wait_queue[0]); - node != NULL; node = rb_next(node)) { - t = rb_entry(node, struct task, rb_node); - if (t->state & TASK_RUNNING) - continue; - if (tv_iseternity(&t->expire)) - continue; - if (tv_cmp_ms(&t->expire, &now) <= 0) { - task_wakeup(&rq, t); - } else { - int temp_time = tv_remain(&now, &t->expire); - if (temp_time) - next_time = temp_time; - break; - } - } + void *queue; + next_time = wake_expired_tasks(); /* process each task in the run queue now. Each task may be deleted * since we only use the run queue's head. Note that any task can be * woken up by any other task and it will be processed immediately * after as it will be queued on the run queue's head ! */ - while ((t = rq) != NULL) { + + queue = run_queue; + foreach_dlist_item(t, queue, struct task *, qlist) { int temp_time; - task_sleep(&rq, t); + DLIST_DEL(&t->qlist); + t->qlist.p = NULL; + + t->state = TASK_IDLE; temp_time = t->process(t); next_time = MINTIME(temp_time, next_time); }