]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
[MAJOR] replaced rbtree with ul2tree.
authorWilly Tarreau <w@1wt.eu>
Sun, 29 Apr 2007 08:41:56 +0000 (10:41 +0200)
committerWilly Tarreau <w@1wt.eu>
Sun, 29 Apr 2007 11:43:53 +0000 (13:43 +0200)
The rbtree-based wait queue consumes a lot of CPU. Use the ul2tree
instead. Lots of cleanups and code reorganizations made it possible
to reduce the task struct and simplify the code a bit.

12 files changed:
include/proto/task.h
include/types/task.h
src/appsession.c
src/backend.c
src/cfgparse.c
src/checks.c
src/client.c
src/haproxy.c
src/proto_http.c
src/queue.c
src/stream_sock.c
src/task.c

index 8bd973a513fbcce64c1b58f1eb6e9050d7f8bc94..424a46c6d0b02db4c2b92ee67903f8e4878ef679 100644 (file)
@@ -2,7 +2,7 @@
   include/proto/task.h
   Functions for task management.
 
-  Copyright (C) 2000-2006 Willy Tarreau - w@1wt.eu
+  Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
   
   This library is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
 
 #include <common/config.h>
 #include <common/memory.h>
+#include <common/mini-clist.h>
+#include <common/standard.h>
+
 #include <types/task.h>
 
+extern void *run_queue;
+
+/* needed later */
+void *tree_delete(void *node);
 
 /* puts the task <t> in run queue <q>, and returns <t> */
-static inline struct task *task_wakeup(struct task **q, struct task *t)
+static inline struct task *task_wakeup(struct task *t)
 {
        if (t->state == TASK_RUNNING)
                return t;
-       else {
-               t->rqnext = *q;
-               t->state = TASK_RUNNING;
-               return *q = t;
+
+       if (t->qlist.p != NULL)
+               DLIST_DEL(&t->qlist);
+
+       DLIST_ADD(run_queue, &t->qlist);
+       t->state = TASK_RUNNING;
+
+       if (likely(t->wq)) {
+               tree_delete(t->wq);
+               t->wq = NULL;
        }
+
+       return t;
 }
 
-/* removes the task <t> from the queue <q>
- * <s> MUST be <q>'s first task.
- * set the run queue to point to the next one, and return it
+/* removes the task <t> from the run queue if it was in it.
+ * returns <t>.
  */
-static inline struct task *task_sleep(struct task **q, struct task *t)
+static inline struct task *task_sleep(struct task *t)
 {
        if (t->state == TASK_RUNNING) {
-               *q = t->rqnext;
-               t->state = TASK_IDLE; /* tell that s has left the run queue */
+               DLIST_DEL(&t->qlist);
+               t->qlist.p = NULL;
+               t->state = TASK_IDLE;
        }
-       return *q; /* return next running task */
+       return t;
 }
 
 /*
- * removes the task <t> from its wait queue. It must have already been removed
- * from the run queue. A pointer to the task itself is returned.
+ * unlinks the task from wherever it is queued :
+ *  - eternity_queue, run_queue
+ *  - wait queue : wq not null => remove carrier node too
+ * A pointer to the task itself is returned.
  */
 static inline struct task *task_delete(struct task *t)
 {
-       rb_erase(&t->rb_node, t->wq);
-       t->wq = NULL;
+       if (t->qlist.p != NULL) {
+               DLIST_DEL(&t->qlist);
+               t->qlist.p = NULL;
+       }
+
+       if (t->wq) {
+               tree_delete(t->wq);
+               t->wq = NULL;
+       }
        return t;
 }
 
index d09efae2c8ab5df435856ac2dd58525f45e09034..1fd78be56bb99e8582478ef18318e27359776b4a 100644 (file)
@@ -25,7 +25,8 @@
 #include <sys/time.h>
 
 #include <common/config.h>
-#include <common/rbtree.h>
+#include <common/mini-clist.h>
+#include <import/tree.h>
 
 /* values for task->state */
 #define TASK_IDLE      0
 
 /* The base for all tasks */
 struct task {
-       struct rb_node rb_node;
-       struct rb_root *wq;
-       struct task *rqnext;            /* chaining in run queue ... */
-       int state;                              /* task state : IDLE or RUNNING */
+       struct list qlist;              /* chaining in the same queue; bidirectionnal but not circular */
+       struct ultree *wq;              /* NULL if unqueued, or back ref to the carrier node in the WQ */
+       int state;                      /* task state : IDLE or RUNNING */
        struct timeval expire;          /* next expiration time for this task, use only for fast sorting */
        int (*process)(struct task *t); /* the function which processes the task */
        void *context;                  /* the task's context */
@@ -45,10 +45,6 @@ struct task {
 #define sizeof_task     sizeof(struct task)
 extern void **pool_task;
 
-extern struct rb_root wait_queue[2];
-extern struct task *rq;
-
-
 #endif /* _TYPES_TASK_H */
 
 /*
index 62b096ff48f856605749eee0b5e8a4299c6ec1fa..0adbc332c3ae4d4ec9d2614d57e0b5cc65ac5a42 100644 (file)
@@ -114,7 +114,7 @@ int appsession_task_init(void)
                if ((t = pool_alloc(task)) == NULL)
                        return -1;
                t->wq = NULL;
-               t->rqnext = NULL;
+               t->qlist.p = NULL;
                t->state = TASK_IDLE;
                t->context = NULL;
                tv_delayfrom(&t->expire, &now, TBLCHKINT);
index 0eb97ed066526b35f224d050b432a5f73f3bcecb..f096874f206b09c2132100e888c46a7a49fca0ee 100644 (file)
@@ -572,7 +572,7 @@ int srv_count_retry_down(struct session *t, int conn_err)
                 * we have to inform the server that it may be used by another session.
                 */
                if (may_dequeue_tasks(t->srv, t->be))
-                       task_wakeup(&rq, t->srv->queue_mgt);
+                       task_wakeup(t->srv->queue_mgt);
                return 1;
        }
        return 0;
@@ -611,7 +611,7 @@ int srv_retryable_connect(struct session *t)
                        t->be->failed_conns++;
                        /* release other sessions waiting for this server */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(&rq, t->srv->queue_mgt);
+                               task_wakeup(t->srv->queue_mgt);
                        return 1;
                }
                /* ensure that we have enough retries left */
@@ -625,7 +625,7 @@ int srv_retryable_connect(struct session *t)
         */
        /* let's try to offer this slot to anybody */
        if (may_dequeue_tasks(t->srv, t->be))
-               task_wakeup(&rq, t->srv->queue_mgt);
+               task_wakeup(t->srv->queue_mgt);
 
        if (t->srv)
                t->srv->failed_conns++;
@@ -691,7 +691,7 @@ int srv_redispatch_connect(struct session *t)
 
                /* release other sessions waiting for this server */
                if (may_dequeue_tasks(t->srv, t->be))
-                       task_wakeup(&rq, t->srv->queue_mgt);
+                       task_wakeup(t->srv->queue_mgt);
                return 1;
        }
        /* if we get here, it's because we got SRV_STATUS_OK, which also
index 33670f1556c8ec0fdcbac44e6a5314f4f4eaeb26..289d634112ca7208b69e727e678ba71276a44dcc 100644 (file)
@@ -2415,7 +2415,7 @@ int readcfgfile(const char *file)
                                        return -1;
                                }
                
-                               t->rqnext = NULL;
+                               t->qlist.p = NULL;
                                t->wq = NULL;
                                t->state = TASK_IDLE;
                                t->process = process_srv_queue;
@@ -2463,7 +2463,7 @@ int readcfgfile(const char *file)
                                        }
                
                                        t->wq = NULL;
-                                       t->rqnext = NULL;
+                                       t->qlist.p = NULL;
                                        t->state = TASK_IDLE;
                                        t->process = process_chk;
                                        t->context = newsrv;
index 309d0c4a76902b45be2a78d55131aa625834dc13..f9bc5c5a9c8f4093ffd0ac14d729d2163b87e2d1 100644 (file)
@@ -75,7 +75,7 @@ static void set_server_down(struct server *s)
                                sess->srv = NULL; /* it's left to the dispatcher to choose a server */
                                http_flush_cookie_flags(&sess->txn);
                                pendconn_free(pc);
-                               task_wakeup(&rq, sess->task);
+                               task_wakeup(sess->task);
                                xferred++;
                        }
                }
@@ -167,7 +167,7 @@ static int event_srv_chk_w(int fd)
                }
        }
  out_wakeup:
-       task_wakeup(&rq, t);
+       task_wakeup(t);
  out_nowake:
        EV_FD_CLR(fd, DIR_WR);   /* nothing more to write */
        fdtab[fd].ev &= ~FD_POLL_WR;
@@ -237,7 +237,7 @@ static int event_srv_chk_r(int fd)
 
  out_wakeup:
        EV_FD_CLR(fd, DIR_RD);
-       task_wakeup(&rq, t);
+       task_wakeup(t);
        fdtab[fd].ev &= ~FD_POLL_RD;
        return 1;
 }
@@ -436,7 +436,7 @@ int process_chk(struct task *t)
                                                p->sess->srv = s;
                                                sess = p->sess;
                                                pendconn_free(p);
-                                               task_wakeup(&rq, sess->task);
+                                               task_wakeup(sess->task);
                                        }
 
                                        sprintf(trash,
index 521b8604b345b54284ebb74bcd3b763c6cca1665..028282ef1c3057e8acd9d907739c03eebf817aef 100644 (file)
@@ -152,7 +152,7 @@ int event_accept(int fd) {
                        setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));
 
                t->wq = NULL;
-               t->rqnext = NULL;
+               t->qlist.p = NULL;
                t->state = TASK_IDLE;
                t->process = process_session;
                t->context = s;
@@ -422,7 +422,7 @@ int event_accept(int fd) {
                task_queue(t);
 
                if (p->mode != PR_MODE_HEALTH)
-                       task_wakeup(&rq, t);
+                       task_wakeup(t);
 
                p->feconn++;  /* beconn will be increased later */
                if (p->feconn > p->feconn_max)
index f8946f93680a181246914a0e8492af7df35b7810..5776d184615b10a03361277709c5d3bc1d00d210 100644 (file)
@@ -269,6 +269,7 @@ void sig_dump_state(int sig)
 
 void dump(int sig)
 {
+#if 0
        struct task *t;
        struct session *s;
        struct rb_node *node;
@@ -290,6 +291,7 @@ void dump(int sig)
                         s->req->l, s->rep?s->rep->l:0, s->cli_fd
                         );
        }
+#endif
 }
 
 #ifdef DEBUG_MEMORY
index e9b3b56a30793f034911cf5978273cdaa49be440..00277f5ca7ae05af6fb005bb41dec309be9f9797 100644 (file)
@@ -2353,7 +2353,7 @@ int process_srv(struct session *t)
                                 */
                                /* let's try to offer this slot to anybody */
                                if (may_dequeue_tasks(t->srv, t->be))
-                                       task_wakeup(&rq, t->srv->queue_mgt);
+                                       task_wakeup(t->srv->queue_mgt);
 
                                if (t->srv)
                                        t->srv->failed_conns++;
@@ -2539,7 +2539,7 @@ int process_srv(struct session *t)
                                 * we have to inform the server that it may be used by another session.
                                 */
                                if (t->srv && may_dequeue_tasks(t->srv, t->be))
-                                       task_wakeup(&rq, t->srv->queue_mgt);
+                                       task_wakeup(t->srv->queue_mgt);
 
                                return 1;
                        }
@@ -2581,7 +2581,7 @@ int process_srv(struct session *t)
                                 * we have to inform the server that it may be used by another session.
                                 */
                                if (t->srv && may_dequeue_tasks(t->srv, t->be))
-                                       task_wakeup(&rq, t->srv->queue_mgt);
+                                       task_wakeup(t->srv->queue_mgt);
                                return 1;
                        }
 
@@ -2753,7 +2753,7 @@ int process_srv(struct session *t)
                                         * we have to inform the server that it may be used by another session.
                                         */
                                        if (t->srv && may_dequeue_tasks(t->srv, cur_proxy))
-                                               task_wakeup(&rq, t->srv->queue_mgt);
+                                               task_wakeup(t->srv->queue_mgt);
                                        return 1;
                                }
                        }
@@ -2988,7 +2988,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(&rq, t->srv->queue_mgt);
+                               task_wakeup(t->srv->queue_mgt);
 
                        return 1;
                }
@@ -3101,7 +3101,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(&rq, t->srv->queue_mgt);
+                               task_wakeup(t->srv->queue_mgt);
 
                        return 1;
                }
@@ -3117,7 +3117,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(&rq, t->srv->queue_mgt);
+                               task_wakeup(t->srv->queue_mgt);
 
                        return 1;
                }
@@ -3137,7 +3137,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(&rq, t->srv->queue_mgt);
+                               task_wakeup(t->srv->queue_mgt);
 
                        return 1;
                }
@@ -3182,7 +3182,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(&rq, t->srv->queue_mgt);
+                               task_wakeup(t->srv->queue_mgt);
 
                        return 1;
                }
@@ -3198,7 +3198,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(&rq, t->srv->queue_mgt);
+                               task_wakeup(t->srv->queue_mgt);
 
                        return 1;
                }
@@ -3218,7 +3218,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(&rq, t->srv->queue_mgt);
+                               task_wakeup(t->srv->queue_mgt);
 
                        return 1;
                }
index 54fd6a3b289a1af182d51e87278eee45a9a678e3..9b53687de10b5aedffbb8276aa39a5517e840b81 100644 (file)
@@ -62,7 +62,7 @@ int process_srv_queue(struct task *t)
                sess = pendconn_get_next_sess(s, p);
                if (sess == NULL)
                        break;
-               task_wakeup(&rq, sess->task);
+               task_wakeup(sess->task);
        }
 
        return TIME_ETERNITY;
index 2a7275baa50b6d8ca8c8b2852698827903126d51..77eda02c83effb01cc54cb2a835c7be15fbeed6a 100644 (file)
@@ -156,7 +156,7 @@ int stream_sock_read(int fd) {
                else
                        tv_eternity(&b->rex);
        
-               task_wakeup(&rq, fdtab[fd].owner);
+               task_wakeup(fdtab[fd].owner);
        }
 
        fdtab[fd].ev &= ~FD_POLL_RD;
@@ -291,7 +291,7 @@ int stream_sock_write(int fd) {
                }
        }
 
-       task_wakeup(&rq, fdtab[fd].owner);
+       task_wakeup(fdtab[fd].owner);
        fdtab[fd].ev &= ~FD_POLL_WR;
        return retval;
 }
index ca262d634e5a4df0af8f2611b1f3513a16808350..a0bf7af60bf193fd3f604d74c49b3c84fa5de69a 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Task management functions.
  *
- * Copyright 2000-2006 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
 #include <common/config.h>
 #include <common/mini-clist.h>
 #include <common/time.h>
+#include <common/standard.h>
 
 #include <proto/task.h>
+#include <types/task.h>
+
+// FIXME: check 8bitops.c for faster FLS
+#include <import/bitops.h>
+#include <import/tree.h>
 
 
 /* FIXME : this should be removed very quickly ! */
 extern int maintain_proxies(void);
 
 void **pool_task= NULL;
-struct task *rq = NULL;                /* global run queue */
-
-struct rb_root wait_queue[2] = {
-       RB_ROOT,
-       RB_ROOT,
-};
+void **pool_tree64 = NULL;
+static struct ultree *stack[LLONGBITS];
 
+UL2TREE_HEAD(timer_wq);
+void *eternity_queue = NULL;
+void *run_queue = NULL;
 
-static inline void __rb_insert_task_queue(struct task *newtask)
+struct ultree *ul2tree_insert(struct ultree *root, unsigned long h, unsigned long l)
 {
-       struct rb_node **p = &newtask->wq->rb_node;
-       struct rb_node *parent = NULL;
-       struct task * task;
-
-       while (*p)
-       {
-               parent = *p;
-               task = rb_entry(parent, struct task, rb_node);
-               if (tv_cmp_ge2(&task->expire, &newtask->expire))
-                       p = &(*p)->rb_left;
-               else
-                       p = &(*p)->rb_right;
-       }
-       rb_link_node(&newtask->rb_node, parent, p);
+       return __ul2tree_insert(root, h, l);
 }
 
-static void rb_insert_task_queue(struct task *newtask)
-{
-       __rb_insert_task_queue(newtask);
-       rb_insert_color(&newtask->rb_node, newtask->wq);
+void *tree_delete(void *node) {
+    return __tree_delete(node);
 }
 
-
+/*
+ * task_queue()
+ *
+ * Inserts a task into the wait queue at the position given by its expiration
+ * date.
+ *
+ */
 struct task *task_queue(struct task *task)
 {
-       struct rb_node *node;
-       struct task *next, *prev;
-
-       if (tv_iseternity(&task->expire)) {
-               if (task->wq) {
-                       if (task->wq == &wait_queue[1])
-                               return task;
-                       else
-                               task_delete(task);
-               }
-               task->wq = &wait_queue[1];
-               rb_insert_task_queue(task);
+       if (unlikely(task->qlist.p != NULL)) {
+               DLIST_DEL(&task->qlist);
+               task->qlist.p = NULL;
+       }
+
+       if (unlikely(task->wq)) {
+               tree_delete(task->wq);
+               task->wq = NULL;
+       }
+
+       if (unlikely(tv_iseternity(&task->expire))) {
+               task->wq = NULL;
+               DLIST_ADD(eternity_queue, &task->qlist);
                return task;
-       } else {
-               if (task->wq != &wait_queue[0]) {
-                       if (task->wq)
-                               task_delete(task);
-                       task->wq = &wait_queue[0];
-                       rb_insert_task_queue(task);
-                       return task;
-               }
+       }
+
+       task->wq = ul2tree_insert(&timer_wq, task->expire.tv_sec, task->expire.tv_usec);
+       DLIST_ADD(task->wq->data, &task->qlist);
+       return task;
+}
+
+
+/*
+ * Extract all expired timers from the wait queue, and wakes up all
+ * associated tasks.
+ * Returns the time to wait for next task (next_time).
+ *
+ * FIXME: Use an alternative queue for ETERNITY tasks.
+ *
+ */
+int wake_expired_tasks()
+{
+       int slen;
+       struct task *task;
+       void *data;
+       int next_time;
+
+       /*
+        * Hint: tasks are *rarely* expired. So we can try to optimize
+        * by not scanning the tree at all in most cases.
+        */
+
+       if (likely(timer_wq.data != NULL)) {
+               task = LIST_ELEM(timer_wq.data, struct task *, qlist);
+               if (likely(tv_cmp_ge(&task->expire, &now) > 0))
+                       return tv_remain(&now, &task->expire);
+       }
+
+       /* OK we lose. Let's scan the tree then. */
+       next_time = TIME_ETERNITY;
 
-               // check whether task should be re insert
-               node = rb_prev(&task->rb_node);
-               if (node) {
-                       prev = rb_entry(node, struct task, rb_node);
-                       if (tv_cmp_ge(&prev->expire, &task->expire)) {
-                               task_delete(task);
-                               task->wq = &wait_queue[0];
-                               rb_insert_task_queue(task);
-                               return task;
-                       }
+       tree64_foreach(&timer_wq, data, stack, slen) {
+               task = LIST_ELEM(data, struct task *, qlist);
+
+               if (unlikely(tv_cmp_ge(&task->expire, &now) > 0)) {
+                       next_time = tv_remain(&now, &task->expire);
+                       break;
                }
 
-               node = rb_next(&task->rb_node);
-               if (node) {
-                       next = rb_entry(node, struct task, rb_node);
-                       if (tv_cmp_ge(&task->expire, &next->expire)) {
-                               task_delete(task);
-                               task->wq = &wait_queue[0];
-                               rb_insert_task_queue(task);
-                               return task;
-                       }
+               /*
+                * OK, all tasks linked to this node will be unlinked, as well
+                * as the node itself, so we do not need to care about correct
+                * unlinking.
+                */
+               foreach_dlist_item(task, data, struct task *, qlist) {
+                       DLIST_DEL(&task->qlist);
+                       task->wq = NULL;
+                       DLIST_ADD(run_queue, &task->qlist);
+                       task->state = TASK_RUNNING;
                }
-               return task;
        }
+       return next_time;
 }
 
 /*
@@ -117,35 +139,23 @@ int process_runnable_tasks()
        int next_time;
        int time2;
        struct task *t;
-       struct rb_node *node;
-
-       next_time = TIME_ETERNITY;
-       for (node = rb_first(&wait_queue[0]);
-               node != NULL; node = rb_next(node)) {
-               t = rb_entry(node, struct task, rb_node);
-               if (t->state & TASK_RUNNING)
-                       continue;
-               if (tv_iseternity(&t->expire))
-                       continue;
-               if (tv_cmp_ms(&t->expire, &now) <= 0) {
-                       task_wakeup(&rq, t);
-               } else {
-                       int temp_time = tv_remain(&now, &t->expire);
-                       if (temp_time)
-                               next_time = temp_time;
-                       break;
-               }
-       }
+       void *queue;
 
+       next_time = wake_expired_tasks();
        /* process each task in the run queue now. Each task may be deleted
         * since we only use the run queue's head. Note that any task can be
         * woken up by any other task and it will be processed immediately
         * after as it will be queued on the run queue's head !
         */
-       while ((t = rq) != NULL) {
+
+       queue = run_queue;
+       foreach_dlist_item(t, queue, struct task *, qlist) {
                int temp_time;
 
-               task_sleep(&rq, t);
+               DLIST_DEL(&t->qlist);
+               t->qlist.p = NULL;
+
+               t->state = TASK_IDLE;
                temp_time = t->process(t);
                next_time = MINTIME(temp_time, next_time);
        }