]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
*) mod_http2: added support for bootstrapping WebSockets via HTTP/2, as
authorStefan Eissing <icing@apache.org>
Tue, 20 Jun 2023 12:01:09 +0000 (12:01 +0000)
committerStefan Eissing <icing@apache.org>
Tue, 20 Jun 2023 12:01:09 +0000 (12:01 +0000)
     described in RFC 8441. A new directive 'H2WebSockets on|off' has been
     added. The feature is by default not enabled.
     As also discussed in the manual, this feature should work for setups
     using "ProxyPass backend-url upgrade=websocket" without further changes.
     Special server modules for WebSockets will have to be adapted,
     most likely, as the handling if IO events is different with HTTP/2.
     HTTP/2 WebSockets are supported on platforms with native pipes. This
     excludes Windows.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1910507 13f79535-47bb-0310-9956-ffa450edef68

41 files changed:
.github/workflows/linux.yml
CMakeLists.txt
changes-entries/h2_websockets.txt [new file with mode: 0644]
configure.in
docs/manual/mod/mod_http2.xml
include/ap_mmn.h
include/http_core.h
modules/http2/config2.m4
modules/http2/h2.h
modules/http2/h2_bucket_beam.c
modules/http2/h2_bucket_beam.h
modules/http2/h2_c1_io.c
modules/http2/h2_c2.c
modules/http2/h2_c2_filter.c
modules/http2/h2_config.c
modules/http2/h2_config.h
modules/http2/h2_conn_ctx.h
modules/http2/h2_mplx.c
modules/http2/h2_proxy_util.c
modules/http2/h2_push.c
modules/http2/h2_request.c
modules/http2/h2_session.c
modules/http2/h2_stream.c
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_version.h
modules/http2/h2_ws.c [new file with mode: 0644]
modules/http2/h2_ws.h [new file with mode: 0644]
modules/http2/mod_http2.c
modules/http2/mod_http2.dsp
modules/proxy/proxy_util.c
server/core.c
test/clients/.gitignore [new file with mode: 0644]
test/clients/Makefile.in [new file with mode: 0644]
test/clients/h2ws.c [new file with mode: 0644]
test/modules/http2/test_800_websockets.py [new file with mode: 0644]
test/modules/http2/ws_server.py [new file with mode: 0644]
test/pyhttpd/config.ini.in
test/pyhttpd/env.py
test/pyhttpd/ws_util.py [new file with mode: 0644]
test/travis_run_linux.sh

index 913a5bb36489ff9dbfaceb2585dfad62f228829e..beb84c0fdc37d217aed55de9c8052cd72d8cc1aa 100644 (file)
@@ -197,7 +197,7 @@ jobs:
           # -------------------------------------------------------------------------
           - name: HTTP/2 test suite
             config: --enable-mods-shared=reallyall --with-mpm=event --enable-mpms-shared=all
-            pkgs: curl python3-pytest nghttp2-client python3-cryptography python3-requests python3-multipart
+            pkgs: curl python3-pytest nghttp2-client python3-cryptography python3-requests python3-multipart python3-filelock python3-websockets
             env: |
               APR_VERSION=1.7.4
               APU_VERSION=1.6.3
@@ -228,7 +228,7 @@ jobs:
           ### TODO: fix caching here.
           - name: MOD_TLS test suite
             config: --enable-mods-shared=reallyall --with-mpm=event --enable-mpms-shared=event
-            pkgs: curl python3-pytest nghttp2-client python3-cryptography python3-requests python3-multipart cargo cbindgen
+            pkgs: curl python3-pytest nghttp2-client python3-cryptography python3-requests python3-multipart python3-filelock python3-websockets cargo cbindgen
             env: |
               APR_VERSION=1.7.4
               APU_VERSION=1.6.3
index daeccb397fa62b98867fffbddf74138231883b7e..869f72173f518442ea00dbb2159b11c0b9f6e021 100644 (file)
@@ -497,6 +497,7 @@ SET(mod_http2_extra_sources
   modules/http2/h2_request.c         modules/http2/h2_session.c
   modules/http2/h2_stream.c          modules/http2/h2_switch.c
   modules/http2/h2_util.c            modules/http2/h2_workers.c
+  modules/http2/h2_ws.c
 )
 SET(mod_ldap_extra_defines           LDAP_DECLARE_EXPORT)
 SET(mod_ldap_extra_libs              wldap32)
diff --git a/changes-entries/h2_websockets.txt b/changes-entries/h2_websockets.txt
new file mode 100644 (file)
index 0000000..0f2bc71
--- /dev/null
@@ -0,0 +1,10 @@
+  *) mod_http2: added support for bootstrapping WebSockets via HTTP/2, as
+     described in RFC 8441. A new directive 'H2WebSockets on|off' has been
+     added. The feature is by default not enabled.
+     As also discussed in the manual, this feature should work for setups
+     using "ProxyPass backend-url upgrade=websocket" without further changes.
+     Special server modules for WebSockets will have to be adapted,
+     most likely, as the handling if IO events is different with HTTP/2.
+     HTTP/2 WebSockets are supported on platforms with native pipes. This
+     excludes Windows.
+     [Stefan Eissing]
\ No newline at end of file
index daaae7d20044e1c2d1244bbec0652eb272a602f7..719f8ac5c303fce3af8800e4c9f29b5bdd9c719b 100644 (file)
@@ -999,6 +999,7 @@ APACHE_FAST_OUTPUT(support/Makefile)
 if test -d ./test; then
     APACHE_FAST_OUTPUT(test/Makefile)
     AC_CONFIG_FILES([test/pyhttpd/config.ini])
+    APACHE_FAST_OUTPUT(test/clients/Makefile)
 fi
 
 dnl ## Finalize the variables
index 926bb1ca5f2a6f8bc2d215e503c9870a9b1c6e8b..0b9590f05d2a9d678c9f008e53a4128f54be8c5d 100644 (file)
@@ -1082,4 +1082,42 @@ H2EarlyHint Link "&lt;/my.css&gt;;rel=preload;as=style"
         </usage>
     </directivesynopsis>
 
+    <directivesynopsis>
+        <name>H2WebSockets</name>
+        <description>En-/Disable WebSockets via HTTP/2</description>
+        <syntax>H2WebSockets  on|off</syntax>
+        <default>H2WebSockets off</default>
+        <contextlist>
+            <context>server config</context>
+            <context>virtual host</context>
+        </contextlist>
+        <compatibility>Available in version 2.5.1 and later.</compatibility>
+
+        <usage>
+            <p>
+                Use <directive>H2WebSockets</directive> to enable or disable
+                bootstrapping of WebSockets via the HTTP/2 protocol. This
+                protocol extension is defined in RFC 8441.
+            </p><p>
+                Such requests come as a CONNECT with an extra ':protocol'
+                header. Such requests are transformed inside the module to
+                their HTTP/1.1 equivalents before passing it to internal
+                processing.
+            </p><p>
+                This means that HTTP/2 WebSockets can be used for a
+                <directive module="mod_proxy">ProxyPass</directive> with
+                'upgrade=websocket' parameter without further changes.
+            </p><p>
+                For (3rd party) modules that handle WebSockets directly in the
+                server, the protocol bootstrapping itself will also work. However
+                the transfer of data does require extra support in case of HTTP/2.
+                The negotiated WebSocket will not be able to use the client connection
+                socket for polling IO related events.
+            </p><p>
+                Because enabling this feature might break backward compatibility
+                for such 3rd party modules, it is not enabled by default.
+            </p>
+        </usage>
+    </directivesynopsis>
+
 </modulesynopsis>
index b7acf6e4bfeb0c8704a8afc0ff006c3cb02ffbac..59af7ae08280cab0cdfb832fac52594fc28f5059 100644 (file)
  * 20211221.13 (2.5.1-dev) Add hook token_checker to check for authorization other
  *                         than username / password. Add autht_provider structure.
  * 20211221.14 (2.5.1-dev) Add request_rec->final_resp_passed bit
+ * 20211221.15 (2.5.1-dev) Add ap_get_pollfd_from_conn()
  */
 
 #define MODULE_MAGIC_COOKIE 0x41503235UL /* "AP25" */
 #ifndef MODULE_MAGIC_NUMBER_MAJOR
 #define MODULE_MAGIC_NUMBER_MAJOR 20211221
 #endif
-#define MODULE_MAGIC_NUMBER_MINOR 14             /* 0...n */
+#define MODULE_MAGIC_NUMBER_MINOR 15             /* 0...n */
 
 /**
  * Determine if the server's current MODULE_MAGIC_NUMBER is at least a
index 8610c98742912a63fd44370701e451716aa7ad57..7c4f166e09791df64e8ae242ee9a18cfb3ad52c7 100644 (file)
@@ -31,6 +31,7 @@
 #include "apr_optional.h"
 #include "util_filter.h"
 #include "ap_expr.h"
+#include "apr_poll.h"
 #include "apr_tables.h"
 
 #include "http_config.h"
@@ -1109,6 +1110,30 @@ AP_DECLARE(int) ap_state_query(int query_code);
  */
 AP_CORE_DECLARE(conn_rec *) ap_create_slave_connection(conn_rec *c);
 
+/** Get a apr_pollfd_t populated with descriptor and descriptor type
+ * and the timeout to use for it.
+ * @return APR_ENOTIMPL if not supported for a connection.
+ */
+AP_DECLARE_HOOK(apr_status_t, get_pollfd_from_conn,
+                (conn_rec *c, struct apr_pollfd_t *pfd,
+                 apr_interval_time_t *ptimeout))
+
+/**
+ * Pass in a `struct apr_pollfd_t*` and get `desc_type` and `desc`
+ * populated with a suitable value for polling connection input.
+ * For primary connection (c->master == NULL), this will be the connection
+ * socket. For secondary connections this may differ or not be available
+ * at all.
+ * Note that APR_NO_DESC may be set to indicate that the connection
+ * input is already closed.
+ *
+ * @param pfd  the pollfd to set the descriptor in
+ * @param ptimeout  != NULL to retrieve the timeout in effect
+ * @return ARP_SUCCESS when the information was assigned.
+ */
+AP_CORE_DECLARE(apr_status_t) ap_get_pollfd_from_conn(conn_rec *c,
+                                      struct apr_pollfd_t *pfd,
+                                      apr_interval_time_t *ptimeout);
 
 /** Macro to provide a default value if the pointer is not yet initialised
  */
index f89f5baa6c9b4ded2f06cc90c8fd93454caa7342..c4579c4dbbddda2a9b299f1c8aaf613fc2b92e65 100644 (file)
@@ -37,6 +37,7 @@ h2_stream.lo dnl
 h2_switch.lo dnl
 h2_util.lo dnl
 h2_workers.lo dnl
+h2_ws.lo dnl
 "
 
 dnl
index aa58b61a5a2201ce276bf1a24ef90edcdbfb06f9..4babbf81d6b5bc87d26c6bb5f060e6aa0db4798e 100644 (file)
@@ -62,6 +62,8 @@ extern const char *H2_MAGIC_TOKEN;
 #define H2_HEADER_AUTH_LEN   10
 #define H2_HEADER_PATH       ":path"
 #define H2_HEADER_PATH_LEN   5
+#define H2_HEADER_PROTO      ":protocol"
+#define H2_HEADER_PROTO_LEN  9
 #define H2_CRLF             "\r\n"
 
 /* Size of the frame header itself in HTTP/2 */
@@ -153,6 +155,7 @@ struct h2_request {
     const char *scheme;
     const char *authority;
     const char *path;
+    const char *protocol;
     apr_table_t *headers;
 
     apr_time_t request_time;
index 72baea3e337918049d76e44998096741886ee911..69782541aa498f5de00ecb15c7d63eb1a0bc3b99 100644 (file)
@@ -268,6 +268,7 @@ static void beam_shutdown(h2_bucket_beam *beam, apr_shutdown_how_e how)
     if (how == APR_SHUTDOWN_READWRITE) {
         beam->cons_io_cb = NULL;
         beam->recv_cb = NULL;
+        beam->eagain_cb = NULL;
     }
 
     /* shutdown sender (or both)? */
@@ -747,6 +748,9 @@ transfer:
 
 leave:
     H2_BEAM_LOG(beam, to, APLOG_TRACE2, rv, "end receive", bb);
+    if (rv == APR_EAGAIN && beam->eagain_cb) {
+        beam->eagain_cb(beam->eagain_ctx, beam);
+    }
     apr_thread_mutex_unlock(beam->lock);
     return rv;
 }
@@ -769,6 +773,15 @@ void h2_beam_on_received(h2_bucket_beam *beam,
     apr_thread_mutex_unlock(beam->lock);
 }
 
+void h2_beam_on_eagain(h2_bucket_beam *beam,
+                       h2_beam_ev_callback *eagain_cb, void *ctx)
+{
+    apr_thread_mutex_lock(beam->lock);
+    beam->eagain_cb = eagain_cb;
+    beam->eagain_ctx = ctx;
+    apr_thread_mutex_unlock(beam->lock);
+}
+
 void h2_beam_on_send(h2_bucket_beam *beam,
                      h2_beam_ev_callback *send_cb, void *ctx)
 {
@@ -846,3 +859,25 @@ int h2_beam_report_consumption(h2_bucket_beam *beam)
     apr_thread_mutex_unlock(beam->lock);
     return rv;
 }
+
+int h2_beam_is_complete(h2_bucket_beam *beam)
+{
+    int rv = 0;
+
+    apr_thread_mutex_lock(beam->lock);
+    if (beam->closed)
+        rv = 1;
+    else {
+        apr_bucket *b;
+        for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
+             b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
+             b = APR_BUCKET_NEXT(b)) {
+            if (APR_BUCKET_IS_EOS(b)) {
+                rv = 1;
+                break;
+            }
+        }
+    }
+    apr_thread_mutex_unlock(beam->lock);
+    return rv;
+}
index 94e788a03e88471b703c137d1b83027e9524507e..c58ce9823c6ff63c57773d007466f0e7840c865a 100644 (file)
@@ -67,6 +67,8 @@ struct h2_bucket_beam {
     void *recv_ctx;
     h2_beam_ev_callback *send_cb;      /* event: buckets were added in h2_beam_send() */
     void *send_ctx;
+    h2_beam_ev_callback *eagain_cb;    /* event: a receive results in ARP_EAGAIN */
+    void *eagain_ctx;
 
     apr_off_t recv_bytes;             /* amount of bytes transferred in h2_beam_receive() */
     apr_off_t recv_bytes_reported;    /* amount of bytes reported as received via callback */
@@ -205,6 +207,16 @@ void h2_beam_on_consumed(h2_bucket_beam *beam,
 void h2_beam_on_received(h2_bucket_beam *beam,
                          h2_beam_ev_callback *recv_cb, void *ctx);
 
+/**
+ * Register a callback to be invoked on the receiver side whenever
+ * APR_EAGAIN is being returned in h2_beam_receive().
+ * @param beam the beam to set the callback on
+ * @param egain_cb the callback or NULL, called before APR_EAGAIN is returned
+ * @param ctx  the context to use in callback invocation
+ */
+void h2_beam_on_eagain(h2_bucket_beam *beam,
+                       h2_beam_ev_callback *eagain_cb, void *ctx);
+
 /**
  * Register a call back from the sender side to be invoked when send
  * has added buckets to the beam.
@@ -246,4 +258,10 @@ apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam);
  */
 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam);
 
+/**
+ * @return != 0 iff beam has been closed or has an EOS bucket buffered
+ *                  waiting to be received.
+ */
+int h2_beam_is_complete(h2_bucket_beam *beam);
+
 #endif /* h2_bucket_beam_h */
index c09e2deb40ad096ceedc095dd1add880c9840be2..1d5cf4f33d897ee181d0729531b9e65e9aadd268 100644 (file)
@@ -267,7 +267,7 @@ static apr_status_t pass_output(h2_c1_io *io, int flush)
         /* recursive call, may be triggered by an H2EOS bucket
          * being destroyed and triggering sending more data? */
         AP_DEBUG_ASSERT(0);
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c, APLOGNO(10456)
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, c, APLOGNO(10456)
                       "h2_c1_io(%ld): recursive call of h2_c1_io_pass. "
                       "Denied to prevent output corruption. This "
                       "points to a bug in the HTTP/2 implementation.",
index 0c2edba55bf14f59e4f8f34cce06a5a63f5f5613..537163bf79d989306af4815e1b3705f3f58ff507 100644 (file)
@@ -48,6 +48,7 @@
 #include "h2_headers.h"
 #include "h2_session.h"
 #include "h2_stream.h"
+#include "h2_ws.h"
 #include "h2_c2.h"
 #include "h2_util.h"
 
@@ -173,6 +174,7 @@ void h2_c2_abort(conn_rec *c2, conn_rec *from)
 
 typedef struct {
     apr_bucket_brigade *bb;       /* c2: data in holding area */
+    unsigned did_upgrade_eos:1;   /* for Upgrade, we added an extra EOS */
 } h2_c2_fctx_in_t;
 
 static apr_status_t h2_c2_filter_in(ap_filter_t* f,
@@ -216,7 +218,17 @@ static apr_status_t h2_c2_filter_in(ap_filter_t* f,
             APR_BRIGADE_INSERT_TAIL(fctx->bb, b);
         }
     }
-    
+
+    /* If this is a HTTP Upgrade, it means the request we process
+     * has not Content, although the stream is not necessarily closed.
+     * On first read, we insert an EOS to signal processing that it
+     * has the complete body. */
+    if (conn_ctx->is_upgrade && !fctx->did_upgrade_eos) {
+        b = apr_bucket_eos_create(f->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(fctx->bb, b);
+        fctx->did_upgrade_eos = 1;
+    }
+
     while (APR_BRIGADE_EMPTY(fctx->bb)) {
         /* Get more input data for our request. */
         if (APLOGctrace2(f->c)) {
@@ -547,6 +559,31 @@ static int c2_hook_pre_connection(conn_rec *c2, void *csd)
     return OK;
 }
 
+static apr_status_t c2_get_pollfd_from_conn(conn_rec *c,
+                                            struct apr_pollfd_t *pfd,
+                                            apr_interval_time_t *ptimeout)
+{
+    if (c->master) {
+        h2_conn_ctx_t *ctx = h2_conn_ctx_get(c);
+        if (ctx) {
+            if (ctx->beam_in && ctx->pipe_in[H2_PIPE_OUT]) {
+                pfd->desc_type = APR_POLL_FILE;
+                pfd->desc.f = ctx->pipe_in[H2_PIPE_OUT];
+                if (ptimeout)
+                    *ptimeout = h2_beam_timeout_get(ctx->beam_in);
+            }
+            else {
+                /* no input */
+                pfd->desc_type = APR_NO_DESC;
+                if (ptimeout)
+                    *ptimeout = -1;
+            }
+            return APR_SUCCESS;
+        }
+    }
+    return APR_ENOTIMPL;
+}
+
 void h2_c2_register_hooks(void)
 {
     /* When the connection processing actually starts, we might
@@ -558,8 +595,14 @@ void h2_c2_register_hooks(void)
     /* We need to manipulate the standard HTTP/1.1 protocol filters and
      * install our own. This needs to be done very early. */
     ap_hook_pre_read_request(c2_pre_read_request, NULL, NULL, APR_HOOK_MIDDLE);
-    ap_hook_post_read_request(c2_post_read_request, NULL, NULL, APR_HOOK_REALLY_FIRST);
+    ap_hook_post_read_request(c2_post_read_request, NULL, NULL,
+                              APR_HOOK_REALLY_FIRST);
     ap_hook_fixups(c2_hook_fixups, NULL, NULL, APR_HOOK_LAST);
+#if AP_MODULE_MAGIC_AT_LEAST(20211221, 15)
+    ap_hook_get_pollfd_from_conn(c2_get_pollfd_from_conn, NULL, NULL,
+                                 APR_HOOK_MIDDLE);
+#endif
+
 
     c2_net_in_filter_handle =
         ap_register_input_filter("H2_C2_NET_IN", h2_c2_filter_in,
@@ -668,11 +711,21 @@ static apr_status_t c2_process(h2_conn_ctx_t *conn_ctx, conn_rec *c)
 {
     const h2_request *req = conn_ctx->request;
     conn_state_t *cs = c->cs;
-    request_rec *r;
+    request_rec *r = NULL;
     const char *tenc;
     apr_time_t timeout;
+    apr_status_t rv = APR_SUCCESS;
+
+    if(req->protocol && !strcmp("websocket", req->protocol)) {
+        req = h2_ws_rewrite_request(req, c, conn_ctx->beam_in == NULL);
+        if (!req) {
+            rv = APR_EGENERAL;
+            goto cleanup;
+        }
+    }
+
+    r = h2_create_request_rec(req, c, conn_ctx->beam_in == NULL);
 
-    r = h2_create_request_rec(conn_ctx->request, c, conn_ctx->beam_in == NULL);
     if (!r) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_c2(%s-%d): create request_rec failed, r=NULL",
index 5b1838b773041f41f72ff398f6f49767c428d0dc..846344c6b474efef6508bd01c7512a083983b387 100644 (file)
@@ -39,6 +39,7 @@
 #include "h2_c2.h"
 #include "h2_mplx.h"
 #include "h2_request.h"
+#include "h2_ws.h"
 #include "h2_util.h"
 
 
@@ -108,15 +109,26 @@ apr_status_t h2_c2_filter_request_in(ap_filter_t *f,
     /* This filter is a one-time wonder */
     ap_remove_input_filter(f);
 
-    if (f->c->master && (conn_ctx = h2_conn_ctx_get(f->c)) && conn_ctx->stream_id) {
-        if (conn_ctx->request->http_status != H2_HTTP_STATUS_UNSET) {
+    if (f->c->master && (conn_ctx = h2_conn_ctx_get(f->c)) &&
+        conn_ctx->stream_id) {
+        const h2_request *req = conn_ctx->request;
+
+        if (req->http_status == H2_HTTP_STATUS_UNSET &&
+            req->protocol && !strcmp("websocket", req->protocol)) {
+            req = h2_ws_rewrite_request(req, f->c, conn_ctx->beam_in == NULL);
+            if (!req)
+                return APR_EGENERAL;
+        }
+
+        if (req->http_status != H2_HTTP_STATUS_UNSET) {
             /* error was encountered preparing this request */
-            b = ap_bucket_error_create(conn_ctx->request->http_status, NULL, f->r->pool,
+            b = ap_bucket_error_create(req->http_status, NULL, f->r->pool,
                                        f->c->bucket_alloc);
             APR_BRIGADE_INSERT_TAIL(bb, b);
             return APR_SUCCESS;
         }
-        b = h2_request_create_bucket(conn_ctx->request, f->r);
+
+        b = h2_request_create_bucket(req, f->r);
         APR_BRIGADE_INSERT_TAIL(bb, b);
         if (!conn_ctx->beam_in) {
             b = apr_bucket_eos_create(f->c->bucket_alloc);
@@ -184,7 +196,7 @@ static int uniq_field_values(void *d, const char *key, const char *val)
          */
         for (i = 0, strpp = (char **) values->elts; i < values->nelts;
              ++i, ++strpp) {
-            if (*strpp && apr_strnatcasecmp(*strpp, start) == 0) {
+            if (*strpp && ap_cstr_casecmp(*strpp, start) == 0) {
                 break;
             }
         }
@@ -292,7 +304,7 @@ static h2_headers *create_response(request_rec *r)
 
         while (field && (token = ap_get_list_item(r->pool, &field)) != NULL) {
             for (i = 0; i < r->content_languages->nelts; ++i) {
-                if (!apr_strnatcasecmp(token, languages[i]))
+                if (!ap_cstr_casecmp(token, languages[i]))
                     break;
             }
             if (i == r->content_languages->nelts) {
@@ -636,9 +648,11 @@ apr_status_t h2_c2_filter_catch_h1_out(ap_filter_t* f, apr_bucket_brigade* bb)
                 int result = ap_map_http_request_error(conn_ctx->last_err,
                                                        HTTP_INTERNAL_SERVER_ERROR);
                 request_rec *r = h2_create_request_rec(conn_ctx->request, f->c, 1);
-                ap_die((result >= 400)? result : HTTP_INTERNAL_SERVER_ERROR, r);
-                b = ap_bucket_eor_create(f->c->bucket_alloc, r);
-                APR_BRIGADE_INSERT_TAIL(bb, b);
+                if (r) {
+                    ap_die((result >= 400)? result : HTTP_INTERNAL_SERVER_ERROR, r);
+                    b = ap_bucket_eor_create(f->c->bucket_alloc, r);
+                    APR_BRIGADE_INSERT_TAIL(bb, b);
+                }
             }
         }
         /* There are cases where we need to parse a serialized http/1.1 response.
index 670833ec129918153481639b18396c55ad095d86..a8b19739026c556e9710c2268672756f45960899 100644 (file)
@@ -77,6 +77,7 @@ typedef struct h2_config {
     int output_buffered;
     apr_interval_time_t stream_timeout;/* beam timeout */
     int max_data_frame_len;          /* max # bytes in a single h2 DATA frame */
+    int h2_websockets;               /* if mod_h2 negotiating WebSockets */
 } h2_config;
 
 typedef struct h2_dir_config {
@@ -115,6 +116,7 @@ static h2_config defconf = {
     1,                      /* stream output buffered */
     -1,                     /* beam timeout */
     0,                      /* max DATA frame len, 0 == no extra limit */
+    0,                      /* WebSockets negotiation, enabled */
 };
 
 static h2_dir_config defdconf = {
@@ -161,6 +163,7 @@ void *h2_config_create_svr(apr_pool_t *pool, server_rec *s)
     conf->output_buffered      = DEF_VAL;
     conf->stream_timeout       = DEF_VAL;
     conf->max_data_frame_len   = DEF_VAL;
+    conf->h2_websockets        = DEF_VAL;
     return conf;
 }
 
@@ -210,6 +213,7 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv)
     n->padding_always       = H2_CONFIG_GET(add, base, padding_always);
     n->stream_timeout       = H2_CONFIG_GET(add, base, stream_timeout);
     n->max_data_frame_len   = H2_CONFIG_GET(add, base, max_data_frame_len);
+    n->h2_websockets        = H2_CONFIG_GET(add, base, h2_websockets);
     return n;
 }
 
@@ -301,6 +305,8 @@ static apr_int64_t h2_srv_config_geti64(const h2_config *conf, h2_config_var_t v
             return H2_CONFIG_GET(conf, &defconf, stream_timeout);
         case H2_CONF_MAX_DATA_FRAME_LEN:
             return H2_CONFIG_GET(conf, &defconf, max_data_frame_len);
+        case H2_CONF_WEBSOCKETS:
+            return H2_CONFIG_GET(conf, &defconf, h2_websockets);
         default:
             return DEF_VAL;
     }
@@ -363,6 +369,9 @@ static void h2_srv_config_seti(h2_config *conf, h2_config_var_t var, int val)
         case H2_CONF_MAX_DATA_FRAME_LEN:
             H2_CONFIG_SET(conf, max_data_frame_len, val);
             break;
+        case H2_CONF_WEBSOCKETS:
+            H2_CONFIG_SET(conf, h2_websockets, val);
+            break;
         default:
             break;
     }
@@ -681,6 +690,24 @@ static const char *h2_conf_set_push(cmd_parms *cmd, void *dirconf, const char *v
     return "value must be On or Off";
 }
 
+static const char *h2_conf_set_websockets(cmd_parms *cmd,
+                                          void *dirconf, const char *value)
+{
+    if (!strcasecmp(value, "On")) {
+#if H2_USE_PIPES
+        CONFIG_CMD_SET(cmd, dirconf, H2_CONF_WEBSOCKETS, 1);
+        return NULL;
+#else
+        return "HTTP/2 WebSockets are not supported on this platform";
+#endif
+    }
+    else if (!strcasecmp(value, "Off")) {
+        CONFIG_CMD_SET(cmd, dirconf, H2_CONF_WEBSOCKETS, 0);
+        return NULL;
+    }
+    return "value must be On or Off";
+}
+
 static const char *h2_conf_add_push_priority(cmd_parms *cmd, void *_cfg,
                                              const char *ctype, const char *sdependency,
                                              const char *sweight)
@@ -1021,6 +1048,8 @@ const command_rec h2_cmds[] = {
                   RSRC_CONF, "maximum number of bytes in a single HTTP/2 DATA frame"),
     AP_INIT_TAKE2("H2EarlyHint", h2_conf_add_early_hint, NULL,
                    OR_FILEINFO|OR_AUTHCFG, "add a a 'Link:' header for a 103 Early Hints response."),
+    AP_INIT_TAKE1("H2WebSockets", h2_conf_set_websockets, NULL,
+                  RSRC_CONF, "off to disable WebSockets over HTTP/2"),
     AP_END_CMD
 };
 
index 5a783712848ae108b8e701d5d1010fae8aca7534..1c8f86509ff61779b882fea1d708afe68207d494 100644 (file)
@@ -44,6 +44,7 @@ typedef enum {
     H2_CONF_OUTPUT_BUFFER,
     H2_CONF_STREAM_TIMEOUT,
     H2_CONF_MAX_DATA_FRAME_LEN,
+    H2_CONF_WEBSOCKETS,
 } h2_config_var_t;
 
 struct apr_hash_t;
index 90dc9f627dd62417615aa14e9a1f563b39bf85f8..3b44856f95af3581b1d5ae134e7c0dfcbc163fd4 100644 (file)
@@ -53,7 +53,8 @@ struct h2_conn_ctx_t {
     const struct h2_request *request; /* c2: the request to process */
     struct h2_bucket_beam *beam_out; /* c2: data out, created from req_pool */
     struct h2_bucket_beam *beam_in;  /* c2: data in or NULL, borrowed from request stream */
-    unsigned int input_chunked;      /* c2: if input needs HTTP/1.1 chunking applied */
+    unsigned input_chunked:1;        /* c2: if input needs HTTP/1.1 chunking applied */
+    unsigned is_upgrade:1;           /* c2: if requst is a HTTP Upgrade */
 
     apr_file_t *pipe_in[2];          /* c2: input produced notification pipe */
     apr_pollfd_t pfd;                /* c1: poll socket input, c2: NUL */
index ecb2b7c4d4e56a257067f8752b3ec4e61423249f..b3138dc4c18b4f21beb99078ecfed278280ef20e 100644 (file)
@@ -146,6 +146,7 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
         if (c2_ctx->beam_in) {
             h2_beam_on_send(c2_ctx->beam_in, NULL, NULL);
             h2_beam_on_received(c2_ctx->beam_in, NULL, NULL);
+            h2_beam_on_eagain(c2_ctx->beam_in, NULL, NULL);
             h2_beam_on_consumed(c2_ctx->beam_in, NULL, NULL);
         }
     }
@@ -666,7 +667,9 @@ static apr_status_t c1_process_stream(h2_mplx *m,
     if (APLOGctrace1(m->c1)) {
         const h2_request *r = stream->request;
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
-                      H2_STRM_MSG(stream, "process %s %s%s%s%s"),
+                      H2_STRM_MSG(stream, "process %s%s%s %s%s%s%s"),
+                      r->protocol? r->protocol : "",
+                      r->protocol? " " : "",
                       r->method, r->scheme? r->scheme : "",
                       r->scheme? "://" : "",
                       r->authority, r->path? r->path: "");
@@ -780,6 +783,19 @@ static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
     }
 }
 
+static void c2_beam_input_read_eagain(void *ctx, h2_bucket_beam *beam)
+{
+    conn_rec *c = ctx;
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
+    /* installed in the input bucket beams when we use pipes.
+     * Drain the pipe just before the beam returns APR_EAGAIN.
+     * A clean state for allowing polling on the pipe to rest
+     * when the beam is empty */
+    if (conn_ctx && conn_ctx->pipe_in[H2_PIPE_OUT]) {
+        h2_util_drain_pipe(conn_ctx->pipe_in[H2_PIPE_OUT]);
+    }
+}
+
 static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
 {
     conn_rec *c = ctx;
@@ -824,6 +840,7 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_
                                         c2->pool, c2->pool);
         if (APR_SUCCESS != rv) goto cleanup;
 #endif
+        h2_beam_on_eagain(stream->input, c2_beam_input_read_eagain, c2);
     }
 
 cleanup:
@@ -930,6 +947,15 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
                       "h2_c2(%s-%d): processing finished without final response",
                       conn_ctx->id, conn_ctx->stream_id);
         c2->aborted = 1;
+        if (conn_ctx->beam_out)
+          h2_beam_abort(conn_ctx->beam_out, c2);
+    }
+    else if (!conn_ctx->beam_out || !h2_beam_is_complete(conn_ctx->beam_out)) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2,
+                      "h2_c2(%s-%d): processing finished with incomplete output",
+                      conn_ctx->id, conn_ctx->stream_id);
+        c2->aborted = 1;
+        h2_beam_abort(conn_ctx->beam_out, c2);
     }
     else if (!c2->aborted) {
         s_mplx_be_happy(m, c2, conn_ctx);
index dc69ec06368d22460711d1600289f40cdc043ac9..5e1ebe663db5e2951b7254cb8adaa0de74f64346 100644 (file)
@@ -381,7 +381,7 @@ static int iq_bubble_down(h2_proxy_iqueue *q, int i, int bottom,
  * h2_proxy_ngheader
  ******************************************************************************/
 #define H2_HD_MATCH_LIT_CS(l, name)  \
-    ((strlen(name) == sizeof(l) - 1) && !apr_strnatcasecmp(l, name))
+    ((strlen(name) == sizeof(l) - 1) && !ap_cstr_casecmp(l, name))
 
 static int h2_util_ignore_header(const char *name) 
 {
@@ -500,7 +500,7 @@ static int ignore_header(const literal *lits, size_t llen,
     
     for (i = 0; i < llen; ++i) {
         lit = &lits[i];
-        if (lit->len == nlen && !apr_strnatcasecmp(lit->name, name)) {
+        if (lit->len == nlen && !ap_cstr_casecmp(lit->name, name)) {
             return 1;
         }
     }
@@ -542,7 +542,7 @@ void h2_proxy_util_camel_case_header(char *s, size_t len)
 
 /** Match a header value against a string constance, case insensitive */
 #define H2_HD_MATCH_LIT(l, name, nlen)  \
-    ((nlen == sizeof(l) - 1) && !apr_strnatcasecmp(l, name))
+    ((nlen == sizeof(l) - 1) && !ap_cstr_casecmp(l, name))
 
 static apr_status_t h2_headers_add_h1(apr_table_t *headers, apr_pool_t *pool, 
                                       const char *name, size_t nlen,
index dd0928b7190b68d6560c4dde0cbc7eb532867bd2..e6a10c5ab102f3cd59e5d6db2be284b84627bba4 100644 (file)
@@ -426,7 +426,7 @@ static void inspect_link(link_ctx *ctx, const char *s, size_t slen)
 
 static int head_iter(void *ctx, const char *key, const char *value) 
 {
-    if (!apr_strnatcasecmp("link", key)) {
+    if (!ap_cstr_casecmp("link", key)) {
         inspect_link(ctx, value, strlen(value));
     }
     return 1;
index 3a6c42b0921cb773d10d4fbcccd66d923b8f0cfc..b55d5720a02d8f7991038bba4e55302f09702107 100644 (file)
@@ -166,6 +166,10 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
                  && !strncmp(H2_HEADER_AUTH, name, nlen)) {
             req->authority = apr_pstrndup(pool, value, vlen);
         }
+        else if (H2_HEADER_PROTO_LEN == nlen
+                 && !strncmp(H2_HEADER_PROTO, name, nlen)) {
+            req->protocol = apr_pstrndup(pool, value, vlen);
+        }
         else {
             char buffer[32];
             memset(buffer, 0, 32);
@@ -214,6 +218,7 @@ h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
     dst->scheme       = apr_pstrdup(p, src->scheme);
     dst->authority    = apr_pstrdup(p, src->authority);
     dst->path         = apr_pstrdup(p, src->path);
+    dst->protocol     = apr_pstrdup(p, src->protocol);
     dst->headers      = apr_table_clone(p, src->headers);
     return dst;
 }
@@ -299,13 +304,13 @@ apr_bucket *h2_request_create_bucket(const h2_request *req, request_rec *r)
 #endif
 
 static void assign_headers(request_rec *r, const h2_request *req,
-                           int no_body)
+                           int no_body, int is_connect)
 {
     const char *cl;
 
     r->headers_in = apr_table_clone(r->pool, req->headers);
 
-    if (req->authority) {
+    if (req->authority && !is_connect) {
         /* for internal handling, we have to simulate that :authority
          * came in as Host:, RFC 9113 ch. says that mismatches between
          * :authority and Host: SHOULD be rejected as malformed. However,
@@ -324,36 +329,40 @@ static void assign_headers(request_rec *r, const h2_request *req,
                       "set 'Host: %s' from :authority", req->authority);
     }
 
-    cl = apr_table_get(req->headers, "Content-Length");
-    if (no_body) {
-        if (!cl && apr_table_get(req->headers, "Content-Type")) {
-            /* If we have a content-type, but already seen eos, no more
-             * data will come. Signal a zero content length explicitly.
-             */
-            apr_table_setn(req->headers, "Content-Length", "0");
+    /* Unless we open a byte stream via CONNECT, apply content-length guards. */
+    if (!is_connect) {
+        cl = apr_table_get(req->headers, "Content-Length");
+        if (no_body) {
+            if (!cl && apr_table_get(req->headers, "Content-Type")) {
+                /* If we have a content-type, but already seen eos, no more
+                 * data will come. Signal a zero content length explicitly.
+                 */
+                apr_table_setn(req->headers, "Content-Length", "0");
+            }
         }
-    }
 #if !AP_HAS_RESPONSE_BUCKETS
-    else if (!cl) {
-        /* there may be a body and we have internal HTTP/1.1 processing.
-         * If the Content-Length is unspecified, we MUST simulate
-         * chunked Transfer-Encoding.
-         *
-         * HTTP/2 does not need a Content-Length for framing. Ideally
-         * all clients set the EOS flag on the header frame if they
-         * do not intent to send a body. However, forwarding proxies
-         * might just no know at the time and send an empty DATA
-         * frame with EOS much later.
-         */
-        apr_table_mergen(r->headers_in, "Transfer-Encoding", "chunked");
-    }
+        else if (!cl) {
+            /* there may be a body and we have internal HTTP/1.1 processing.
+             * If the Content-Length is unspecified, we MUST simulate
+             * chunked Transfer-Encoding.
+             *
+             * HTTP/2 does not need a Content-Length for framing. Ideally
+             * all clients set the EOS flag on the header frame if they
+             * do not intent to send a body. However, forwarding proxies
+             * might just no know at the time and send an empty DATA
+             * frame with EOS much later.
+             */
+            apr_table_mergen(r->headers_in, "Transfer-Encoding", "chunked");
+        }
 #endif /* else AP_HAS_RESPONSE_BUCKETS */
+  }
 }
 
 request_rec *h2_create_request_rec(const h2_request *req, conn_rec *c,
                                    int no_body)
 {
     int access_status = HTTP_OK;
+    int is_connect = !ap_cstr_casecmp("CONNECT", req->method);
 
 #if AP_MODULE_MAGIC_AT_LEAST(20120211, 106)
     request_rec *r = ap_create_request(c);
@@ -362,24 +371,43 @@ request_rec *h2_create_request_rec(const h2_request *req, conn_rec *c,
 #endif
 
 #if AP_MODULE_MAGIC_AT_LEAST(20120211, 107)
-    assign_headers(r, req, no_body);
+    assign_headers(r, req, no_body, is_connect);
     ap_run_pre_read_request(r, c);
 
     /* Time to populate r with the data we have. */
     r->request_time = req->request_time;
     AP_DEBUG_ASSERT(req->authority);
-    if (!apr_strnatcasecmp("CONNECT", req->method)) {
+    if (is_connect) {
       /* CONNECT MUST NOT have scheme or path */
+      if (req->scheme) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(10458)
+                      "':scheme: %s' header present in CONNECT request",
+                      req->scheme);
+        access_status = HTTP_BAD_REQUEST;
+        goto die;
+      }
+      if (req->path) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(10459)
+                      "':path: %s' header present in CONNECT request",
+                      req->path);
+        access_status = HTTP_BAD_REQUEST;
+        goto die;
+      }
       r->the_request = apr_psprintf(r->pool, "%s %s HTTP/2.0",
                                     req->method, req->authority);
     }
-    else if (req->scheme && ap_cstr_casecmp(req->scheme, "http")
-             && ap_cstr_casecmp(req->scheme, "https")) {
-        /* FIXME: we also need to create absolute uris when we are
-         * in a forward proxy configuration! But there is currently
-         * no way to detect that. */
+    else if (req->protocol) {
+      ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(10460)
+                    "':protocol: %s' header present in %s request",
+                    req->protocol, req->method);
+      access_status = HTTP_BAD_REQUEST;
+      goto die;
+    }
+    else if (req->scheme &&
+             ap_cstr_casecmp(req->scheme, ap_ssl_conn_is_ssl(c->master? c->master : c)?
+                             "https" : "http")) {
         /* Client sent a ':scheme' pseudo header for something else
-         * than what we handle by default. Make an absolute URI. */
+         * than what we have on this connection. Make an absolute URI. */
         r->the_request = apr_psprintf(r->pool, "%s %s://%s%s HTTP/2.0",
                                       req->method, req->scheme, req->authority,
                                       req->path ? req->path : "");
@@ -420,7 +448,7 @@ request_rec *h2_create_request_rec(const h2_request *req, conn_rec *c,
     {
         const char *s;
 
-        assign_headers(r, req, no_body);
+        assign_headers(r, req, no_body, is_connect);
         ap_run_pre_read_request(r, c);
 
         /* Time to populate r with the data we have. */
index 603e47ce85e1f85e0abb2b2e4891f47efb23eebb..c104cac18412311731163360ec96a7f48513837e 100644 (file)
@@ -621,9 +621,8 @@ static int on_invalid_header_cb(nghttp2_session *ngh2,
     
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c1, APLOGNO(03456)
                   H2_SSSN_STRM_MSG(session, frame->hd.stream_id,
-                  "invalid header '%s: %s'"),
-                  apr_pstrndup(session->pool, (const char *)name, namelen),
-                  apr_pstrndup(session->pool, (const char *)value, valuelen));
+                  "invalid header '%.*s: %.*s'"),
+                  (int)namelen, name, (int)valuelen, value);
     stream = get_stream(session, frame->hd.stream_id);
     if (stream) {
         h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
@@ -1003,7 +1002,7 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
 static apr_status_t h2_session_start(h2_session *session, int *rv)
 {
     apr_status_t status = APR_SUCCESS;
-    nghttp2_settings_entry settings[3];
+    nghttp2_settings_entry settings[4];
     size_t slen;
     int win_size;
     
@@ -1070,7 +1069,12 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
         settings[slen].value = win_size;
         ++slen;
     }
-    
+    if (h2_config_sgeti(session->s, H2_CONF_WEBSOCKETS)) {
+      settings[slen].settings_id = NGHTTP2_SETTINGS_ENABLE_CONNECT_PROTOCOL;
+      settings[slen].value = 1;
+      ++slen;
+    }
+
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c1,
                   H2_SSSN_LOG(APLOGNO(03201), session, 
                   "start, INITIAL_WINDOW_SIZE=%ld, MAX_CONCURRENT_STREAMS=%d"), 
index 635caf7cb3c70790b08839e443573e11be94a314..24d0268f38b4cccc89bcd79bfdf6cb1c166d319e 100644 (file)
@@ -767,6 +767,9 @@ apr_status_t h2_stream_add_header(h2_stream *stream,
         status = h2_request_add_header(stream->rtmp, stream->pool,
                                        name, nlen, value, vlen,
                                        session->s->limit_req_fieldsize, &was_added);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c1,
+                      H2_STRM_MSG(stream, "add_header: '%.*s: %.*s"),
+                      (int)nlen, name, (int)vlen, value);
         if (was_added) ++stream->request_headers_added;
     }
     else if (H2_SS_OPEN == stream->state) {
@@ -897,7 +900,14 @@ apr_status_t h2_stream_end_headers(h2_stream *stream, int eos, size_t raw_bytes)
      *      of CONNECT requests (see [RFC7230], Section 5.3)).
      */
     if (!ap_cstr_casecmp(req->method, "CONNECT")) {
-        if (req->scheme || req->path) {
+        if (req->protocol && !strcmp("websocket", req->protocol)) {
+            if (!req->scheme || !req->path) {
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
+                              H2_STRM_LOG(APLOGNO(10457), stream, "Request to websocket CONNECT "
+                              "without :scheme or :path, sending 400 answer"));
+            }
+        }
+        else if (req->scheme || req->path) {
             ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
                           H2_STRM_LOG(APLOGNO(10384), stream, "Request to CONNECT "
                           "with :scheme or :path specified, sending 400 answer"));
@@ -1459,8 +1469,8 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
                  * it is all fine. */
                  ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
                                H2_SSSN_STRM_MSG(session, stream_id, "rst stream"));
-                 h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
-                 return NGHTTP2_ERR_CALLBACK_FAILURE;
+                 h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
+                 return NGHTTP2_ERR_DEFERRED;
             }
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
                           H2_SSSN_STRM_MSG(session, stream_id,
@@ -1469,10 +1479,17 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
             eos = 1;
             rv = APR_SUCCESS;
         }
+        else if (APR_ECONNRESET == rv || APR_ECONNABORTED == rv) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c1,
+                          H2_STRM_LOG(APLOGNO(), stream, "data_cb, reading data"));
+            h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
+            return NGHTTP2_ERR_DEFERRED;
+        }
         else {
             ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1,
                           H2_STRM_LOG(APLOGNO(02938), stream, "data_cb, reading data"));
-            return NGHTTP2_ERR_CALLBACK_FAILURE;
+            h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+            return NGHTTP2_ERR_DEFERRED;
         }
     }
 
index 728cee95aa2ba64fc5daae13cf6ff5542a8d07e3..8e53cebdf92a586b61e1046ca523d9ada7be8e14 100644 (file)
@@ -1281,8 +1281,8 @@ apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax,
     else if (bmax > off) {
         off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
                             b->type->name,
-                            (long)(b->length == ((apr_size_t)-1)?
-                                   -1 : b->length));
+                            (b->length == ((apr_size_t)-1)?
+                                   -1 : (long)b->length));
     }
     return off;
 }
@@ -1650,7 +1650,7 @@ static int contains_name(const literal *lits, size_t llen, nghttp2_nv *nv)
     for (i = 0; i < llen; ++i) {
         lit = &lits[i];
         if (lit->len == nv->namelen
-            && !apr_strnatcasecmp(lit->name, (const char *)nv->name)) {
+            && !ap_cstr_casecmp(lit->name, (const char *)nv->name)) {
             return 1;
         }
     }
@@ -1706,7 +1706,7 @@ static apr_status_t req_add_header(apr_table_t *headers, apr_pool_t *pool,
         return APR_SUCCESS;
     }
     else if (nv->namelen == sizeof("cookie")-1
-             && !apr_strnatcasecmp("cookie", (const char *)nv->name)) {
+             && !ap_cstr_casecmp("cookie", (const char *)nv->name)) {
         existing = apr_table_get(headers, "cookie");
         if (existing) {
             /* Cookie header come separately in HTTP/2, but need
@@ -1725,7 +1725,7 @@ static apr_status_t req_add_header(apr_table_t *headers, apr_pool_t *pool,
         }
     }
     else if (nv->namelen == sizeof("host")-1
-             && !apr_strnatcasecmp("host", (const char *)nv->name)) {
+             && !ap_cstr_casecmp("host", (const char *)nv->name)) {
         if (apr_table_get(headers, "Host")) {
             return APR_SUCCESS; /* ignore duplicate */
         }
@@ -1883,6 +1883,13 @@ void h2_util_drain_pipe(apr_file_t *pipe)
 {
     char rb[512];
     apr_size_t nr = sizeof(rb);
+    apr_interval_time_t timeout;
+    apr_status_t trv;
+
+    /* Make the pipe non-blocking if we can */
+    trv = apr_file_pipe_timeout_get(pipe, &timeout);
+    if (trv == APR_SUCCESS)
+      apr_file_pipe_timeout_set(pipe, 0);
 
     while (apr_file_read(pipe, rb, &nr) == APR_SUCCESS) {
         /* Although we write just one byte to the other end of the pipe
@@ -1893,6 +1900,8 @@ void h2_util_drain_pipe(apr_file_t *pipe)
         if (nr != sizeof(rb))
             break;
     }
+    if (trv == APR_SUCCESS)
+      apr_file_pipe_timeout_set(pipe, timeout);
 }
 
 apr_status_t h2_util_wait_on_pipe(apr_file_t *pipe)
index d2e6548ba879c91e4111edc10eebcf1d0c6be501..dcec73eaf2c294c92d4645298e389f59eece7ac3 100644 (file)
@@ -337,7 +337,7 @@ apr_size_t h2_util_table_bytes(apr_table_t *t, apr_size_t pair_extra);
 
 /** Match a header value against a string constance, case insensitive */
 #define H2_HD_MATCH_LIT(l, name, nlen)  \
-    ((nlen == sizeof(l) - 1) && !apr_strnatcasecmp(l, name))
+    ((nlen == sizeof(l) - 1) && !ap_cstr_casecmp(l, name))
 
 /*******************************************************************************
  * HTTP/2 header helpers
index 7de3144ec7f96109ee1079cb8c536dcd539bb730..59dc06c8b6a94c4733e0b92a466c07cae4563c71 100644 (file)
@@ -27,7 +27,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "2.0.19-git"
+#define MOD_HTTP2_VERSION "2.0.20-git"
 
 /**
  * @macro
@@ -35,7 +35,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x020013
+#define MOD_HTTP2_VERSION_NUM 0x020014
 
 
 #endif /* mod_h2_h2_version_h */
diff --git a/modules/http2/h2_ws.c b/modules/http2/h2_ws.c
new file mode 100644 (file)
index 0000000..e3bdadb
--- /dev/null
@@ -0,0 +1,326 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+
+#include "apr.h"
+#include "apr_strings.h"
+#include "apr_lib.h"
+#include "apr_encode.h"
+#include "apr_sha1.h"
+#include "apr_strmatch.h"
+
+#include <ap_mmn.h>
+
+#include <httpd.h>
+#include <http_core.h>
+#include <http_connection.h>
+#include <http_protocol.h>
+#include <http_request.h>
+#include <http_log.h>
+#include <http_ssl.h>
+#include <http_vhost.h>
+#include <util_filter.h>
+#include <ap_mpm.h>
+
+#include "h2_private.h"
+#include "h2_config.h"
+#include "h2_conn_ctx.h"
+#include "h2_headers.h"
+#include "h2_request.h"
+#include "h2_ws.h"
+
+static ap_filter_rec_t *c2_ws_out_filter_handle;
+
+struct ws_filter_ctx {
+    const char *ws_accept_base64;
+    int has_final_response;
+    int override_body;
+};
+
+/**
+ * Generate the "Sec-WebSocket-Accept" header field for the given key
+ * (base64 encoded) as defined in RFC 6455 ch. 4.2.2 step 5.3
+ */
+static const char *gen_ws_accept(conn_rec *c, const char *key_base64)
+{
+    apr_byte_t dgst[APR_SHA1_DIGESTSIZE];
+    const char ws_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+    apr_sha1_ctx_t sha1_ctx;
+
+    apr_sha1_init(&sha1_ctx);
+    apr_sha1_update(&sha1_ctx, key_base64, (unsigned int)strlen(key_base64));
+    apr_sha1_update(&sha1_ctx, ws_guid, (unsigned int)strlen(ws_guid));
+    apr_sha1_final(dgst, &sha1_ctx);
+
+    return apr_pencode_base64_binary(c->pool, dgst, sizeof(dgst),
+                                     APR_ENCODE_NONE, NULL);
+}
+
+const h2_request *h2_ws_rewrite_request(const h2_request *req,
+                                        conn_rec *c2, int no_body)
+{
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
+    h2_request *wsreq;
+    unsigned char key_raw[16];
+    const char *key_base64, *accept_base64;
+    struct ws_filter_ctx *ws_ctx;
+    apr_status_t rv;
+
+    if (!conn_ctx || !req->protocol || strcmp("websocket", req->protocol))
+        return req;
+
+    if (ap_cstr_casecmp("CONNECT", req->method)) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
+                      "h2_c2(%s-%d): websocket request with method %s",
+                      conn_ctx->id, conn_ctx->stream_id, req->method);
+        return req;
+    }
+    if (!req->scheme) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
+                      "h2_c2(%s-%d): websocket CONNECT without :scheme",
+                      conn_ctx->id, conn_ctx->stream_id);
+        return req;
+    }
+    if (!req->path) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
+                      "h2_c2(%s-%d): websocket CONNECT without :path",
+                      conn_ctx->id, conn_ctx->stream_id);
+        return req;
+    }
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
+                  "h2_c2(%s-%d): websocket CONNECT for %s",
+                  conn_ctx->id, conn_ctx->stream_id, req->path);
+    /* Transform the HTTP/2 extended CONNECT to an internal GET using
+     * the HTTP/1.1 version of websocket connection setup. */
+    wsreq = h2_request_clone(c2->pool, req);
+    wsreq->method = "GET";
+    wsreq->protocol = NULL;
+    apr_table_set(wsreq->headers, "Upgrade", "websocket");
+    /* add Sec-WebSocket-Key header */
+    rv = apr_generate_random_bytes(key_raw, sizeof(key_raw));
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO(10461)
+                     "error generating secret");
+        return NULL;
+    }
+    key_base64 = apr_pencode_base64_binary(c2->pool, key_raw, sizeof(key_raw),
+                                           APR_ENCODE_NONE, NULL);
+    apr_table_set(wsreq->headers, "Sec-WebSocket-Key", key_base64);
+    /* This is now the request to process internally */
+
+    /* When this request gets processed and delivers a 101 response,
+     * we expect it to carry a "Sec-WebSocket-Accept" header with
+     * exactly the following value, as per RFC 6455. */
+    accept_base64 = gen_ws_accept(c2, key_base64);
+    /* Add an output filter that intercepts generated responses:
+     * - if a valid WebSocket negotiation happens, transform the
+     *   101 response to a 200
+     * - if a 2xx response happens, that does not pass the Accept test,
+     *   return a 502 indicating that the URI seems not support the websocket
+     *   protocol (RFC 8441 does not define this, but it seems the best
+     *   choice)
+     * - if a 3xx, 4xx or 5xx response happens, forward this unchanged.
+     */
+    ws_ctx = apr_pcalloc(c2->pool, sizeof(*ws_ctx));
+    ws_ctx->ws_accept_base64 = accept_base64;
+    /* insert our filter just before the C2 core filter */
+    ap_remove_output_filter_byhandle(c2->output_filters, "H2_C2_NET_OUT");
+    ap_add_output_filter("H2_C2_WS_OUT", ws_ctx, NULL, c2);
+    ap_add_output_filter("H2_C2_NET_OUT", NULL, NULL, c2);
+    /* Mark the connection as being an Upgrade, with some special handling
+     * since the request needs an EOS, without the stream being closed  */
+    conn_ctx->is_upgrade = 1;
+
+    return wsreq;
+}
+
+static apr_bucket *make_valid_resp(conn_rec *c2, int status,
+                                   apr_table_t *headers, apr_table_t *notes)
+{
+    apr_table_t *nheaders, *nnotes;
+
+    ap_assert(headers);
+    nheaders = apr_table_clone(c2->pool, headers);
+    apr_table_unset(nheaders, "Connection");
+    apr_table_unset(nheaders, "Upgrade");
+    apr_table_unset(nheaders, "Sec-WebSocket-Accept");
+    nnotes = notes? apr_table_clone(c2->pool, notes) :
+                    apr_table_make(c2->pool, 10);
+#if AP_HAS_RESPONSE_BUCKETS
+    return ap_bucket_response_create(status, NULL, nheaders, nnotes,
+                                     c2->pool, c2->bucket_alloc);
+#else
+    return h2_bucket_headers_create(c2->bucket_alloc,
+                                    h2_headers_create(status, nheaders,
+                                                      nnotes, 0, c2->pool));
+#endif
+}
+
+static apr_bucket *make_invalid_resp(conn_rec *c2, int status,
+                                     apr_table_t *notes)
+{
+    apr_table_t *nheaders, *nnotes;
+
+    nheaders = apr_table_make(c2->pool, 10);
+    apr_table_setn(nheaders, "Content-Length", "0");
+    nnotes = notes? apr_table_clone(c2->pool, notes) :
+                    apr_table_make(c2->pool, 10);
+#if AP_HAS_RESPONSE_BUCKETS
+    return ap_bucket_response_create(status, NULL, nheaders, nnotes,
+                                     c2->pool, c2->bucket_alloc);
+#else
+    return h2_bucket_headers_create(c2->bucket_alloc,
+                                    h2_headers_create(status, nheaders,
+                                                      nnotes, 0, c2->pool));
+#endif
+}
+
+static void ws_handle_resp(conn_rec *c2, h2_conn_ctx_t *conn_ctx,
+                           struct ws_filter_ctx *ws_ctx, apr_bucket *b)
+{
+#if AP_HAS_RESPONSE_BUCKETS
+    ap_bucket_response *resp = b->data;
+#else /* AP_HAS_RESPONSE_BUCKETS */
+    h2_headers *resp = h2_bucket_headers_get(b);
+#endif /* !AP_HAS_RESPONSE_BUCKETS */
+    apr_bucket *b_override = NULL;
+    int is_final = 0;
+    int override_body = 0;
+
+    if (ws_ctx->has_final_response) {
+        /* already did, nop */
+        return;
+    }
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
+                  "h2_c2(%s-%d): H2_C2_WS_OUT inspecting response %d",
+                  conn_ctx->id, conn_ctx->stream_id, resp->status);
+    if (resp->status == HTTP_SWITCHING_PROTOCOLS) {
+        /* The resource agreed to switch protocol. But this is only valid
+         * if it send back the correct Sec-WebSocket-Accept header value */
+        const char *hd = apr_table_get(resp->headers, "Sec-WebSocket-Accept");
+        if (hd && !strcmp(ws_ctx->ws_accept_base64, hd)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
+                          "h2_c2(%s-%d): websocket CONNECT, valid 101 Upgrade"
+                          ", converting to 200 response",
+                          conn_ctx->id, conn_ctx->stream_id);
+            b_override = make_valid_resp(c2, HTTP_OK, resp->headers, resp->notes);
+            is_final = 1;
+        }
+        else {
+            if (!hd) {
+                /* This points to someone being confused */
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2, APLOGNO(10462)
+                              "h2_c2(%s-%d): websocket CONNECT, got 101 response "
+                              "without Sec-WebSocket-Accept header",
+                              conn_ctx->id, conn_ctx->stream_id);
+            }
+            else {
+                /* This points to a bug, either in our WebSockets negotiation
+                 * or in the request processings implementation of WebSockets */
+                ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, c2, APLOGNO(10463)
+                              "h2_c2(%s-%d): websocket CONNECT, 101 response "
+                              "without 'Sec-WebSocket-Accept: %s' but expected %s",
+                              conn_ctx->id, conn_ctx->stream_id, hd,
+                              ws_ctx->ws_accept_base64);
+            }
+            b_override = make_invalid_resp(c2, HTTP_BAD_GATEWAY, resp->notes);
+            override_body = is_final = 1;
+        }
+    }
+    else if (resp->status < 200) {
+        /* other intermediate response, pass through */
+    }
+    else if (resp->status < 300) {
+        /* Failure, we might be talking to a plain http resource */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
+                      "h2_c2(%s-%d): websocket CONNECT, invalid response %d",
+                      conn_ctx->id, conn_ctx->stream_id, resp->status);
+        b_override = make_invalid_resp(c2, HTTP_BAD_GATEWAY, resp->notes);
+        override_body = is_final = 1;
+    }
+    else {
+        /* error response, pass through. */
+        ws_ctx->has_final_response = 1;
+    }
+
+    if (b_override) {
+        APR_BUCKET_INSERT_BEFORE(b, b_override);
+        apr_bucket_delete(b);
+        b = b_override;
+    }
+    if (override_body) {
+        APR_BUCKET_INSERT_AFTER(b, apr_bucket_eos_create(c2->bucket_alloc));
+        ws_ctx->override_body = 1;
+    }
+    if (is_final) {
+        ws_ctx->has_final_response = 1;
+        conn_ctx->has_final_response = 1;
+    }
+}
+
+static apr_status_t h2_c2_ws_filter_out(ap_filter_t* f, apr_bucket_brigade* bb)
+{
+    struct ws_filter_ctx *ws_ctx = f->ctx;
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(f->c);
+    apr_bucket *b, *bnext;
+
+    ap_assert(conn_ctx);
+    if (ws_ctx->override_body) {
+        /* We have overridden the original response and also its body.
+         * If this filter is called again, we signal a hard abort to
+         * allow processing to terminate at the earliest. */
+        f->c->aborted = 1;
+        return APR_ECONNABORTED;
+    }
+
+    /* Inspect the brigade, looking for RESPONSE/HEADER buckets.
+     * Remember, this filter is only active for client websocket CONNECT
+     * requests that we translated to an internal GET with websocket
+     * headers.
+     * We inspect the repsone to see if the internal resource actually
+     * agrees to talk websocket or is "just" a normal HTTP resource that
+     * ignored the websocket request headers. */
+    for (b = APR_BRIGADE_FIRST(bb);
+         b != APR_BRIGADE_SENTINEL(bb);
+         b = bnext)
+    {
+        bnext = APR_BUCKET_NEXT(b);
+        if (APR_BUCKET_IS_METADATA(b)) {
+#if AP_HAS_RESPONSE_BUCKETS
+            if (AP_BUCKET_IS_RESPONSE(b)) {
+#else
+            if (H2_BUCKET_IS_HEADERS(b)) {
+#endif /* !AP_HAS_RESPONSE_BUCKETS */
+                ws_handle_resp(f->c, conn_ctx, ws_ctx, b);
+                continue;
+            }
+        }
+        else if (ws_ctx->override_body) {
+            apr_bucket_delete(b);
+        }
+    }
+    return ap_pass_brigade(f->next, bb);
+}
+
+void h2_ws_register_hooks(void)
+{
+    c2_ws_out_filter_handle =
+        ap_register_output_filter("H2_C2_WS_OUT", h2_c2_ws_filter_out,
+                                  NULL, AP_FTYPE_NETWORK);
+}
diff --git a/modules/http2/h2_ws.h b/modules/http2/h2_ws.h
new file mode 100644 (file)
index 0000000..4e8967c
--- /dev/null
@@ -0,0 +1,35 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_ws__
+#define __mod_h2__h2_ws__
+
+#include "h2.h"
+
+/**
+ * Rewrite a websocket request.
+ *
+ * @param req the h2 request to rewrite
+ * @param conn the connection to process the request on
+ * @param no_body != 0 iff the request is known to have no body
+ * @return the websocket request for internal submit
+ */
+const h2_request *h2_ws_rewrite_request(const h2_request *req,
+                                        conn_rec *c2, int no_body);
+
+void h2_ws_register_hooks(void);
+
+#endif /* defined(__mod_h2__h2_ws__) */
index 8a1ee3faa5d0fa0b30a0a463e5ac1fee396b4ac1..1bd34b207dda06db0534987b63369ac38e2a2d78 100644 (file)
@@ -42,6 +42,7 @@
 #include "h2_switch.h"
 #include "h2_version.h"
 #include "h2_bucket_beam.h"
+#include "h2_ws.h"
 
 
 static void h2_hooks(apr_pool_t *pool);
@@ -199,6 +200,7 @@ static void h2_hooks(apr_pool_t *pool)
     h2_c1_register_hooks();
     h2_switch_register_hooks();
     h2_c2_register_hooks();
+    h2_ws_register_hooks();
 
     /* Setup subprocess env for certain variables
      */
index d9ff22203a8c4603396150138e97fa2884df47b7..977553436dac34a02e0426e17e4f2511b01bdcc3 100644 (file)
@@ -173,6 +173,10 @@ SOURCE=./h2_workers.c
 # End Source File
 # Begin Source File
 
+SOURCE=./h2_ws.c
+# End Source File
+# Begin Source File
+
 SOURCE=./mod_http2.c
 # End Source File
 # Begin Source File
index 614cdabb3531eefd74b96d560460ce5bd46fc2a6..750714560f05d87f6fad516308c4e41265dba3e9 100644 (file)
@@ -21,6 +21,7 @@
 #include "apr_version.h"
 #include "apr_strings.h"
 #include "apr_hash.h"
+#include "http_core.h"
 #include "proxy_util.h"
 #include "ajp.h"
 #include "scgi.h"
@@ -4871,7 +4872,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
 {
     apr_status_t rv;
     conn_rec *c_i = r->connection;
-    apr_interval_time_t timeout = -1;
+    apr_interval_time_t client_timeout = -1, origin_timeout = -1;
     proxy_tunnel_rec *tunnel;
 
     *ptunnel = NULL;
@@ -4898,9 +4899,16 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
     tunnel->client->bb = apr_brigade_create(c_i->pool, c_i->bucket_alloc);
     tunnel->client->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
     tunnel->client->pfd->p = r->pool;
-    tunnel->client->pfd->desc_type = APR_POLL_SOCKET;
-    tunnel->client->pfd->desc.s = ap_get_conn_socket(c_i);
+    tunnel->client->pfd->desc_type = APR_NO_DESC;
+    rv = ap_get_pollfd_from_conn(tunnel->client->c,
+                                 tunnel->client->pfd, &client_timeout);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
     tunnel->client->pfd->client_data = tunnel->client;
+    if (tunnel->client->pfd->desc_type == APR_POLL_SOCKET) {
+        apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
+    }
 
     tunnel->origin->c = c_o;
     tunnel->origin->name = "origin";
@@ -4910,17 +4918,12 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
     tunnel->origin->pfd->desc_type = APR_POLL_SOCKET;
     tunnel->origin->pfd->desc.s = ap_get_conn_socket(c_o);
     tunnel->origin->pfd->client_data = tunnel->origin;
+    apr_socket_timeout_get(tunnel->origin->pfd->desc.s, &origin_timeout);
+    apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_NONBLOCK, 1);
 
     /* Defaults to the biggest timeout of both connections */
-    apr_socket_timeout_get(tunnel->client->pfd->desc.s, &timeout);
-    apr_socket_timeout_get(tunnel->origin->pfd->desc.s, &tunnel->timeout);
-    if (timeout >= 0 && (tunnel->timeout < 0 || tunnel->timeout < timeout)) {
-        tunnel->timeout = timeout;
-    }
-
-    /* We should be nonblocking from now on the sockets */
-    apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
-    apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_NONBLOCK, 1);
+    tunnel->timeout = (origin_timeout >= 0 && origin_timeout > client_timeout)?
+                      origin_timeout : client_timeout;
 
     /* Bidirectional non-HTTP stream will confuse mod_reqtimeoout */
     ap_remove_input_filter_byhandle(c_i->input_filters, "reqtimeout");
@@ -4938,14 +4941,43 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
         tunnel->nohalfclose = 1;
     }
 
-    /* Start with POLLOUT and let ap_proxy_tunnel_run() schedule both
-     * directions when there are no output data pending (anymore).
-     */
-    tunnel->client->pfd->reqevents = APR_POLLOUT | APR_POLLERR;
-    tunnel->origin->pfd->reqevents = APR_POLLOUT | APR_POLLERR;
-    if ((rv = apr_pollset_add(tunnel->pollset, tunnel->client->pfd))
-            || (rv = apr_pollset_add(tunnel->pollset, tunnel->origin->pfd))) {
-        return rv;
+    if (tunnel->client->pfd->desc_type == APR_POLL_SOCKET) {
+        /* Both ends are sockets, the poll strategy is:
+         * - poll both sides POLLOUT
+         * - when one side is writable, remove the POLLOUT
+         *   and add POLLIN to the other side.
+         * - tunnel arriving data, remove POLLIN from the source
+         *   again and add POLLOUT to the receiving side
+         * - on EOF on read, remove the POLLIN from that side
+         * Repeat until both sides are down */
+        tunnel->client->pfd->reqevents = APR_POLLOUT | APR_POLLERR;
+        tunnel->origin->pfd->reqevents = APR_POLLOUT | APR_POLLERR;
+        if ((rv = apr_pollset_add(tunnel->pollset, tunnel->origin->pfd)) ||
+            (rv = apr_pollset_add(tunnel->pollset, tunnel->client->pfd))) {
+            return rv;
+        }
+    }
+    else if (tunnel->client->pfd->desc_type == APR_POLL_FILE) {
+        /* Input is a PIPE fd, the poll strategy is:
+         * - always POLLIN on origin
+         * - use socket strategy described above for client only
+         * otherwise the same
+         */
+        tunnel->client->pfd->reqevents = 0;
+        tunnel->origin->pfd->reqevents = APR_POLLIN | APR_POLLHUP |
+                                         APR_POLLOUT | APR_POLLERR;
+        if ((rv = apr_pollset_add(tunnel->pollset, tunnel->origin->pfd))) {
+            return rv;
+        }
+    }
+    else {
+        /* input is already closed, unsual, but we know nothing about
+         * the tunneled protocol. */
+        tunnel->client->down_in = 1;
+        tunnel->origin->pfd->reqevents = APR_POLLIN | APR_POLLHUP;
+        if ((rv = apr_pollset_add(tunnel->pollset, tunnel->origin->pfd))) {
+            return rv;
+        }
     }
 
     *ptunnel = tunnel;
@@ -5054,7 +5086,23 @@ static int proxy_tunnel_transfer(proxy_tunnel_rec *tunnel,
         }
 
         del_pollset(tunnel->pollset, in->pfd, APR_POLLIN);
-        add_pollset(tunnel->pollset, out->pfd, APR_POLLOUT);
+        if (out->pfd->desc_type == APR_POLL_SOCKET) {
+            /* if the output is a SOCKET, we can stop polling the input
+             * until the output signals POLLOUT again. */
+            add_pollset(tunnel->pollset, out->pfd, APR_POLLOUT);
+        }
+        else {
+            /* We can't use POLLOUT in this direction for the only
+             * APR_POLL_FILE case we have so far (mod_h2's "signal" pipe),
+             * we assume that the client's ouput filters chain will block/flush
+             * if necessary (i.e. no pending data), hence that the origin
+             * is EOF when reaching here. This direction is over. */
+            ap_assert(in->down_in && APR_STATUS_IS_EOF(rv));
+            ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, tunnel->r,
+                          "proxy: %s: %s write shutdown",
+                          tunnel->scheme, out->name);
+            out->down_out = 1;
+        }
     }
 
     return OK;
index ef7868107ddb60ba0e36eb3ffa98c80770eae8ef..5c065121bee3676b2b5607ea79c97b19206de8c5 100644 (file)
@@ -92,6 +92,7 @@
 APR_HOOK_STRUCT(
     APR_HOOK_LINK(get_mgmt_items)
     APR_HOOK_LINK(insert_network_bucket)
+    APR_HOOK_LINK(get_pollfd_from_conn)
 )
 
 AP_IMPLEMENT_HOOK_RUN_ALL(int, get_mgmt_items,
@@ -103,6 +104,11 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, insert_network_bucket,
                              apr_socket_t *socket),
                             (c, bb, socket), AP_DECLINED)
 
+AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, get_pollfd_from_conn,
+                            (conn_rec *c, struct apr_pollfd_t *pfd,
+                             apr_interval_time_t *ptimeout),
+                              (c, pfd, ptimeout), APR_ENOTIMPL)
+
 /* Server core module... This module provides support for really basic
  * server operations, including options and commands which control the
  * operation of other modules.  Consider this the bureaucracy module.
@@ -5971,6 +5977,28 @@ static int core_upgrade_storage(request_rec *r)
     return DECLINED;
 }
 
+static apr_status_t core_get_pollfd_from_conn(conn_rec *c,
+                                              struct apr_pollfd_t *pfd,
+                                              apr_interval_time_t *ptimeout)
+{
+    if (c && !c->master) {
+        pfd->desc_type = APR_POLL_SOCKET;
+        pfd->desc.s = ap_get_conn_socket(c);
+        if (ptimeout) {
+            apr_socket_timeout_get(pfd->desc.s, ptimeout);
+        }
+        return APR_SUCCESS;
+    }
+    return APR_ENOTIMPL;
+}
+
+AP_CORE_DECLARE(apr_status_t) ap_get_pollfd_from_conn(conn_rec *c,
+                                      struct apr_pollfd_t *pfd,
+                                      apr_interval_time_t *ptimeout)
+{
+    return ap_run_get_pollfd_from_conn(c, pfd, ptimeout);
+}
+
 static void register_hooks(apr_pool_t *p)
 {
     errorlog_hash = apr_hash_make(p);
@@ -6016,6 +6044,8 @@ static void register_hooks(apr_pool_t *p)
     ap_hook_open_htaccess(ap_open_htaccess, NULL, NULL, APR_HOOK_REALLY_LAST);
     ap_hook_optional_fn_retrieve(core_optional_fn_retrieve, NULL, NULL,
                                  APR_HOOK_MIDDLE);
+    ap_hook_get_pollfd_from_conn(core_get_pollfd_from_conn, NULL, NULL,
+                                 APR_HOOK_REALLY_LAST);
 
     ap_hook_input_pending(ap_filter_input_pending, NULL, NULL,
                           APR_HOOK_MIDDLE);
diff --git a/test/clients/.gitignore b/test/clients/.gitignore
new file mode 100644 (file)
index 0000000..18b1263
--- /dev/null
@@ -0,0 +1 @@
+h2ws
\ No newline at end of file
diff --git a/test/clients/Makefile.in b/test/clients/Makefile.in
new file mode 100644 (file)
index 0000000..a322a58
--- /dev/null
@@ -0,0 +1,20 @@
+DISTCLEAN_TARGETS = h2ws
+
+CLEAN_TARGETS = h2ws
+
+bin_PROGRAMS = h2ws
+TARGETS  = $(bin_PROGRAMS)
+
+PROGRAM_LDADD        = $(UTIL_LDFLAGS) $(PROGRAM_DEPENDENCIES) $(EXTRA_LIBS) $(AP_LIBS)
+PROGRAM_DEPENDENCIES =
+
+include $(top_builddir)/build/rules.mk
+
+h2ws.lo: h2ws.c
+       $(LIBTOOL) --mode=compile $(CC) $(ab_CFLAGS) $(ALL_CFLAGS) $(ALL_CPPFLAGS) \
+           $(ALL_INCLUDES) $(PICFLAGS) $(LTCFLAGS) -c $< && touch $@
+h2ws_OBJECTS = h2ws.lo
+h2ws_LDADD = -lnghttp2
+h2ws: $(h2ws_OBJECTS)
+       $(LIBTOOL) --mode=link $(CC) $(ALL_CFLAGS) $(PILDFLAGS) \
+           $(LT_LDFLAGS) $(ALL_LDFLAGS) -o $@ $(h2ws_LTFLAGS) $(h2ws_OBJECTS) $(h2ws_LDADD)
diff --git a/test/clients/h2ws.c b/test/clients/h2ws.c
new file mode 100644 (file)
index 0000000..a090ac7
--- /dev/null
@@ -0,0 +1,1096 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr.h>
+
+#include <assert.h>
+#include <inttypes.h>
+#include <stdlib.h>
+#ifdef APR_HAVE_UNISTD_H
+#  include <unistd.h>
+#endif /* HAVE_UNISTD_H */
+#ifdef APR_HAVE_FCNTL_H
+#  include <fcntl.h>
+#endif /* HAVE_FCNTL_H */
+#include <sys/types.h>
+#include <sys/time.h>
+#ifdef APR_HAVE_SYS_SOCKET_H
+#  include <sys/socket.h>
+#endif /* HAVE_SYS_SOCKET_H */
+#ifdef APR_HAVE_NETDB_H
+#  include <netdb.h>
+#endif /* HAVE_NETDB_H */
+#ifdef APR_HAVE_NETINET_IN_H
+#  include <netinet/in.h>
+#endif /* HAVE_NETINET_IN_H */
+#include <netinet/tcp.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+
+#include <nghttp2/nghttp2.h>
+
+#define MAKE_NV(NAME, VALUE)                                                   \
+  {                                                                            \
+    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1,    \
+        NGHTTP2_NV_FLAG_NONE                                                   \
+  }
+
+#define MAKE_NV_CS(NAME, VALUE)                                                \
+  {                                                                            \
+    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, strlen(VALUE),        \
+        NGHTTP2_NV_FLAG_NONE                                                   \
+  }
+
+
+static int verbose;
+static const char *cmd;
+
+static void log_out(const char *level, const char *where, const char *msg)
+{
+    struct timespec tp;
+    struct tm tm;
+    char timebuf[128];
+
+    clock_gettime(CLOCK_REALTIME, &tp);
+    localtime_r(&tp.tv_sec, &tm);
+    strftime(timebuf, sizeof(timebuf)-1, "%H:%M:%S", &tm);
+    fprintf(stderr, "[%s.%09lu][%s][%s] %s\n", timebuf, tp.tv_nsec, level, where, msg);
+}
+
+static void log_err(const char *where, const char *msg)
+{
+    log_out("ERROR", where, msg);
+}
+
+static void log_info(const char *where, const char *msg)
+{
+    if (verbose)
+        log_out("INFO", where, msg);
+}
+
+static void log_debug(const char *where, const char *msg)
+{
+    if (verbose > 1)
+        log_out("DEBUG", where, msg);
+}
+
+#if defined(__GNUC__)
+    __attribute__((format(printf, 2, 3)))
+#endif
+static void log_errf(const char *where, const char *msg, ...)
+{
+    char buffer[8*1024];
+    va_list ap;
+
+    va_start(ap, msg);
+    vsnprintf(buffer, sizeof(buffer), msg, ap);
+    va_end(ap);
+    log_err(where, buffer);
+}
+
+#if defined(__GNUC__)
+    __attribute__((format(printf, 2, 3)))
+#endif
+static void log_infof(const char *where, const char *msg, ...)
+{
+    if (verbose) {
+        char buffer[8*1024];
+        va_list ap;
+
+        va_start(ap, msg);
+        vsnprintf(buffer, sizeof(buffer), msg, ap);
+        va_end(ap);
+        log_info(where, buffer);
+    }
+}
+
+#if defined(__GNUC__)
+    __attribute__((format(printf, 2, 3)))
+#endif
+static void log_debugf(const char *where, const char *msg, ...)
+{
+    if (verbose > 1) {
+        char buffer[8*1024];
+        va_list ap;
+
+        va_start(ap, msg);
+        vsnprintf(buffer, sizeof(buffer), msg, ap);
+        va_end(ap);
+        log_debug(where, buffer);
+    }
+}
+
+static int parse_host_port(const char **phost, uint16_t *pport,
+                           int *pipv6, size_t *pconsumed,
+                           const char *s, size_t len, uint16_t def_port)
+{
+    size_t i, offset;
+    char *host = NULL;
+    int port = 0;
+    int rv = 1, ipv6 = 0;
+
+    if (!len)
+        goto leave;
+    offset = 0;
+    if (s[offset] == '[') {
+        ipv6 = 1;
+        for (i = offset++; i < len; ++i) {
+            if (s[i] == ']')
+              break;
+        }
+        if (i >= len || i == offset)
+            goto leave;
+        host = strndup(s + offset, i - offset);
+        offset = i + 1;
+    }
+    else {
+        for (i = offset; i < len; ++i) {
+            if (strchr(":/?#", s[i]))
+              break;
+        }
+        if (i == offset) {
+            log_debugf("parse_uri", "empty host name in '%.*s", (int)len, s);
+            goto leave;
+        }
+        host = strndup(s + offset, i - offset);
+        offset = i;
+    }
+    if (offset < len && s[offset] == ':') {
+        port = 0;
+        ++offset;
+        for (i = offset; i < len; ++i) {
+            if (strchr("/?#", s[i]))
+                break;
+            if (s[i] < '0' || s[i] > '9') {
+                log_debugf("parse_uri", "invalid port char '%c'", s[i]);
+                goto leave;
+            }
+            port *= 10;
+            port += s[i] - '0';
+            if (port > 65535) {
+                log_debugf("parse_uri", "invalid port number '%d'", port);
+                goto leave;
+            }
+        }
+        offset = i;
+    }
+    rv = 0;
+
+leave:
+    *phost = rv? NULL : host;
+    *pport = rv? 0 : (port? (uint16_t)port : def_port);
+    if (pipv6)
+      *pipv6 = ipv6;
+    if (pconsumed)
+      *pconsumed = offset;
+    return rv;
+}
+
+struct uri {
+  const char *scheme;
+  const char *host;
+  const char *authority;
+  const char *path;
+  uint16_t port;
+  int ipv6;
+};
+
+static int parse_uri(struct uri *uri, const char *s, size_t len)
+{
+    char tmp[8192];
+    size_t n, offset = 0;
+    uint16_t def_port = 0;
+    int rv = 1;
+
+    /* NOT A REAL URI PARSER */
+    memset(uri, 0, sizeof(*uri));
+    if (len > 5 && !memcmp("ws://", s, 5)) {
+        uri->scheme = "ws";
+        def_port = 80;
+        offset = 5;
+    }
+    else if (len > 6 && !memcmp("wss://", s, 6)) {
+        uri->scheme = "wss";
+        def_port = 443;
+        offset = 6;
+    }
+    else {
+        /* not a scheme we process */
+        goto leave;
+    }
+
+    if (parse_host_port(&uri->host, &uri->port, &uri->ipv6, &n, s + offset,
+                        len - offset, def_port))
+        goto leave;
+    offset += n;
+
+    if (uri->port == def_port)
+      uri->authority = uri->host;
+    else if (uri->ipv6) {
+      snprintf(tmp, sizeof(tmp), "[%s]:%u", uri->host, uri->port);
+      uri->authority = strdup(tmp);
+    }
+    else {
+      snprintf(tmp, sizeof(tmp), "%s:%u", uri->host, uri->port);
+      uri->authority = strdup(tmp);
+    }
+
+    if (offset < len) {
+        uri->path = strndup(s + offset, len - offset);
+    }
+    rv = 0;
+
+leave:
+    return rv;
+}
+
+static int sock_nonblock_nodelay(int fd) {
+  int flags, rv;
+  int val = 1;
+
+  while ((flags = fcntl(fd, F_GETFL, 0)) == -1 && errno == EINTR)
+      ;
+  if (flags == -1) {
+      log_errf("sock_nonblock_nodelay", "fcntl get error %d (%s)",
+               errno, strerror(errno));
+      return -1;
+  }
+  while ((rv = fcntl(fd, F_SETFL, flags | O_NONBLOCK)) == -1 && errno == EINTR)
+    ;
+  if (rv == -1) {
+      log_errf("sock_nonblock_nodelay", "fcntl set error %d (%s)",
+               errno, strerror(errno));
+      return -1;
+  }
+  rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t)sizeof(val));
+  if (rv == -1) {
+      log_errf("sock_nonblock_nodelay", "set nodelay error %d (%s)",
+               errno, strerror(errno));
+      return -1;
+  }
+  return 0;
+}
+
+static int open_connection(const char *host, uint16_t port)
+{
+    char service[NI_MAXSERV];
+    struct addrinfo hints;
+    struct addrinfo *res = NULL, *rp;
+    int rv, fd = -1;
+
+    memset(&hints, 0, sizeof(hints));
+    snprintf(service, sizeof(service), "%u", port);
+    hints.ai_family = AF_UNSPEC;
+    hints.ai_socktype = SOCK_STREAM;
+    rv = getaddrinfo(host, service, &hints, &res);
+    if (rv) {
+      log_err("getaddrinfo", gai_strerror(rv));
+      goto leave;
+    }
+
+    for (rp = res; rp; rp = rp->ai_next) {
+      fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+      if (fd == -1) {
+        continue;
+      }
+      while ((rv = connect(fd, rp->ai_addr, rp->ai_addrlen)) == -1 &&
+             errno == EINTR)
+        ;
+      if (!rv) /* connected */
+          break;
+      close(fd);
+      fd = -1;
+    }
+
+leave:
+    if (res)
+      freeaddrinfo(res);
+    return fd;
+}
+
+struct h2_stream;
+
+#define IO_WANT_NONE   0
+#define IO_WANT_READ   1
+#define IO_WANT_WRITE  2
+
+struct h2_session {
+    const char *server_name;
+    const char *connect_host;
+    uint16_t connect_port;
+    int fd;
+    nghttp2_session *ngh2;
+    struct h2_stream *streams;
+    int aborted;
+    int want_io;
+};
+
+typedef void h2_stream_closed_cb(struct h2_stream *stream);
+typedef void h2_stream_recv_data(struct h2_stream *stream,
+                                 const uint8_t *data, size_t len);
+
+struct h2_stream {
+    struct h2_stream *next;
+    struct uri *uri;
+    int32_t id;
+    int fdin;
+    int http_status;
+    uint32_t error_code;
+    unsigned input_closed : 1;
+    unsigned closed : 1;
+    unsigned reset : 1;
+    h2_stream_closed_cb *on_close;
+    h2_stream_recv_data *on_recv_data;
+};
+
+static void h2_session_stream_add(struct h2_session *session,
+                                  struct h2_stream *stream)
+{
+    struct h2_stream *s;
+    for (s = session->streams; s; s = s->next) {
+        if (s == stream)  /* already there? */
+            return;
+    }
+    stream->next = session->streams;
+    session->streams = stream;
+}
+
+static void h2_session_stream_remove(struct h2_session *session,
+                                     struct h2_stream *stream)
+{
+    struct h2_stream *s, **pnext;
+    pnext = &session->streams;
+    s = session->streams;
+    while (s) {
+        if (s == stream) {
+            *pnext = s->next;
+            s->next = NULL;
+            break;
+        }
+        pnext = &s->next;
+        s = s->next;
+    }
+}
+
+static struct h2_stream *h2_session_stream_get(struct h2_session *session,
+                                               int32_t id)
+{
+    struct h2_stream *s;
+    for (s = session->streams; s; s = s->next) {
+        if (s->id == id)
+            return s;
+    }
+    return NULL;
+}
+
+static ssize_t h2_session_send(nghttp2_session *ngh2, const uint8_t *data,
+                               size_t length, int flags, void *user_data)
+{
+    struct h2_session *session = user_data;
+    ssize_t nwritten;
+    (void)ngh2;
+    (void)flags;
+
+    session->want_io = IO_WANT_NONE;
+    nwritten = send(session->fd, data, length, 0);
+    if (nwritten < 0) {
+      int err = errno;
+      if ((EWOULDBLOCK == err) || (EAGAIN == err) ||
+          (EINTR == err) || (EINPROGRESS == err)) {
+          return NGHTTP2_ERR_WOULDBLOCK;
+      }
+      log_errf("h2_session_send", "error sending %ld bytes: %d (%s)",
+               (long)length, err, strerror(err));
+      return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+    return nwritten;
+}
+
+static ssize_t h2_session_recv(nghttp2_session *ngh2, uint8_t *buf,
+                               size_t length, int flags, void *user_data)
+{
+    struct h2_session *session = user_data;
+    ssize_t nread;
+    (void)ngh2;
+    (void)flags;
+
+    session->want_io = IO_WANT_NONE;
+    nread = recv(session->fd, buf, length, 0);
+    if (nread < 0) {
+      int err = errno;
+      if ((EWOULDBLOCK == err) || (EAGAIN == err) || (EINTR == err)) {
+          return NGHTTP2_ERR_WOULDBLOCK;
+      }
+      log_errf("h2_session_recv", "error reading %ld bytes: %d (%s)",
+               (long)length, err, strerror(err));
+      return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+    return nread;
+}
+
+static int h2_session_on_frame_send(nghttp2_session *session,
+                                    const nghttp2_frame *frame,
+                                    void *user_data)
+{
+    size_t i;
+    (void)user_data;
+
+    switch (frame->hd.type) {
+    case NGHTTP2_HEADERS:
+      if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)) {
+        const nghttp2_nv *nva = frame->headers.nva;
+        log_infof("frame send", "FRAME[HEADERS, stream=%d",
+                  frame->hd.stream_id);
+        for (i = 0; i < frame->headers.nvlen; ++i) {
+            log_infof("frame send", "  %.*s: %.*s",
+                      (int)nva[i].namelen, nva[i].name,
+                      (int)nva[i].valuelen, nva[i].value);
+        }
+        log_infof("frame send", "]");
+      }
+      break;
+    case NGHTTP2_DATA:
+        log_infof("frame send", "FRAME[DATA, stream=%d, length=%d, flags=%d]",
+                  frame->hd.stream_id, (int)frame->hd.length,
+                  (int)frame->hd.flags);
+        break;
+    case NGHTTP2_RST_STREAM:
+        log_infof("frame send", "FRAME[RST, stream=%d]",
+                  frame->hd.stream_id);
+        break;
+    case NGHTTP2_WINDOW_UPDATE:
+        log_infof("frame send", "FRAME[WINDOW_UPDATE, stream=%d]",
+                  frame->hd.stream_id);
+        break;
+    case NGHTTP2_GOAWAY:
+        log_infof("frame send", "FRAME[GOAWAY]");
+        break;
+    }
+    return 0;
+}
+
+static int h2_session_on_frame_recv(nghttp2_session *ngh2,
+                                    const nghttp2_frame *frame,
+                                    void *user_data)
+{
+    (void)user_data;
+
+    switch (frame->hd.type) {
+    case NGHTTP2_HEADERS:
+        if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
+          log_infof("frame recv", "FRAME[HEADERS, stream=%d]",
+                    frame->hd.stream_id);
+        }
+        break;
+    case NGHTTP2_DATA:
+        log_infof("frame recv", "FRAME[DATA, stream=%d, len=%lu, eof=%d]",
+                  frame->hd.stream_id, frame->hd.length,
+                  (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) != 0);
+        break;
+    case NGHTTP2_RST_STREAM:
+        log_infof("frame recv", "FRAME[RST, stream=%d]",
+                  frame->hd.stream_id);
+        fprintf(stdout, "[%d] RST\n", frame->hd.stream_id);
+        break;
+    case NGHTTP2_GOAWAY:
+        log_infof("frame recv", "FRAME[GOAWAY]");
+        break;
+    }
+    return 0;
+}
+
+static int h2_session_on_header(nghttp2_session *ngh2,
+                                const nghttp2_frame *frame,
+                                const uint8_t *name, size_t namelen,
+                                const uint8_t *value, size_t valuelen,
+                                uint8_t flags, void *user_data)
+{
+    struct h2_session *session = user_data;
+    struct h2_stream *stream;
+    (void)flags;
+    (void)user_data;
+    log_infof("frame recv", "stream=%d, HEADER   %.*s: %.*s",
+              frame->hd.stream_id, (int)namelen, name,
+              (int)valuelen, value);
+    stream = h2_session_stream_get(session, frame->hd.stream_id);
+    if (stream) {
+        if (namelen == 7 && !strncmp(":status", (const char *)name, namelen)) {
+            stream->http_status = 0;
+            if (valuelen < 10) {
+                char tmp[10], *endp;
+                memcpy(tmp, value, valuelen);
+                tmp[valuelen] = 0;
+                stream->http_status = (int)strtol(tmp, &endp, 10);
+            }
+            if (stream->http_status < 100 || stream->http_status >= 600) {
+                log_errf("on header recv", "stream=%d, invalid :status: %.*s",
+                          frame->hd.stream_id, (int)valuelen, value);
+                return NGHTTP2_ERR_CALLBACK_FAILURE;
+            }
+            else {
+                fprintf(stdout, "[%d] :status: %d\n", stream->id,
+                        stream->http_status);
+            }
+        }
+    }
+    return 0;
+}
+
+static int h2_session_on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
+                                      uint32_t error_code, void *user_data)
+{
+    struct h2_session *session = user_data;
+    struct h2_stream *stream;
+
+    stream = h2_session_stream_get(session, stream_id);
+    if (stream) {
+        /* closed known stream */
+        stream->error_code = error_code;
+        stream->closed = 1;
+        if (error_code)
+            stream->reset = 1;
+        if (error_code) {
+            log_errf("stream close", "stream %d closed with error %d",
+                     stream_id, error_code);
+        }
+
+        h2_session_stream_remove(session, stream);
+        if (stream->on_close)
+            stream->on_close(stream);
+        /* last one? */
+        if (!session->streams) {
+            int rv;
+            rv = nghttp2_session_terminate_session(ngh2, NGHTTP2_NO_ERROR);
+            if (rv) {
+                log_errf("terminate session", "error %d (%s)",
+                         rv, nghttp2_strerror(rv));
+                session->aborted = 1;
+            }
+        }
+    }
+    return 0;
+}
+
+static int h2_session_on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
+                                         int32_t stream_id, const uint8_t *data,
+                                         size_t len, void *user_data) {
+    struct h2_session *session = user_data;
+    struct h2_stream *stream;
+
+    stream = h2_session_stream_get(session, stream_id);
+    if (stream && stream->on_recv_data) {
+        stream->on_recv_data(stream, data, len);
+    }
+    return 0;
+}
+
+static int h2_session_open(struct h2_session *session, const char *server_name,
+                           const char *host, uint16_t port)
+{
+    nghttp2_session_callbacks *cbs = NULL;
+    int rv = -1;
+
+    memset(session, 0, sizeof(*session));
+    session->server_name = server_name;
+    session->connect_host = host;
+    session->connect_port = port;
+    /* establish socket */
+    session->fd = open_connection(session->connect_host, session->connect_port);
+    if (session->fd < 0) {
+      log_errf(cmd, "could not connect to %s:%u",
+               session->connect_host, session->connect_port);
+      goto leave;
+    }
+    if (sock_nonblock_nodelay(session->fd))
+        goto leave;
+    session->want_io = IO_WANT_NONE;
+
+    log_infof(cmd, "connected to %s via %s:%u", session->server_name,
+              session->connect_host, session->connect_port);
+
+    rv = nghttp2_session_callbacks_new(&cbs);
+    if (rv) {
+        log_errf("setup callbacks", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+        rv = -1;
+        goto leave;
+    }
+    /* setup session callbacks */
+    nghttp2_session_callbacks_set_send_callback(cbs, h2_session_send);
+    nghttp2_session_callbacks_set_recv_callback(cbs, h2_session_recv);
+    nghttp2_session_callbacks_set_on_frame_send_callback(
+        cbs, h2_session_on_frame_send);
+    nghttp2_session_callbacks_set_on_frame_recv_callback(
+        cbs, h2_session_on_frame_recv);
+    nghttp2_session_callbacks_set_on_header_callback(
+        cbs, h2_session_on_header);
+    nghttp2_session_callbacks_set_on_stream_close_callback(
+        cbs, h2_session_on_stream_close);
+    nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
+        cbs, h2_session_on_data_chunk_recv);
+    /* create the ngh2 session */
+    rv = nghttp2_session_client_new(&session->ngh2, cbs, session);
+    if (rv) {
+        log_errf("client new", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+        rv = -1;
+        goto leave;
+    }
+    /* submit initial settings */
+    rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, NULL, 0);
+    if (rv) {
+        log_errf("submit settings", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+        rv = -1;
+        goto leave;
+    }
+    rv = 0;
+
+leave:
+    if (cbs)
+        nghttp2_session_callbacks_del(cbs);
+    return rv;
+}
+
+static int h2_session_io(struct h2_session *session) {
+    int rv;
+    rv = nghttp2_session_recv(session->ngh2);
+    if (rv) {
+        log_errf("session recv", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+        return 1;
+    }
+    rv = nghttp2_session_send(session->ngh2);
+    if (rv) {
+        log_errf("session send", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+    }
+    return 0;
+}
+
+struct h2_poll_ctx;
+typedef int h2_poll_ev_cb(struct h2_poll_ctx *pctx, struct pollfd *pfd);
+
+struct h2_poll_ctx {
+    struct h2_session *session;
+    struct h2_stream *stream;
+    h2_poll_ev_cb *on_ev;
+};
+
+static int h2_session_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd)
+{
+    if (pfd->revents & (POLLIN | POLLOUT)) {
+        h2_session_io(pctx->session);
+    }
+    else if (pfd->revents & POLLHUP) {
+        log_errf("session run", "connection closed");
+        return -1;
+    }
+    else if (pfd->revents & POLLERR) {
+        log_errf("session run", "connection error");
+        return -1;
+    }
+    return 0;
+}
+
+static int h2_stream_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd)
+{
+    if (pfd->revents & (POLLIN | POLLHUP)) {
+        nghttp2_session_resume_data(pctx->session->ngh2, pctx->stream->id);
+    }
+    else if (pfd->revents & (POLLERR)) {
+        nghttp2_submit_rst_stream(pctx->session->ngh2, NGHTTP2_FLAG_NONE,
+                                  pctx->stream->id, NGHTTP2_STREAM_CLOSED);
+    }
+    return 0;
+}
+
+static nfds_t h2_session_set_poll(struct h2_session *session,
+                                  struct h2_poll_ctx *pollctxs,
+                                  struct pollfd *pfds)
+{
+    nfds_t n = 0;
+    int want_read, want_write;
+    struct h2_stream *stream;
+
+    want_read = (nghttp2_session_want_read(session->ngh2) ||
+                 session->want_io == IO_WANT_READ);
+    want_write = (nghttp2_session_want_write(session->ngh2) ||
+                  session->want_io == IO_WANT_WRITE);
+    if (want_read || want_write) {
+        pollctxs[n].session = session;
+        pollctxs[n].stream = NULL;
+        pollctxs[n].on_ev = h2_session_ev;
+        pfds[n].fd = session->fd;
+        pfds[n].events = pfds[n].revents = 0;
+        if (want_read)
+            pfds[n].events |= (POLLIN | POLLHUP);
+        if (want_write)
+            pfds[n].events |= (POLLOUT | POLLERR);
+        ++n;
+    }
+
+    for (stream = session->streams; stream; stream = stream->next) {
+        if (stream->fdin >= 0 && !stream->input_closed && !stream->closed) {
+            pollctxs[n].session = session;
+            pollctxs[n].stream = stream;
+            pollctxs[n].on_ev = h2_stream_ev;
+            pfds[n].fd = stream->fdin;
+            pfds[n].revents = 0;
+            pfds[n].events = (POLLIN | POLLHUP);
+            ++n;
+        }
+    }
+    return n;
+}
+
+static void h2_session_run(struct h2_session *session)
+{
+  struct h2_poll_ctx pollctxs[5];
+  struct pollfd pfds[5];
+  nfds_t npollfds, i;
+
+  npollfds  = h2_session_set_poll(session, pollctxs, pfds);
+  while (npollfds) {
+    if (poll(pfds, npollfds, -1) == -1) {
+        log_errf("session run", "poll error %d (%s)", errno, strerror(errno));
+        break;
+    }
+    for (i = 0; i < npollfds; ++i) {
+        if (pfds[i].revents) {
+            if (pollctxs[i].on_ev(&pollctxs[i], &pfds[i])) {
+                break;
+            }
+        }
+    }
+    npollfds = h2_session_set_poll(session, pollctxs, pfds);
+    if (!session->streams)
+        break;
+  }
+}
+
+static void h2_session_close(struct h2_session *session)
+{
+    log_infof(cmd, "closed session to %s:%u",
+              session->connect_host, session->connect_port);
+}
+
+/* websocket stream */
+
+struct ws_stream {
+  struct h2_stream s;
+};
+
+static void ws_stream_on_close(struct h2_stream *stream)
+{
+    log_infof("ws stream", "stream %d closed", stream->id);
+    if (!stream->reset)
+        fprintf(stdout, "[%d] EOF\n", stream->id);
+}
+
+static void ws_stream_on_recv_data(struct h2_stream *stream,
+                            const uint8_t *data, size_t len)
+{
+    size_t i;
+
+    log_infof("ws stream", "stream %d recv %lu data bytes",
+              stream->id, (unsigned long)len);
+    for (i = 0; i < len; ++i) {
+        fprintf(stdout, "%s%02x", (i&0xf)? " " : (i? "\n" : ""), data[i]);
+    }
+    fprintf(stdout, "\n");
+}
+
+static int ws_stream_create(struct ws_stream **pstream, struct uri *uri)
+{
+    struct ws_stream *stream;
+
+    stream = calloc(1, sizeof(*stream));
+    if (!stream) {
+        log_errf("ws stream create", "out of memory");
+        *pstream = NULL;
+        return -1;
+    }
+    stream->s.uri = uri;
+    stream->s.id = -1;
+    stream->s.on_close = ws_stream_on_close;
+    stream->s.on_recv_data = ws_stream_on_recv_data;
+    *pstream = stream;
+    return 0;
+}
+
+static ssize_t ws_stream_read_req_body(nghttp2_session *ngh2,
+                                       int32_t stream_id,
+                                       uint8_t *buf, size_t buflen,
+                                       uint32_t *pflags,
+                                       nghttp2_data_source *source,
+                                       void *user_data)
+{
+    struct h2_session *session = user_data;
+    struct ws_stream *stream;
+    ssize_t nread = 0;
+    int eof = 0;
+
+    stream = (struct ws_stream *)h2_session_stream_get(session, stream_id);
+    if (!stream) {
+         log_errf("stream req body", "stream not known");
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+
+    (void)source;
+    assert(stream->s.fdin >= 0);
+    nread = read(stream->s.fdin, buf, buflen);
+    log_debugf("stream req body", "fread(len=%lu) -> %ld",
+               (unsigned long)buflen, (long)nread);
+
+    if (nread < 0) {
+        if (errno == EAGAIN) {
+            nread = 0;
+        }
+        else {
+            log_errf("stream req body", "error on input");
+            return NGHTTP2_ERR_CALLBACK_FAILURE;
+        }
+    }
+    else if (nread == 0) {
+      eof = 1;
+      stream->s.input_closed = 1;
+    }
+
+    *pflags = stream->s.input_closed? NGHTTP2_DATA_FLAG_EOF : 0;
+    if (nread == 0 && !eof) {
+      return NGHTTP2_ERR_DEFERRED;
+    }
+    return nread;
+}
+
+static int ws_stream_submit(struct ws_stream *stream,
+                            struct h2_session *session,
+                            const nghttp2_nv *nva, size_t nvalen,
+                            int fdin)
+{
+    nghttp2_data_provider provider, *req_body = NULL;
+
+    if (fdin >= 0) {
+        sock_nonblock_nodelay(fdin);
+        stream->s.fdin = fdin;
+        provider.read_callback = ws_stream_read_req_body;
+        provider.source.ptr = NULL;
+        req_body = &provider;
+    }
+    else {
+        stream->s.input_closed = 1;
+    }
+
+    stream->s.id = nghttp2_submit_request(session->ngh2, NULL, nva, nvalen,
+                                          req_body, stream);
+    if (stream->s.id < 0) {
+        log_errf("ws stream submit", "nghttp2_submit_request: error %d",
+                 stream->s.id);
+        return -1;
+    }
+
+    h2_session_stream_add(session, &stream->s);
+    log_infof("ws stream submit", "stream %d opened for %s%s",
+              stream->s.id, stream->s.uri->authority, stream->s.uri->path);
+    return 0;
+}
+
+static void usage(const char *msg)
+{
+    if(msg)
+        fprintf(stderr, "%s\n", msg);
+    fprintf(stderr,
+        "usage: [options] ws-uri scenario\n"
+        "  run a websocket scenario to the ws-uri, options:\n"
+        "  -c host:port connect to host:port\n"
+        "  -v         increase verbosity\n"
+        "scenarios are:\n"
+        "  * fail-proto: CONNECT using wrong :protocol\n"
+        "  * miss-authority: CONNECT without :authority header\n"
+        "  * miss-path: CONNECT without :path header\n"
+        "  * miss-scheme: CONNECT without :scheme header\n"
+        "  * miss-version: CONNECT without sec-webSocket-version header\n"
+        "  * ws-empty: open valid websocket, do not send anything\n"
+    );
+}
+
+int main(int argc, char *argv[])
+{
+    const char *host = NULL, *scenario;
+    uint16_t port = 80;
+    struct uri uri;
+    struct h2_session session;
+    struct ws_stream *stream;
+    char ch;
+
+    cmd = argv[0];
+    while((ch = getopt(argc, argv, "c:vh")) != -1) {
+        switch(ch) {
+        case 'c':
+            if (parse_host_port(&host, &port, NULL, NULL,
+                                optarg, strlen(optarg), 80)) {
+                log_errf(cmd, "could not parse connect '%s'", optarg);
+                return 1;
+            }
+            break;
+        case 'h':
+            usage(NULL);
+            return 2;
+            break;
+        case 'v':
+            ++verbose;
+            break;
+        default:
+           usage("invalid option");
+           return 1;
+        }
+    }
+    argc -= optind;
+    argv += optind;
+
+    if (argc < 1) {
+        usage("need URL");
+        return 1;
+    }
+    if (argc < 2) {
+        usage("need scenario");
+        return 1;
+    }
+    if (parse_uri(&uri, argv[0], strlen(argv[0]))) {
+        log_errf(cmd, "could not parse uri '%s'", argv[0]);
+        return 1;
+    }
+    log_debugf(cmd, "normalized uri: %s://%s:%u%s", uri.scheme, uri.host,
+               uri.port, uri.path? uri.path : "");
+    scenario = argv[1];
+
+    if (!host) {
+        host = uri.host;
+        port = uri.port;
+    }
+
+    if (h2_session_open(&session, uri.host, host, port))
+        return 1;
+
+    if (ws_stream_create(&stream, &uri))
+        return 1;
+
+    if (!strcmp(scenario, "ws-stdin")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), 0))
+            return 1;
+    }
+    else if (!strcmp(scenario, "fail-proto")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websockets"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else if (!strcmp(scenario, "miss-version")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else if (!strcmp(scenario, "miss-path")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else if (!strcmp(scenario, "miss-scheme")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else if (!strcmp(scenario, "miss-authority")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else {
+        log_errf(cmd, "unknown scenario: %s", scenario);
+        return 1;
+    }
+
+    h2_session_run(&session);
+    h2_session_close(&session);
+    return 0;
+}
diff --git a/test/modules/http2/test_800_websockets.py b/test/modules/http2/test_800_websockets.py
new file mode 100644 (file)
index 0000000..58ac4eb
--- /dev/null
@@ -0,0 +1,306 @@
+import inspect
+import logging
+import os
+import shutil
+import subprocess
+import time
+from datetime import timedelta, datetime
+from typing import Tuple, Union, List
+import packaging.version
+
+import pytest
+import websockets
+from pyhttpd.result import ExecResult
+from pyhttpd.ws_util import WsFrameReader, WsFrame
+
+from .env import H2Conf, H2TestEnv
+
+
+log = logging.getLogger(__name__)
+
+ws_version = packaging.version.parse(websockets.version.version)
+ws_version_min = packaging.version.Version('10.4')
+
+
+def ws_run(env: H2TestEnv, path, do_input=None,
+           inbytes=None, send_close=True,
+           timeout=5, scenario='ws-stdin',
+           wait_close: float = 0.0) -> Tuple[ExecResult, List[str], Union[List[WsFrame], bytes]]:
+    """ Run the h2ws test client in various scenarios with given input and
+        timings.
+    :param env: the test environment
+    :param path: the path on the Apache server to CONNECt to
+    :param do_input: a Callable for sending input to h2ws
+    :param inbytes: fixed bytes to send to h2ws, unless do_input is given
+    :param send_close: send a CLOSE WebSockets frame at the end
+    :param timeout: timeout for waiting on h2ws to finish
+    :param scenario: name of scenario h2ws should run in
+    :param wait_close: time to wait before closing input
+    :return: ExecResult with exit_code/stdout/stderr of run
+    """
+    h2ws = os.path.join(env.clients_dir, 'h2ws')
+    if not os.path.exists(h2ws):
+        pytest.fail(f'test client not build: {h2ws}')
+    args = [
+        h2ws, '-vv', '-c', f'localhost:{env.http_port}',
+        f'ws://cgi.{env.http_tld}:{env.http_port}{path}',
+        scenario
+    ]
+    # we write all output to files, because we manipulate input timings
+    # and would run in deadlock situations with h2ws blocking operations
+    # because its output is not consumed
+    with open(f'{env.gen_dir}/h2ws.stdout', 'w') as fdout:
+        with open(f'{env.gen_dir}/h2ws.stderr', 'w') as fderr:
+            proc = subprocess.Popen(args=args, stdin=subprocess.PIPE,
+                                    stdout=fdout, stderr=fderr)
+            if do_input is not None:
+                do_input(proc)
+            elif inbytes is not None:
+                proc.stdin.write(inbytes)
+                proc.stdin.flush()
+
+            if wait_close > 0:
+                time.sleep(wait_close)
+            try:
+                inbytes = WsFrame.client_close(code=1000).to_network() if send_close else None
+                proc.communicate(input=inbytes, timeout=timeout)
+            except subprocess.TimeoutExpired:
+                log.error(f'ws_run: timeout expired')
+                proc.kill()
+                proc.communicate(timeout=timeout)
+    lines = open(f'{env.gen_dir}/h2ws.stdout').read().splitlines()
+    infos = [line for line in lines if line.startswith('[1] ')]
+    hex_content = ' '.join([line for line in lines if not line.startswith('[1] ')])
+    if len(infos) > 0 and infos[0] == '[1] :status: 200':
+        frames = WsFrameReader.parse(bytearray.fromhex(hex_content))
+    else:
+        frames = bytearray.fromhex(hex_content)
+    return ExecResult(args=args, exit_code=proc.returncode,
+                      stdout=b'', stderr=b''), infos, frames
+
+
+@pytest.mark.skipif(condition=H2TestEnv.is_unsupported, reason="mod_http2 not supported here")
+@pytest.mark.skipif(condition=ws_version < ws_version_min,
+                    reason=f'websockets is {ws_version}, need at least {ws_version_min}')
+class TestWebSockets:
+
+    @pytest.fixture(autouse=True, scope='class')
+    def _class_scope(self, env):
+        # Apache config that CONNECT proxies a WebSocket server for paths starting
+        # with '/ws/'
+        # The WebSocket server is started in pytest fixture 'ws_server' below.
+        conf = H2Conf(env, extras={
+            f'cgi.{env.http_tld}': [
+              f'  H2WebSockets on',
+              f'  ProxyPass /ws/ http://127.0.0.1:{env.ws_port}/ \\',
+              f'           upgrade=websocket timeout=10',
+            ]
+        })
+        conf.add_vhost_cgi(proxy_self=True, h2proxy_self=True).install()
+        assert env.apache_restart() == 0
+
+    def ws_check_alive(self, env, timeout=5):
+        url = f'http://localhost:{env.ws_port}/'
+        end = datetime.now() + timedelta(seconds=timeout)
+        while datetime.now() < end:
+            r = env.curl_get(url, 5)
+            if r.exit_code == 0:
+                return True
+            time.sleep(.1)
+        return False
+
+    def _mkpath(self, path):
+        if not os.path.exists(path):
+            return os.makedirs(path)
+
+    def _rmrf(self, path):
+        if os.path.exists(path):
+            return shutil.rmtree(path)
+
+    @pytest.fixture(autouse=True, scope='class')
+    def ws_server(self, env):
+        # Run our python websockets server that has some special behaviour
+        # for the different path to CONNECT to.
+        run_dir = os.path.join(env.gen_dir, 'ws-server')
+        err_file = os.path.join(run_dir, 'stderr')
+        self._rmrf(run_dir)
+        self._mkpath(run_dir)
+        with open(err_file, 'w') as cerr:
+            cmd = os.path.join(os.path.dirname(inspect.getfile(TestWebSockets)),
+                               'ws_server.py')
+            args = ['python3', cmd, '--port', str(env.ws_port)]
+            p = subprocess.Popen(args=args, cwd=run_dir, stderr=cerr,
+                                 stdout=cerr)
+            if not self.ws_check_alive(env):
+                p.kill()
+                p.wait()
+                pytest.fail(f'ws_server did not start. stderr={open(err_file).readlines()}')
+            yield
+            p.terminate()
+
+    # a correct CONNECT, send CLOSE, expect CLOSE, basic success
+    def test_h2_800_01_ws_empty(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == 1, f'{frames}'
+        assert frames[0].opcode == WsFrame.CLOSE, f'{frames}'
+
+    # CONNECT with invalid :protocol header, must fail
+    def test_h2_800_02_fail_proto(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='fail-proto')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}'
+
+    # CONNECT to a URL path that does not exist on the server
+    def test_h2_800_03_not_found(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/does-not-exist')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 404', '[1] EOF'], f'{r}'
+
+    # CONNECT to a URL path that is a normal HTTP file resource
+    # we do not want to receive the body of that
+    def test_h2_800_04_non_ws_resource(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/alive.json')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}'
+        assert frames == b''
+
+    # CONNECT to a URL path that sends a delayed HTTP response body
+    # we do not want to receive the body of that
+    def test_h2_800_05_non_ws_delay_resource(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/h2test/error?body_delay=100ms')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}'
+        assert frames == b''
+
+    # CONNECT missing the sec-webSocket-version header
+    def test_h2_800_06_miss_version(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-version')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}'
+
+    # CONNECT missing the :path header
+    def test_h2_800_07_miss_path(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-path')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] RST'], f'{r}'
+
+    # CONNECT missing the :scheme header
+    def test_h2_800_08_miss_scheme(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-scheme')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] RST'], f'{r}'
+
+    # CONNECT missing the :authority header
+    def test_h2_800_09_miss_authority(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-authority')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] RST'], f'{r}'
+
+    # CONNECT and exchange a PING
+    def test_h2_800_10_ws_ping(self, env: H2TestEnv, ws_server):
+        ping = WsFrame.client_ping(b'12345')
+        r, infos, frames = ws_run(env, path='/ws/echo/', inbytes=ping.to_network())
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == 2, f'{frames}'
+        assert frames[0].opcode == WsFrame.PONG, f'{frames}'
+        assert frames[0].data == ping.data, f'{frames}'
+        assert frames[1].opcode == WsFrame.CLOSE, f'{frames}'
+
+    # CONNECT and send several PINGs with a delay of 200ms
+    def test_h2_800_11_ws_timed_pings(self, env: H2TestEnv, ws_server):
+        frame_count = 5
+        ping = WsFrame.client_ping(b'12345')
+
+        def do_send(proc):
+            for _ in range(frame_count):
+                try:
+                    proc.stdin.write(ping.to_network())
+                    proc.stdin.flush()
+                    proc.wait(timeout=0.2)
+                except subprocess.TimeoutExpired:
+                    pass
+
+        r, infos, frames = ws_run(env, path='/ws/echo/', do_input=do_send)
+        assert r.exit_code == 0
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == frame_count + 1, f'{frames}'
+        assert frames[-1].opcode == WsFrame.CLOSE, f'{frames}'
+        for i in range(frame_count):
+            assert frames[i].opcode == WsFrame.PONG, f'{frames}'
+            assert frames[i].data == ping.data, f'{frames}'
+
+    # CONNECT to path that closes immediately
+    def test_h2_800_12_ws_unknown(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/unknown')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == 1, f'{frames}'
+        # expect a CLOSE with code=4999, reason='path unknown'
+        assert frames[0].opcode == WsFrame.CLOSE, f'{frames}'
+        assert frames[0].data[2:].decode() == 'path unknown', f'{frames}'
+
+    # CONNECT to a path that sends us 1 TEXT frame
+    def test_h2_800_13_ws_text(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/text/')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == 2, f'{frames}'
+        assert frames[0].opcode == WsFrame.TEXT, f'{frames}'
+        assert frames[0].data.decode() == 'hello!', f'{frames}'
+        assert frames[1].opcode == WsFrame.CLOSE, f'{frames}'
+
+    # CONNECT to a path that sends us a named file in BINARY frames
+    @pytest.mark.parametrize("fname,flen", [
+        ("data-1k", 1000),
+        ("data-10k", 10000),
+        ("data-100k", 100*1000),
+        ("data-1m", 1000*1000),
+    ])
+    def test_h2_800_14_ws_file(self, env: H2TestEnv, ws_server, fname, flen):
+        r, infos, frames = ws_run(env, path=f'/ws/file/{fname}', wait_close=0.5)
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) > 0
+        total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+        assert total_len == flen, f'{frames}'
+
+    # CONNECT to path with 1MB file and trigger varying BINARY frame lengths
+    @pytest.mark.parametrize("frame_len", [
+        1000 * 1024,
+        100 * 1024,
+        10 * 1024,
+        1 * 1024,
+        512,
+    ])
+    def test_h2_800_15_ws_frame_len(self, env: H2TestEnv, ws_server, frame_len):
+        fname = "data-1m"
+        flen = 1000*1000
+        r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}', wait_close=0.5)
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) > 0
+        total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+        assert total_len == flen, f'{frames}'
+
+    # CONNECT to path with 1MB file and trigger delays between BINARY frame writes
+    @pytest.mark.parametrize("frame_delay", [
+        1,
+        10,
+        50,
+        100,
+    ])
+    def test_h2_800_16_ws_frame_delay(self, env: H2TestEnv, ws_server, frame_delay):
+        fname = "data-1m"
+        flen = 1000*1000
+        # adjust frame_len to allow for 1 second overall duration
+        frame_len = int(flen / (1000 / frame_delay))
+        r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}/{frame_delay}',
+                                  wait_close=1.5)
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) > 0
+        total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+        assert total_len == flen, f'{frames}\n{r}'
diff --git a/test/modules/http2/ws_server.py b/test/modules/http2/ws_server.py
new file mode 100644 (file)
index 0000000..bcb8d3b
--- /dev/null
@@ -0,0 +1,100 @@
+#!/usr/bin/env python3
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import time
+
+import websockets.server as ws_server
+from websockets.exceptions import ConnectionClosedError
+
+log = logging.getLogger(__name__)
+
+logging.basicConfig(
+    format="[%(asctime)s] %(message)s",
+    level=logging.DEBUG,
+)
+
+
+async def echo(websocket):
+    try:
+        async for message in websocket:
+            try:
+                log.info(f'got request {message}')
+            except Exception as e:
+                log.error(f'error {e} getting path from {message}')
+            await websocket.send(message)
+    except ConnectionClosedError:
+        pass
+
+
+async def on_async_conn(conn):
+    rpath = str(conn.path)
+    pcomps = rpath[1:].split('/')
+    if len(pcomps) == 0:
+        pcomps = ['echo']  # default handler
+    log.info(f'connection for {pcomps}')
+    if pcomps[0] == 'echo':
+        log.info(f'/echo endpoint')
+        for message in await conn.recv():
+            await conn.send(message)
+    elif pcomps[0] == 'text':
+        await conn.send('hello!')
+    elif pcomps[0] == 'file':
+        if len(pcomps) < 2:
+            conn.close(code=4999, reason='unknown file')
+            return
+        fpath = os.path.join('../', pcomps[1])
+        if not os.path.exists(fpath):
+            conn.close(code=4999, reason='file not found')
+            return
+        bufsize = 0
+        if len(pcomps) > 2:
+            bufsize = int(pcomps[2])
+        if bufsize <= 0:
+            bufsize = 16*1024
+        delay_ms = 0
+        if len(pcomps) > 3:
+            delay_ms = int(pcomps[3])
+        with open(fpath, 'r+b') as fd:
+            while True:
+                buf = fd.read(bufsize)
+                if buf is None or len(buf) == 0:
+                    break
+                await conn.send(buf)
+                if delay_ms > 0:
+                    time.sleep(delay_ms/1000)
+    else:
+        log.info(f'unknown endpoint: {rpath}')
+        await conn.close(code=4999, reason='path unknown')
+    await conn.close(code=1000, reason='')
+
+
+async def run_server(port):
+    log.info(f'starting server on port {port}')
+    async with ws_server.serve(ws_handler=on_async_conn,
+                               host="localhost", port=port):
+        await asyncio.Future()
+
+
+async def main():
+    parser = argparse.ArgumentParser(prog='scorecard',
+                                     description="Run a websocket echo server.")
+    parser.add_argument("--port", type=int,
+                        default=0, help="port to listen on")
+    args = parser.parse_args()
+
+    if args.port == 0:
+        sys.stderr.write('need --port\n')
+        sys.exit(1)
+
+    logging.basicConfig(
+        format="%(asctime)s %(message)s",
+        level=logging.DEBUG,
+    )
+    await run_server(args.port)
+
+
+if __name__ == "__main__":
+    asyncio.run(main())
index e1ae0707ab0003497ed67eb47c666c3cd0a44d57..3f42248f65b34ea8320b7571175ed64e9fb9c127 100644 (file)
@@ -26,6 +26,7 @@ http_port = 5002
 https_port = 5001
 proxy_port = 5003
 http_port2 = 5004
+ws_port = 5100
 http_tld = tests.httpd.apache.org
 test_dir = @abs_srcdir@
 test_src_dir = @abs_srcdir@
index 842e369cbced47a8e4e15f283aa291a80667e670..51118833882468631fc735af73fb2a52b2048041 100644 (file)
@@ -250,8 +250,10 @@ class HttpdTestEnv:
         self._http_port2 = int(self.config.get('test', 'http_port2'))
         self._https_port = int(self.config.get('test', 'https_port'))
         self._proxy_port = int(self.config.get('test', 'proxy_port'))
+        self._ws_port = int(self.config.get('test', 'ws_port'))
         self._http_tld = self.config.get('test', 'http_tld')
         self._test_dir = self.config.get('test', 'test_dir')
+        self._clients_dir = os.path.join(os.path.dirname(self._test_dir), 'clients')
         self._gen_dir = self.config.get('test', 'gen_dir')
         self._server_dir = os.path.join(self._gen_dir, 'apache')
         self._server_conf_dir = os.path.join(self._server_dir, "conf")
@@ -366,6 +368,10 @@ class HttpdTestEnv:
     def proxy_port(self) -> int:
         return self._proxy_port
 
+    @property
+    def ws_port(self) -> int:
+        return self._ws_port
+
     @property
     def http_tld(self) -> str:
         return self._http_tld
@@ -390,6 +396,10 @@ class HttpdTestEnv:
     def test_dir(self) -> str:
         return self._test_dir
 
+    @property
+    def clients_dir(self) -> str:
+        return self._clients_dir
+
     @property
     def server_dir(self) -> str:
         return self._server_dir
@@ -519,12 +529,14 @@ class HttpdTestEnv:
         if not os.path.exists(path):
             return os.makedirs(path)
 
-    def run(self, args, stdout_list=False, intext=None, debug_log=True):
+    def run(self, args, stdout_list=False, intext=None, inbytes=None, debug_log=True):
         if debug_log:
             log.debug(f"run: {args}")
         start = datetime.now()
+        if intext is not None:
+            inbytes = intext.encode()
         p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
-                           input=intext.encode() if intext else None)
+                           input=inbytes)
         stdout_as_list = None
         if stdout_list:
             try:
diff --git a/test/pyhttpd/ws_util.py b/test/pyhttpd/ws_util.py
new file mode 100644 (file)
index 0000000..38a3cf7
--- /dev/null
@@ -0,0 +1,137 @@
+import logging
+import struct
+
+
+log = logging.getLogger(__name__)
+
+
+class WsFrame:
+
+    CONT = 0
+    TEXT = 1
+    BINARY = 2
+    RSVD3 = 3
+    RSVD4 = 4
+    RSVD5 = 5
+    RSVD6 = 6
+    RSVD7 = 7
+    CLOSE = 8
+    PING = 9
+    PONG = 10
+    RSVD11 = 11
+    RSVD12 = 12
+    RSVD13 = 13
+    RSVD14 = 14
+    RSVD15 = 15
+
+    OP_NAMES = [
+        "CONT",
+        "TEXT",
+        "BINARY",
+        "RSVD3",
+        "RSVD4",
+        "RSVD5",
+        "RSVD6",
+        "RSVD7",
+        "CLOSE",
+        "PING",
+        "PONG",
+        "RSVD11",
+        "RSVD12",
+        "RSVD13",
+        "RSVD14",
+        "RSVD15",
+    ]
+
+    def __init__(self, opcode: int, fin: bool, mask: bytes, data: bytes):
+        self.opcode = opcode
+        self.fin = fin
+        self.mask = mask
+        self.data = data
+        self.length = len(data)
+
+    def __repr__(self):
+        return f'WsFrame[{self.OP_NAMES[self.opcode]} fin={self.fin}, mask={self.mask}, len={len(self.data)}]'
+
+    @property
+    def data_len(self) -> int:
+        return len(self.data) if self.data else 0
+
+    def to_network(self) -> bytes:
+        nd = bytearray()
+        h1 = self.opcode
+        if self.fin:
+            h1 |= 0x80
+        nd.extend(struct.pack("!B", h1))
+        mask_bit = 0x80 if self.mask is not None else 0x0
+        h2 = self.data_len
+        if h2 > 65535:
+            nd.extend(struct.pack("!BQ", 127|mask_bit, h2))
+        elif h2 > 126:
+            nd.extend(struct.pack("!BH", 126|mask_bit, h2))
+        else:
+            nd.extend(struct.pack("!B", h2|mask_bit))
+        if self.mask is not None:
+            nd.extend(self.mask)
+        if self.data is not None:
+            nd.extend(self.data)
+        return nd
+
+    @classmethod
+    def client_ping(cls, data: bytes, mask: bytes = None) -> 'WsFrame':
+        if mask is None:
+            mask = bytes.fromhex('00 00 00 00')
+        return WsFrame(opcode=WsFrame.PING, fin=True, mask=mask, data=data)
+
+    @classmethod
+    def client_close(cls, code: int, reason: str = None,
+                     mask: bytes = None) -> 'WsFrame':
+        data = bytearray(struct.pack("!H", code))
+        if reason is not None:
+            data.extend(reason.encode())
+        if mask is None:
+            mask = bytes.fromhex('00 00 00 00')
+        return WsFrame(opcode=WsFrame.CLOSE, fin=True, mask=mask, data=data)
+
+
+class WsFrameReader:
+
+    def __init__(self, data: bytes):
+        self.data = data
+
+    def _read(self, n: int):
+        if len(self.data) < n:
+            raise EOFError(f'have {len(self.data)} bytes left, but {n} requested')
+        elif n == 0:
+            return b''
+        chunk = self.data[:n]
+        del self.data[:n]
+        return chunk
+
+    def next_frame(self):
+        data = self._read(2)
+        h1, h2 = struct.unpack("!BB", data)
+        log.debug(f'parsed h1={h1} h2={h2} from {data}')
+        fin = True if h1 & 0x80 else False
+        opcode = h1 & 0xf
+        has_mask = True if h2 & 0x80 else False
+        mask = None
+        dlen = h2 & 0x7f
+        if dlen == 126:
+            (dlen,) = struct.unpack("!H", self._read(2))
+        elif dlen == 127:
+            (dlen,) = struct.unpack("!Q", self._read(8))
+        if has_mask:
+            mask = self._read(4)
+        return WsFrame(opcode=opcode, fin=fin, mask=mask, data=self._read(dlen))
+
+    def eof(self):
+        return len(self.data) == 0
+
+    @classmethod
+    def parse(cls, data: bytes):
+        frames = []
+        reader = WsFrameReader(data=data)
+        while not reader.eof():
+            frames.append(reader.next_frame())
+        return frames
index 3a2cdbd009f7cfd244bd4e738d2edf2a37bf4005..39c331da5cdf99940af0a213ea747aa72a143ee5 100755 (executable)
@@ -221,6 +221,8 @@ if ! test -v SKIP_TESTING; then
     fi
 
     if test -v TEST_H2 -a $RV -eq 0; then
+        # Build the test clients
+        (cd test/clients && make)
         # Run HTTP/2 tests.
         MPM=event py.test-3 test/modules/http2
         RV=$?