]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
WIP: httpterm: add httpterm sources
authorFrederic Lecaille <flecaille@haproxy.com>
Fri, 16 Jan 2026 10:06:10 +0000 (11:06 +0100)
committerFrederic Lecaille <flecaille@haproxy.com>
Wed, 28 Jan 2026 15:09:40 +0000 (16:09 +0100)
13 files changed:
Makefile
include/haproxy/hstream-t.h [new file with mode: 0644]
include/haproxy/hstream.h [new file with mode: 0644]
include/haproxy/obj_type-t.h
include/haproxy/obj_type.h
include/haproxy/proxy-t.h
include/haproxy/stconn.h
include/haproxy/stream.h
src/cfgparse-listen.c
src/httpterm.c [new file with mode: 0644]
src/proxy.c
src/stconn.c
src/stream.c

index 0bb4557f7f538729c0f85526de80fdfbeaab5456..88ba6ad5cc48a3f4d45f30265d9e7ae68d45b368 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -951,11 +951,12 @@ all:
        @echo
        @exit 1
 else
-all: dev/flags/flags haproxy $(EXTRA)
+all: dev/flags/flags haproxy httpterm $(EXTRA)
 endif # obsolete targets
 endif # TARGET
 
 OBJS =
+HTTPTERM_OBJS =
 
 ifneq ($(EXTRA_OBJS),)
   OBJS += $(EXTRA_OBJS)
@@ -1002,12 +1003,15 @@ OBJS += src/mux_h2.o src/mux_h1.o src/mux_fcgi.o src/log.o              \
         src/ebsttree.o src/freq_ctr.o src/systemd.o src/init.o         \
         src/http_acl.o src/dict.o src/dgram.o src/pipe.o               \
         src/hpack-huff.o src/hpack-enc.o src/ebtree.o src/hash.o       \
-        src/httpclient_cli.o src/version.o src/ncbmbuf.o src/ech.o
+        src/httpclient_cli.o src/version.o src/ncbmbuf.o src/ech.o \
+        src/httpterm.o
 
 ifneq ($(TRACE),)
   OBJS += src/calltrace.o
 endif
 
+HTTPTERM_OBJS += $(OBJS)
+
 # Used only for forced dependency checking. May be cleared during development.
 INCLUDES = $(wildcard include/*/*.h)
 DEP = $(INCLUDES) .build_opts
@@ -1055,6 +1059,9 @@ endif # non-empty target
 haproxy: $(OPTIONS_OBJS) $(OBJS)
        $(cmd_LD) $(ARCH_FLAGS) $(LDFLAGS) -o $@ $^ $(LDOPTS)
 
+httpterm: $(OPTIONS_OBJS) $(HTTPTERM_OBJS)
+       $(cmd_LD) $(ARCH_FLAGS) $(LDFLAGS) -o $@ $^ $(LDOPTS)
+
 objsize: haproxy
        $(Q)objdump -t $^|grep ' g '|grep -F '.text'|awk '{print $$5 FS $$6}'|sort
 
diff --git a/include/haproxy/hstream-t.h b/include/haproxy/hstream-t.h
new file mode 100644 (file)
index 0000000..5919c1f
--- /dev/null
@@ -0,0 +1,35 @@
+#ifndef _HAPROXY_HSTREAM_T_H
+#define _HAPROXY_HSTREAM_T_H
+
+#include <haproxy/dynbuf-t.h>
+#include <haproxy/http-t.h>
+#include <haproxy/obj_type-t.h>
+
+/* httpterm stream */
+struct hstream {
+       enum obj_type obj_type;
+       struct session *sess;
+
+       struct stconn *sc;
+       struct task *task;
+
+       struct buffer req;
+       struct buffer res;
+       unsigned long long to_write;    /* #of response data bytes to write after headers */
+       /* Wait list for buffer allocation */
+       struct buffer_wait buf_wait;
+
+       int flags;
+
+       int ka;                         /* .0: keep-alive  .1: forced  .2: http/1.1, .3: was_reused */
+       unsigned long long req_size;    /* values passed in the URI to override the server's */
+       unsigned long long req_body;    /* remaining body to be consumed from the request */
+       int req_code;
+       int req_cache, req_time;
+       int req_chunked;
+       int req_random;
+       int res_before_req;
+       enum http_meth_t req_meth;
+};
+
+#endif /* _HAPROXY_HSTREAM_T_H */
diff --git a/include/haproxy/hstream.h b/include/haproxy/hstream.h
new file mode 100644 (file)
index 0000000..97e4d60
--- /dev/null
@@ -0,0 +1,13 @@
+#ifndef _HAPROXY_HSTREAM_H
+#define  _HAPROXY_HSTREAM_H
+
+#include <haproxy/cfgparse.h>
+#include <haproxy/hstream-t.h>
+
+void init_httpterm_cfg(int argc, char **argv, struct cfgfile *cfg);
+struct task *sc_hstream_io_cb(struct task *t, void *ctx, unsigned int state);
+int hstream_wake(struct stconn *sc);
+void hstream_shutdown(struct stconn *sc);
+void *hstream_new(struct session *sess, struct stconn *sc, struct buffer *input);
+
+#endif /* _HAPROXY_HSTREAM_H */
index da2efbf86f3de8b8de788ece2a462d062b5a9139..37c4a358b703a03cc119d7035f8ff521fa382902 100644 (file)
@@ -46,6 +46,7 @@ enum obj_type {
 #ifdef USE_QUIC
        OBJ_TYPE_DGRAM,        /* object is a struct quic_dgram */
 #endif
+       OBJ_TYPE_HTTPTERM,     /* object is a struct hstream */
        OBJ_TYPE_ENTRIES       /* last one : number of entries */
 } __attribute__((packed)) ;
 
index 233f9d7ada096649f6d96aad432c6ddc35562b1c..cc01c54f4003c7d8fcde86f32513094850b20e01 100644 (file)
@@ -26,6 +26,7 @@
 #include <haproxy/applet-t.h>
 #include <haproxy/check-t.h>
 #include <haproxy/connection-t.h>
+#include <haproxy/hstream-t.h>
 #include <haproxy/listener-t.h>
 #include <haproxy/obj_type-t.h>
 #include <haproxy/pool.h>
@@ -189,6 +190,19 @@ static inline struct check *objt_check(enum obj_type *t)
        return __objt_check(t);
 }
 
+static inline struct hstream *__objt_hstream(enum obj_type *t)
+{
+       return container_of(t, struct hstream, obj_type);
+}
+
+static inline struct hstream *objt_hstream(enum obj_type *t)
+{
+       if (!t || *t != OBJ_TYPE_HTTPTERM)
+               return NULL;
+
+       return __objt_hstream(t);
+}
+
 #ifdef USE_QUIC
 static inline struct quic_dgram *__objt_dgram(enum obj_type *t)
 {
index 0b8ce6d389db5961035c36dd13f5612c839af5b5..cb33d0dee84af63e25e3c6a0b89db71c85fc2895 100644 (file)
@@ -411,6 +411,7 @@ struct proxy {
        int redispatch_after;                   /* number of retries before redispatch */
        unsigned down_time;                     /* total time the proxy was down */
        int (*accept)(struct stream *s);       /* application layer's accept() */
+       void *(*stream_new_from_sc)(struct session *sess, struct stconn *sc, struct buffer *in); /* stream connector creator function from mux stream connector */
        struct conn_src conn_src;               /* connection source settings */
        enum obj_type *default_target;          /* default target to use for accepted streams or NULL */
        struct proxy *next;
index 61ce17c395af99fb98d42485e497eddbfb318178..226bc21d137451618cefaf426ec122cc6a45754a 100644 (file)
@@ -24,6 +24,7 @@
 
 #include <haproxy/api.h>
 #include <haproxy/connection.h>
+#include <haproxy/hstream-t.h>
 #include <haproxy/htx-t.h>
 #include <haproxy/obj_type.h>
 #include <haproxy/stconn-t.h>
@@ -45,10 +46,12 @@ void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode);
 struct stconn *sc_new_from_endp(struct sedesc *sedesc, struct session *sess, struct buffer *input);
 struct stconn *sc_new_from_strm(struct stream *strm, unsigned int flags);
 struct stconn *sc_new_from_check(struct check *check, unsigned int flags);
+struct stconn *sc_new_from_httpterm(struct sedesc *sd, struct session *sess, struct buffer *input);
 void sc_free(struct stconn *sc);
 
 int sc_attach_mux(struct stconn *sc, void *target, void *ctx);
 int sc_attach_strm(struct stconn *sc, struct stream *strm);
+int sc_attach_hstream(struct stconn *sc, struct hstream *hs);
 
 void sc_destroy(struct stconn *sc);
 int sc_reset_endp(struct stconn *sc);
@@ -331,6 +334,21 @@ static inline struct check *sc_check(const struct stconn *sc)
        return NULL;
 }
 
+/* Returns the httpterm stream from a sc if the application is a
+ * httpterm stream. Otherwise NULL is returned. __sc_hstream() returns the httpterm
+ * stream without any control while sc_hstream() check the application type.
+ */
+static inline struct hstream *__sc_hstream(const struct stconn *sc)
+{
+       return __objt_hstream(sc->app);
+}
+static inline struct hstream *sc_hstream(const struct stconn *sc)
+{
+       if (obj_type(sc->app) == OBJ_TYPE_HTTPTERM)
+               return __objt_hstream(sc->app);
+       return NULL;
+}
+
 /* Returns the name of the application layer's name for the stconn,
  * or "NONE" when none is attached.
  */
index 78d828fb230871845eb6e0e0510a7e8a75c7f460..46464f8601147db9bef406b7dcbb0f22e36643e7 100644 (file)
@@ -59,7 +59,7 @@ extern struct pool_head *pool_head_uniqueid;
 
 extern struct data_cb sess_conn_cb;
 
-struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer *input);
+void *stream_new(struct session *sess, struct stconn *sc, struct buffer *input);
 void stream_free(struct stream *s);
 int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input);
 int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_proto);
index 68bf937312a943c2ed3fb0fd8485b3a67bef0f79..9ae7b183cc0e53022e8da5ed7c8f84e6b96e1750 100644 (file)
@@ -18,6 +18,7 @@
 #include <haproxy/compression-t.h>
 #include <haproxy/connection.h>
 #include <haproxy/extcheck.h>
+#include <haproxy/hstream.h>
 #include <haproxy/http_ana.h>
 #include <haproxy/http_htx.h>
 #include <haproxy/http_ext.h>
@@ -705,6 +706,10 @@ int cfg_parse_listen(const char *file, int linenum, char **args, int kwm)
                        err_code |= ERR_ALERT | ERR_FATAL;
                        goto out;
                }
+               else if (strcmp(args[1], "httpterm") == 0 && (curproxy->cap & PR_CAP_FE)) {
+                       curproxy->mode = PR_MODE_HTTP;
+                       curproxy->stream_new_from_sc = hstream_new;
+               }
                else {
                        ha_alert("parsing [%s:%d] : unknown proxy mode '%s'.\n", file, linenum, args[1]);
                        err_code |= ERR_ALERT | ERR_FATAL;
diff --git a/src/httpterm.c b/src/httpterm.c
new file mode 100644 (file)
index 0000000..6faa46d
--- /dev/null
@@ -0,0 +1,1126 @@
+#include <haproxy/buf.h>
+#include <haproxy/cfgparse.h>
+#include <haproxy/chunk.h>
+#include <haproxy/global.h>
+#include <haproxy/hstream-t.h>
+#include <haproxy/http_htx.h>
+#include <haproxy/http.h>
+#include <haproxy/pool.h>
+#include <haproxy/proxy-t.h>
+#include <haproxy/stconn-t.h>
+#include <haproxy/stream.h>
+#include <haproxy/task-t.h>
+#include <haproxy/trace.h>
+
+#include <haproxy/sc_strm.h>
+
+DECLARE_TYPED_POOL(pool_head_hstream, "hstream", struct hstream);
+
+#define HTTPTERM_VERSION "1.7.9"
+#define HTTPTERM_DATE   "2020/06/28"
+
+#define HS_ST_IN_ALLOC          0x0001
+#define HS_ST_OUT_ALLOC         0x0002
+#define HS_ST_CONN_ERROR        0x0004
+#define HS_ST_HTTP_GOT_HDRS     0x0008
+#define HS_ST_HTTP_HELP         0x0010
+#define HS_ST_HTTP_EXPECT       0x0020
+#define HS_ST_HTTP_RESP_SL_SENT 0x0040
+#define HS_ST_HTTP_RESP_SENT    0x0080
+
+static int httpterm_debug;
+const char *HTTP_HELP =
+        "HTTPTerm-" HTTPTERM_VERSION " - " HTTPTERM_DATE "\n"
+        "All integer argument values are in the form [digits]*[kmgr] (r=random(0..1)).\n"
+        "The following arguments are supported to override the default objects :\n"
+        " - /?s=<size>        return <size> bytes.\n"
+        "                     E.g. /?s=20k\n"
+        " - /?r=<retcode>     present <retcode> as the HTTP return code.\n"
+        "                     E.g. /?r=404\n"
+        " - /?c=<cache>       set the return as not cacheable if <1.\n"
+        "                     E.g. /?c=0\n"
+        " - /?A=<req-before>  drain the request body before sending the response.\n"
+        "                     E.g. /?A=1\n"
+        " - /?C=<close>       force the response to use close if >0.\n"
+        "                     E.g. /?C=1\n"
+        " - /?K=<keep-alive>  force the response to use keep-alive if >0.\n"
+        "                     E.g. /?K=1\n"
+        " - /?t=<time>        wait <time> milliseconds before responding.\n"
+        "                     E.g. /?t=500\n"
+        " - /?k=<enable>      Enable transfer encoding chunked with only one chunk if >0.\n"
+        " - /?R=<enable>      Enable sending random data if >0 (disables splicing).\n"
+        "\n"
+        "Note that those arguments may be cumulated on one line separated by a set of\n"
+        "delimitors among [&?,;/] :\n"
+        " -  GET /?s=20k&c=1&t=700&K=30r HTTP/1.0\n"
+        " -  GET /?r=500?s=0?c=0?t=1000 HTTP/1.0\n"
+        "\n";
+
+#define RESPSIZE 16384
+/* Number of bytes by body response line for common response */
+#define HS_COMMON_RESPONSE_LINE_SZ 50
+static char common_response[RESPSIZE];
+static char common_chunk_resp[RESPSIZE];
+static char *random_resp;
+static int random_resp_len = RESPSIZE;
+
+#define TRACE_SOURCE &trace_httpterm
+struct trace_source trace_httpterm;
+static void hterm_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
+                        const struct ist where, const struct ist func,
+                        const void *a1, const void *a2, const void *a3, const void *a4);
+
+static const struct  name_desc hterm_trace_logon_args[4] = {
+       /* arg1 */ { /* already used by the httpterm stream */ },
+       /* arg2 */ {
+               .name="httpterm",
+               .desc="httpterm server",
+       },
+       /* arg3 */ { },
+       /* arg4 */ { }
+};
+
+static const struct trace_event hterm_trace_events[] = {
+#define HS_EV_HSTRM_NEW      (1ULL << 0)
+       { .mask = HS_EV_HSTRM_NEW,      .name = "hstrm_new",      .desc = "new httpterm stream" },
+#define HS_EV_PROCESS_HSTRM  (1ULL << 1)
+       { .mask = HS_EV_PROCESS_HSTRM,  .name = "process_hstrm",  .desc = "httpterm stream processing" },
+#define HS_EV_HSTRM_SEND     (1ULL << 2)
+       { .mask = HS_EV_HSTRM_SEND,     .name = "hstrm_send",     .desc = "httpterm stream sending" },
+#define HS_EV_HSTRM_RECV     (1ULL << 3)
+       { .mask = HS_EV_HSTRM_RECV,     .name = "hstrm_recv",     .desc = "httpterm stream receiving" },
+#define HS_EV_HSTRM_IO_CB    (1ULL << 4)
+       { .mask = HS_EV_HSTRM_IO_CB,    .name = "hstrm_io_cb",    .desc = "httpterm stream I/O callback call" },
+#define HS_EV_HSTRM_RESP     (1ULL << 5)
+       { .mask = HS_EV_HSTRM_RESP,     .name = "hstrm_resp",     .desc = "build a HTTP response" },
+#define HS_EV_HSTRM_ADD_DATA (1ULL << 6)
+       { .mask = HS_EV_HSTRM_ADD_DATA, .name = "hstrm_add_data", .desc = "add data to HTX httpterm stream" },
+};
+
+static const struct name_desc hterm_trace_decoding[] = {
+#define HTERM_VERB_CLEAN 1
+       { .name = "clean", .desc = "only user-friendly stuff, generally suitable for level \"user\"" },
+};
+
+struct trace_source trace_httpterm = {
+       .name = IST("httpterm"),
+       .desc = "httpterm",
+       /* TRACE()'s first argument is always a httpterm stream */
+       .arg_def = TRC_ARG1_HSTRM,
+       .default_cb = hterm_trace,
+       .known_events = hterm_trace_events,
+       .lockon_args = hterm_trace_logon_args,
+       .decoding = hterm_trace_decoding,
+       .report_events = ~0, /* report everything by default */
+};
+
+INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
+
+static void hterm_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
+                        const struct ist where, const struct ist func,
+                        const void *a1, const void *a2, const void *a3, const void *a4)
+{
+       const struct hstream *hs = a1;
+
+       chunk_appendf(&trace_buf, " hs@%p ", hs);
+       if (hs) {
+               chunk_appendf(&trace_buf, " res=%u req_size=%llu to_write=%llu req_body=%llu",
+                             (unsigned int)b_data(&hs->res), hs->req_size, hs->to_write, hs->req_body);
+       }
+
+}
+
+/*
+ * This function prints the command line usage for httpterm and exits
+ */
+static void usage_httpterm(char *name)
+{
+       fprintf(stderr,
+               "Usage : %s -L [<ip>]:<clear port>:<TCP&QUIC SSL port> [-L...]\n"
+               "        -D goes daemon\n", name);
+       exit(1);
+}
+
+#define HTTPTERM_FRONTEND_NAME   "___httpterm_frontend___"
+#define HTTPTERM_RSA_CERT_NAME   "httpterm.pem.rsa"
+#define HTTPTERM_ECDSA_CERT_NAME "httpterm.pem.ecdsa"
+
+static const char *httpterm_cfg_str =
+        "defaults\n"
+            "\tmode httpterm\n"
+            "\ttimeout client 25s\n"
+        "\n"
+        "crt-store\n"
+            "\tload generate on keytype RSA crt "   HTTPTERM_RSA_CERT_NAME   " domains \"*.httpterm.local\" \n"
+            "\tload generate on keytype ECDSA crt " HTTPTERM_ECDSA_CERT_NAME " domains \"*.httpterm.local\" \n"
+        "\n"
+        "frontend " HTTPTERM_FRONTEND_NAME "\n";
+
+static const char *httpterm_cfg_traces_str =
+        "traces\n"
+        "    trace httpterm sink stderr level developer start now\n"
+        "    trace h1 sink stderr level developer start now\n"
+        "    trace h2 sink stderr level developer start now\n"
+        "    trace h3 sink stderr level developer start now\n"
+        "    trace qmux sink stderr level developer start now\n"
+        "    trace ssl sink stderr level developer start now\n";
+
+/* Simple function, to append <line> to <b> without without trailing '\0' character.
+ * Take into an account the '\t' and '\n' escaped sequeces.
+ */
+void hstream_str_buf_append(struct buffer *b, const char *line)
+{
+       const char *p, *end;
+       char *to = b_tail(b);
+
+       p = line;
+       end = line + strlen(line);
+       while (p < end && to < b_wrap(b)) {
+               if (*p == '\\') {
+                       if (!*++p || p >= end)
+                               break;
+                       if (*p == 'n')
+                               *to++ = '\n';
+                       else if (*p == 't')
+                               *to++ = '\t';
+                       p++;
+                       b_add(b, 1);
+               }
+               else {
+                       *to++ = *p++;
+                       b_add(b, 1);
+               }
+       }
+}
+
+/* Initialize <cfg> for <argv> arguments array for httpterm.
+ * Never fails.
+ */
+void init_httpterm_cfg(int argc, char **argv, struct cfgfile *cfg)
+{
+       struct buffer *global_buf = NULL, *fe_buf = NULL;
+
+       if (argc <= 1)
+               usage_httpterm(progname);
+
+       chunk_appendf(&trash, "%s", httpterm_cfg_str);
+       /* skip program name and start */
+       argc--; argv++;
+       while (argc > 0) {
+               char *opt;
+
+               if (**argv == '-') {
+                       opt = *argv + 1;
+                       if (*opt == 'd') {
+                               /* debug mode */
+                               httpterm_debug = 1;
+                       }
+                       else if (*opt == 'D') {
+                               global.mode |= MODE_DAEMON;
+                       }
+                       else if (*opt == 'F') {
+                               argv++; argc--;
+                               if (argc <= 0)
+                                       usage_httpterm(progname);
+
+                               if (!fe_buf) {
+                                       fe_buf = get_trash_chunk();
+                                       if (!fe_buf) {
+                                               ha_alert("failed to allocate a trash buffer.\n");
+                                               exit(1);
+                                       }
+                               }
+
+                               hstream_str_buf_append(fe_buf, *argv);
+                       }
+                       else if (*opt == 'G') {
+                               argv++; argc--;
+                               if (argc <= 0)
+                                       usage_httpterm(progname);
+
+                               if (!global_buf) {
+                                       global_buf = get_trash_chunk();
+                                       if (!global_buf) {
+                                               ha_alert("failed to allocate a trash buffer.\n");
+                                               exit(1);
+                                       }
+
+                                       chunk_appendf(global_buf, "global\n");
+                               }
+
+                               hstream_str_buf_append(global_buf, *argv);
+                       }
+                       else if (*opt == 'L') {
+                               /* binding */
+                               int i = 0, ipv6 = 0;
+                               char *ip, *port, *port1 = NULL, *port2 = NULL;
+
+                               argv++; argc--;
+                               if (argc == 0)
+                                       usage_httpterm(progname);
+
+                               port = ip = *argv;
+                               if (*ip == '[') {
+                                       /* IPv6 address */
+                                       ip++;
+                                       port = strchr(port, ']');
+                                       if (!port)
+                                               usage_httpterm(progname);
+                                       *port++ = '\0';
+                                       ipv6 = 1;
+                               }
+
+                               while ((port = strchr(port, ':'))) {
+                                       *port++ = '\0';
+                                       i++;
+                                       if (!port1)
+                                               port1 = port;
+                                       else
+                                               port2 = port;
+                               }
+
+                               if (i > 2)
+                                       usage_httpterm(progname);
+
+                               /* clear HTTP */
+                               chunk_appendf(&trash, "\tbind %s:%s\n", ip, port1);
+                               if (port2) {
+                                       chunk_appendf(&trash, "\tbind %s:%s alpn h2,http1.1,http1.0 ssl"
+                                                     " crt " HTTPTERM_RSA_CERT_NAME "\n", ip, port2);
+                                       chunk_appendf(&trash, "\tbind %s@%s:%s ssl"
+                                                     " crt " HTTPTERM_RSA_CERT_NAME "\n",
+                                                     ipv6 ? "quic6" : "quic4", ip, port2);
+                               }
+                       }
+                       else
+                               usage_httpterm(progname);
+               }
+               else
+                       usage_httpterm(progname);
+               argv++; argc--;
+       }
+
+       if (fe_buf)
+               chunk_appendf(&trash, "%.*s", (int)b_data(fe_buf), b_orig(fe_buf));
+       if (global_buf)
+               chunk_appendf(&trash, "%.*s", (int)b_data(global_buf), b_orig(global_buf));
+       if (httpterm_debug)
+               chunk_appendf(&trash, "%s", httpterm_cfg_traces_str);
+
+       cfg->filename = strdup("httpterm cfgfile");
+       cfg->content = strdup(trash.area);
+       cfg->size = b_data(&trash);
+}
+
+int hstream_buf_available(void *target)
+{
+       struct hstream *hs = target;
+
+       BUG_ON(!hs->sc);
+
+       if ((hs->flags & HS_ST_IN_ALLOC) && b_alloc(&hs->req, DB_CHANNEL)) {
+               hs->flags &= ~HS_ST_IN_ALLOC;
+               tasklet_wakeup(hs->sc->wait_event.tasklet);
+               return 1;
+       }
+
+       if ((hs->flags & HS_ST_OUT_ALLOC) && b_alloc(&hs->res, DB_CHANNEL)) {
+               hs->flags &= ~HS_ST_OUT_ALLOC;
+               tasklet_wakeup(hs->sc->wait_event.tasklet);
+               return 1;
+       }
+
+       return 0;
+}
+
+/*
+ * Allocate a buffer. If it fails, it adds the stream in buffer wait queue.
+ */
+struct buffer *hstream_get_buf(struct hstream *hs, struct buffer *bptr)
+{
+       struct buffer *buf = NULL;
+
+       if (likely(!LIST_INLIST(&hs->buf_wait.list)) &&
+           unlikely((buf = b_alloc(bptr, DB_CHANNEL)) == NULL)) {
+               b_queue(DB_CHANNEL, &hs->buf_wait, hs, hstream_buf_available);
+       }
+
+       return buf;
+}
+
+/*
+ * Release a buffer, if any, and try to wake up entities waiting in the buffer
+ * wait queue.
+ */
+void hstream_release_buf(struct hstream *hs, struct buffer *bptr)
+{
+       if (bptr->size) {
+               b_free(bptr);
+               offer_buffers(hs->buf_wait.target, 1);
+       }
+}
+
+/* Release <hs> httpterm stream */
+void hstream_free(struct hstream *hs)
+{
+       sc_destroy(hs->sc);
+       hstream_release_buf(hs, &hs->res);
+       hstream_release_buf(hs, &hs->req);
+       pool_free(pool_head_hstream, hs);
+}
+
+struct task *sc_hstream_io_cb(struct task *t, void *ctx, unsigned int state)
+{
+       struct stconn *sc = ctx;
+       struct connection *conn;
+       struct hstream *hs = __sc_hstream(sc);
+
+       TRACE_ENTER(HS_EV_HSTRM_IO_CB, hs);
+
+       conn = sc_conn(sc);
+       if (unlikely(!conn || conn->flags & CO_FL_ERROR || sc_ep_test(sc, SE_FL_ERROR))) {
+               TRACE_ERROR("connection error", HS_EV_HSTRM_IO_CB, hs);
+               hs->flags |= HS_ST_CONN_ERROR;
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+       }
+
+       if (((hs->res_before_req || !hs->to_write) && hs->req_body) ||
+           !htx_is_empty(htxbuf(&hs->req)))
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+
+       if (hs->to_write || !htx_is_empty(htxbuf(&hs->res)))
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+
+       TRACE_LEAVE(HS_EV_HSTRM_IO_CB, hs);
+       return t;
+}
+
+static int hstream_htx_buf_rcv(struct connection *conn, struct hstream *hs)
+{
+       int ret = 0;
+       struct buffer *buf;
+       size_t max, read, cur_read = 0;
+       int is_empty, read_poll = MAX_READ_POLL_LOOPS;
+
+       TRACE_ENTER(HS_EV_HSTRM_RECV, hs);
+
+       if (hs->sc->wait_event.events & SUB_RETRY_RECV) {
+               TRACE_DEVEL("waiting for data", HS_EV_HSTRM_RECV, hs);
+               goto wait_more_data;
+       }
+
+       if (sc_ep_test(hs->sc, SE_FL_EOS))
+               goto end_recv;
+
+       if (hs->flags & HS_ST_IN_ALLOC) {
+               TRACE_STATE("waiting for input buffer", HS_EV_HSTRM_RECV, hs);
+               goto wait_more_data;
+       }
+
+       buf = hstream_get_buf(hs, &hs->req);
+       if (!buf) {
+               TRACE_STATE("waiting for input buffer", HS_EV_HSTRM_RECV, hs);
+               hs->flags |= HS_ST_IN_ALLOC;
+               goto wait_more_data;
+       }
+
+       /* prepare to detect if the mux needs more room */
+       sc_ep_clr(hs->sc, SE_FL_WANT_ROOM);
+
+       while (sc_ep_test(hs->sc, SE_FL_RCV_MORE) ||
+              (!(conn->flags & CO_FL_ERROR) && !sc_ep_test(hs->sc, SE_FL_ERROR | SE_FL_EOS))) {
+               max = (IS_HTX_SC(hs->sc) ?  htx_free_space(htxbuf(&hs->req)) : b_room(&hs->req));
+               read = conn->mux->rcv_buf(hs->sc, &hs->req, max, 0);
+               cur_read += read;
+               if (!read ||
+                   sc_ep_test(hs->sc, SE_FL_WANT_ROOM) ||
+                   (--read_poll <= 0) ||
+                   (read < max && read >= global.tune.recv_enough))
+                       break;
+       }
+
+  end_recv:
+       is_empty = (IS_HTX_SC(hs->sc) ? htx_is_empty(htxbuf(&hs->req)) : !b_data(&hs->req));
+       hs->req_body -= cur_read;
+
+       if (is_empty && ((conn->flags & CO_FL_ERROR) || sc_ep_test(hs->sc, SE_FL_ERROR))) {
+               /* Report network errors only if we got no other data. Otherwise
+                * we'll let the upper layers decide whether the response is OK
+                * or not. It is very common that an RST sent by the server is
+                * reported as an error just after the last data chunk.
+                */
+               TRACE_ERROR("connection error during recv", HS_EV_HSTRM_RECV, hs);
+               goto stop;
+       }
+       else if ((hs->req_body || !cur_read) && !sc_ep_test(hs->sc, SE_FL_ERROR | SE_FL_EOS)) {
+               TRACE_DEVEL("subscribing for read data", HS_EV_HSTRM_RECV, hs);
+               conn->mux->subscribe(hs->sc, SUB_RETRY_RECV, &hs->sc->wait_event);
+               goto wait_more_data;
+       }
+
+       ret = 1;
+ leave:
+       hstream_release_buf(hs, &hs->req);
+       TRACE_PRINTF(TRACE_LEVEL_PROTO, HS_EV_HSTRM_RECV, hs, 0, 0, 0,
+                    "data received (%llu) ret=%d", (unsigned long long)cur_read, ret);
+       TRACE_LEAVE(HS_EV_HSTRM_RECV, hs);
+       return ret;
+ stop:
+       ret = 2;
+ wait_more_data:
+       ret = 3;
+       goto leave;
+}
+
+/* Send HTX data prepared for <hs> httpterm stream from <conn> connection */
+static int hstream_htx_buf_snd(struct connection *conn, struct hstream *hs)
+{
+       struct stconn *sc = hs->sc;
+       int ret = 0;
+       int nret;
+
+       TRACE_ENTER(HS_EV_HSTRM_SEND, hs);
+
+       if (!htxbuf(&hs->res)->data) {
+               /* This is possible after having drained the body, so after
+                * having sent the response when res_before_req=0.
+                */
+               ret = 1;
+               goto out;
+       }
+
+       nret = conn->mux->snd_buf(hs->sc, &hs->res, htxbuf(&hs->res)->data, 0);
+       if (nret <= 0) {
+               if (hs->flags & HS_ST_CONN_ERROR ||
+                   conn->flags & CO_FL_ERROR || sc_ep_test(sc, SE_FL_ERROR)) {
+                       TRACE_DEVEL("connection error during send", HS_EV_HSTRM_SEND, hs);
+                       goto out;
+               }
+       }
+
+       /* The HTX data are not fully sent if the last HTX data
+        * were not fully transfered or if there are remaining data
+        * to send (->to_write > 0).
+        */
+       if (!htx_is_empty(htxbuf(&hs->res)) || hs->to_write > 0) {
+               TRACE_DEVEL("data not fully sent, wait", HS_EV_HSTRM_SEND, hs);
+               conn->mux->subscribe(sc, SUB_RETRY_SEND, &sc->wait_event);
+       }
+
+       ret = 1;
+ out:
+       if (htx_is_empty(htxbuf(&hs->res)) || ret == 0) {
+               TRACE_DEVEL("releasing underlying buffer", HS_EV_HSTRM_SEND, hs);
+               hstream_release_buf(hs, &hs->res);
+       }
+
+       /* XXX TODO check the condition to shut this sc */
+       if (!hs->to_write && !hs->req_body && !b_data(&hs->res) && !b_data(&hs->req)) {
+               TRACE_DEVEL("shutting down stream", HS_EV_HSTRM_SEND, hs);
+               conn->mux->shut(sc, SE_SHW_SILENT|SE_SHW_NORMAL, NULL);
+       }
+
+       TRACE_LEAVE(HS_EV_HSTRM_SEND, hs);
+       return ret;
+}
+
+/* Build the help response for <hs> httpterm stream.
+ * Return 1 if succeed, 0 if not.
+ */
+
+static int hstream_build_http_help_resp(struct hstream *hs)
+{
+       int ret = 0;
+       struct buffer *buf;
+       struct htx *htx;
+       unsigned int flags = HTX_SL_F_IS_RESP | HTX_SL_F_XFER_LEN;
+       struct htx_sl *sl;
+
+       TRACE_ENTER(HS_EV_HSTRM_SEND, hs);
+
+       buf = hstream_get_buf(hs, &hs->res);
+       if (!buf) {
+               TRACE_ERROR("waiting for output buffer", HS_EV_HSTRM_SEND, hs);
+               hs->flags |= HS_ST_OUT_ALLOC;
+               goto err;
+       }
+
+       htx = htx_from_buf(buf);
+       sl = htx_add_stline(htx, HTX_BLK_RES_SL, flags, ist("HTTP/1.0"),
+                           ist("200"), IST_NULL);
+       if (!sl)
+               goto err;
+
+       if (!htx_add_header(htx, ist("Cache-Control"), ist("no-cache")) ||
+               !htx_add_header(htx, ist("Connection"), ist("close")) ||
+               !htx_add_header(htx, ist("Content-type"), ist("text/plain"))) {
+               TRACE_ERROR("could not add connection HTX header", HS_EV_HSTRM_SEND, hs);
+               goto err;
+       }
+
+       if (!htx_add_endof(htx, HTX_BLK_EOH)) {
+               TRACE_ERROR("could not add EOH HTX", HS_EV_HSTRM_SEND, hs);
+               goto err;
+       }
+
+       if (!htx_add_data_atonce(htx, ist2(HTTP_HELP, strlen(HTTP_HELP)))) {
+               TRACE_ERROR("unable to add payload to HTX message", HS_EV_HSTRM_SEND, hs);
+               goto err;
+       }
+
+       htx->flags |= HTX_FL_EOM;
+       htx_to_buf(htx, buf);
+       sl->info.res.status = 200;
+       ret = 1;
+leave:
+       TRACE_LEAVE(HS_EV_HSTRM_SEND, hs);
+       return ret;
+err:
+       hs->flags |= HS_ST_CONN_ERROR;
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_SEND, hs);
+       goto leave;
+}
+
+/* Build 100-continue HTX message.
+ * Return 1 if succeeded, 0 if not.
+ */
+static int hstream_build_http_100_continue_resp(struct hstream *hs)
+{
+       int ret = 0;
+       struct buffer *buf;
+       struct htx *htx;
+       unsigned int flags = HTX_SL_F_IS_RESP | HTX_SL_F_XFER_LEN;
+       struct htx_sl *sl;
+
+       TRACE_ENTER(HS_EV_HSTRM_SEND, hs);
+
+       buf = hstream_get_buf(hs, &hs->res);
+       if (!buf) {
+               TRACE_STATE("waiting for output buffer", HS_EV_HSTRM_SEND, hs);
+               hs->flags |= HS_ST_OUT_ALLOC;
+               goto err;
+       }
+
+       htx = htx_from_buf(buf);
+       sl = htx_add_stline(htx, HTX_BLK_RES_SL, flags, ist("HTTP/1.1"),
+                                               ist("100-continue"), IST_NULL);
+       if (!sl) {
+               TRACE_ERROR("could not add HTX start line", HS_EV_HSTRM_SEND, hs);
+               goto err;
+       }
+
+       if (!htx_add_endof(htx, HTX_BLK_EOH)) {
+               TRACE_ERROR("could not add EOH HTX", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       htx->flags |= HTX_FL_EOM;
+       htx_to_buf(htx, buf);
+       sl->info.res.status = 100;
+       ret = 1;
+leave:
+       TRACE_LEAVE(HS_EV_HSTRM_SEND, hs);
+       return ret;
+err:
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_SEND, hs);
+       goto leave;
+}
+
+int hstream_wake(struct stconn *sc)
+{
+       struct hstream *hs = __sc_hstream(sc);
+
+       tasklet_wakeup(hs->sc->wait_event.tasklet);
+       return 0;
+}
+
+static void hstream_add_data(struct htx *htx, struct hstream *hs)
+{
+       int ret;
+       char *data_ptr;
+       unsigned long long max;
+       unsigned int offset;
+       char *buffer;
+       size_t buffer_len;
+       int modulo;
+
+       TRACE_ENTER(HS_EV_HSTRM_ADD_DATA, hs);
+
+       if (hs->req_chunked) {
+               buffer = common_chunk_resp;
+               buffer_len = sizeof(common_chunk_resp);
+               modulo = sizeof(common_chunk_resp);
+       }
+       else if (hs->req_random) {
+               buffer = random_resp;
+               buffer_len = random_resp_len;
+               modulo = random_resp_len;
+       }
+       else {
+               buffer = common_response;
+               buffer_len = sizeof(common_response);
+               modulo = HS_COMMON_RESPONSE_LINE_SZ;
+       }
+
+       offset = (hs->req_size - hs->to_write) % modulo;
+       data_ptr = buffer + offset;
+       max = hs->to_write;
+       if (max > (unsigned long long)(buffer_len - offset))
+               max = (unsigned long long)(buffer_len - offset);
+
+       ret = htx_add_data(htx, ist2(data_ptr, max));
+       if (!ret)
+               TRACE_STATE("unable to add payload to HTX message", HS_EV_HSTRM_ADD_DATA, hs);
+
+       hs->to_write -= ret;
+leave:
+       TRACE_LEAVE(HS_EV_HSTRM_ADD_DATA, hs);
+       return;
+err:
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_ADD_DATA);
+       goto leave;
+}
+
+static int hstream_build_http_resp(struct hstream *hs)
+{
+       int ret = 0;
+       struct buffer *buf;
+       struct htx *htx;
+       unsigned int flags = HTX_SL_F_IS_RESP | HTX_SL_F_XFER_LEN | (!hs->req_chunked ?  HTX_SL_F_CLEN : 0);
+       struct htx_sl *sl;
+       char hdrbuf[128];
+
+       TRACE_ENTER(HS_EV_HSTRM_RESP, hs);
+
+       snprintf(hdrbuf, sizeof(hdrbuf), "%d", hs->req_code);
+       buf = hstream_get_buf(hs, &hs->res);
+       if (!buf) {
+               TRACE_ERROR("could not allocate response buffer", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       htx = htx_from_buf(buf);
+       sl = htx_add_stline(htx, HTX_BLK_RES_SL, flags,
+                           !(hs->ka & 4) ? ist("HTTP/1.0") : ist("HTTP/1.1"),
+                           ist(hdrbuf), IST_NULL);
+       if (!sl) {
+               TRACE_ERROR("could not add HTX start line", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       if ((hs->ka & 5) == 1) {
+               // HTTP/1.0 + KA
+               if (!htx_add_header(htx, ist("Connection"), ist("keep-alive"))) {
+                       TRACE_ERROR("could not add connection HTX header", HS_EV_HSTRM_RESP, hs);
+                       goto err;
+               }
+       }
+       else if ((hs->ka & 5) == 4) {
+               // HTTP/1.1 + close
+               if (!htx_add_header(htx, ist("Connection"), ist("close"))) {
+                       TRACE_ERROR("could not add connection HTX header", HS_EV_HSTRM_RESP, hs);
+                       goto err;
+               }
+       }
+
+       if (!hs->req_chunked && (hs->ka & 1)) {
+               char *end = ultoa_o(hs->req_size, trash.area, trash.size);
+               if (!htx_add_header(htx, ist("Content-Length"), ist2(trash.area, end - trash.area))) {
+                       TRACE_ERROR("could not add content-length HTX header", HS_EV_HSTRM_RESP, hs);
+                       goto err;
+               }
+       }
+
+       if (!hs->req_cache && !htx_add_header(htx, ist("Cache-control"), ist("no-cache"))) {
+               TRACE_ERROR("could not add cache-control HTX header", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       /* XXX TODO time?  XXX */
+       snprintf(hdrbuf, sizeof(hdrbuf), "time=%ld ms", 0L);
+       if (!htx_add_header(htx, ist("X-req"), ist(hdrbuf))) {
+               TRACE_ERROR("could not add x-req HTX header", HS_EV_HSTRM_RESP, hs);
+           goto err;
+       }
+
+       /* XXX TODO time? XXX */
+       snprintf(hdrbuf, sizeof(hdrbuf), "id=%s, code=%d, cache=%d,%s size=%lld, time=%d ms (%ld real)",
+                "dummy", hs->req_code, hs->req_cache,
+                        hs->req_chunked ? " chunked," : "",
+                        hs->req_size, 0, 0L);
+       if (!htx_add_header(htx, ist("X-rsp"), ist(hdrbuf))) {
+               TRACE_ERROR("could not add x-rsp HTX header", HS_EV_HSTRM_RESP, hs);
+           goto err;
+       }
+
+       if (!htx_add_endof(htx, HTX_BLK_EOH)) {
+               TRACE_ERROR("could not add EOH HTX", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+    if (hs->to_write > 0)
+           hstream_add_data(htx, hs);
+
+    if (!hs->to_write)
+               htx->flags |= HTX_FL_EOM;
+       htx_to_buf(htx, buf);
+
+       sl->info.res.status = hs->req_code;
+       ret = 1;
+ leave:
+       TRACE_LEAVE(HS_EV_HSTRM_RESP, hs);
+       return ret;
+ err:
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_RESP, hs);
+       goto leave;
+}
+
+
+/* Parse <hs> httpterm stream <uri> URI. This has as side effect to initialize
+ * some <hs> members.
+ */
+static void hstream_parse_uri(struct ist uri, struct hstream *hs)
+{
+       char *next;
+       char *end = istptr(uri) + istlen(uri);
+       char *arg;
+       long result, mult;
+       int use_rand;
+
+       /* we'll check for the following URIs :
+        * /?{s=<size>|r=<resp>|t=<time>|c=<cache>}[&{...}]
+        * /? to get the help page.
+        */
+       if ((next = strchr(istptr(uri), '?'))) {
+               next += 1;
+               arg = next;
+               if (next == end || *next == ' ') {
+                       /* request for help */
+                       hs->flags |= HS_ST_HTTP_HELP;
+                       return;
+               }
+
+               while (arg + 2 <= end && arg[1] == '=') {
+                       use_rand = 0;
+                       result = strtol(arg + 2, &next, 0);
+                       if (next > arg + 2) {
+                               mult = 0;
+                               do {
+                                       if (*next == 'k' || *next == 'K')
+                                               mult += 10;
+                                       else if (*next == 'm' || *next == 'M')
+                                               mult += 20;
+                                       else if (*next == 'g' || *next == 'G')
+                                               mult += 30;
+                                       else if (*next == 'r' || *next == 'R')
+                                               use_rand=1;
+                                       else
+                                               break;
+                                       next++;
+                               } while (*next);
+
+                               if (use_rand)
+                                       result = ((long long)random() * result) / ((long long)RAND_MAX + 1);
+
+                               switch (*arg) {
+                               case 's':
+                                       if (hs->req_meth != HTTP_METH_HEAD)
+                                               hs->req_size = (long long)result << mult;
+                                       break;
+                               case 'r':
+                                       hs->req_code = result << mult;
+                                       break;
+                               case 't':
+                                       hs->req_time = result << mult;
+                                       break;
+                               case 'c':
+                                       hs->req_cache = result << mult;
+                                       break;
+                               case 'A':
+                                       hs->res_before_req = result;
+                                       break;
+                               case 'C':
+                                       hs->ka = (hs->ka & 4) | 2 | !result;  // forced OFF
+                                       break;
+                               case 'K':
+                                       hs->ka = (hs->ka & 4) | 2 | !!result; // forced ON
+                                       break;
+                               case 'k':
+                                       hs->req_chunked = result;
+                                       break;
+                               case 'R':
+                                       hs->req_random = result;
+                                       break;
+                               }
+                               arg = next;
+                       }
+
+                       if (*arg == '&' || *arg == ';' || *arg == '/' || *arg == '?' || *arg == ',')
+                               arg++;
+                       else
+                               break;
+               }
+       }
+
+    hs->to_write = hs->req_size;
+}
+
+/* Prepare start line and headers response and push them to HTX.
+ * Return 1 if succeeded, 0 if not.
+ */
+static inline int hstream_sl_hdrs_snd_buf(struct hstream *hs, struct connection *conn)
+{
+       int ret = 0;
+
+       if ((hs->flags & HS_ST_HTTP_RESP_SL_SENT))
+               return 1;
+
+       if (hs->flags & HS_ST_HTTP_HELP) {
+               if (!hstream_build_http_help_resp(hs))
+                       goto out;
+       }
+       else {
+               if (!hstream_build_http_resp(hs))
+                       goto out;
+       }
+
+       hstream_htx_buf_snd(conn, hs);
+       hs->flags |= HS_ST_HTTP_RESP_SL_SENT;
+       ret = 1;
+ out:
+       return ret;
+}
+
+static struct task *process_hstream(struct task *t, void *context, unsigned int state)
+{
+       struct hstream *hs = context;
+       struct ist uri;
+    struct connection *conn = __sc_conn(hs->sc);
+       int rcvd;
+
+    TRACE_ENTER(HS_EV_PROCESS_HSTRM, hs);
+
+       if (unlikely(hs->flags & HS_ST_CONN_ERROR ||
+                    !conn || conn->flags & CO_FL_ERROR || sc_ep_test(hs->sc, SE_FL_ERROR))) {
+               TRACE_ERROR("connection error", HS_EV_PROCESS_HSTRM, hs);
+               goto out;
+       }
+
+    if (!(hs->flags & HS_ST_HTTP_GOT_HDRS)) {
+               struct htx *htx = htx_from_buf(&hs->req);
+               struct htx_sl *sl = http_get_stline(htx);
+               struct http_hdr_ctx expect, clength;
+
+               if (sl->flags & HTX_SL_F_VER_11)
+                       hs->ka = 5;
+
+               hs->req_meth = sl->info.req.meth;
+               hs->flags |= HS_ST_HTTP_GOT_HDRS;
+               uri = htx_sl_req_uri(http_get_stline(htx));
+               hstream_parse_uri(uri, hs);
+
+               clength.blk = NULL;
+               if (http_find_header(htx, ist("content-length"), &clength, 0)) {
+                       if (isttest(clength.value)) {
+                               if (strl2llrc(istptr(clength.value), istlen(clength.value),
+                                             (long long *)&hs->req_body) != 0) {
+                                       TRACE_ERROR("could not parse the content length",
+                                                   HS_EV_PROCESS_HSTRM, hs);
+                                       goto err;
+                               }
+                       }
+               }
+
+               expect.blk = NULL;
+               if (http_find_header(htx, ist("expect"), &expect, 0)) {
+                       hs->flags |= HS_ST_HTTP_EXPECT;
+                       if (hstream_build_http_100_continue_resp(hs))
+                               hstream_htx_buf_snd(conn, hs);
+               }
+
+               hstream_release_buf(hs, &hs->req);
+               if ((hs->res_before_req || !hs->to_write) && hs->req_body) {
+                       rcvd = hstream_htx_buf_rcv(conn, hs);
+                       if (rcvd == 3) {
+                               TRACE_STATE("waiting for more data", HS_EV_HSTRM_RESP, hs);
+                               goto leave;
+                       }
+               }
+
+               if (!hstream_sl_hdrs_snd_buf(hs, conn))
+                       goto err;
+       }
+       else {
+               struct buffer *buf;
+               struct htx *htx;
+
+ receive:
+               if ((hs->res_before_req || !hs->to_write) && hs->req_body) {
+                       rcvd = hstream_htx_buf_rcv(conn, hs);
+                       if (rcvd == 3) {
+                               TRACE_STATE("waiting for more data", HS_EV_HSTRM_RESP, hs);
+                               goto leave;
+                       }
+               }
+
+               if (!hstream_sl_hdrs_snd_buf(hs, conn))
+                       goto err;
+
+               buf = hstream_get_buf(hs, &hs->res);
+               if (!buf) {
+                       TRACE_ERROR("could not allocate response buffer", HS_EV_HSTRM_RESP, hs);
+                       goto err;
+               }
+
+               htx = htx_from_buf(buf);
+               if (hs->to_write)
+                       hstream_add_data(htx, hs);
+
+               if (!hs->to_write)
+                       htx->flags |= HTX_FL_EOM;
+               htx_to_buf(htx, &hs->res);
+               hstream_htx_buf_snd(conn, hs);
+               if (!hs->to_write && !hs->res_before_req && hs->req_body &&
+                   !(hs->flags & HS_ST_HTTP_RESP_SENT)) {
+                        /* Flag this stream has having sent the response to
+                         * prevent an infinite loop here.
+                         */
+                       hs->flags |= HS_ST_HTTP_RESP_SENT;
+                       goto receive;
+               }
+       }
+
+ out:
+       if (hs->flags & HS_ST_CONN_ERROR ||
+           (!hs->to_write && !hs->req_body &&
+            htx_is_empty(htxbuf(&hs->res)) &&
+            htx_is_empty(htxbuf(&hs->req)))) {
+               TRACE_STATE("releasing hstream", HS_EV_PROCESS_HSTRM, hs);
+               hstream_free(hs);
+               hs = NULL;
+               task_destroy(t);
+               t = NULL;
+       }
+
+ leave:
+    TRACE_LEAVE(HS_EV_PROCESS_HSTRM, hs);
+       return t;
+ err:
+       TRACE_DEVEL("leaving on error", HS_EV_PROCESS_HSTRM);
+       goto leave;
+}
+
+/* Allocate a httpter stream as this is done for classical haproxy streams.
+ * This function is called as proxy callback from muxes.
+ * Return the httpterm stream object if succeede, NUL if not.
+ */
+void *hstream_new(struct session *sess, struct stconn *sc, struct buffer *input)
+{
+       struct hstream *hs = NULL;
+       struct task *t = NULL;
+
+       TRACE_ENTER(HS_EV_HSTRM_NEW);
+
+       if (unlikely((hs = pool_alloc(pool_head_hstream)) == NULL)) {
+               TRACE_ERROR("stream allocation failure", HS_EV_HSTRM_NEW);
+               goto err;
+       }
+
+       if ((t = task_new_here()) == NULL) {
+               TRACE_ERROR("task allocation failure", HS_EV_HSTRM_NEW, hs);
+               goto err;
+       }
+
+       hs->obj_type = OBJ_TYPE_HTTPTERM;
+       hs->sess = sess;
+       hs->sc = sc;
+       hs->task = t;
+       hs->to_write = 0;
+
+       LIST_INIT(&hs->buf_wait.list);
+       hs->flags = 0;
+
+       hs->req_cache = 1;
+       hs->req_size = 0;
+       hs->req_body = 0;
+       hs->req_code = 200;
+       hs->req_time = 0;
+       hs->req_chunked = 0;
+       hs->req_random = 0;
+       hs->res_before_req = 1;
+       hs->ka = 0;
+
+       if (sc_conn(sc)) {
+               const struct mux_ops *mux = sc_mux_ops(sc);
+               if (mux && !(mux->flags & MX_FL_HTX)) {
+                       TRACE_ERROR("mux without HTX not supported",
+                                   HS_EV_HSTRM_NEW, hs);
+                       goto err;
+               }
+       }
+
+       if (sc_attach_hstream(hs->sc, hs) < 0) {
+               TRACE_ERROR("could not attach to stream connector",
+                           HS_EV_HSTRM_NEW, hs);
+               goto err;
+       }
+
+       TRACE_PRINTF(TRACE_LEVEL_PROTO, HS_EV_HSTRM_NEW, hs, 0, 0, 0,
+                    "stream initialized @%p", hs);
+       hs->res = BUF_NULL;
+       /* Xfer the input buffer */
+       if (!b_is_null(input)) {
+               hs->req = *input;
+               *input = BUF_NULL;
+       }
+
+       t->process = process_hstream;
+       t->context = hs;
+       t->expire = TICK_ETERNITY;
+       task_wakeup(hs->task, TASK_WOKEN_INIT);
+
+       TRACE_LEAVE(HS_EV_HSTRM_NEW, hs);
+       return hs;
+
+ err:
+       task_destroy(t);
+       pool_free(pool_head_hstream, hs);
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_NEW);
+       return NULL;
+}
+
+/* Build the response buffers.
+ * Return 1 if succeeded, -1 if failed.
+ */
+static int hstream_build_responses(void)
+{
+       int i;
+
+       for (i = 0; i < sizeof(common_response); i++) {
+               if (i % HS_COMMON_RESPONSE_LINE_SZ == HS_COMMON_RESPONSE_LINE_SZ - 1)
+                       common_response[i] = '\n';
+               else if (i % 10 == 0)
+                       common_response[i] = '.';
+               else
+                       common_response[i] = '0' + i % 10;
+       }
+
+       /* original httpterm chunk mode responses are made of 1-byte chunks
+        * but the haproxy muxes do not support this. At this time
+        * these reponses are handled the same way as for common
+        * responses with a pre-built buffer.
+        */
+       for (i = 0; i < sizeof(common_chunk_resp); i++)
+               common_chunk_resp[i] = '1';
+
+    random_resp = malloc(random_resp_len);
+    if (!random_resp)
+           return -1;
+
+    for (i = 0; i < random_resp_len; i++)
+               random_resp[i] = rand() >> 16;
+
+       return 1;
+}
+
+REGISTER_POST_CHECK(hstream_build_responses);
index dd7354352429814995dfcb3e0ec146ddde0ea717..55f0935c0a0c3428e9a880d9a8ec2b4f30002129 100644 (file)
@@ -31,6 +31,7 @@
 #include <haproxy/filters.h>
 #include <haproxy/global.h>
 #include <haproxy/guid.h>
+#include <haproxy/hstream.h>
 #include <haproxy/http_ana.h>
 #include <haproxy/http_htx.h>
 #include <haproxy/http_ext.h>
@@ -1544,6 +1545,7 @@ void init_new_proxy(struct proxy *p)
        /* Default to only allow L4 retries */
        p->retry_type = PR_RE_CONN_FAILED;
 
+       p->stream_new_from_sc = stream_new;
        guid_init(&p->guid);
 
        p->extra_counters_fe = NULL;
@@ -1909,6 +1911,7 @@ static int proxy_defproxy_cpy(struct proxy *curproxy, const struct proxy *defpro
                curproxy->clitcpka_cnt   = defproxy->clitcpka_cnt;
                curproxy->clitcpka_idle  = defproxy->clitcpka_idle;
                curproxy->clitcpka_intvl = defproxy->clitcpka_intvl;
+               curproxy->stream_new_from_sc = defproxy->stream_new_from_sc;
        }
 
        if (curproxy->cap & PR_CAP_BE) {
index 9d54dbff43c7ec41cb7957dfbb1028b54b0c8a9d..f098644002c2263c80e4f9f52d145324d765c263 100644 (file)
@@ -16,6 +16,7 @@
 #include <haproxy/connection.h>
 #include <haproxy/check.h>
 #include <haproxy/filters.h>
+#include <haproxy/hstream.h>
 #include <haproxy/http_ana.h>
 #include <haproxy/pipe.h>
 #include <haproxy/pool.h>
@@ -91,6 +92,15 @@ struct sc_app_ops sc_app_check_ops = {
        .name    = "CHCK",
 };
 
+struct sc_app_ops sc_app_hstream_ops = {
+       .chk_rcv = NULL,
+       .chk_snd = NULL,
+       .abort   = NULL,
+       .shutdown= NULL,
+       .wake    = hstream_wake,
+       .name    = "HTTPTERM",
+};
+
 /* Initializes an endpoint */
 void sedesc_init(struct sedesc *sedesc)
 {
@@ -244,7 +254,7 @@ struct stconn *sc_new_from_endp(struct sedesc *sd, struct session *sess, struct
        sc = sc_new(sd);
        if (unlikely(!sc))
                return NULL;
-       if (unlikely(!stream_new(sess, sc, input))) {
+       if (unlikely(!sess->fe->stream_new_from_sc(sess, sc, input))) {
                sd->sc = NULL;
                if (sc->sedesc != sd) {
                        /* none was provided so sc_new() allocated one */
@@ -415,6 +425,30 @@ int sc_attach_strm(struct stconn *sc, struct stream *strm)
        return 0;
 }
 
+/* Attach a stconn to a httpterm layer and sets the relevant
+ * callbacks. Returns -1 on error and 0 on success. SE_FL_ORPHAN flag is
+ * removed. This function is called by a httpterm stream when it is created
+ * to attach it on the stream connector on the client side.
+ */
+int sc_attach_hstream(struct stconn *sc, struct hstream *hs)
+{
+       BUG_ON(!sc_ep_test(sc, SE_FL_T_MUX));
+
+       sc->app = &hs->obj_type;
+       sc_ep_clr(sc, SE_FL_ORPHAN);
+       sc_ep_report_read_activity(sc);
+       sc->wait_event.tasklet = tasklet_new();
+       if (!sc->wait_event.tasklet)
+               return -1;
+
+       sc->wait_event.tasklet->process = sc_hstream_io_cb;
+       sc->wait_event.tasklet->context = sc;
+       sc->wait_event.events = 0;
+
+       sc->app_ops = &sc_app_hstream_ops;
+       return 0;
+}
+
 /* Detaches the stconn from the endpoint, if any. For a connecrion, if a
  * mux owns the connection ->detach() callback is called. Otherwise, it means
  * the stream connector owns the connection. In this case the connection is closed
index f7709072901edfc31579f2974e94fc9bb801f6ac..e103346508f8df66d7b63dc26bd6e95ae0e69036 100644 (file)
@@ -344,7 +344,7 @@ int stream_buf_available(void *arg)
  * transfer to the stream and <input> is set to BUF_NULL. On error, <input>
  * buffer is unchanged and it is the caller responsibility to release it.
  */
-struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer *input)
+void *stream_new(struct session *sess, struct stconn *sc, struct buffer *input)
 {
        struct stream *s;
        struct task *t;