From: Willy Tarreau Date: Sat, 29 Jul 2006 14:59:06 +0000 (+0200) Subject: [MEDIUM] started the changes towards I/O completion callbacks X-Git-Tag: v1.3.2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5446940e37c5cbf905421b69bdab71a06a38d449;p=thirdparty%2Fhaproxy.git [MEDIUM] started the changes towards I/O completion callbacks Now the event_* functions find their buffer in the fdtab itself. --- diff --git a/include/proto/buffers.h b/include/proto/buffers.h index 29a2abc8be..6dbc9569ad 100644 --- a/include/proto/buffers.h +++ b/include/proto/buffers.h @@ -25,6 +25,16 @@ #include #include +/* Initializes all fields in the buffer. The ->rlim field is initialized last + * so that the compiler can optimize it away if changed immediately after the + * call to this function. + */ +static inline void buffer_init(struct buffer *buf) +{ + buf->l = buf->total = buf->flags = 0; + buf->rlim = buf->h = buf->r = buf->lr = buf->w = buf->data; +} + /* returns 1 if the buffer is empty, 0 otherwise */ static inline int buffer_isempty(struct buffer *buf) { diff --git a/include/types/buffers.h b/include/types/buffers.h index b5bd661cb1..bc8e8f20ba 100644 --- a/include/types/buffers.h +++ b/include/types/buffers.h @@ -25,6 +25,24 @@ #include #include +#include + +/* The BF_* macros designate Buffer Flags, which may be ORed in the bit field + * member 'flags' in struct buffer. + */ +#define BF_SHUTR_PENDING 1 +#define BF_SHUTR_DONE 2 +#define BF_SHUTW_PENDING 4 +#define BF_SHUTW_DONE 8 +#define BF_PARTIAL_READ 16 +#define BF_COMPLETE_READ 32 +#define BF_READ_ERROR 64 +#define BF_PARTIAL_WRITE 128 +#define BF_COMPLETE_WRITE 256 +#define BF_WRITE_ERROR 512 + + + /* describes a chunk of string */ struct chunk { char *str; /* beginning of the string itself. Might not be 0-terminated */ @@ -32,6 +50,7 @@ struct chunk { }; struct buffer { + u_int32_t flags; unsigned int l; /* data length */ char *r, *w, *h, *lr; /* read ptr, write ptr, last header ptr, last read */ char *rlim; /* read limit, used for header rewriting */ diff --git a/include/types/fd.h b/include/types/fd.h index d0219dc6f4..ae8872bee5 100644 --- a/include/types/fd.h +++ b/include/types/fd.h @@ -28,6 +28,7 @@ #include #include +#include /* different possible states for the fd */ #define FD_STCLOSE 0 @@ -36,13 +37,20 @@ #define FD_STREADY 3 #define FD_STERROR 4 +enum { + DIR_RD=0, + DIR_WR=1, + DIR_SIZE +}; /* info about one given fd */ struct fdtab { - int (*read)(int fd); /* read function */ - int (*write)(int fd); /* write function */ - struct task *owner; /* the session (or proxy) associated with this fd */ - int state; /* the state of this fd */ + struct { + int (*f)(int fd); /* read/write function */ + struct buffer *b; /* read/write buffer */ + } cb[DIR_SIZE]; + struct task *owner; /* the session (or proxy) associated with this fd */ + int state; /* the state of this fd */ }; extern struct fdtab *fdtab; /* array of all the file descriptors */ diff --git a/src/backend.c b/src/backend.c index ff91ee3d01..3c60388db6 100644 --- a/src/backend.c +++ b/src/backend.c @@ -426,9 +426,11 @@ int connect_server(struct session *s) } fdtab[fd].owner = s->task; - fdtab[fd].read = &event_srv_read; - fdtab[fd].write = &event_srv_write; fdtab[fd].state = FD_STCONN; /* connection in progress */ + fdtab[fd].cb[DIR_RD].f = &event_srv_read; + fdtab[fd].cb[DIR_RD].b = s->rep; + fdtab[fd].cb[DIR_WR].f = &event_srv_write; + fdtab[fd].cb[DIR_WR].b = s->req; FD_SET(fd, StaticWriteEvent); /* for connect status */ #if defined(DEBUG_FULL) && defined(ENABLE_EPOLL) diff --git a/src/checks.c b/src/checks.c index 817afc20de..5fc9e285bf 100644 --- a/src/checks.c +++ b/src/checks.c @@ -280,8 +280,10 @@ int process_chk(struct task *t) s->curfd = fd; /* that's how we know a test is in progress ;-) */ fdtab[fd].owner = t; - fdtab[fd].read = &event_srv_chk_r; - fdtab[fd].write = &event_srv_chk_w; + fdtab[fd].cb[DIR_RD].f = &event_srv_chk_r; + fdtab[fd].cb[DIR_RD].b = NULL; + fdtab[fd].cb[DIR_WR].f = &event_srv_chk_w; + fdtab[fd].cb[DIR_WR].b = NULL; fdtab[fd].state = FD_STCONN; /* connection in progress */ FD_SET(fd, StaticWriteEvent); /* for connect status */ #ifdef DEBUG_FULL diff --git a/src/client.c b/src/client.c index f4f02e3a69..3aeb3c20ce 100644 --- a/src/client.c +++ b/src/client.c @@ -33,6 +33,7 @@ #include #include +#include #include #include #include @@ -309,10 +310,8 @@ int event_accept(int fd) { return 0; } - s->req->l = 0; - s->req->total = 0; - s->req->h = s->req->r = s->req->lr = s->req->w = s->req->data; /* r and w will be reset further */ - s->req->rlim = s->req->data + BUFSIZE; + buffer_init(s->req); + s->req->rlim += BUFSIZE; if (s->cli_state == CL_STHEADERS) /* reserve some space for header rewriting */ s->req->rlim -= MAXREWRITE; @@ -327,14 +326,15 @@ int event_accept(int fd) { pool_free(session, s); return 0; } - s->rep->l = 0; - s->rep->total = 0; - s->rep->h = s->rep->r = s->rep->lr = s->rep->w = s->rep->rlim = s->rep->data; - fdtab[cfd].read = &event_cli_read; - fdtab[cfd].write = &event_cli_write; + buffer_init(s->rep); + fdtab[cfd].owner = t; fdtab[cfd].state = FD_STREADY; + fdtab[cfd].cb[DIR_RD].f = &event_cli_read; + fdtab[cfd].cb[DIR_RD].b = s->req; + fdtab[cfd].cb[DIR_WR].f = &event_cli_write; + fdtab[cfd].cb[DIR_WR].b = s->rep; if ((p->mode == PR_MODE_HTTP && (s->flags & SN_MONITOR)) || (p->mode == PR_MODE_HEALTH && (p->options & PR_O_HTTP_CHK))) diff --git a/src/fd.c b/src/fd.c index ec0607ebaf..2804a46934 100644 --- a/src/fd.c +++ b/src/fd.c @@ -222,14 +222,14 @@ int epoll_loop(int action) if (fdtab[fd].state == FD_STCLOSE) continue; if (epoll_events[count].events & ( EPOLLIN | EPOLLERR | EPOLLHUP )) - fdtab[fd].read(fd); + fdtab[fd].cb[DIR_RD].f(fd); } if (FD_ISSET(fd, StaticWriteEvent)) { if (fdtab[fd].state == FD_STCLOSE) continue; if (epoll_events[count].events & ( EPOLLOUT | EPOLLERR | EPOLLHUP )) - fdtab[fd].write(fd); + fdtab[fd].cb[DIR_WR].f(fd); } } } @@ -334,14 +334,14 @@ int poll_loop(int action) if (fdtab[fd].state == FD_STCLOSE) continue; if (poll_events[count].revents & ( POLLIN | POLLERR | POLLHUP )) - fdtab[fd].read(fd); + fdtab[fd].cb[DIR_RD].f(fd); } if (FD_ISSET(fd, StaticWriteEvent)) { if (fdtab[fd].state == FD_STCLOSE) continue; if (poll_events[count].revents & ( POLLOUT | POLLERR | POLLHUP )) - fdtab[fd].write(fd); + fdtab[fd].cb[DIR_WR].f(fd); } } } @@ -451,13 +451,13 @@ int select_loop(int action) if (FD_ISSET(fd, ReadEvent)) { if (fdtab[fd].state == FD_STCLOSE) continue; - fdtab[fd].read(fd); + fdtab[fd].cb[DIR_RD].f(fd); } if (FD_ISSET(fd, WriteEvent)) { if (fdtab[fd].state == FD_STCLOSE) continue; - fdtab[fd].write(fd); + fdtab[fd].cb[DIR_WR].f(fd); } } } diff --git a/src/polling.c b/src/polling.c deleted file mode 100644 index 9264a50b6d..0000000000 --- a/src/polling.c +++ /dev/null @@ -1,455 +0,0 @@ -/* - * File descriptors management functions. - * - * Copyright 2000-2006 Willy Tarreau - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version - * 2 of the License, or (at your option) any later version. - * - */ - -#include -#include -#include - -#include -#include -#include - -#include -#include -#include - -#include - -fd_set *StaticReadEvent, *StaticWriteEvent; -int cfg_polling_mechanism = 0; /* POLL_USE_{SELECT|POLL|EPOLL} */ - -/* - * FIXME: this is dirty, but at the moment, there's no other solution to remove - * the old FDs from outside the loop. Perhaps we should export a global 'poll' - * structure with pointers to functions such as init_fd() and close_fd(), plus - * a private structure with several pointers to places such as below. - */ - -#if defined(ENABLE_EPOLL) -fd_set *PrevReadEvent = NULL, *PrevWriteEvent = NULL; - -#if defined(USE_MY_EPOLL) -_syscall1 (int, epoll_create, int, size); -_syscall4 (int, epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event *, event); -_syscall4 (int, epoll_wait, int, epfd, struct epoll_event *, events, int, maxevents, int, timeout); -#endif - -#endif - - -#if defined(ENABLE_EPOLL) -/* - * Main epoll() loop. - * does 3 actions : - * 0 (POLL_LOOP_ACTION_INIT) : initializes necessary private structures - * 1 (POLL_LOOP_ACTION_RUN) : runs the loop - * 2 (POLL_LOOP_ACTION_CLEAN) : cleans up - * - * returns 0 if initialization failed, !0 otherwise. - */ - -int epoll_loop(int action) -{ - int next_time; - int status; - int fd; - - int fds, count; - int pr, pw, sr, sw; - unsigned rn, ro, wn, wo; /* read new, read old, write new, write old */ - struct epoll_event ev; - - /* private data */ - static struct epoll_event *epoll_events = NULL; - static int epoll_fd; - - if (action == POLL_LOOP_ACTION_INIT) { - epoll_fd = epoll_create(global.maxsock + 1); - if (epoll_fd < 0) - return 0; - else { - epoll_events = (struct epoll_event*) - calloc(1, sizeof(struct epoll_event) * global.maxsock); - PrevReadEvent = (fd_set *) - calloc(1, sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE); - PrevWriteEvent = (fd_set *) - calloc(1, sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE); - } - return 1; - } - else if (action == POLL_LOOP_ACTION_CLEAN) { - if (PrevWriteEvent) free(PrevWriteEvent); - if (PrevReadEvent) free(PrevReadEvent); - if (epoll_events) free(epoll_events); - close(epoll_fd); - epoll_fd = 0; - return 1; - } - - /* OK, it's POLL_LOOP_ACTION_RUN */ - - tv_now(&now); - - while (1) { - next_time = process_runnable_tasks(); - - /* stop when there's no connection left and we don't allow them anymore */ - if (!actconn && listeners == 0) - break; - - for (fds = 0; (fds << INTBITS) < maxfd; fds++) { - - rn = ((int*)StaticReadEvent)[fds]; ro = ((int*)PrevReadEvent)[fds]; - wn = ((int*)StaticWriteEvent)[fds]; wo = ((int*)PrevWriteEvent)[fds]; - - if ((ro^rn) | (wo^wn)) { - for (count = 0, fd = fds << INTBITS; count < (1<> count) & 1; - pw = (wo >> count) & 1; - sr = (rn >> count) & 1; - sw = (wn >> count) & 1; -#else - pr = FD_ISSET(fd&((1<> count) & 1; - sw = (wn >> count) & 1; -#else - sr = FD_ISSET(fd&((1< 0 && count < nbfd; count++) { - fd = poll_events[count].fd; - - if (!(poll_events[count].revents & ( POLLOUT | POLLIN | POLLERR | POLLHUP ))) - continue; - - /* ok, we found one active fd */ - status--; - - if (FD_ISSET(fd, StaticReadEvent)) { - if (fdtab[fd].state == FD_STCLOSE) - continue; - if (poll_events[count].revents & ( POLLIN | POLLERR | POLLHUP )) - fdtab[fd].read(fd); - } - - if (FD_ISSET(fd, StaticWriteEvent)) { - if (fdtab[fd].state == FD_STCLOSE) - continue; - if (poll_events[count].revents & ( POLLOUT | POLLERR | POLLHUP )) - fdtab[fd].write(fd); - } - } - } - return 1; -} -#endif - - - -/* - * Main select() loop. - * does 3 actions : - * 0 (POLL_LOOP_ACTION_INIT) : initializes necessary private structures - * 1 (POLL_LOOP_ACTION_RUN) : runs the loop - * 2 (POLL_LOOP_ACTION_CLEAN) : cleans up - * - * returns 0 if initialization failed, !0 otherwise. - */ - - -int select_loop(int action) -{ - int next_time; - int status; - int fd,i; - struct timeval delta; - int readnotnull, writenotnull; - static fd_set *ReadEvent = NULL, *WriteEvent = NULL; - - if (action == POLL_LOOP_ACTION_INIT) { - ReadEvent = (fd_set *) - calloc(1, sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE); - WriteEvent = (fd_set *) - calloc(1, sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE); - return 1; - } - else if (action == POLL_LOOP_ACTION_CLEAN) { - if (WriteEvent) free(WriteEvent); - if (ReadEvent) free(ReadEvent); - return 1; - } - - /* OK, it's POLL_LOOP_ACTION_RUN */ - - tv_now(&now); - - while (1) { - next_time = process_runnable_tasks(); - - /* stop when there's no connection left and we don't allow them anymore */ - if (!actconn && listeners == 0) - break; - - if (next_time > 0) { /* FIXME */ - /* Convert to timeval */ - /* to avoid eventual select loops due to timer precision */ - next_time += SCHEDULER_RESOLUTION; - delta.tv_sec = next_time / 1000; - delta.tv_usec = (next_time % 1000) * 1000; - } - else if (next_time == 0) { /* allow select to return immediately when needed */ - delta.tv_sec = delta.tv_usec = 0; - } - - - /* let's restore fdset state */ - - readnotnull = 0; writenotnull = 0; - for (i = 0; i < (maxfd + FD_SETSIZE - 1)/(8*sizeof(int)); i++) { - readnotnull |= (*(((int*)ReadEvent)+i) = *(((int*)StaticReadEvent)+i)) != 0; - writenotnull |= (*(((int*)WriteEvent)+i) = *(((int*)StaticWriteEvent)+i)) != 0; - } - - // /* just a verification code, needs to be removed for performance */ - // for (i=0; i= 0) ? &delta : NULL); - - /* this is an experiment on the separation of the select work */ - // status = (readnotnull ? select(maxfd, ReadEvent, NULL, NULL, (next_time >= 0) ? &delta : NULL) : 0); - // status |= (writenotnull ? select(maxfd, NULL, WriteEvent, NULL, (next_time >= 0) ? &delta : NULL) : 0); - - tv_now(&now); - - if (status > 0) { /* must proceed with events */ - - int fds; - char count; - - for (fds = 0; (fds << INTBITS) < maxfd; fds++) - if ((((int *)(ReadEvent))[fds] | ((int *)(WriteEvent))[fds]) != 0) - for (count = 1<fd = fd; /* the function for the accept() event */ - fdtab[fd].read = &event_accept; - fdtab[fd].write = NULL; /* never called */ + fdtab[fd].cb[DIR_RD].f = &event_accept; + fdtab[fd].cb[DIR_WR].f = NULL; /* never called */ + fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL; fdtab[fd].owner = (struct task *)curproxy; /* reference the proxy instead of a task */ fdtab[fd].state = FD_STLISTEN; FD_SET(fd, StaticReadEvent); diff --git a/src/stream_sock.c b/src/stream_sock.c index 1c7fa5916f..5e01da248a 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -46,8 +46,8 @@ */ int event_cli_read(int fd) { struct task *t = fdtab[fd].owner; + struct buffer *b = fdtab[fd].cb[DIR_RD].b; struct session *s = t->context; - struct buffer *b = s->req; int ret, max; #ifdef DEBUG_FULL @@ -151,8 +151,8 @@ int event_cli_read(int fd) { */ int event_cli_write(int fd) { struct task *t = fdtab[fd].owner; + struct buffer *b = fdtab[fd].cb[DIR_WR].b; struct session *s = t->context; - struct buffer *b = s->rep; int ret, max; #ifdef DEBUG_FULL @@ -243,8 +243,8 @@ int event_cli_write(int fd) { */ int event_srv_read(int fd) { struct task *t = fdtab[fd].owner; + struct buffer *b = fdtab[fd].cb[DIR_RD].b; struct session *s = t->context; - struct buffer *b = s->rep; int ret, max; #ifdef DEBUG_FULL @@ -348,8 +348,8 @@ int event_srv_read(int fd) { */ int event_srv_write(int fd) { struct task *t = fdtab[fd].owner; + struct buffer *b = fdtab[fd].cb[DIR_WR].b; struct session *s = t->context; - struct buffer *b = s->req; int ret, max; #ifdef DEBUG_FULL