From 6b66f3e4f6b22619bd551f8e33cbf7211c6c7cd0 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Sun, 14 Dec 2008 17:31:54 +0100 Subject: [PATCH] [MAJOR] implement autonomous inter-socket forwarding If an analyser sets buf->to_forward to a given value, that many data will be forwarded between the two stream interfaces attached to a buffer without waking the task up. The same applies once all analysers have been released. This saves a large amount of calls to process_session() and a number of task_dequeue/queue. --- include/common/defaults.h | 14 ++++++++++++-- include/proto/buffers.h | 2 ++ include/types/buffers.h | 1 + src/proto_uxst.c | 23 +++++++++++++++++++++-- src/session.c | 25 ++++++++++++++++++++----- src/stream_sock.c | 33 +++++++++++++++++++++++++++------ 6 files changed, 83 insertions(+), 15 deletions(-) diff --git a/include/common/defaults.h b/include/common/defaults.h index 05628e1763..acba5c6eed 100644 --- a/include/common/defaults.h +++ b/include/common/defaults.h @@ -2,8 +2,8 @@ include/common/defaults.h Miscellaneous default values. - Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu - + Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu + This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, version 2.1 @@ -40,6 +40,16 @@ #define MAXREWRITE (BUFSIZE / 2) #endif +/* FORWARD_DEFAULT_SIZE + * Indicates how many bytes may be forwarded at once in low-level stream-socks + * without waking the owner task up. This should be much larger than the buffer + * size. A few megabytes seem appropriate. + */ +#ifndef FORWARD_DEFAULT_SIZE +#define FORWARD_DEFAULT_SIZE (16*1024*1024) +#endif + + #define REQURI_LEN 1024 #define CAPTURE_LEN 64 diff --git a/include/proto/buffers.h b/include/proto/buffers.h index 64427963d2..ab0de118a9 100644 --- a/include/proto/buffers.h +++ b/include/proto/buffers.h @@ -46,6 +46,7 @@ int init_buffer(); static inline void buffer_init(struct buffer *buf) { buf->send_max = 0; + buf->to_forward = 0; buf->l = buf->total = 0; buf->analysers = 0; buf->cons = NULL; @@ -92,6 +93,7 @@ static inline void buffer_check_timeouts(struct buffer *b) static inline void buffer_flush(struct buffer *buf) { buf->send_max = 0; + buf->to_forward = 0; buf->r = buf->lr = buf->w = buf->data; buf->l = 0; buf->flags |= BF_EMPTY | BF_FULL; diff --git a/include/types/buffers.h b/include/types/buffers.h index d8f71188b4..f15d33daaa 100644 --- a/include/types/buffers.h +++ b/include/types/buffers.h @@ -130,6 +130,7 @@ struct buffer { char *r, *w, *lr; /* read ptr, write ptr, last read */ char *rlim; /* read limit, used for header rewriting */ unsigned int send_max; /* number of bytes the sender can consume */ + unsigned int to_forward; /* number of bytes that can send without a wake-up, >= send_max */ unsigned int analysers; /* bit field indicating what to do on the buffer */ int analyse_exp; /* expiration date for current analysers (if set) */ void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */ diff --git a/src/proto_uxst.c b/src/proto_uxst.c index 6c1367936e..a6eed1332e 100644 --- a/src/proto_uxst.c +++ b/src/proto_uxst.c @@ -809,6 +809,17 @@ void uxst_process_session(struct task *t, int *next) if (!s->req->analysers && !(s->req->flags & BF_HIJACK)) s->req->send_max = s->req->l; + /* if noone is interested in analysing data, let's forward everything + * and only wake up every 1-2 MB. We still wake up when send_max is + * reached though. + */ + if (!s->req->send_max && s->req->prod->state >= SI_ST_EST && + !s->req->analysers && !(s->req->flags & BF_HIJACK)) { + if (s->req->to_forward < FORWARD_DEFAULT_SIZE) + s->req->to_forward += FORWARD_DEFAULT_SIZE; + s->req->send_max = s->req->l; + } + /* reflect what the L7 analysers have seen last */ rqf_last = s->req->flags; @@ -879,9 +890,17 @@ void uxst_process_session(struct task *t, int *next) resync = 1; } - /* if noone is interested in analysing data, let's forward everything */ - if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) + /* if noone is interested in analysing data, let's forward everything + * and only wake up every 1-2 MB. We still wake up when send_max is + * reached though. + */ + if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST && + !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) { + if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) { + s->rep->to_forward += FORWARD_DEFAULT_SIZE; + } s->rep->send_max = s->rep->l; + } /* reflect what the L7 analysers have seen last */ rpf_last = s->rep->flags; diff --git a/src/session.c b/src/session.c index a775ba5319..97ce822a1e 100644 --- a/src/session.c +++ b/src/session.c @@ -746,9 +746,16 @@ resync_stream_interface: resync = 1; } - /* if noone is interested in analysing data, let's forward everything */ - if (!s->req->analysers && !(s->req->flags & BF_HIJACK)) + /* if noone is interested in analysing data, let's forward everything + * and only wake up every 1-2 MB. We still wake up when send_max is + * reached though. + */ + if (!s->req->send_max && s->req->prod->state >= SI_ST_EST && + !s->req->analysers && !(s->req->flags & BF_HIJACK)) { + if (s->req->to_forward < FORWARD_DEFAULT_SIZE) + s->req->to_forward += FORWARD_DEFAULT_SIZE; s->req->send_max = s->req->l; + } /* reflect what the L7 analysers have seen last */ rqf_last = s->req->flags; @@ -855,9 +862,17 @@ resync_stream_interface: resync = 1; } - /* if noone is interested in analysing data, let's forward everything */ - if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) + /* if noone is interested in analysing data, let's forward everything + * and only wake up every 1-2 MB. We still wake up when send_max is + * reached though. + */ + if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST && + !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) { + if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) { + s->rep->to_forward += FORWARD_DEFAULT_SIZE; + } s->rep->send_max = s->rep->l; + } /* reflect what the L7 analysers have seen last */ rpf_last = s->rep->flags; @@ -870,7 +885,7 @@ resync_stream_interface: * FIXME: this is probably where we should produce error responses. */ - /* first, let's check if the request buffer needs to shutdown(write) */ + /* first, let's check if the response buffer needs to shutdown(write) */ if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR))) buffer_shutw_now(s->rep); diff --git a/src/stream_sock.c b/src/stream_sock.c index fdd0dbdf3a..82e1055554 100644 --- a/src/stream_sock.c +++ b/src/stream_sock.c @@ -116,8 +116,8 @@ int stream_sock_read(int fd) { cur_read += ret; /* if noone is interested in analysing data, let's forward everything */ - if (!b->analysers) - b->send_max += ret; + if (b->to_forward > b->send_max) + b->send_max = MIN(b->to_forward, b->l); if (fdtab[fd].state == FD_STCONN) fdtab[fd].state = FD_STREADY; @@ -251,10 +251,17 @@ int stream_sock_read(int fd) { goto out_skip_wakeup; out_wakeup: /* the consumer might be waiting for data */ - if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL)) + if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL) && !(b->flags & BF_EMPTY)) b->cons->chk_snd(b->cons); - task_wakeup(si->owner, TASK_WOKEN_IO); + /* we have to wake up if there is a special event or if we don't have + * any more data to forward. + */ + if ((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) || + !b->to_forward || + si->state != SI_ST_EST || + b->cons->state != SI_ST_EST) + task_wakeup(si->owner, TASK_WOKEN_IO); out_skip_wakeup: fdtab[fd].ev &= ~FD_POLL_IN; @@ -379,6 +386,13 @@ int stream_sock_write(int fd) { b->l -= ret; b->w += ret; b->send_max -= ret; + /* we can send up to send_max, we just want to know when + * to_forward has been reached. + */ + if ((signed)(b->to_forward - ret) >= 0) + b->to_forward -= ret; + else + b->to_forward = 0; if (fdtab[fd].state == FD_STCONN) fdtab[fd].state = FD_STREADY; @@ -453,10 +467,17 @@ int stream_sock_write(int fd) { goto out_skip_wakeup; out_wakeup: /* the producer might be waiting for more room to store data */ - if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL)) + if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL) && !(b->flags & BF_FULL)) b->prod->chk_rcv(b->prod); - task_wakeup(si->owner, TASK_WOKEN_IO); + /* we have to wake up if there is a special event or if we don't have + * any more data to forward. + */ + if ((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) || + !b->to_forward || + si->state != SI_ST_EST || + b->prod->state != SI_ST_EST) + task_wakeup(si->owner, TASK_WOKEN_IO); out_skip_wakeup: fdtab[fd].ev &= ~FD_POLL_OUT; -- 2.39.5