From: Willy Tarreau Date: Sun, 8 Mar 2009 08:38:41 +0000 (+0100) Subject: [MEDIUM] minor update to the task api: let the scheduler queue itself X-Git-Tag: v1.3.16-rc1~13 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=26c250683f29c74b347d05609a9cff2785106893;p=thirdparty%2Fhaproxy.git [MEDIUM] minor update to the task api: let the scheduler queue itself All the tasks callbacks had to requeue the task themselves, and update a global timeout. This was not convenient at all. Now the API has been simplified. The tasks callbacks only have to update their expire timer, and return either a pointer to the task or NULL if the task has been deleted. The scheduler will take care of requeuing the task at the proper place in the wait queue. --- diff --git a/include/common/appsession.h b/include/common/appsession.h index 616766f8e2..6c129261ca 100644 --- a/include/common/appsession.h +++ b/include/common/appsession.h @@ -38,7 +38,7 @@ int match_str(const void *key1, const void *key2); /* Callback for destroy */ void destroy(appsess *data); -void appsession_refresh(struct task *t, int *next); +struct task *appsession_refresh(struct task *t); int appsession_task_init(void); int appsession_init(void); void appsession_cleanup(void); diff --git a/include/proto/checks.h b/include/proto/checks.h index 8499175714..6f0aa8b3b9 100644 --- a/include/proto/checks.h +++ b/include/proto/checks.h @@ -2,7 +2,7 @@ include/proto/checks.h Functions prototypes for the checks. - Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu + Copyright (C) 2000-2009 Willy Tarreau - w@1wt.eu This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -25,7 +25,7 @@ #include #include -void process_chk(struct task *t, struct timeval *next); +struct task *process_chk(struct task *t); int start_checks(); #endif /* _PROTO_CHECKS_H */ diff --git a/include/proto/proto_uxst.h b/include/proto/proto_uxst.h index e7709420f2..bf487b6a91 100644 --- a/include/proto/proto_uxst.h +++ b/include/proto/proto_uxst.h @@ -28,8 +28,7 @@ int uxst_event_accept(int fd); void uxst_add_listener(struct listener *listener); -void process_uxst_stats(struct task *t, int *next); -void uxst_process_session(struct task *t, int *next); +struct task *uxst_process_session(struct task *t); #endif /* _PROTO_PROTO_UXST_H */ diff --git a/include/proto/session.h b/include/proto/session.h index ad0a57ea34..7cc50f6b01 100644 --- a/include/proto/session.h +++ b/include/proto/session.h @@ -36,7 +36,7 @@ int init_session(); void session_process_counters(struct session *s); void sess_change_server(struct session *sess, struct server *newsrv); -void process_session(struct task *t, int *next); +struct task *process_session(struct task *t); static void inline trace_term(struct session *s, unsigned int code) { diff --git a/include/types/protocols.h b/include/types/protocols.h index 5c3b60847e..e91fdb3ade 100644 --- a/include/types/protocols.h +++ b/include/types/protocols.h @@ -85,7 +85,7 @@ struct listener { struct listener *next; /* next address for the same proxy, or NULL */ struct list proto_list; /* list in the protocol header */ int (*accept)(int fd); /* accept() function passed to fdtab[] */ - void (*handler)(struct task *t, int *next); /* protocol handler */ + struct task * (*handler)(struct task *t); /* protocol handler. It is a task */ int *timeout; /* pointer to client-side timeout */ void *private; /* any private data which may be used by accept() */ unsigned int analysers; /* bitmap of required protocol analysers */ diff --git a/include/types/task.h b/include/types/task.h index 4302ec092a..1cc12a9f8f 100644 --- a/include/types/task.h +++ b/include/types/task.h @@ -50,11 +50,18 @@ struct task { struct eb32_node rq; /* ebtree node used to hold the task in the run queue */ int state; /* task state : bit field of TASK_* */ unsigned int expire; /* next expiration time for this task */ - void (*process)(struct task *t, int *next); /* the function which processes the task */ + struct task * (*process)(struct task *t); /* the function which processes the task */ void *context; /* the task's context */ int nice; /* the task's current nice value from -1024 to +1024 */ }; +/* + * The task callback (->process) is responsible for updating ->expire. It must + * return a pointer to the task itself, except if the task has been deleted, in + * which case it returns NULL so that the scheduler knows it must not check the + * expire timer. The scheduler will requeue the task at the proper location. + */ + #endif /* _TYPES_TASK_H */ /* diff --git a/src/appsession.c b/src/appsession.c index 45050b5513..a1c1c886aa 100644 --- a/src/appsession.c +++ b/src/appsession.c @@ -2,7 +2,7 @@ * AppSession functions. * * Copyright 2004-2006 Alexander Lazic, Klaus Wagner - * Copyright 2006-2007 Willy Tarreau + * Copyright 2006-2009 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -100,7 +100,7 @@ int appsession_task_init(void) return 0; } -void appsession_refresh(struct task *t, int *next) +struct task *appsession_refresh(struct task *t) { struct proxy *p = proxy; struct appsession_hash *htbl; @@ -131,8 +131,7 @@ void appsession_refresh(struct task *t, int *next) p = p->next; } t->expire = tick_add(now_ms, MS_TO_TICKS(TBLCHKINT)); /* check expiration every 5 seconds */ - task_queue(t); - *next = t->expire; + return t; } /* end appsession_refresh */ int match_str(const void *key1, const void *key2) diff --git a/src/checks.c b/src/checks.c index 37e0c29616..f1e8120a79 100644 --- a/src/checks.c +++ b/src/checks.c @@ -1,7 +1,7 @@ /* * Health-checks functions. * - * Copyright 2000-2008 Willy Tarreau + * Copyright 2000-2009 Willy Tarreau * Copyright 2007-2008 Krzysztof Piotr Oledzki * * This program is free software; you can redistribute it and/or @@ -522,9 +522,8 @@ static int event_srv_chk_r(int fd) * manages a server health-check. Returns * the time the task accepts to wait, or TIME_ETERNITY for infinity. */ -void process_chk(struct task *t, int *next) +struct task *process_chk(struct task *t) { - __label__ new_chk, out; struct server *s = t->context; struct sockaddr_in sa; int fd; @@ -536,11 +535,8 @@ void process_chk(struct task *t, int *next) fd = s->curfd; if (fd < 0) { /* no check currently running */ //fprintf(stderr, "process_chk: 2\n"); - if (!tick_is_expired(t->expire, now_ms)) { /* not good time yet */ - task_queue(t); /* restore t to its place in the task list */ - *next = t->expire; - goto out; - } + if (!tick_is_expired(t->expire, now_ms)) /* woke up too early */ + return t; /* we don't send any health-checks when the proxy is stopped or when * the server should not be checked. @@ -548,9 +544,7 @@ void process_chk(struct task *t, int *next) if (!(s->state & SRV_CHECKED) || s->proxy->state == PR_STSTOPPED) { while (tick_is_expired(t->expire, now_ms)) t->expire = tick_add(t->expire, MS_TO_TICKS(s->inter)); - task_queue(t); /* restore t to its place in the task list */ - *next = t->expire; - goto out; + return t; } /* we'll initiate a new check */ @@ -674,10 +668,7 @@ void process_chk(struct task *t, int *next) int t_con = tick_add(now_ms, s->proxy->timeout.connect); t->expire = tick_first(t->expire, t_con); } - - task_queue(t); /* restore t to its place in the task list */ - *next = t->expire; - return; + return t; } else if (errno != EALREADY && errno != EISCONN && errno != EAGAIN) { s->result |= SRV_CHK_ERROR; /* a real error */ @@ -797,10 +788,7 @@ void process_chk(struct task *t, int *next) } //fprintf(stderr, "process_chk: 11\n"); s->result = SRV_CHK_UNKNOWN; - task_queue(t); /* restore t to its place in the task list */ - *next = t->expire; - out: - return; + return t; } /* diff --git a/src/proto_uxst.c b/src/proto_uxst.c index 4b24209a64..6351ef109d 100644 --- a/src/proto_uxst.c +++ b/src/proto_uxst.c @@ -708,7 +708,7 @@ int uxst_req_analyser_stats(struct session *s, struct buffer *req) * still exists but remains in SI_ST_INI state forever, so that any call is a * NOP. */ -void uxst_process_session(struct task *t, int *next) +struct task *uxst_process_session(struct task *t) { struct session *s = t->context; int resync; @@ -969,11 +969,7 @@ void uxst_process_session(struct task *t, int *next) if (s->si[0].exp) t->expire = tick_first(t->expire, s->si[0].exp); - /* restore t to its place in the task list */ - task_queue(t); - - *next = t->expire; - return; /* nothing more to do */ + return t; } actconn--; @@ -988,10 +984,10 @@ void uxst_process_session(struct task *t, int *next) } /* the task MUST not be in the run queue anymore */ - task_delete(t); session_free(s); + task_delete(t); task_free(t); - *next = TICK_ETERNITY; + return NULL; } __attribute__((constructor)) diff --git a/src/session.c b/src/session.c index bcfa5c12e1..7d7b4b6d4b 100644 --- a/src/session.c +++ b/src/session.c @@ -554,7 +554,7 @@ static void sess_prepare_conn_req(struct session *s, struct stream_interface *si * and each function is called only if at least another function has changed at * least one flag it is interested in. */ -void process_session(struct task *t, int *next) +struct task *process_session(struct task *t) { struct session *s = t->context; int resync; @@ -1029,16 +1029,13 @@ resync_stream_interface: fprintf(stderr, "[%u] queuing with exp=%u req->rex=%u req->wex=%u req->ana_exp=%u rep->rex=%u rep->wex=%u, cs=%d, ss=%d\n", now_ms, t->expire, s->req->rex, s->req->wex, s->req->analyse_exp, s->rep->rex, s->rep->wex, s->si[0].state, s->si[1].state); #endif - /* restore t to its place in the task list */ - task_queue(t); #ifdef DEBUG_DEV /* this may only happen when no timeout is set or in case of an FSM bug */ if (!t->expire) ABORT_NOW(); #endif - *next = t->expire; - return; /* nothing more to do */ + return t; /* nothing more to do */ } s->fe->feconn--; @@ -1066,10 +1063,10 @@ resync_stream_interface: } /* the task MUST not be in the run queue anymore */ - task_delete(t); session_free(s); + task_delete(t); task_free(t); - *next = TICK_ETERNITY; + return NULL; } /* diff --git a/src/task.c b/src/task.c index fde9112211..844862e3d4 100644 --- a/src/task.c +++ b/src/task.c @@ -295,11 +295,11 @@ void wake_expired_tasks(int *next) */ void process_runnable_tasks(int *next) { - int temp; struct task *t; struct eb32_node *eb; unsigned int tree, stop; unsigned int max_processed; + int expire; if (!run_queue) return; @@ -315,6 +315,7 @@ void process_runnable_tasks(int *next) stop = (tree + TIMER_TREES / 2) & TIMER_TREE_MASK; tree = (tree - 1) & TIMER_TREE_MASK; + expire = *next; do { eb = eb32_first(&rqueue[tree]); while (eb) { @@ -325,15 +326,19 @@ void process_runnable_tasks(int *next) __task_unlink_rq(t); t->state |= TASK_RUNNING; - t->process(t, &temp); - t->state &= ~TASK_RUNNING; - *next = tick_first(*next, temp); + if (likely(t->process(t) != NULL)) { + t->state &= ~TASK_RUNNING; + expire = tick_first(expire, t->expire); + task_queue(t); + } if (!--max_processed) - return; + goto out; } tree = (tree + 1) & TIMER_TREE_MASK; } while (tree != stop); + out: + *next = expire; } /* perform minimal intializations, report 0 in case of error, 1 if OK. */