]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: haterm: add haterm HTTP server
authorFrederic Lecaille <flecaille@haproxy.com>
Wed, 11 Feb 2026 14:05:42 +0000 (15:05 +0100)
committerWilly Tarreau <w@1wt.eu>
Thu, 19 Feb 2026 14:10:37 +0000 (15:10 +0100)
Contrary to haproxy, httpterm does not support all the HTTP protocols.
Furthermore, it has become easier to handle inbound/outbound
connections / streams since the rework done at conn_stream level.

This patch implements httpterm HTTP server services into haproxy. To do
so, it proceeds the same way as for the TCP checks which use only one
stream connector, but on frontend side.

The makefile is modified to handle haterm.c in additions to all the C
files for haproxy to build new haterm program into haproxy, the haterm
server also instantiates a haterm stream (hstream struct) attached to a
stream connector for each incoming connection without backend stream
connector. This is the role of sc_new_from_endp() called by the muxes to
instantiate streams/hstreams.

As for stream_new(), hstream_new() instantiates a task named
process_hstream() (see haterm.c) which has the same role as
process_stream() but for haterm streams.

haterm into haproxy takes advantage of the HTTP muxes and HTX API to
support all the HTTP protocols supported by haproxy.

Makefile
doc/configuration.txt
include/haproxy/hstream-t.h [new file with mode: 0644]
include/haproxy/hstream.h [new file with mode: 0644]
include/haproxy/obj_type-t.h
include/haproxy/obj_type.h
include/haproxy/stconn.h
src/cfgparse-listen.c
src/haterm.c [new file with mode: 0644]
src/stconn.c

index 508bd75d72f8b927ec808fed85a5c50c88888dc0..da50e262563258279ca952178b2b311fc2248816 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1003,7 +1003,7 @@ OBJS += src/mux_h2.o src/mux_h1.o src/mux_fcgi.o src/log.o                \
         src/http_acl.o src/dict.o src/dgram.o src/pipe.o               \
         src/hpack-huff.o src/hpack-enc.o src/ebtree.o src/hash.o       \
         src/httpclient_cli.o src/version.o src/ncbmbuf.o src/ech.o     \
-        src/cfgparse-peers.o
+        src/cfgparse-peers.o src/haterm.o
 
 ifneq ($(TRACE),)
   OBJS += src/calltrace.o
index 3f351c8f0a4b1ee8b86918fead2b204347ef660f..b91c886c9bb828ad7bf9acdfaa8517561c316f0e 100644 (file)
@@ -9307,6 +9307,9 @@ mode { tcp|http|log|spop }
               processing and switching will be possible. This is the mode which
               brings HAProxy most of its value.
 
+    haterm    The frontend will work in haterm HTTP benchmark mode. This is
+              not supported by backends. See doc/haterm.txt for details.
+
     log       When used in a backend section, it will turn the backend into a
               log backend. Such backend can be used as a log destination for
               any "log" directive by using the "backend@<name>" syntax. Log
diff --git a/include/haproxy/hstream-t.h b/include/haproxy/hstream-t.h
new file mode 100644 (file)
index 0000000..e9ef52a
--- /dev/null
@@ -0,0 +1,36 @@
+#ifndef _HAPROXY_HSTREAM_T_H
+#define _HAPROXY_HSTREAM_T_H
+
+#include <haproxy/dynbuf-t.h>
+#include <haproxy/http-t.h>
+#include <haproxy/obj_type-t.h>
+
+/* hastream stream */
+struct hstream {
+       enum obj_type obj_type;
+       struct session *sess;
+
+       struct stconn *sc;
+       struct task *task;
+
+       struct buffer req;
+       struct buffer res;
+       unsigned long long to_write; /* #of response data bytes to write after headers */
+       struct buffer_wait buf_wait; /* Wait list for buffer allocation */
+
+       int flags;
+
+       int ka;                      /* .0: keep-alive  .1: forced  .2: http/1.1, .3: was_reused */
+       int req_cache;
+       unsigned long long req_size; /* values passed in the URI to override the server's */
+       unsigned long long req_body; /* remaining body to be consumed from the request */
+       int req_code;
+       int res_wait;                /* time to wait before replying in ms */
+       int res_time;
+       int req_chunked;
+       int req_random;
+       int req_after_res;           /* Drain the request body after having sent the response */
+       enum http_meth_t req_meth;
+};
+
+#endif /* _HAPROXY_HSTREAM_T_H */
diff --git a/include/haproxy/hstream.h b/include/haproxy/hstream.h
new file mode 100644 (file)
index 0000000..893adff
--- /dev/null
@@ -0,0 +1,12 @@
+#ifndef _HAPROXY_HSTREAM_H
+#define  _HAPROXY_HSTREAM_H
+
+#include <haproxy/cfgparse.h>
+#include <haproxy/hstream-t.h>
+
+struct task *sc_hstream_io_cb(struct task *t, void *ctx, unsigned int state);
+int hstream_wake(struct stconn *sc);
+void hstream_shutdown(struct stconn *sc);
+void *hstream_new(struct session *sess, struct stconn *sc, struct buffer *input);
+
+#endif /* _HAPROXY_HSTREAM_H */
index da2efbf86f3de8b8de788ece2a462d062b5a9139..fd232b34781085deac35c811e24c6b23dd0df7bc 100644 (file)
@@ -46,6 +46,7 @@ enum obj_type {
 #ifdef USE_QUIC
        OBJ_TYPE_DGRAM,        /* object is a struct quic_dgram */
 #endif
+       OBJ_TYPE_HATERM,       /* object is a struct hstream */
        OBJ_TYPE_ENTRIES       /* last one : number of entries */
 } __attribute__((packed)) ;
 
index 233f9d7ada096649f6d96aad432c6ddc35562b1c..bd850b9697fcbd76572737d436e247221c4dcfe0 100644 (file)
@@ -26,6 +26,7 @@
 #include <haproxy/applet-t.h>
 #include <haproxy/check-t.h>
 #include <haproxy/connection-t.h>
+#include <haproxy/hstream-t.h>
 #include <haproxy/listener-t.h>
 #include <haproxy/obj_type-t.h>
 #include <haproxy/pool.h>
@@ -189,6 +190,19 @@ static inline struct check *objt_check(enum obj_type *t)
        return __objt_check(t);
 }
 
+static inline struct hstream *__objt_hstream(enum obj_type *t)
+{
+       return container_of(t, struct hstream, obj_type);
+}
+
+static inline struct hstream *objt_hstream(enum obj_type *t)
+{
+       if (!t || *t != OBJ_TYPE_HATERM)
+               return NULL;
+
+       return __objt_hstream(t);
+}
+
 #ifdef USE_QUIC
 static inline struct quic_dgram *__objt_dgram(enum obj_type *t)
 {
index 3a2c5238c80af44105be5f872b287743a9783b14..5a906e623bfefcc1a55a43b0f4e51d2dc7506bb3 100644 (file)
@@ -24,6 +24,7 @@
 
 #include <haproxy/api.h>
 #include <haproxy/connection.h>
+#include <haproxy/hstream-t.h>
 #include <haproxy/htx-t.h>
 #include <haproxy/obj_type.h>
 #include <haproxy/stconn-t.h>
@@ -45,10 +46,12 @@ void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode);
 struct stconn *sc_new_from_endp(struct sedesc *sedesc, struct session *sess, struct buffer *input);
 struct stconn *sc_new_from_strm(struct stream *strm, unsigned int flags);
 struct stconn *sc_new_from_check(struct check *check, unsigned int flags);
+struct stconn *sc_new_from_haterm(struct sedesc *sd, struct session *sess, struct buffer *input);
 void sc_free(struct stconn *sc);
 
 int sc_attach_mux(struct stconn *sc, void *target, void *ctx);
 int sc_attach_strm(struct stconn *sc, struct stream *strm);
+int sc_attach_hstream(struct stconn *sc, struct hstream *hs);
 
 void sc_destroy(struct stconn *sc);
 int sc_reset_endp(struct stconn *sc);
@@ -331,6 +334,21 @@ static inline struct check *sc_check(const struct stconn *sc)
        return NULL;
 }
 
+/* Returns the haterm stream from a sc if the application is a
+ * haterm stream. Otherwise NULL is returned. __sc_hstream() returns the haterm
+ * stream without any control while sc_hstream() check the application type.
+ */
+static inline struct hstream *__sc_hstream(const struct stconn *sc)
+{
+       return __objt_hstream(sc->app);
+}
+static inline struct hstream *sc_hstream(const struct stconn *sc)
+{
+       if (obj_type(sc->app) == OBJ_TYPE_HATERM)
+               return __objt_hstream(sc->app);
+       return NULL;
+}
+
 /* Returns the name of the application layer's name for the stconn,
  * or "NONE" when none is attached.
  */
index 9a680ef09bfd2ab43720572a83c53ebec1483067..3bce9dd0b39e0d1f4b35ac6b16f09d0362e067b0 100644 (file)
@@ -18,6 +18,7 @@
 #include <haproxy/compression-t.h>
 #include <haproxy/connection.h>
 #include <haproxy/extcheck.h>
+#include <haproxy/hstream.h>
 #include <haproxy/http_ana.h>
 #include <haproxy/http_htx.h>
 #include <haproxy/http_ext.h>
@@ -645,9 +646,22 @@ int cfg_parse_listen(const char *file, int linenum, char **args, int kwm)
 
                mode = str_to_proxy_mode(args[1]);
                if (!mode) {
-                       ha_alert("parsing [%s:%d] : unknown proxy mode '%s'.\n", file, linenum, args[1]);
-                       err_code |= ERR_ALERT | ERR_FATAL;
-                       goto out;
+                       if (strcmp(args[1], "haterm") == 0) {
+                               if (!(curproxy->cap & PR_CAP_FE)) {
+                                       ha_alert("parsing [%s:%d] : mode haterm is only applicable"
+                                                " on proxies with frontend capability.\n", file, linenum);
+                                       err_code |= ERR_ALERT | ERR_FATAL;
+                                       goto out;
+                               }
+
+                               mode = PR_MODE_HTTP;
+                               curproxy->stream_new_from_sc = hstream_new;
+                       }
+                       else {
+                               ha_alert("parsing [%s:%d] : unknown proxy mode '%s'.\n", file, linenum, args[1]);
+                               err_code |= ERR_ALERT | ERR_FATAL;
+                               goto out;
+                       }
                }
                else if ((mode == PR_MODE_SYSLOG || mode == PR_MODE_SPOP) &&
                         !(curproxy->cap & PR_CAP_BE)) {
diff --git a/src/haterm.c b/src/haterm.c
new file mode 100644 (file)
index 0000000..672a699
--- /dev/null
@@ -0,0 +1,1028 @@
+#include <haproxy/buf.h>
+#include <haproxy/cfgparse.h>
+#include <haproxy/chunk.h>
+#include <haproxy/global.h>
+#include <haproxy/hstream-t.h>
+#include <haproxy/http_htx.h>
+#include <haproxy/http.h>
+#include <haproxy/pool.h>
+#include <haproxy/proxy-t.h>
+#include <haproxy/sc_strm.h>
+#include <haproxy/stconn-t.h>
+#include <haproxy/stream.h>
+#include <haproxy/task-t.h>
+#include <haproxy/trace.h>
+#include <haproxy/version.h>
+
+DECLARE_TYPED_POOL(pool_head_hstream, "hstream", struct hstream);
+
+/* haterm stream state flags */
+#define HS_ST_IN_ALLOC          0x0001
+#define HS_ST_OUT_ALLOC         0x0002
+#define HS_ST_CONN_ERROR        0x0004
+#define HS_ST_HTTP_GOT_HDRS     0x0008
+#define HS_ST_HTTP_HELP         0x0010
+#define HS_ST_HTTP_EXPECT       0x0020
+#define HS_ST_HTTP_RESP_SL_SENT 0x0040
+
+const char *HTTP_HELP =
+       "HAProxy's dummy HTTP server for benchmarks - version " HAPROXY_VERSION ".\n"
+        "All integer argument values are in the form [digits]*[kmgr] (r=random(0..1)).\n"
+        "The following arguments are supported to override the default objects :\n"
+        " - /?s=<size>        return <size> bytes.\n"
+        "                     E.g. /?s=20k\n"
+        " - /?r=<retcode>     present <retcode> as the HTTP return code.\n"
+        "                     E.g. /?r=404\n"
+        " - /?c=<cache>       set the return as not cacheable if <1.\n"
+        "                     E.g. /?c=0\n"
+        " - /?A=<req-after>   drain the request body after sending the response.\n"
+        "                     E.g. /?A=1\n"
+        " - /?C=<close>       force the response to use close if >0.\n"
+        "                     E.g. /?C=1\n"
+        " - /?K=<keep-alive>  force the response to use keep-alive if >0.\n"
+        "                     E.g. /?K=1\n"
+        " - /?t=<time>        wait <time> milliseconds before responding.\n"
+        "                     E.g. /?t=500\n"
+        " - /?k=<enable>      Enable transfer encoding chunked with only one chunk if >0.\n"
+        " - /?R=<enable>      Enable sending random data if >0.\n"
+        "\n"
+        "Note that those arguments may be cumulated on one line separated by a set of\n"
+        "delimitors among [&?,;/] :\n"
+        " -  GET /?s=20k&c=1&t=700&K=30r HTTP/1.0\n"
+        " -  GET /?r=500?s=0?c=0?t=1000 HTTP/1.0\n"
+        "\n";
+
+/* Size in bytes of the prebuilts response buffers */
+#define RESPSIZE 16384
+/* Number of bytes by body response line */
+#define HS_COMMON_RESPONSE_LINE_SZ 50
+static char common_response[RESPSIZE];
+static char common_chunk_resp[RESPSIZE];
+static char *random_resp;
+static int random_resp_len = RESPSIZE;
+
+#define TRACE_SOURCE &trace_haterm
+struct trace_source trace_haterm;
+static void hterm_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
+                        const struct ist where, const struct ist func,
+                        const void *a1, const void *a2, const void *a3, const void *a4);
+
+static const struct  name_desc hterm_trace_logon_args[4] = {
+       /* arg1 */ { /* already used by the haterm stream */ },
+       /* arg2 */ {
+               .name="haterm",
+               .desc="haterm server",
+       },
+       /* arg3 */ { },
+       /* arg4 */ { }
+};
+
+static const struct trace_event hterm_trace_events[] = {
+#define HS_EV_HSTRM_NEW      (1ULL << 0)
+       { .mask = HS_EV_HSTRM_NEW,      .name = "hstrm_new",      .desc = "new haterm stream" },
+#define HS_EV_PROCESS_HSTRM  (1ULL << 1)
+       { .mask = HS_EV_PROCESS_HSTRM,  .name = "process_hstrm",  .desc = "haterm stream processing" },
+#define HS_EV_HSTRM_SEND     (1ULL << 2)
+       { .mask = HS_EV_HSTRM_SEND,     .name = "hstrm_send",     .desc = "haterm stream sending" },
+#define HS_EV_HSTRM_RECV     (1ULL << 3)
+       { .mask = HS_EV_HSTRM_RECV,     .name = "hstrm_recv",     .desc = "haterm stream receiving" },
+#define HS_EV_HSTRM_IO_CB    (1ULL << 4)
+       { .mask = HS_EV_HSTRM_IO_CB,    .name = "hstrm_io_cb",    .desc = "haterm stream I/O callback call" },
+#define HS_EV_HSTRM_RESP     (1ULL << 5)
+       { .mask = HS_EV_HSTRM_RESP,     .name = "hstrm_resp",     .desc = "build a HTTP response" },
+#define HS_EV_HSTRM_ADD_DATA (1ULL << 6)
+       { .mask = HS_EV_HSTRM_ADD_DATA, .name = "hstrm_add_data", .desc = "add data to HTX haterm stream" },
+};
+
+static const struct name_desc hterm_trace_decoding[] = {
+#define HATERM_VERB_CLEAN 1
+       { .name = "clean", .desc = "only user-friendly stuff, generally suitable for level \"user\"" },
+};
+
+struct trace_source trace_haterm = {
+       .name = IST("haterm"),
+       .desc = "haterm",
+       /* TRACE()'s first argument is always a haterm stream */
+       .arg_def = TRC_ARG1_HSTRM,
+       .default_cb = hterm_trace,
+       .known_events = hterm_trace_events,
+       .lockon_args = hterm_trace_logon_args,
+       .decoding = hterm_trace_decoding,
+       .report_events = ~0, /* report everything by default */
+};
+
+INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
+
+static void hterm_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
+                        const struct ist where, const struct ist func,
+                        const void *a1, const void *a2, const void *a3, const void *a4)
+{
+       const struct hstream *hs = a1;
+
+       chunk_appendf(&trace_buf, " hs@%p ", hs);
+       if (hs) {
+               chunk_appendf(&trace_buf, " res=%u req=%u req_size=%llu to_write=%llu req_body=%llu",
+                             (unsigned int)b_data(&hs->res), (unsigned int)b_data(&hs->res),
+                             hs->req_size, hs->to_write, hs->req_body);
+       }
+
+}
+
+int hstream_buf_available(void *target)
+{
+       struct hstream *hs = target;
+
+       BUG_ON(!hs->sc);
+
+       if ((hs->flags & HS_ST_IN_ALLOC) && b_alloc(&hs->req, DB_CHANNEL)) {
+               hs->flags &= ~HS_ST_IN_ALLOC;
+               TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+               return 1;
+       }
+
+       if ((hs->flags & HS_ST_OUT_ALLOC) && b_alloc(&hs->res, DB_CHANNEL)) {
+               hs->flags &= ~HS_ST_OUT_ALLOC;
+               TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+               return 1;
+       }
+
+       return 0;
+}
+
+/* Allocate a buffer. If it fails, it adds the stream in buffer wait queue */
+struct buffer *hstream_get_buf(struct hstream *hs, struct buffer *bptr)
+{
+       struct buffer *buf = NULL;
+
+       if (likely(!LIST_INLIST(&hs->buf_wait.list)) &&
+           unlikely((buf = b_alloc(bptr, DB_CHANNEL)) == NULL)) {
+               b_queue(DB_CHANNEL, &hs->buf_wait, hs, hstream_buf_available);
+       }
+
+       return buf;
+}
+
+/* Release a buffer, if any, and try to wake up entities waiting in the buffer
+ * wait queue.
+ */
+void hstream_release_buf(struct hstream *hs, struct buffer *bptr)
+{
+       if (bptr->size) {
+               b_free(bptr);
+               offer_buffers(hs->buf_wait.target, 1);
+       }
+}
+
+/* Release <hs> haterm stream */
+void hstream_free(struct hstream *hs)
+{
+       sc_destroy(hs->sc);
+       hstream_release_buf(hs, &hs->res);
+       hstream_release_buf(hs, &hs->req);
+       pool_free(pool_head_hstream, hs);
+}
+
+struct task *sc_hstream_io_cb(struct task *t, void *ctx, unsigned int state)
+{
+       struct stconn *sc = ctx;
+       struct connection *conn;
+       struct hstream *hs = __sc_hstream(sc);
+
+       TRACE_ENTER(HS_EV_HSTRM_IO_CB, hs);
+
+       conn = sc_conn(sc);
+       if (unlikely(!conn || conn->flags & CO_FL_ERROR || sc_ep_test(sc, SE_FL_ERROR))) {
+               TRACE_ERROR("connection error", HS_EV_HSTRM_IO_CB, hs);
+               hs->flags |= HS_ST_CONN_ERROR;
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+       }
+
+       if (((!hs->req_after_res || !hs->to_write) && hs->req_body) ||
+           !htx_is_empty(htxbuf(&hs->req))) {
+               TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+       }
+       else if (hs->to_write || !htx_is_empty(htxbuf(&hs->res))) {
+               TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+       }
+
+       TRACE_LEAVE(HS_EV_HSTRM_IO_CB, hs);
+       return t;
+}
+
+static int hstream_htx_buf_rcv(struct connection *conn, struct hstream *hs)
+{
+       int ret = 0;
+       struct buffer *buf;
+       size_t max, read = 0, cur_read = 0;
+       int is_empty;
+       int fin = 0;
+
+       TRACE_ENTER(HS_EV_HSTRM_RECV, hs);
+
+       if (hs->sc->wait_event.events & SUB_RETRY_RECV) {
+               TRACE_DEVEL("subscribed for RECV, waiting for data", HS_EV_HSTRM_RECV, hs);
+               goto wait_more_data;
+       }
+
+       if (sc_ep_test(hs->sc, SE_FL_EOS)) {
+               TRACE_STATE("end of stream", HS_EV_HSTRM_RECV, hs);
+               goto end_recv;
+       }
+
+       if (hs->flags & HS_ST_IN_ALLOC) {
+               TRACE_STATE("waiting for input buffer", HS_EV_HSTRM_RECV, hs);
+               goto wait_more_data;
+       }
+
+       buf = hstream_get_buf(hs, &hs->req);
+       if (!buf) {
+               TRACE_STATE("waiting for input buffer", HS_EV_HSTRM_RECV, hs);
+               hs->flags |= HS_ST_IN_ALLOC;
+               goto wait_more_data;
+       }
+
+       while (sc_ep_test(hs->sc, SE_FL_RCV_MORE) ||
+              (!(conn->flags & CO_FL_ERROR) && !sc_ep_test(hs->sc, SE_FL_ERROR | SE_FL_EOS))) {
+               htx_reset(htxbuf(&hs->req));
+               max = (IS_HTX_SC(hs->sc) ?  htx_free_space(htxbuf(&hs->req)) : b_room(&hs->req));
+               sc_ep_clr(hs->sc, SE_FL_WANT_ROOM);
+               read = conn->mux->rcv_buf(hs->sc, &hs->req, max, 0);
+               cur_read += read;
+               if (!htx_expect_more(htxbuf(&hs->req))) {
+                   fin = 1;
+                   break;
+               }
+
+               if (!read)
+                       break;
+       }
+
+ end_recv:
+       is_empty = (IS_HTX_SC(hs->sc) ? htx_is_empty(htxbuf(&hs->req)) : !b_data(&hs->req));
+       hs->req_body -= cur_read;
+
+       if (is_empty && ((conn->flags & CO_FL_ERROR) || sc_ep_test(hs->sc, SE_FL_ERROR))) {
+               /* Report network errors only if we got no other data. Otherwise
+                * we'll let the upper layers decide whether the response is OK
+                * or not. It is very common that an RST sent by the server is
+                * reported as an error just after the last data chunk.
+                */
+               TRACE_ERROR("connection error during recv", HS_EV_HSTRM_RECV, hs);
+               goto stop;
+       }
+       else if (!read && !fin && !sc_ep_test(hs->sc, SE_FL_ERROR | SE_FL_EOS)) {
+               TRACE_DEVEL("subscribing for read data", HS_EV_HSTRM_RECV, hs);
+               conn->mux->subscribe(hs->sc, SUB_RETRY_RECV, &hs->sc->wait_event);
+               goto wait_more_data;
+       }
+
+       ret = 1;
+ leave:
+       hstream_release_buf(hs, &hs->req);
+       TRACE_PRINTF(TRACE_LEVEL_PROTO, HS_EV_HSTRM_RECV, hs, 0, 0, 0,
+                    "data received (%llu) ret=%d read=%d fin=%d",
+                    (unsigned long long)cur_read, ret, (int)read, fin);
+       TRACE_LEAVE(HS_EV_HSTRM_RECV, hs);
+       return ret;
+ stop:
+       ret = 2;
+       goto leave;
+ wait_more_data:
+       ret = 3;
+       goto leave;
+}
+
+/* Send HTX data prepared for <hs> haterm stream from <conn> connection */
+static int hstream_htx_buf_snd(struct connection *conn, struct hstream *hs)
+{
+       struct stconn *sc = hs->sc;
+       int ret = 0;
+       int nret;
+
+       TRACE_ENTER(HS_EV_HSTRM_SEND, hs);
+
+       if (!htxbuf(&hs->res)->data) {
+               /* This is possible after having drained the body, so after
+                * having sent the response here when req_after_res=1.
+                */
+               ret = 1;
+               goto out;
+       }
+
+       nret = conn->mux->snd_buf(hs->sc, &hs->res, htxbuf(&hs->res)->data, 0);
+       if (nret <= 0) {
+               if (hs->flags & HS_ST_CONN_ERROR ||
+                   conn->flags & CO_FL_ERROR || sc_ep_test(sc, SE_FL_ERROR)) {
+                       TRACE_DEVEL("connection error during send", HS_EV_HSTRM_SEND, hs);
+                       goto out;
+               }
+       }
+
+       /* The HTX data are not fully sent if the last HTX data
+        * were not fully transfered or if there are remaining data
+        * to send (->to_write > 0).
+        */
+       if (!htx_is_empty(htxbuf(&hs->res))) {
+               TRACE_DEVEL("data not fully sent, wait", HS_EV_HSTRM_SEND, hs);
+               conn->mux->subscribe(sc, SUB_RETRY_SEND, &sc->wait_event);
+       }
+       else if (hs->to_write) {
+               TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+               task_wakeup(hs->task, TASK_WOKEN_IO);
+       }
+
+       ret = 1;
+ out:
+       if (htx_is_empty(htxbuf(&hs->res)) || ret == 0) {
+               TRACE_DEVEL("releasing underlying buffer", HS_EV_HSTRM_SEND, hs);
+               hstream_release_buf(hs, &hs->res);
+       }
+
+       TRACE_LEAVE(HS_EV_HSTRM_SEND, hs);
+       return ret;
+}
+
+/* Build the help response for <hs> haterm stream.
+ * Return 1 if succeed, 0 if not.
+ */
+static int hstream_build_http_help_resp(struct hstream *hs)
+{
+       int ret = 0;
+       struct buffer *buf;
+       struct htx *htx;
+       unsigned int flags = HTX_SL_F_IS_RESP | HTX_SL_F_XFER_LEN;
+       struct htx_sl *sl;
+
+       TRACE_ENTER(HS_EV_HSTRM_SEND, hs);
+
+       buf = hstream_get_buf(hs, &hs->res);
+       if (!buf) {
+               TRACE_ERROR("waiting for output buffer", HS_EV_HSTRM_SEND, hs);
+               hs->flags |= HS_ST_OUT_ALLOC;
+               goto err;
+       }
+
+       htx = htx_from_buf(buf);
+       sl = htx_add_stline(htx, HTX_BLK_RES_SL, flags, ist("HTTP/1.0"),
+                           ist("200"), IST_NULL);
+       if (!sl)
+               goto err;
+
+       if (!htx_add_header(htx, ist("Cache-Control"), ist("no-cache")) ||
+               !htx_add_header(htx, ist("Connection"), ist("close")) ||
+               !htx_add_header(htx, ist("Content-type"), ist("text/plain"))) {
+               TRACE_ERROR("could not add connection HTX header", HS_EV_HSTRM_SEND, hs);
+               goto err;
+       }
+
+       if (!htx_add_endof(htx, HTX_BLK_EOH)) {
+               TRACE_ERROR("could not add EOH HTX", HS_EV_HSTRM_SEND, hs);
+               goto err;
+       }
+
+       if (!htx_add_data_atonce(htx, ist2(HTTP_HELP, strlen(HTTP_HELP)))) {
+               TRACE_ERROR("unable to add payload to HTX message", HS_EV_HSTRM_SEND, hs);
+               goto err;
+       }
+
+       htx->flags |= HTX_FL_EOM;
+       htx_to_buf(htx, buf);
+       sl->info.res.status = 200;
+       ret = 1;
+leave:
+       TRACE_LEAVE(HS_EV_HSTRM_SEND, hs);
+       return ret;
+err:
+       hs->flags |= HS_ST_CONN_ERROR;
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_SEND, hs);
+       goto leave;
+}
+
+/* Build 100-continue HTX message.
+ * Return 1 if succeeded, 0 if not.
+ */
+static int hstream_build_http_100_continue_resp(struct hstream *hs)
+{
+       int ret = 0;
+       struct buffer *buf;
+       struct htx *htx;
+       unsigned int flags = HTX_SL_F_IS_RESP | HTX_SL_F_XFER_LEN;
+       struct htx_sl *sl;
+
+       TRACE_ENTER(HS_EV_HSTRM_SEND, hs);
+
+       buf = hstream_get_buf(hs, &hs->res);
+       if (!buf) {
+               TRACE_STATE("waiting for output buffer", HS_EV_HSTRM_SEND, hs);
+               hs->flags |= HS_ST_OUT_ALLOC;
+               goto err;
+       }
+
+       htx = htx_from_buf(buf);
+       sl = htx_add_stline(htx, HTX_BLK_RES_SL, flags, ist("HTTP/1.1"),
+                                               ist("100-continue"), IST_NULL);
+       if (!sl) {
+               TRACE_ERROR("could not add HTX start line", HS_EV_HSTRM_SEND, hs);
+               goto err;
+       }
+
+       if (!htx_add_endof(htx, HTX_BLK_EOH)) {
+               TRACE_ERROR("could not add EOH HTX", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       htx->flags |= HTX_FL_EOM;
+       htx_to_buf(htx, buf);
+       sl->info.res.status = 100;
+       ret = 1;
+leave:
+       TRACE_LEAVE(HS_EV_HSTRM_SEND, hs);
+       return ret;
+err:
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_SEND, hs);
+       goto leave;
+}
+
+int hstream_wake(struct stconn *sc)
+{
+       struct hstream *hs = __sc_hstream(sc);
+
+       TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+       task_wakeup(hs->task, TASK_WOKEN_IO);
+       return 0;
+}
+
+/* Add data to HTX response buffer from pre-built responses */
+static void hstream_add_data(struct htx *htx, struct hstream *hs)
+{
+       int ret;
+       char *data_ptr;
+       unsigned long long max;
+       unsigned int offset;
+       char *buffer;
+       size_t buffer_len;
+       int modulo;
+
+       TRACE_ENTER(HS_EV_HSTRM_ADD_DATA, hs);
+
+       if (hs->req_chunked) {
+               buffer = common_chunk_resp;
+               buffer_len = sizeof(common_chunk_resp);
+               modulo = sizeof(common_chunk_resp);
+       }
+       else if (hs->req_random) {
+               buffer = random_resp;
+               buffer_len = random_resp_len;
+               modulo = random_resp_len;
+       }
+       else {
+               buffer = common_response;
+               buffer_len = sizeof(common_response);
+               modulo = HS_COMMON_RESPONSE_LINE_SZ;
+       }
+
+       offset = (hs->req_size - hs->to_write) % modulo;
+       data_ptr = buffer + offset;
+       max = hs->to_write;
+       if (max > (unsigned long long)(buffer_len - offset))
+               max = (unsigned long long)(buffer_len - offset);
+
+       ret = htx_add_data(htx, ist2(data_ptr, max));
+       if (!ret)
+               TRACE_STATE("unable to add payload to HTX message", HS_EV_HSTRM_ADD_DATA, hs);
+
+       hs->to_write -= ret;
+leave:
+       TRACE_LEAVE(HS_EV_HSTRM_ADD_DATA, hs);
+       return;
+err:
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_ADD_DATA);
+       goto leave;
+}
+
+/* Build the HTTP response with eventually some BODY data depending on ->to_write
+ * value. Return 1 if succeeded, 0 if not.
+ */
+static int hstream_build_http_resp(struct hstream *hs)
+{
+       int ret = 0;
+       struct buffer *buf;
+       struct htx *htx;
+       unsigned int flags = HTX_SL_F_IS_RESP | HTX_SL_F_XFER_LEN | (!hs->req_chunked ?  HTX_SL_F_CLEN : 0);
+       struct htx_sl *sl;
+       char hdrbuf[128];
+
+       TRACE_ENTER(HS_EV_HSTRM_RESP, hs);
+
+       snprintf(hdrbuf, sizeof(hdrbuf), "%d", hs->req_code);
+       buf = hstream_get_buf(hs, &hs->res);
+       if (!buf) {
+               TRACE_ERROR("could not allocate response buffer", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       htx = htx_from_buf(buf);
+       sl = htx_add_stline(htx, HTX_BLK_RES_SL, flags,
+                           !(hs->ka & 4) ? ist("HTTP/1.0") : ist("HTTP/1.1"),
+                           ist(hdrbuf), IST_NULL);
+       if (!sl) {
+               TRACE_ERROR("could not add HTX start line", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       if ((hs->ka & 5) == 1) {
+               // HTTP/1.0 + KA
+               if (!htx_add_header(htx, ist("Connection"), ist("keep-alive"))) {
+                       TRACE_ERROR("could not add connection HTX header", HS_EV_HSTRM_RESP, hs);
+                       goto err;
+               }
+       }
+       else if ((hs->ka & 5) == 4) {
+               // HTTP/1.1 + close
+               if (!htx_add_header(htx, ist("Connection"), ist("close"))) {
+                       TRACE_ERROR("could not add connection HTX header", HS_EV_HSTRM_RESP, hs);
+                       goto err;
+               }
+       }
+
+       if (!hs->req_chunked && (hs->ka & 1)) {
+               char *end = ultoa_o(hs->req_size, trash.area, trash.size);
+               if (!htx_add_header(htx, ist("Content-Length"), ist2(trash.area, end - trash.area))) {
+                       TRACE_ERROR("could not add content-length HTX header", HS_EV_HSTRM_RESP, hs);
+                       goto err;
+               }
+       }
+
+       if (!hs->req_cache && !htx_add_header(htx, ist("Cache-control"), ist("no-cache"))) {
+               TRACE_ERROR("could not add cache-control HTX header", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       /* XXX TODO time?  XXX */
+       snprintf(hdrbuf, sizeof(hdrbuf), "time=%ld ms", 0L);
+       if (!htx_add_header(htx, ist("X-req"), ist(hdrbuf))) {
+               TRACE_ERROR("could not add x-req HTX header", HS_EV_HSTRM_RESP, hs);
+           goto err;
+       }
+
+       /* XXX TODO time? XXX */
+       snprintf(hdrbuf, sizeof(hdrbuf), "id=%s, code=%d, cache=%d,%s size=%lld, time=%d ms (%ld real)",
+                "dummy", hs->req_code, hs->req_cache,
+                        hs->req_chunked ? " chunked," : "",
+                        hs->req_size, 0, 0L);
+       if (!htx_add_header(htx, ist("X-rsp"), ist(hdrbuf))) {
+               TRACE_ERROR("could not add x-rsp HTX header", HS_EV_HSTRM_RESP, hs);
+           goto err;
+       }
+
+       if (!htx_add_endof(htx, HTX_BLK_EOH)) {
+               TRACE_ERROR("could not add EOH HTX", HS_EV_HSTRM_RESP, hs);
+               goto err;
+       }
+
+       if (hs->to_write > 0)
+               hstream_add_data(htx, hs);
+       if (hs->to_write <= 0)
+               htx->flags |= HTX_FL_EOM;
+       htx_to_buf(htx, buf);
+
+       sl->info.res.status = hs->req_code;
+       ret = 1;
+ leave:
+       TRACE_LEAVE(HS_EV_HSTRM_RESP, hs);
+       return ret;
+ err:
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_RESP, hs);
+       goto leave;
+}
+
+
+/* Parse <hs> haterm stream <uri> URI. This has as side effect to initialize
+ * some <hs> members.
+ */
+static void hstream_parse_uri(struct ist uri, struct hstream *hs)
+{
+       char *next = NULL, *arg;
+       char *p = istptr(uri);
+       char *end = p + istlen(uri);
+       long result, mult;
+       int use_rand;
+
+       /* we'll check for the following URIs :
+        * /?{s=<size>|r=<resp>|t=<time>|c=<cache>}[&{...}]
+        * /? to get the help page.
+        */
+       while (p < end)
+               if (*p++ == '?')
+                       next = p;
+
+       if (next) {
+               arg = next;
+               if (next == end || *next == ' ') {
+                       /* request for help */
+                       hs->flags |= HS_ST_HTTP_HELP;
+                       return;
+               }
+
+               while (arg + 2 <= end && arg[1] == '=') {
+                       use_rand = 0;
+                       next = NULL;
+                       result = strtol(arg + 2, &next, 0);
+                       if (next > arg + 2) {
+                               mult = 0;
+                               do {
+                                       if (*next == 'k' || *next == 'K')
+                                               mult += 10;
+                                       else if (*next == 'm' || *next == 'M')
+                                               mult += 20;
+                                       else if (*next == 'g' || *next == 'G')
+                                               mult += 30;
+                                       else if (*next == 'r' || *next == 'R')
+                                               use_rand=1;
+                                       else
+                                               break;
+                                       next++;
+                               } while (*next);
+
+                               if (use_rand)
+                                       result = ((long long)random() * result) / ((long long)RAND_MAX + 1);
+
+                               switch (*arg) {
+                               case 's':
+                                       if (hs->req_meth != HTTP_METH_HEAD)
+                                               hs->req_size = (long long)result << mult;
+                                       break;
+                               case 'r':
+                                       hs->req_code = result << mult;
+                                       break;
+                               case 't':
+                                       hs->res_wait = MS_TO_TICKS(result << mult);
+                                       break;
+                               case 'c':
+                                       hs->req_cache = result << mult;
+                                       break;
+                               case 'A':
+                                       hs->req_after_res = result;
+                                       break;
+                               case 'C':
+                                       hs->ka = (hs->ka & 4) | 2 | !result;  // forced OFF
+                                       break;
+                               case 'K':
+                                       hs->ka = (hs->ka & 4) | 2 | !!result; // forced ON
+                                       break;
+                               case 'k':
+                                       hs->req_chunked = result;
+                                       break;
+                               case 'R':
+                                       hs->req_random = result;
+                                       break;
+                               }
+                               arg = next;
+                       }
+
+                       if (*arg == '&' || *arg == ';' || *arg == '/' || *arg == '?' || *arg == ',')
+                               arg++;
+                       else
+                               break;
+               }
+       }
+
+       hs->to_write = hs->req_size;
+}
+
+/* Prepare start line and headers response and push them to HTX.
+ * Return 1 if succeeded, 0 if not.
+ */
+static inline int hstream_sl_hdrs_htx_buf_snd(struct hstream *hs,
+                                              struct connection *conn)
+{
+       int ret = 0;
+
+       if ((hs->flags & HS_ST_HTTP_RESP_SL_SENT))
+               return 1;
+
+       if (hs->flags & HS_ST_HTTP_HELP) {
+               if (!hstream_build_http_help_resp(hs))
+                       goto out;
+       }
+       else {
+               if (!hstream_build_http_resp(hs))
+                       goto out;
+       }
+
+       hstream_htx_buf_snd(conn, hs);
+       hs->flags |= HS_ST_HTTP_RESP_SL_SENT;
+       ret = 1;
+ out:
+       return ret;
+}
+
+/* Must be called before sending to determine if the body request must be
+ * drained asap before sending. Return 1 if this is the case, 0 if not.
+ * This is the case by default before sending the response except if
+ * the contrary has been asked with ->req_after_res=0.
+ * Return true if the body request has not been fully drained (->hs->req_body>0)
+ * and if the response has been sent (hs->to_write=0 &&
+ * htx_is_empty(htxbuf(&hs->res) or if it must not be drained after having
+ * sent the response (->req_after_res=0) or
+ */
+static inline int hstream_must_drain(struct hstream *hs)
+{
+       int ret;
+
+       TRACE_ENTER(HS_EV_PROCESS_HSTRM, hs);
+       ret = !(hs->flags & HS_ST_CONN_ERROR) && hs->req_body > 0 &&
+               ((!hs->to_write && htx_is_empty(htxbuf(&hs->res))) || !hs->req_after_res);
+       TRACE_LEAVE(HS_EV_PROCESS_HSTRM, hs);
+
+       return ret;
+}
+
+/* haterm stream processing task */
+static struct task *process_hstream(struct task *t, void *context, unsigned int state)
+{
+       struct hstream *hs = context;
+       struct ist uri;
+       struct connection *conn = __sc_conn(hs->sc);
+       int rcvd;
+
+       TRACE_ENTER(HS_EV_PROCESS_HSTRM, hs);
+
+       if (unlikely(hs->flags & HS_ST_CONN_ERROR ||
+                    !conn || conn->flags & CO_FL_ERROR || sc_ep_test(hs->sc, SE_FL_ERROR))) {
+               TRACE_ERROR("connection error", HS_EV_PROCESS_HSTRM, hs);
+               hs->flags |= HS_ST_CONN_ERROR;
+               goto out;
+       }
+
+       if (tick_isset(hs->res_time) && !tick_is_expired(hs->res_time, now_ms)) {
+               TRACE_STATE("waiting before responding", HS_EV_HSTRM_IO_CB, hs);
+               goto leave;
+       }
+
+       if (!(hs->flags & HS_ST_HTTP_GOT_HDRS)) {
+               struct htx *htx = htx_from_buf(&hs->req);
+               struct htx_sl *sl = http_get_stline(htx);
+               struct http_hdr_ctx expect, clength;
+
+               if (sl->flags & HTX_SL_F_VER_11)
+                       hs->ka = 5;
+
+               hs->req_meth = sl->info.req.meth;
+               hs->flags |= HS_ST_HTTP_GOT_HDRS;
+               uri = htx_sl_req_uri(http_get_stline(htx));
+               hstream_parse_uri(uri, hs);
+
+               clength.blk = NULL;
+               if (http_find_header(htx, ist("content-length"), &clength, 0)) {
+                       if (isttest(clength.value)) {
+                               if (strl2llrc(istptr(clength.value), istlen(clength.value),
+                                             (long long *)&hs->req_body) != 0) {
+                                       TRACE_ERROR("could not parse the content length",
+                                                   HS_EV_PROCESS_HSTRM, hs);
+                                       goto err;
+                               }
+                       }
+               }
+
+               expect.blk = NULL;
+               if (http_find_header(htx, ist("expect"), &expect, 0)) {
+                       hs->flags |= HS_ST_HTTP_EXPECT;
+                       if (hstream_build_http_100_continue_resp(hs))
+                               hstream_htx_buf_snd(conn, hs);
+               }
+
+               if (!htx_expect_more(htxbuf(&hs->req))) {
+                       /* The request body has always been fully received */
+                       TRACE_STATE("no more expected data", HS_EV_HSTRM_RESP, hs);
+                       hs->req_body = 0;
+               }
+
+               if (hstream_must_drain(hs)) {
+                       /* The request must be drained before sending the response (hs->req_after_res=0).
+                        * The body will be drained upon next wakeup.
+                        */
+                       TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+                       task_wakeup(hs->task, TASK_WOKEN_IO);
+                       goto out;
+               }
+
+               if (tick_isset(hs->res_wait)) {
+                       TRACE_STATE("task scheduled", HS_EV_HSTRM_IO_CB, hs);
+                       hs->res_time = tick_add(now_ms, hs->res_wait);
+                       task_schedule(t, hs->res_time);
+                       goto leave;
+               }
+
+               /* HTX send the start line and headers if not already sent */
+               if (!hstream_sl_hdrs_htx_buf_snd(hs, conn))
+                       goto err;
+
+               if (hstream_must_drain(hs)) {
+                       /* The request must be drained before sending the response (hs->req_after_res=0).
+                        * The body will be drained upon next wakeup.
+                        */
+                       TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+                       task_wakeup(hs->task, TASK_WOKEN_IO);
+                       goto out;
+               }
+       }
+       else {
+               struct buffer *buf;
+               struct htx *htx;
+
+               /* HTX RX part */
+               if (hstream_must_drain(hs)) {
+                       rcvd = hstream_htx_buf_rcv(conn, hs);
+                       if (rcvd == 3) {
+                               TRACE_STATE("waiting for more data", HS_EV_HSTRM_RESP, hs);
+                               goto out;
+                       }
+               }
+
+               if (tick_isset(hs->res_wait) && !tick_isset(hs->res_time)) {
+                       TRACE_STATE("task scheduled", HS_EV_HSTRM_IO_CB, hs);
+                       hs->res_time = tick_add(now_ms, hs->res_wait);
+                       task_schedule(t, hs->res_time);
+                       goto leave;
+               }
+
+               /* HTX send the start line and headers if not already sent */
+               if (!hstream_sl_hdrs_htx_buf_snd(hs, conn))
+                       goto err;
+
+               /* HTX TX part */
+               if (!hs->to_write && htx_is_empty(htxbuf(&hs->res)))
+                       goto out;
+
+               buf = hstream_get_buf(hs, &hs->res);
+               if (!buf) {
+                       TRACE_ERROR("could not allocate response buffer", HS_EV_HSTRM_RESP, hs);
+                       goto err;
+               }
+
+               htx = htx_from_buf(buf);
+               if (hs->to_write > 0)
+                       hstream_add_data(htx, hs);
+               if (hs->to_write <= 0)
+                       htx->flags |= HTX_FL_EOM;
+               htx_to_buf(htx, &hs->res);
+               hstream_htx_buf_snd(conn, hs);
+
+               if (hs->req_body && hs->req_after_res && !hs->to_write) {
+                       /* Response sending has just complete. The body will be drained upon
+                        * next wakeup.
+                        */
+                       TRACE_STATE("waking up task", HS_EV_HSTRM_IO_CB, hs);
+                       task_wakeup(hs->task, TASK_WOKEN_IO);
+                       goto out;
+               }
+       }
+
+ out:
+       if (!hs->to_write && !hs->req_body && htx_is_empty(htxbuf(&hs->res))) {
+               TRACE_DEVEL("shutting down stream", HS_EV_HSTRM_SEND, hs);
+               conn->mux->shut(hs->sc, SE_SHW_SILENT|SE_SHW_NORMAL, NULL);
+       }
+
+       if (hs->flags & HS_ST_CONN_ERROR ||
+           (!hs->to_write && !hs->req_body && htx_is_empty(htxbuf(&hs->res)))) {
+               TRACE_STATE("releasing hstream", HS_EV_PROCESS_HSTRM, hs);
+               hstream_free(hs);
+               hs = NULL;
+               task_destroy(t);
+               t = NULL;
+       }
+
+ leave:
+       TRACE_LEAVE(HS_EV_PROCESS_HSTRM, hs);
+       return t;
+ err:
+       TRACE_DEVEL("leaving on error", HS_EV_PROCESS_HSTRM);
+       goto leave;
+}
+
+/* Allocate a httpter stream as this is done for classical haproxy streams.
+ * This function is called as proxy callback from muxes.
+ * Return the haterm stream object if succeede, NUL if not.
+ */
+void *hstream_new(struct session *sess, struct stconn *sc, struct buffer *input)
+{
+       struct hstream *hs = NULL;
+       struct task *t = NULL;
+
+       TRACE_ENTER(HS_EV_HSTRM_NEW);
+
+       if (unlikely((hs = pool_alloc(pool_head_hstream)) == NULL)) {
+               TRACE_ERROR("stream allocation failure", HS_EV_HSTRM_NEW);
+               goto err;
+       }
+
+       if ((t = task_new_here()) == NULL) {
+               TRACE_ERROR("task allocation failure", HS_EV_HSTRM_NEW, hs);
+               goto err;
+       }
+
+       hs->obj_type = OBJ_TYPE_HATERM;
+       hs->sess = sess;
+       hs->sc = sc;
+       hs->task = t;
+       hs->req = BUF_NULL;
+       hs->res = BUF_NULL;
+       hs->to_write = 0;
+
+       LIST_INIT(&hs->buf_wait.list);
+       hs->flags = 0;
+
+       hs->ka = 0;
+       hs->req_cache = 1;
+       hs->req_size = 0;
+       hs->req_body = 0;
+       hs->req_code = 200;
+       hs->res_wait = TICK_ETERNITY;
+       hs->res_time = TICK_ETERNITY;
+       hs->req_chunked = 0;
+       hs->req_random = 0;
+       hs->req_after_res = 0;
+       hs->req_meth = HTTP_METH_OTHER;
+
+       if (sc_conn(sc)) {
+               const struct mux_ops *mux = sc_mux_ops(sc);
+               if (mux && !(mux->flags & MX_FL_HTX)) {
+                       TRACE_ERROR("mux without HTX not supported",
+                                   HS_EV_HSTRM_NEW, hs);
+                       goto err;
+               }
+       }
+
+       if (sc_attach_hstream(hs->sc, hs) < 0) {
+               TRACE_ERROR("could not attach to stream connector",
+                           HS_EV_HSTRM_NEW, hs);
+               goto err;
+       }
+
+       TRACE_PRINTF(TRACE_LEVEL_PROTO, HS_EV_HSTRM_NEW, hs, 0, 0, 0,
+                    "stream initialized @%p", hs);
+       hs->res = BUF_NULL;
+       /* Xfer the input buffer */
+       if (!b_is_null(input)) {
+               hs->req = *input;
+               *input = BUF_NULL;
+       }
+
+       t->process = process_hstream;
+       t->context = hs;
+       t->expire = TICK_ETERNITY;
+       task_wakeup(hs->task, TASK_WOKEN_INIT);
+
+       TRACE_LEAVE(HS_EV_HSTRM_NEW, hs);
+       return hs;
+
+ err:
+       task_destroy(t);
+       pool_free(pool_head_hstream, hs);
+       TRACE_DEVEL("leaving on error", HS_EV_HSTRM_NEW);
+       return NULL;
+}
+
+/* Build the response buffers.
+ * Return 1 if succeeded, -1 if failed.
+ */
+static int hstream_build_responses(void)
+{
+       int i;
+
+       for (i = 0; i < sizeof(common_response); i++) {
+               if (i % HS_COMMON_RESPONSE_LINE_SZ == HS_COMMON_RESPONSE_LINE_SZ - 1)
+                       common_response[i] = '\n';
+               else if (i % 10 == 0)
+                       common_response[i] = '.';
+               else
+                       common_response[i] = '0' + i % 10;
+       }
+
+       /* original haterm chunk mode responses are made of 1-byte chunks
+        * but the haproxy muxes do not support this. At this time
+        * these reponses are handled the same way as for common
+        * responses with a pre-built buffer.
+        */
+       for (i = 0; i < sizeof(common_chunk_resp); i++)
+               common_chunk_resp[i] = '1';
+
+       random_resp = malloc(random_resp_len);
+       if (!random_resp) {
+               ha_alert("not enough memore...\n");
+               return -1;
+       }
+
+       for (i = 0; i < random_resp_len; i++)
+               random_resp[i] = rand() >> 16;
+
+       return 1;
+}
+
+REGISTER_POST_CHECK(hstream_build_responses);
index 3f0c264af4c2733db70814a95efd13939cddb5ff..a07aa98f64415c30828a293584e4d062af23ba3d 100644 (file)
@@ -16,6 +16,7 @@
 #include <haproxy/connection.h>
 #include <haproxy/check.h>
 #include <haproxy/filters.h>
+#include <haproxy/hstream.h>
 #include <haproxy/http_ana.h>
 #include <haproxy/pipe.h>
 #include <haproxy/pool.h>
@@ -91,6 +92,15 @@ struct sc_app_ops sc_app_check_ops = {
        .name    = "CHCK",
 };
 
+struct sc_app_ops sc_app_hstream_ops = {
+       .chk_rcv = NULL,
+       .chk_snd = NULL,
+       .abort   = NULL,
+       .shutdown= NULL,
+       .wake    = hstream_wake,
+       .name    = "HTERM",
+};
+
 /* Initializes an endpoint */
 void sedesc_init(struct sedesc *sedesc)
 {
@@ -415,6 +425,30 @@ int sc_attach_strm(struct stconn *sc, struct stream *strm)
        return 0;
 }
 
+/* Attach a stconn to a haterm layer and sets the relevant
+ * callbacks. Returns -1 on error and 0 on success. SE_FL_ORPHAN flag is
+ * removed. This function is called by a haterm stream when it is created
+ * to attach it on the stream connector on the client side.
+ */
+int sc_attach_hstream(struct stconn *sc, struct hstream *hs)
+{
+       BUG_ON(!sc_ep_test(sc, SE_FL_T_MUX));
+
+       sc->app = &hs->obj_type;
+       sc_ep_clr(sc, SE_FL_ORPHAN);
+       sc_ep_report_read_activity(sc);
+       sc->wait_event.tasklet = tasklet_new();
+       if (!sc->wait_event.tasklet)
+               return -1;
+
+       sc->wait_event.tasklet->process = sc_hstream_io_cb;
+       sc->wait_event.tasklet->context = sc;
+       sc->wait_event.events = 0;
+
+       sc->app_ops = &sc_app_hstream_ops;
+       return 0;
+}
+
 /* Detaches the stconn from the endpoint, if any. For a connecrion, if a
  * mux owns the connection ->detach() callback is called. Otherwise, it means
  * the stream connector owns the connection. In this case the connection is closed