]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
[MAJOR] replace the wait-queue linked list with an rbtree.
authorWilly Tarreau <w@1wt.eu>
Sat, 6 Jan 2007 23:38:00 +0000 (00:38 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 7 Jan 2007 01:14:23 +0000 (02:14 +0100)
This patch from Sin Yu makes use of an rbtree for the wait queue,
which will solve the slowdown problem encountered when timeouts
are heterogenous in the configuration. The next step will be to
turn maintain_proxies() into a per-proxy task so that we won't
have to scan them all after each poll() loop.

Makefile
Makefile.bsd
include/proto/task.h
include/types/task.h
src/appsession.c
src/cfgparse.c
src/client.c
src/haproxy.c
src/task.c

index 195e6a9fca195cfc73fdfd407e4ad52f0d288836..5e23a0593b23b36372fc0086e80cd959bdd3ac26 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -182,7 +182,7 @@ OBJS = src/haproxy.o src/list.o src/chtbl.o src/hashpjw.o src/base64.o \
        src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \
        src/checks.o src/queue.o src/capture.o src/client.o src/proxy.o \
        src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \
-       src/session.o src/hdr_idx.o
+       src/session.o src/hdr_idx.o src/rbtree.o
 
 haproxy: $(OBJS)
        $(LD) $(LDFLAGS) -o $@ $^ $(LIBS)
index 7cde1949cab75255dce87f3f92ffda33e8f84a9b..40229fab16ef30509c6c13603248367aafeace13 100644 (file)
@@ -87,7 +87,7 @@ OBJS = src/haproxy.o src/list.o src/chtbl.o src/hashpjw.o src/base64.o \
        src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \
        src/checks.o src/queue.o src/capture.o src/client.o src/proxy.o \
        src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \
-       src/session.o src/hdr_idx.o
+       src/session.o src/hdr_idx.o src/rbtree.o
 
 all: haproxy
 
index 70abb8296fc039c5ada3065c7f4cbac7657f98be..8bd973a513fbcce64c1b58f1eb6e9050d7f8bc94 100644 (file)
@@ -61,8 +61,8 @@ static inline struct task *task_sleep(struct task **q, struct task *t)
  */
 static inline struct task *task_delete(struct task *t)
 {
-       t->prev->next = t->next;
-       t->next->prev = t->prev;
+       rb_erase(&t->rb_node, t->wq);
+       t->wq = NULL;
        return t;
 }
 
index 6b1df226cdbbe94ca5d13adbc95c90eeb65ce1ef..d09efae2c8ab5df435856ac2dd58525f45e09034 100644 (file)
@@ -25,6 +25,7 @@
 #include <sys/time.h>
 
 #include <common/config.h>
+#include <common/rbtree.h>
 
 /* values for task->state */
 #define TASK_IDLE      0
@@ -32,9 +33,9 @@
 
 /* The base for all tasks */
 struct task {
-       struct task *next, *prev;               /* chaining ... */
+       struct rb_node rb_node;
+       struct rb_root *wq;
        struct task *rqnext;            /* chaining in run queue ... */
-       struct task *wq;                        /* the wait queue this task is in */
        int state;                              /* task state : IDLE or RUNNING */
        struct timeval expire;          /* next expiration time for this task, use only for fast sorting */
        int (*process)(struct task *t); /* the function which processes the task */
@@ -44,7 +45,7 @@ struct task {
 #define sizeof_task     sizeof(struct task)
 extern void **pool_task;
 
-extern struct task wait_queue[2];
+extern struct rb_root wait_queue[2];
 extern struct task *rq;
 
 
index a63116d9cb94c0e5e24a240ba0281e286831c3ea..62b096ff48f856605749eee0b5e8a4299c6ec1fa 100644 (file)
@@ -113,8 +113,8 @@ int appsession_task_init(void)
        if (!initialized) {
                if ((t = pool_alloc(task)) == NULL)
                        return -1;
-               t->next = t->prev = t->rqnext = NULL;
-               t->wq = LIST_HEAD(wait_queue[0]);
+               t->wq = NULL;
+               t->rqnext = NULL;
                t->state = TASK_IDLE;
                t->context = NULL;
                tv_delayfrom(&t->expire, &now, TBLCHKINT);
index 5017d50d01ada488a9cf1695cfcd1429008f3ada..7b517d7716dad6c3311734eb36f2d6ff35136b7e 100644 (file)
@@ -2293,8 +2293,8 @@ int readcfgfile(const char *file)
                                        return -1;
                                }
                
-                               t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */
-                               t->wq = LIST_HEAD(wait_queue[1]); /* already assigned to the eternity queue */
+                               t->rqnext = NULL;
+                               t->wq = NULL;
                                t->state = TASK_IDLE;
                                t->process = process_srv_queue;
                                t->context = newsrv;
@@ -2340,8 +2340,8 @@ int readcfgfile(const char *file)
                                                return -1;
                                        }
                
-                                       t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */
-                                       t->wq = LIST_HEAD(wait_queue[0]); /* but already has a wait queue assigned */
+                                       t->wq = NULL;
+                                       t->rqnext = NULL;
                                        t->state = TASK_IDLE;
                                        t->process = process_chk;
                                        t->context = newsrv;
index f0e698d4c6a531b611790fd207aaacd4e7f39686..a8aad8e3cee6b3bbcb1d55e676a5255a6a3d3552 100644 (file)
@@ -150,8 +150,8 @@ int event_accept(int fd) {
                if (p->options & PR_O_TCP_CLI_KA)
                        setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));
 
-               t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */
-               t->wq = LIST_HEAD(wait_queue[0]); /* but already has a wait queue assigned */
+               t->wq = NULL;
+               t->rqnext = NULL;
                t->state = TASK_IDLE;
                t->process = process_session;
                t->context = s;
index 047136422a3921e3c58d79538479eb1fff3faea7..4a303573007ecc4ad672e99dcae5af759fddea7c 100644 (file)
@@ -255,12 +255,13 @@ void sig_dump_state(int sig)
 
 void dump(int sig)
 {
-       struct task *t, *tnext;
+       struct task *t;
        struct session *s;
+       struct rb_node *node;
 
-       tnext = ((struct task *)LIST_HEAD(wait_queue[0]))->next;
-       while ((t = tnext) != LIST_HEAD(wait_queue[0])) { /* we haven't looped ? */
-               tnext = t->next;
+       for(node = rb_first(&wait_queue[0]);
+               node != NULL; node = rb_next(node)) {
+               t = rb_entry(node, struct task, rb_node);
                s = t->context;
                qfprintf(stderr,"[dump] wq: task %p, still %ld ms, "
                         "cli=%d, srv=%d, cr=%d, cw=%d, sr=%d, sw=%d, "
index 1f567b9240f6fadcbb977613560aa436f8e9c5db..beddb27e3d98e644f0277c41063e49aa0061e688 100644 (file)
@@ -22,112 +22,86 @@ extern int maintain_proxies(void);
 
 void **pool_task= NULL;
 struct task *rq = NULL;                /* global run queue */
-struct task wait_queue[2] = {  /* global wait queue */
-    {
-       prev:LIST_HEAD(wait_queue[0]),  /* expirable tasks */
-       next:LIST_HEAD(wait_queue[0]),
-    },
-    {
-       prev:LIST_HEAD(wait_queue[1]),  /* non-expirable tasks */
-       next:LIST_HEAD(wait_queue[1]),
-    },
+
+struct rb_root wait_queue[2] = {
+       RB_ROOT,
+       RB_ROOT,
 };
 
 
-/* inserts <task> into its assigned wait queue, where it may already be. In this case, it
- * may be only moved or left where it was, depending on its timing requirements.
- * <task> is returned.
- */
+static inline void __rb_insert_task_queue(struct task *newtask)
+{
+       struct rb_node **p = &newtask->wq->rb_node;
+       struct rb_node *parent = NULL;
+       struct task * task;
+
+       while (*p)
+       {
+               parent = *p;
+               task = rb_entry(parent, struct task, rb_node);
+               if (tv_cmp2(&task->expire, &newtask->expire) >= 0)
+                       p = &(*p)->rb_left;
+               else
+                       p = &(*p)->rb_right;
+       }
+       rb_link_node(&newtask->rb_node, parent, p);
+}
+
+static inline void rb_insert_task_queue(struct task *newtask)
+{
+       __rb_insert_task_queue(newtask);
+       rb_insert_color(&newtask->rb_node, newtask->wq);
+}
+
+
 struct task *task_queue(struct task *task)
 {
-       struct task *list = task->wq;
-       struct task *start_from;
+       struct rb_node *node;
+       struct task *next, *prev;
 
-       /* This is a very dirty hack to queue non-expirable tasks in another queue
-        * in order to avoid pulluting the tail of the standard queue. This will go
-        * away with the new O(log(n)) scheduler anyway.
-        */
        if (tv_iseternity(&task->expire)) {
-               /* if the task was queued in the standard wait queue, we must dequeue it */
-               if (task->prev) {
-                       if (task->wq == LIST_HEAD(wait_queue[1]))
+               if (task->wq) {
+                       if (task->wq == &wait_queue[1])
                                return task;
-                       else {
+                       else
                                task_delete(task);
-                               task->prev = NULL;
-                       }
                }
-               list = task->wq = LIST_HEAD(wait_queue[1]);
+               task->wq = &wait_queue[1];
+               rb_insert_task_queue(task);
+               return task;
        } else {
-               /* if the task was queued in the eternity queue, we must dequeue it */
-               if (task->prev && (task->wq == LIST_HEAD(wait_queue[1]))) {
-                       task_delete(task);
-                       task->prev = NULL;
-                       list = task->wq = LIST_HEAD(wait_queue[0]);
-               }
-       }
-
-       /* next, test if the task was already in a list */
-       if (task->prev == NULL) {
-               //      start_from = list;
-               start_from = list->prev;
-               /* insert the unlinked <task> into the list, searching back from the last entry */
-               while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) {
-                       start_from = start_from->prev;
-               }
-       
-               //        while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) {
-               //            start_from = start_from->next;
-               //            stats_tsk_nsrch++;
-               //        }
-       }       
-       else if (task->prev == list ||
-                tv_cmp2(&task->expire, &task->prev->expire) >= 0) { /* walk right */
-               start_from = task->next;
-               if (start_from == list || tv_cmp2(&task->expire, &start_from->expire) <= 0) {
-                       return task; /* it's already in the right place */
+               if (task->wq != &wait_queue[0]) {
+                       if (task->wq)
+                               task_delete(task);
+                       task->wq = &wait_queue[0];
+                       rb_insert_task_queue(task);
+                       return task;
                }
 
-               /* if the task is not at the right place, there's little chance that
-                * it has only shifted a bit, and it will nearly always be queued
-                * at the end of the list because of constant timeouts
-                * (observed in real case).
-                */
-#ifndef WE_REALLY_THINK_THAT_THIS_TASK_MAY_HAVE_SHIFTED
-               start_from = list->prev; /* assume we'll queue to the end of the list */
-               while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) {
-                       start_from = start_from->prev;
-               }
-#else /* WE_REALLY_... */
-               /* insert the unlinked <task> into the list, searching after position <start_from> */
-               while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) {
-                       start_from = start_from->next;
+               // check whether task should be re insert
+               node = rb_prev(&task->rb_node);
+               if (node) {
+                       prev = rb_entry(node, struct task, rb_node);
+                       if (tv_cmp2(&prev->expire, &task->expire) >= 0) {
+                               task_delete(task);
+                               task->wq = &wait_queue[0];
+                               rb_insert_task_queue(task);
+                               return task;
+                       }
                }
-#endif /* WE_REALLY_... */
 
-               /* we need to unlink it now */
-               task_delete(task);
-       }
-       else { /* walk left. */
-#ifdef LEFT_TO_TOP     /* not very good */
-               start_from = list;
-               while (start_from->next != list && tv_cmp2(&task->expire, &start_from->next->expire) > 0) {
-                       start_from = start_from->next;
-               }
-#else
-               start_from = task->prev->prev; /* valid because of the previous test above */
-               while (start_from != list && tv_cmp2(&task->expire, &start_from->expire) < 0) {
-                       start_from = start_from->prev;
+               node = rb_next(&task->rb_node);
+               if (node) {
+                       next = rb_entry(node, struct task, rb_node);
+                       if (tv_cmp2(&task->expire, &next->expire) > 0) {
+                               task_delete(task);
+                               task->wq = &wait_queue[0];
+                               rb_insert_task_queue(task);
+                               return task;
+                       }
                }
-#endif
-               /* we need to unlink it now */
-               task_delete(task);
+               return task;
        }
-       task->prev = start_from;
-       task->next = start_from->next;
-       task->next->prev = task;
-       start_from->next = task;
-       return task;
 }
 
 /*
@@ -136,37 +110,26 @@ struct task *task_queue(struct task *task)
  *   - call all runnable tasks
  *   - call maintain_proxies() to enable/disable the listeners
  *   - return the delay till next event in ms, -1 = wait indefinitely
- * Note: this part should be rewritten with the O(ln(n)) scheduler.
  *
  */
-
 int process_runnable_tasks()
 {
        int next_time;
        int time2;
-       struct task *t, *tnext;
+       struct task *t;
+       struct rb_node *node;
 
-       next_time = TIME_ETERNITY; /* set the timer to wait eternally first */
-
-       /* look for expired tasks and add them to the run queue.
-        */
-       tnext = ((struct task *)LIST_HEAD(wait_queue[0]))->next;
-       while ((t = tnext) != LIST_HEAD(wait_queue[0])) { /* we haven't looped ? */
-               tnext = t->next;
+       next_time = TIME_ETERNITY;
+       for (node = rb_first(&wait_queue[0]);
+               node != NULL; node = rb_next(node)) {
+               t = rb_entry(node, struct task, rb_node);
                if (t->state & TASK_RUNNING)
                        continue;
-      
                if (tv_iseternity(&t->expire))
                        continue;
-
-               /* wakeup expired entries. It doesn't matter if they are
-                * already running because of a previous event
-                */
                if (tv_cmp_ms(&t->expire, &now) <= 0) {
                        task_wakeup(&rq, t);
-               }
-               else {
-                       /* first non-runnable task. Use its expiration date as an upper bound */
+               } else {
                        int temp_time = tv_remain(&now, &t->expire);
                        if (temp_time)
                                next_time = temp_time;
@@ -177,7 +140,7 @@ int process_runnable_tasks()
        /* process each task in the run queue now. Each task may be deleted
         * since we only use the run queue's head. Note that any task can be
         * woken up by any other task and it will be processed immediately
-        * after as it will be queued on the run queue's head.
+        * after as it will be queued on the run queue's head !
         */
        while ((t = rq) != NULL) {
                int temp_time;
@@ -186,13 +149,14 @@ int process_runnable_tasks()
                temp_time = t->process(t);
                next_time = MINTIME(temp_time, next_time);
        }
-  
-       /* maintain all proxies in a consistent state. This should quickly become a task */
+
+       /* maintain all proxies in a consistent state. This should quickly
+        * become a task because it becomes expensive when there are huge
+        * numbers of proxies. */
        time2 = maintain_proxies();
        return MINTIME(time2, next_time);
 }
 
-
 /*
  * Local variables:
  *  c-indent-level: 8