int wto; /* write timeout, in ticks */
int cto; /* connect timeout, in ticks */
unsigned int l; /* data length */
- unsigned int splice_len; /* number of bytes remaining in splice, out of buffer */
char *r, *w, *lr; /* read ptr, write ptr, last read */
unsigned int max_len; /* read limit, used to keep room for header rewriting */
unsigned int send_max; /* number of bytes the sender can consume om this buffer, <= l */
unsigned long long total; /* total data read */
struct stream_interface *prod; /* producer attached to this buffer */
struct stream_interface *cons; /* consumer attached to this buffer */
- struct {
-#if defined(CONFIG_HAP_LINUX_SPLICE)
- int prod; /* -1 or fd of the pipe's end towards the producer */
- int cons; /* -1 or fd of the pipe's end towards the consumer */
-#endif
- } splice;
+ struct pipe *pipe; /* non-NULL only when data present */
char data[BUFSIZE];
};
split in two parts :
- the visible data (->data, for ->l bytes)
- the invisible data, typically in kernel buffers forwarded directly from
- the source stream sock to the destination stream sock (->splice_len
+ the source stream sock to the destination stream sock (->pipe->data
bytes). Those are used only during forward.
In order not to mix data streams, the producer may only feed the invisible
buffer is expected to reach the destination file descriptor, by any means.
However, it's the consumer's responsibility to ensure that the invisible
data has been entirely consumed before consuming visible data. This must be
- reflected by ->splice_len. This is very important as this and only this can
+ reflected by ->pipe->data. This is very important as this and only this can
ensure strict ordering of data between buffers.
The producer is responsible for decreasing ->to_forward and increasing
well as any data forwarded through the visible buffer.
The consumer is responsible for decreasing ->send_max when it sends data
- from the visible buffer, and ->splice_len when it sends data from the
+ from the visible buffer, and ->pipe->data when it sends data from the
invisible buffer.
A real-world example consists in part in an HTTP response waiting in a
#include <proto/hdr_idx.h>
#include <proto/log.h>
#include <proto/session.h>
+#include <proto/pipe.h>
#include <proto/proto_http.h>
#include <proto/proto_tcp.h>
#include <proto/queue.h>
sess_change_server(s, NULL);
}
-#if defined(CONFIG_HAP_LINUX_SPLICE)
- if (s->req->splice.prod >= 0)
- close(s->req->splice.prod);
- if (s->req->splice.cons >= 0)
- close(s->req->splice.cons);
-
- if (s->req->splice.prod >= 0 || s->req->splice.cons >= 0)
- usedpipes--;
-
- if (s->rep->splice.prod >= 0)
- close(s->rep->splice.prod);
- if (s->rep->splice.cons >= 0)
- close(s->rep->splice.cons);
-
- if (s->rep->splice.prod >= 0 || s->rep->splice.cons >= 0)
- usedpipes--;
-#endif
+ if (s->req->pipe)
+ put_pipe(s->req->pipe);
+
+ if (s->rep->pipe)
+ put_pipe(s->rep->pipe);
pool_free2(pool2_buffer, s->req);
pool_free2(pool2_buffer, s->rep);
!s->req->analysers && !(s->req->flags & BF_HIJACK)) {
/* check if it is wise to enable kernel splicing on the request buffer */
if (!(s->req->flags & BF_KERN_SPLICING) &&
- (usedpipes < global.maxpipes) &&
+ (pipes_used < global.maxpipes) &&
(((s->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
(((s->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
(s->req->flags & BF_STREAMER_FAST))))
!s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
/* check if it is wise to enable kernel splicing on the response buffer */
if (!(s->rep->flags & BF_KERN_SPLICING) &&
- (usedpipes < global.maxpipes) &&
+ (pipes_used < global.maxpipes) &&
(((s->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
(((s->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
(s->rep->flags & BF_STREAMER_FAST))))
/*
* Functions operating on SOCK_STREAM and buffers.
*
- * Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2009 Willy Tarreau <w@1wt.eu>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
#include <proto/buffers.h>
#include <proto/client.h>
#include <proto/fd.h>
+#include <proto/pipe.h>
#include <proto/stream_sock.h>
#include <proto/task.h>
* SI_FL_ERR
* SI_FL_WAIT_ROOM
* (SI_FL_WAIT_RECV)
+ *
+ * This function automatically allocates a pipe from the pipe pool. It also
+ * carefully ensures to clear b->pipe whenever it leaves the pipe empty.
*/
static int stream_sock_splice_in(struct buffer *b, struct stream_interface *si)
{
return 1;
}
- if (unlikely(b->splice.prod == -1)) {
- int pipefd[2];
- if (usedpipes >= global.maxpipes || pipe(pipefd) < 0) {
+ if (unlikely(b->pipe == NULL)) {
+ if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
b->flags &= ~BF_KERN_SPLICING;
return -1;
}
- usedpipes++;
- b->splice.prod = pipefd[1];
- b->splice.cons = pipefd[0];
}
+ /* At this point, b->pipe is valid */
+
while (1) {
max = b->to_forward;
if (max <= 0) {
break;
}
- ret = splice(fd, NULL, b->splice.prod, NULL, max,
+ ret = splice(fd, NULL, b->pipe->prod, NULL, max,
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
if (ret <= 0) {
* will almost always fill/empty the pipe.
*/
- if (b->splice_len > 0) {
+ if (b->pipe->data) {
si->flags |= SI_FL_WAIT_ROOM;
retval = 1;
break;
b->to_forward -= ret;
total += ret;
b->total += ret;
- b->splice_len += ret;
+ b->pipe->data += ret;
b->flags |= BF_READ_PARTIAL;
b->flags &= ~BF_EMPTY; /* to prevent shutdowns */
- if (b->splice_len >= SPLICE_FULL_HINT) {
+ if (b->pipe->data >= SPLICE_FULL_HINT) {
/* We've read enough of it for this time. */
retval = 1;
break;
}
} /* while */
+ if (unlikely(!b->pipe->data)) {
+ put_pipe(b->pipe);
+ b->pipe = NULL;
+ }
+
return retval;
}
out_wakeup:
/* We might have some data the consumer is waiting for */
- if ((b->send_max || b->splice_len) && (b->cons->flags & SI_FL_WAIT_DATA)) {
- int last_len = b->splice_len;
+ if ((b->send_max || b->pipe) && (b->cons->flags & SI_FL_WAIT_DATA)) {
+ int last_len = b->pipe ? b->pipe->data : 0;
b->cons->chk_snd(b->cons);
/* check if the consumer has freed some space */
- if (!(b->flags & BF_FULL) && (!last_len || b->splice_len < last_len))
+ if (!(b->flags & BF_FULL) &&
+ (!last_len || !b->pipe || b->pipe->data < last_len))
si->flags &= ~SI_FL_WAIT_ROOM;
}
/*
* This function is called to send buffer data to a stream socket.
* It returns -1 in case of unrecoverable error, 0 if the caller needs to poll
- * before calling it again, otherwise 1.
+ * before calling it again, otherwise 1. If a pipe was associated with the
+ * buffer and it empties it, it releases it as well.
*/
static int stream_sock_write_loop(struct stream_interface *si, struct buffer *b)
{
int ret, max;
#if defined(CONFIG_HAP_LINUX_SPLICE)
- while (b->splice_len) {
- ret = splice(b->splice.cons, NULL, si->fd, NULL, b->splice_len,
+ while (b->pipe) {
+ ret = splice(b->pipe->cons, NULL, si->fd, NULL, b->pipe->data,
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
if (ret <= 0) {
if (ret == 0 || errno == EAGAIN) {
}
b->flags |= BF_WRITE_PARTIAL;
- b->splice_len -= ret;
+ b->pipe->data -= ret;
- if (!b->splice_len)
+ if (!b->pipe->data) {
+ put_pipe(b->pipe);
+ b->pipe = NULL;
break;
+ }
if (--write_poll <= 0)
return retval;
if (likely(!b->l)) {
/* optimize data alignment in the buffer */
b->r = b->w = b->lr = b->data;
- if (likely(!b->splice_len))
+ if (likely(!b->pipe))
b->flags |= BF_EMPTY;
}
*/
}
- if (!b->splice_len && !b->send_max) {
+ if (!b->pipe && !b->send_max) {
/* the connection is established but we can't write. Either the
* buffer is empty, or we just refrain from sending because the
* send_max limit was reached. Maybe we just wrote the last
out_may_wakeup:
if (b->flags & BF_WRITE_ACTIVITY) {
/* update timeout if we have written something */
- if ((b->send_max || b->splice_len) &&
+ if ((b->send_max || b->pipe) &&
(b->flags & (BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
b->wex = tick_add_ifset(now_ms, b->wto);
* any more data to forward and it's not planned to send any more.
*/
if (likely((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
- (!b->to_forward && !b->send_max && !b->splice_len) ||
+ (!b->to_forward && !b->send_max && !b->pipe) ||
si->state != SI_ST_EST ||
b->prod->state != SI_ST_EST))
task_wakeup(si->owner, TASK_WOKEN_IO);
/* Check if we need to close the write side */
if (!(ob->flags & BF_SHUTW)) {
/* Write not closed, update FD status and timeout for writes */
- if ((ob->send_max == 0 && ob->splice_len == 0) ||
+ if ((ob->send_max == 0 && !ob->pipe) ||
(ob->flags & BF_EMPTY) ||
(ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
/* stop writing */
if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
(fdtab[si->fd].ev & FD_POLL_OUT) || /* we'll be called anyway */
- !(ob->send_max || ob->splice_len) || /* called with nothing to send ! */
+ !(ob->send_max || ob->pipe) || /* called with nothing to send ! */
!(ob->flags & (BF_HIJACK|BF_WRITE_ENA))) /* we may not write */
return;
goto out_wakeup;
}
- if (retval > 0 || (ob->send_max == 0 && ob->splice_len == 0)) {
+ if (retval > 0 || (ob->send_max == 0 && !ob->pipe)) {
/* the connection is established but we can't write. Either the
* buffer is empty, or we just refrain from sending because the
* send_max limit was reached. Maybe we just wrote the last
* have to notify the task.
*/
if (likely((ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
- (!ob->to_forward && !ob->send_max && !ob->splice_len) ||
+ (!ob->to_forward && !ob->send_max && !ob->pipe) ||
si->state != SI_ST_EST)) {
out_wakeup:
task_wakeup(si->owner, TASK_WOKEN_IO);